Train_Identify/nvidia_ascend_engine/common_engine/DataDealEngine/DataDealEngine.cpp

375 lines
13 KiB
C++
Raw Normal View History

2024-01-23 02:46:26 +00:00
#include "DataDealEngine.h"
using namespace ai_matrix;
extern std::atomic_uint64_t g_i64ReRunTimeStamp;
extern std::atomic_uint32_t g_iReRunOrigFrameId;
extern std::atomic_uint32_t g_iReRunFrameId;
DataDealEngine::DataDealEngine() {}
DataDealEngine::~DataDealEngine() {}
APP_ERROR DataDealEngine::Init()
{
strPort0_ = engineName_ + "_" + std::to_string(engineId_) + "_0";
strPort1_ = engineName_ + "_" + std::to_string(engineId_) + "_1";
strPort2_ = engineName_ + "_" + std::to_string(engineId_) + "_2";
strResultPath_ = MyYaml::GetIns()->GetPathValue("gc_result_path");
iPushDirection_ = MyYaml::GetIns()->GetIntValue("gc_push_direction");
dataSourceConfig_ = MyYaml::GetIns()->GetDataSourceConfigById(engineId_); //获取摄像机参数
iIntervalTime_ = (dataSourceConfig_.iSkipInterval - 1) * 40 * 1000;
mapUseDataSouceCfg_ = MyYaml::GetIns()->GetUseDataSourceConfig();
std::string delimiter(",");
for (auto iter = mapUseDataSouceCfg_.begin(); iter!=mapUseDataSouceCfg_.end(); iter++)
{
int iSourceId = iter->first;
std::vector<std::string> vecSplit = MyUtils::getins()->split(iter->second.strTarget, delimiter);
std::vector<std::string> vecPushPorts;
for (auto iter = vecSplit.begin(); iter != vecSplit.end(); iter++)
{
if (*iter == "NUM")
{
vecPushPorts.push_back(strPort0_);
}
else if (*iter == "CHKDATE")
{
vecPushPorts.push_back(strPort1_);
}
else if(*iter == "CONTAINER" || *iter == "CONTAINER_T")
{
vecPushPorts.push_back(strPort2_);
}
}
mapSourcePushPort_[iSourceId] = vecPushPorts;
}
InitParam();
LogInfo << "engineId_:" << engineId_ << " DataDealEngine Init ok";
return APP_ERR_OK;
}
APP_ERROR DataDealEngine::DeInit()
{
LogInfo << "engineId_:" << engineId_ << " DataDealEngine DeInit ok";
return APP_ERR_OK;
}
/**
* ()
* inParam : N/A
* outParam: N/A
* return : N/A
*/
void DataDealEngine::InitParam()
{
iOrigDataNO_ = 1;
strDataDir_ = "";
moveData_.i64TimeStamp = 0;
moveData_.bHasTrain = false;
moveData_.bIsEnd = false;
iDataNO_ = 1;
strTrainDate_ = "";
strTrainName_ = "";
iDirection_ = DIRECTION_UNKNOWN;
bNotPushFlag_ = false;
}
/**
*
* inParam : RawData &rawData :
* outParam: std::shared_ptr<DecodedData> pDecodedData :
* return : true()/false()
*/
bool DataDealEngine::GetJpegdOut(std::shared_ptr<DecodedData> pDecodedData, RawData &rawData)
{
return true;
}
/**
*
* inParam : N/A
* outParam: N/A
* return : N/A
*/
bool DataDealEngine::ReadFileInfo(Json::Value &jvFrameInfo, RawData &rawData, std::string &strFileName, std::string &strImgName)
{
//图片或文件不存在时休眠10ms直接return重新获取。
if (access(strFileName.c_str(), F_OK) != 0 || access(strImgName.c_str(), F_OK) != 0)
{
LogWarn << "txt:" << strFileName << " or image:" << strImgName << " no exist";
return false;
}
if (!MyUtils::getins()->ReadJsonInfo(jvFrameInfo, strFileName))
{
LogError << "Failed to read txt:" << strFileName;
return false;
}
//读取图片内容
// int iRet = ReadFile(strImgName, rawData);
// if (iRet != APP_ERR_OK)
// {
// LogError << "Failed to read image:" << strImgName;
// return false;
// }
return true;
}
/**
* push数据到队列push
* inParam : const std::string strPort push的端口
: const std::shared_ptr<ProcessData> &pProcessData push的数据
* outParam: N/A
* return : N/A
*/
void DataDealEngine::PushData(const std::string &strPort, const std::shared_ptr<ProcessData> &pProcessData)
{
while (true)
{
int iRet = outputQueMap_[strPort]->push(std::static_pointer_cast<void>(pProcessData));
if (iRet != 0)
{
LogDebug << "sourceid:" << pProcessData->iDataSource << " frameid:" << pProcessData->iFrameId << " push fail iRet:" << iRet;
if (iRet == 2)
{
usleep(10000); // 10ms
continue;
}
}
break;
}
}
/**
* push
* inParam : N/A
* outParam: N/A
* return : N/A
*/
void DataDealEngine::MakeProcessData()
{
int iRet = APP_ERR_OK;
uint64_t i64TimeStampTemp = 0;
bool bIsEndByStop = false;
uint64_t i64ReRunTimeStamp = 0;
uint32_t iReRunOrigFrameId = 0;
uint32_t iReRunFrameId = 0;
if (g_iReRunOrigFrameId != 0)
{
LogDebug << "g_i64ReRunTimeStamp:" << g_i64ReRunTimeStamp << " g_iReRunOrigFrameId:" << g_iReRunOrigFrameId
<< " g_iReRunFrameId:" << g_iReRunFrameId;
i64ReRunTimeStamp = g_i64ReRunTimeStamp;
iReRunOrigFrameId = g_iReRunOrigFrameId;
iReRunFrameId = g_iReRunFrameId;
g_i64ReRunTimeStamp = 0;
g_iReRunOrigFrameId = 0;
g_iReRunFrameId = 0;
bIsEndByStop = true;
}
for (auto iter = mapUseDataSouceCfg_.begin(); iter!=mapUseDataSouceCfg_.end(); iter++)
{
int iSourceId = iter->first;
char szCameraNo[5] = {0};
sprintf(szCameraNo, "%03d/", iSourceId + 1);
uint32_t iOrigFrameId = iOrigDataNO_ * dataSourceConfig_.iSkipInterval;
uint32_t iFrameId = iDataNO_ * dataSourceConfig_.iSkipInterval;
bool bIsEndFlag = (moveData_.bIsEnd && moveData_.iFrameId == iOrigFrameId);
if (bIsEndByStop)
{
bIsEndFlag = true;
iOrigFrameId = iReRunOrigFrameId;
iFrameId = iReRunFrameId;
}
LogInfo << "sourceid:" << iSourceId << " MakeProcessData origtime:" << moveData_.strTrainName << " iOrigFrameId:" << iOrigFrameId
<< " time:" << strTrainName_ << " iFrameId:" << iFrameId << " bIsEndFlag:" << bIsEndFlag;
std::string strImgName = strDataDir_ + szCameraNo + std::to_string(iOrigFrameId);
strImgName += (iter->second.iRotate != 0) ? "_rotate.jpg" : ".jpg";
std::string strFileName = strDataDir_ + szCameraNo + std::to_string(iOrigFrameId) + ".txt";
//摄像头读取失败后重试2000次。
Json::Value jvFrameInfo;
RawData rawData;
bool bRet = false;
int iNoDataCnt = 0;
while (!isStop_ && iNoDataCnt < 30)
{
bRet = ReadFileInfo(jvFrameInfo, rawData, strFileName, strImgName);
if (bRet)
{
break;
}
usleep(500 * 1000); // 500ms
iNoDataCnt++;
}
//组织数据
std::shared_ptr<ProcessData> pProcessData = std::make_shared<ProcessData>();
pProcessData->iDataSource = iSourceId;
pProcessData->iFrameId = iFrameId;
pProcessData->strPicFilePath = strImgName;
pProcessData->i64TimeStamp = i64TimeStampTemp;
pProcessData->strOrigTrainDate = moveData_.strTrainDate;
pProcessData->strOrigTrainName = moveData_.strTrainName;
pProcessData->iOrigFrameId = iOrigFrameId;
pProcessData->strTrainDate = strTrainDate_;
pProcessData->strTrainName = strTrainName_;
pProcessData->iStatus = TRAINSTATUS_RUN;
pProcessData->bIsEnd = bIsEndFlag;
pProcessData->iDataNO = iDataNO_;
pProcessData->nMonitorState = moveData_.nMonitorState;
if (bRet)
{
i64TimeStampTemp = jvFrameInfo["timeStamp"].asUInt64();
pProcessData->i64TimeStamp = i64TimeStampTemp;
pProcessData->iWidth = jvFrameInfo["width"].asInt();
pProcessData->iHeight = jvFrameInfo["height"].asInt();
pProcessData->iDirection = jvFrameInfo["direction"].asInt();
pProcessData->iRate = jvFrameInfo["rate"].asInt();
cv::Mat cvframe = cv::imread(pProcessData->strPicFilePath);
int iBufferSize = pProcessData->iWidth * pProcessData->iHeight * 3;
void* pBGRBufferobj = nullptr;
pBGRBufferobj = new uint8_t[iBufferSize];
memcpy(pBGRBufferobj, cvframe.data, iBufferSize);
pProcessData->pData.reset(pBGRBufferobj, [](void* data){if(data) {delete[] data; data = nullptr;}});
pProcessData->iSize = iBufferSize;
}
std::vector<std::string> vecPushPorts = mapSourcePushPort_[iSourceId];
for (int iPort = 0; iPort < vecPushPorts.size(); iPort++)
{
if (iPort == vecPushPorts.size() - 1)
{
//iRet = outputQueMap_[vecPushPorts[iPort]]->push(std::static_pointer_cast<void>(pProcessData));
PushData(vecPushPorts[iPort], pProcessData);
continue;
}
std::shared_ptr<ProcessData> pNewProcessData = std::make_shared<ProcessData>();
*pNewProcessData = *pProcessData;
//iRet = outputQueMap_[vecPushPorts[iPort]]->push(std::static_pointer_cast<void>(pNewProcessData));
PushData(vecPushPorts[iPort], pNewProcessData);
}
}
iOrigDataNO_++;
iDataNO_++;
//每组处理数据需间隔一定时间
usleep(iIntervalTime_);
if (bIsEndByStop)
{
iDataNO_ = 1;
iOrigDataNO_ = iReRunOrigFrameId / dataSourceConfig_.iSkipInterval;
std::string strDateTime = MyUtils::getins()->GetDateTimeByMilliSeconds(i64ReRunTimeStamp);
strTrainDate_ = strDateTime.substr(0, 10);
strTrainName_ = strDateTime.substr(11, strDateTime.length());
std::string strTrainPath = strResultPath_ + strTrainDate_ + "/" + strTrainName_ + "/";
MyUtils::getins()->CreateDirPath(strTrainPath);
LogDebug << "rerun traindate:" << strTrainDate_ << " trainname:" << strTrainName_;
}
}
APP_ERROR DataDealEngine::Process()
{
int iRet = APP_ERR_OK;
//return APP_ERR_OK;
while (!isStop_)
{
//获取主摄像头检测的状态
std::shared_ptr<void> pVoidData0 = nullptr;
iRet = inputQueMap_[strPort0_]->pop(pVoidData0);
if (nullptr != pVoidData0)
{
std::shared_ptr<MoveData> pMoveData = std::static_pointer_cast<MoveData>(pVoidData0);
moveData_ = *pMoveData;
LogDebug << "traindate:" << moveData_.strTrainDate << " trainname:" << moveData_.strTrainName
<< " MoveData frameid:" << moveData_.iFrameId << " IsEnd:" << moveData_.bIsEnd;
}
if (!moveData_.bHasTrain)
{
usleep(1000); //1ms
continue;
}
//第一个数据休眠1s等待图片存入本地
if (iOrigDataNO_ == 1)
{
usleep(1000000); //1s
}
if (strDataDir_.empty())
{
strDataDir_ = strResultPath_ + moveData_.strTrainDate + "/" + moveData_.strTrainName + "/";
strTrainDate_ = moveData_.strTrainDate;
strTrainName_ = moveData_.strTrainName;
}
//如果设置了方向则方向不对直接过滤但结束帧不能过滤需流转到后面Engine保证后面处理正确。
if (iDirection_ == DIRECTION_UNKNOWN)
{
std::string strFilePath = strResultPath_ + strTrainDate_ + "/" + strTrainName_ + "/" + "direction.txt";
Json::Value jvDirectionInfo;
if (MyUtils::getins()->ReadJsonInfo(jvDirectionInfo, strFilePath, 0))
{
iDirection_ = jvDirectionInfo["direction"].asInt();
}
}
if (!moveData_.bIsEnd)
{
if (iDirection_ == DIRECTION_UNKNOWN || iDirection_ == iPushDirection_ || iPushDirection_ == DIRECTION_UNKNOWN)
{
MakeProcessData();
}
else
{
LogDebug << "traindate:" << strTrainDate_ << " trainname:" << strTrainName_ << " iOrigDataNO_:" << iOrigDataNO_ << " continue.";
bNotPushFlag_ = true;
usleep(iIntervalTime_); //每组处理数据需间隔一定时间
continue;
}
}
else
{
if (bNotPushFlag_)
{
iOrigDataNO_ = moveData_.iFrameId / dataSourceConfig_.iSkipInterval;
iDataNO_ = moveData_.iFrameId / dataSourceConfig_.iSkipInterval;
}
if (iOrigDataNO_ * dataSourceConfig_.iSkipInterval > moveData_.iFrameId)
{
LogDebug << "dealFrameid:" << iOrigDataNO_ * dataSourceConfig_.iSkipInterval << " moveFrameid:" << moveData_.iFrameId;
iOrigDataNO_ = moveData_.iFrameId / dataSourceConfig_.iSkipInterval;
MakeProcessData();
}
else
{
while (!isStop_ && iOrigDataNO_ * dataSourceConfig_.iSkipInterval <= moveData_.iFrameId)
{
//继续处理
MakeProcessData();
}
}
InitParam();
}
}
return APP_ERR_OK;
}