#include "DataDealEngine.h" using namespace ai_matrix; extern std::atomic_uint64_t g_i64ReRunTimeStamp; extern std::atomic_uint32_t g_iReRunOrigFrameId; extern std::atomic_uint32_t g_iReRunFrameId; DataDealEngine::DataDealEngine() {} DataDealEngine::~DataDealEngine() {} APP_ERROR DataDealEngine::Init() { strPort0_ = engineName_ + "_" + std::to_string(engineId_) + "_0"; strPort1_ = engineName_ + "_" + std::to_string(engineId_) + "_1"; strPort2_ = engineName_ + "_" + std::to_string(engineId_) + "_2"; strResultPath_ = MyYaml::GetIns()->GetPathValue("gc_result_path"); iPushDirection_ = MyYaml::GetIns()->GetIntValue("gc_push_direction"); dataSourceConfig_ = MyYaml::GetIns()->GetDataSourceConfigById(engineId_); //获取摄像机参数 iIntervalTime_ = (dataSourceConfig_.iSkipInterval - 1) * 40 * 1000; mapUseDataSouceCfg_ = MyYaml::GetIns()->GetUseDataSourceConfig(); std::string delimiter(","); for (auto iter = mapUseDataSouceCfg_.begin(); iter!=mapUseDataSouceCfg_.end(); iter++) { int iSourceId = iter->first; std::vector vecSplit = MyUtils::getins()->split(iter->second.strTarget, delimiter); std::vector vecPushPorts; for (auto iter = vecSplit.begin(); iter != vecSplit.end(); iter++) { if (*iter == "NUM") { vecPushPorts.push_back(strPort0_); } else if (*iter == "CHKDATE") { vecPushPorts.push_back(strPort1_); } else if(*iter == "CONTAINER" || *iter == "CONTAINER_T") { vecPushPorts.push_back(strPort2_); } } mapSourcePushPort_[iSourceId] = vecPushPorts; } InitParam(); LogInfo << "engineId_:" << engineId_ << " DataDealEngine Init ok"; return APP_ERR_OK; } APP_ERROR DataDealEngine::DeInit() { LogInfo << "engineId_:" << engineId_ << " DataDealEngine DeInit ok"; return APP_ERR_OK; } /** * 参数初始化(列车结束时需调用) * inParam : N/A * outParam: N/A * return : N/A */ void DataDealEngine::InitParam() { iOrigDataNO_ = 1; strDataDir_ = ""; moveData_.i64TimeStamp = 0; moveData_.bHasTrain = false; moveData_.bIsEnd = false; iDataNO_ = 1; strTrainDate_ = ""; strTrainName_ = ""; iDirection_ = DIRECTION_UNKNOWN; bNotPushFlag_ = false; } /** * 图片数据解码 * inParam : RawData &rawData :图片数据 * outParam: std::shared_ptr pDecodedData :解码数据 * return : true(编码成功)/false(编码失败) */ bool DataDealEngine::GetJpegdOut(std::shared_ptr pDecodedData, RawData &rawData) { return true; } /** * 读取图片和文本 * inParam : N/A * outParam: N/A * return : N/A */ bool DataDealEngine::ReadFileInfo(Json::Value &jvFrameInfo, RawData &rawData, std::string &strFileName, std::string &strImgName) { //图片或文件不存在时,休眠10ms直接return,重新获取。 if (access(strFileName.c_str(), F_OK) != 0 || access(strImgName.c_str(), F_OK) != 0) { LogWarn << "txt:" << strFileName << " or image:" << strImgName << " no exist"; return false; } if (!MyUtils::getins()->ReadJsonInfo(jvFrameInfo, strFileName)) { LogError << "Failed to read txt:" << strFileName; return false; } //读取图片内容 // int iRet = ReadFile(strImgName, rawData); // if (iRet != APP_ERR_OK) // { // LogError << "Failed to read image:" << strImgName; // return false; // } return true; } /** * push数据到队列,队列满时则休眠一段时间再push。 * inParam : const std::string strPort push的端口 : const std::shared_ptr &pProcessData push的数据 * outParam: N/A * return : N/A */ void DataDealEngine::PushData(const std::string &strPort, const std::shared_ptr &pProcessData) { while (true) { int iRet = outputQueMap_[strPort]->push(std::static_pointer_cast(pProcessData)); if (iRet != 0) { LogDebug << "sourceid:" << pProcessData->iDataSource << " frameid:" << pProcessData->iFrameId << " push fail iRet:" << iRet; if (iRet == 2) { usleep(10000); // 10ms continue; } } break; } } /** * 构造处理数据并push * inParam : N/A * outParam: N/A * return : N/A */ void DataDealEngine::MakeProcessData() { int iRet = APP_ERR_OK; uint64_t i64TimeStampTemp = 0; bool bIsEndByStop = false; uint64_t i64ReRunTimeStamp = 0; uint32_t iReRunOrigFrameId = 0; uint32_t iReRunFrameId = 0; if (g_iReRunOrigFrameId != 0) { LogDebug << "g_i64ReRunTimeStamp:" << g_i64ReRunTimeStamp << " g_iReRunOrigFrameId:" << g_iReRunOrigFrameId << " g_iReRunFrameId:" << g_iReRunFrameId; i64ReRunTimeStamp = g_i64ReRunTimeStamp; iReRunOrigFrameId = g_iReRunOrigFrameId; iReRunFrameId = g_iReRunFrameId; g_i64ReRunTimeStamp = 0; g_iReRunOrigFrameId = 0; g_iReRunFrameId = 0; bIsEndByStop = true; } for (auto iter = mapUseDataSouceCfg_.begin(); iter!=mapUseDataSouceCfg_.end(); iter++) { int iSourceId = iter->first; char szCameraNo[5] = {0}; sprintf(szCameraNo, "%03d/", iSourceId + 1); uint32_t iOrigFrameId = iOrigDataNO_ * dataSourceConfig_.iSkipInterval; uint32_t iFrameId = iDataNO_ * dataSourceConfig_.iSkipInterval; bool bIsEndFlag = (moveData_.bIsEnd && moveData_.iFrameId == iOrigFrameId); if (bIsEndByStop) { bIsEndFlag = true; iOrigFrameId = iReRunOrigFrameId; iFrameId = iReRunFrameId; } // LogInfo << "sourceid:" << iSourceId << " MakeProcessData origtime:" << moveData_.strTrainName << " iOrigFrameId:" << iOrigFrameId // << " time:" << strTrainName_ << " iFrameId:" << iFrameId << " bIsEndFlag:" << bIsEndFlag; std::string strImgName = strDataDir_ + szCameraNo + std::to_string(iOrigFrameId); strImgName += (iter->second.iRotate != 0) ? "_rotate.jpg" : ".jpg"; std::string strFileName = strDataDir_ + szCameraNo + std::to_string(iOrigFrameId) + ".txt"; //摄像头读取失败后重试2000次。 Json::Value jvFrameInfo; RawData rawData; bool bRet = false; int iNoDataCnt = 0; while (!isStop_ && iNoDataCnt < 30) { bRet = ReadFileInfo(jvFrameInfo, rawData, strFileName, strImgName); if (bRet) { break; } usleep(500 * 1000); // 500ms iNoDataCnt++; } //组织数据 std::shared_ptr pProcessData = std::make_shared(); pProcessData->iDataSource = iSourceId; pProcessData->iFrameId = iFrameId; pProcessData->strPicFilePath = strImgName; pProcessData->i64TimeStamp = i64TimeStampTemp; pProcessData->strOrigTrainDate = moveData_.strTrainDate; pProcessData->strOrigTrainName = moveData_.strTrainName; pProcessData->iOrigFrameId = iOrigFrameId; pProcessData->strTrainDate = strTrainDate_; pProcessData->strTrainName = strTrainName_; pProcessData->iStatus = TRAINSTATUS_RUN; pProcessData->bIsEnd = bIsEndFlag; pProcessData->iDataNO = iDataNO_; if (bRet) { i64TimeStampTemp = jvFrameInfo["timeStamp"].asUInt64(); pProcessData->i64TimeStamp = i64TimeStampTemp; pProcessData->iWidth = jvFrameInfo["width"].asInt(); pProcessData->iHeight = jvFrameInfo["height"].asInt(); pProcessData->iDirection = jvFrameInfo["direction"].asInt(); pProcessData->iRate = jvFrameInfo["rate"].asInt(); pProcessData->nMonitorState = jvFrameInfo["moveType"].asInt(); cv::Mat cvframe = cv::imread(pProcessData->strPicFilePath); int iBufferSize = pProcessData->iWidth * pProcessData->iHeight * 3; void* pBGRBufferobj = nullptr; pBGRBufferobj = new uint8_t[iBufferSize]; memcpy(pBGRBufferobj, cvframe.data, iBufferSize); pProcessData->pData.reset(pBGRBufferobj, [](void* data){if(data) {delete[] data; data = nullptr;}}); pProcessData->iSize = iBufferSize; } std::vector vecPushPorts = mapSourcePushPort_[iSourceId]; for (int iPort = 0; iPort < vecPushPorts.size(); iPort++) { if (iPort == vecPushPorts.size() - 1) { iRet = outputQueMap_[vecPushPorts[iPort]]->push(std::static_pointer_cast(pProcessData), true); // PushData(vecPushPorts[iPort], pProcessData); continue; } std::shared_ptr pNewProcessData = std::make_shared(); *pNewProcessData = *pProcessData; iRet = outputQueMap_[vecPushPorts[iPort]]->push(std::static_pointer_cast(pNewProcessData), true); // PushData(vecPushPorts[iPort], pNewProcessData); } } iOrigDataNO_++; iDataNO_++; //每组处理数据需间隔一定时间 usleep(iIntervalTime_); if (bIsEndByStop) { iDataNO_ = 1; iOrigDataNO_ = iReRunOrigFrameId / dataSourceConfig_.iSkipInterval; std::string strDateTime = MyUtils::getins()->GetDateTimeByMilliSeconds(i64ReRunTimeStamp); strTrainDate_ = strDateTime.substr(0, 10); strTrainName_ = strDateTime.substr(11, strDateTime.length()); std::string strTrainPath = strResultPath_ + strTrainDate_ + "/" + strTrainName_ + "/"; MyUtils::getins()->CreateDirPath(strTrainPath); LogDebug << "rerun traindate:" << strTrainDate_ << " trainname:" << strTrainName_; } } APP_ERROR DataDealEngine::Process() { int iRet = APP_ERR_OK; //return APP_ERR_OK; while (!isStop_) { //获取主摄像头检测的状态 std::shared_ptr pVoidData0 = nullptr; iRet = inputQueMap_[strPort0_]->pop(pVoidData0); if (nullptr != pVoidData0) { std::shared_ptr pMoveData = std::static_pointer_cast(pVoidData0); moveData_ = *pMoveData; LogDebug << "traindate:" << moveData_.strTrainDate << " trainname:" << moveData_.strTrainName << " MoveData frameid:" << moveData_.iFrameId << " IsEnd:" << moveData_.bIsEnd; } if (!moveData_.bHasTrain) { usleep(1000); //1ms continue; } //第一个数据,休眠1s,等待图片存入本地 if (iOrigDataNO_ == 1) { usleep(1000000); //1s } if (strDataDir_.empty()) { strDataDir_ = strResultPath_ + moveData_.strTrainDate + "/" + moveData_.strTrainName + "/"; strTrainDate_ = moveData_.strTrainDate; strTrainName_ = moveData_.strTrainName; } //如果设置了方向,则方向不对直接过滤,但结束帧不能过滤,需流转到后面Engine,保证后面处理正确。 if (iDirection_ == DIRECTION_UNKNOWN) { std::string strFilePath = strResultPath_ + strTrainDate_ + "/" + strTrainName_ + "/" + "direction.txt"; Json::Value jvDirectionInfo; if (MyUtils::getins()->ReadJsonInfo(jvDirectionInfo, strFilePath, 0)) { iDirection_ = jvDirectionInfo["direction"].asInt(); } } if (!moveData_.bIsEnd) { if (iDirection_ == DIRECTION_UNKNOWN || iDirection_ == iPushDirection_ || iPushDirection_ == DIRECTION_UNKNOWN) { MakeProcessData(); } else { LogDebug << "traindate:" << strTrainDate_ << " trainname:" << strTrainName_ << " iOrigDataNO_:" << iOrigDataNO_ << " continue."; bNotPushFlag_ = true; usleep(iIntervalTime_); //每组处理数据需间隔一定时间 continue; } } else { if (bNotPushFlag_) { iOrigDataNO_ = moveData_.iFrameId / dataSourceConfig_.iSkipInterval; iDataNO_ = moveData_.iFrameId / dataSourceConfig_.iSkipInterval; } if (iOrigDataNO_ * dataSourceConfig_.iSkipInterval > moveData_.iFrameId) { LogDebug << "dealFrameid:" << iOrigDataNO_ * dataSourceConfig_.iSkipInterval << " moveFrameid:" << moveData_.iFrameId; iOrigDataNO_ = moveData_.iFrameId / dataSourceConfig_.iSkipInterval; MakeProcessData(); } else { while (!isStop_ && iOrigDataNO_ * dataSourceConfig_.iSkipInterval <= moveData_.iFrameId) { //继续处理 MakeProcessData(); } } InitParam(); } } return APP_ERR_OK; }