386 lines
13 KiB
C++
386 lines
13 KiB
C++
#include "DataDealEngine.h"
|
||
|
||
using namespace ai_matrix;
|
||
|
||
extern std::atomic_uint64_t g_i64ReRunTimeStamp;
|
||
extern std::atomic_uint32_t g_iReRunOrigFrameId;
|
||
extern std::atomic_uint32_t g_iReRunFrameId;
|
||
|
||
DataDealEngine::DataDealEngine() {}
|
||
|
||
DataDealEngine::~DataDealEngine() {}
|
||
|
||
APP_ERROR DataDealEngine::Init()
|
||
{
|
||
strPort0_ = engineName_ + "_" + std::to_string(engineId_) + "_0";
|
||
strPort1_ = engineName_ + "_" + std::to_string(engineId_) + "_1";
|
||
strPort2_ = engineName_ + "_" + std::to_string(engineId_) + "_2";
|
||
strResultPath_ = MyYaml::GetIns()->GetPathValue("gc_result_path");
|
||
iPushDirection_ = MyYaml::GetIns()->GetIntValue("gc_push_direction");
|
||
|
||
dataSourceConfig_ = MyYaml::GetIns()->GetDataSourceConfigById(engineId_); //获取摄像机参数
|
||
iIntervalTime_ = (dataSourceConfig_.iSkipInterval - 1) * 40 * 1000;
|
||
mapUseDataSouceCfg_ = MyYaml::GetIns()->GetUseDataSourceConfig();
|
||
|
||
std::string delimiter(",");
|
||
for (auto iter = mapUseDataSouceCfg_.begin(); iter!=mapUseDataSouceCfg_.end(); iter++)
|
||
{
|
||
int iSourceId = iter->first;
|
||
std::vector<std::string> vecSplit = MyUtils::getins()->split(iter->second.strTarget, delimiter);
|
||
|
||
std::vector<std::string> vecPushPorts;
|
||
for (auto iter = vecSplit.begin(); iter != vecSplit.end(); iter++)
|
||
{
|
||
if (*iter == "NUM")
|
||
{
|
||
vecPushPorts.push_back(strPort0_);
|
||
}
|
||
else if (*iter == "CHKDATE")
|
||
{
|
||
vecPushPorts.push_back(strPort1_);
|
||
}
|
||
else if(*iter == "CONTAINER" || *iter == "CONTAINER_T")
|
||
{
|
||
vecPushPorts.push_back(strPort2_);
|
||
}
|
||
}
|
||
mapSourcePushPort_[iSourceId] = vecPushPorts;
|
||
}
|
||
|
||
InitParam();
|
||
LogInfo << "engineId_:" << engineId_ << " DataDealEngine Init ok";
|
||
return APP_ERR_OK;
|
||
}
|
||
|
||
APP_ERROR DataDealEngine::DeInit()
|
||
{
|
||
|
||
LogInfo << "engineId_:" << engineId_ << " DataDealEngine DeInit ok";
|
||
return APP_ERR_OK;
|
||
}
|
||
|
||
/**
|
||
* 参数初始化(列车结束时需调用)
|
||
* inParam : N/A
|
||
* outParam: N/A
|
||
* return : N/A
|
||
*/
|
||
void DataDealEngine::InitParam()
|
||
{
|
||
iOrigDataNO_ = 1;
|
||
strDataDir_ = "";
|
||
moveData_.i64TimeStamp = 0;
|
||
moveData_.bHasTrain = false;
|
||
moveData_.bIsEnd = false;
|
||
iDataNO_ = 1;
|
||
strTrainDate_ = "";
|
||
strTrainName_ = "";
|
||
iDirection_ = DIRECTION_UNKNOWN;
|
||
bNotPushFlag_ = false;
|
||
}
|
||
|
||
/**
|
||
* 图片数据解码
|
||
* inParam : RawData &rawData :图片数据
|
||
* outParam: std::shared_ptr<DecodedData> pDecodedData :解码数据
|
||
* return : true(编码成功)/false(编码失败)
|
||
*/
|
||
bool DataDealEngine::GetJpegdOut(std::shared_ptr<DecodedData> pDecodedData, RawData &rawData)
|
||
{
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* 读取图片和文本
|
||
* inParam : N/A
|
||
* outParam: N/A
|
||
* return : N/A
|
||
*/
|
||
bool DataDealEngine::ReadFileInfo(Json::Value &jvFrameInfo, RawData &rawData, std::string &strFileName, std::string &strImgName)
|
||
{
|
||
//图片或文件不存在时,休眠10ms直接return,重新获取。
|
||
if (access(strFileName.c_str(), F_OK) != 0 || access(strImgName.c_str(), F_OK) != 0)
|
||
{
|
||
LogWarn << "txt:" << strFileName << " or image:" << strImgName << " no exist";
|
||
return false;
|
||
}
|
||
|
||
if (!MyUtils::getins()->ReadJsonInfo(jvFrameInfo, strFileName))
|
||
{
|
||
LogError << "Failed to read txt:" << strFileName;
|
||
return false;
|
||
}
|
||
|
||
//读取图片内容
|
||
// int iRet = ReadFile(strImgName, rawData);
|
||
// if (iRet != APP_ERR_OK)
|
||
// {
|
||
// LogError << "Failed to read image:" << strImgName;
|
||
// return false;
|
||
// }
|
||
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* push数据到队列,队列满时则休眠一段时间再push。
|
||
* inParam : const std::string strPort push的端口
|
||
: const std::shared_ptr<ProcessData> &pProcessData push的数据
|
||
* outParam: N/A
|
||
* return : N/A
|
||
*/
|
||
void DataDealEngine::PushData(const std::string &strPort, const std::shared_ptr<ProcessData> &pProcessData)
|
||
{
|
||
while (true)
|
||
{
|
||
int iRet = outputQueMap_[strPort]->push(std::static_pointer_cast<void>(pProcessData));
|
||
if (iRet != 0)
|
||
{
|
||
LogDebug << "sourceid:" << pProcessData->iDataSource << " frameid:" << pProcessData->iFrameId << " push fail iRet:" << iRet;
|
||
if (iRet == 2)
|
||
{
|
||
usleep(10000); // 10ms
|
||
continue;
|
||
}
|
||
}
|
||
break;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 构造处理数据并push
|
||
* inParam : N/A
|
||
* outParam: N/A
|
||
* return : N/A
|
||
*/
|
||
void DataDealEngine::MakeProcessData()
|
||
{
|
||
int iRet = APP_ERR_OK;
|
||
uint64_t i64TimeStampTemp = 0;
|
||
|
||
bool bIsEndByStop = false;
|
||
uint64_t i64ReRunTimeStamp = 0;
|
||
uint32_t iReRunOrigFrameId = 0;
|
||
uint32_t iReRunFrameId = 0;
|
||
if (g_iReRunOrigFrameId != 0)
|
||
{
|
||
LogDebug << "g_i64ReRunTimeStamp:" << g_i64ReRunTimeStamp << " g_iReRunOrigFrameId:" << g_iReRunOrigFrameId
|
||
<< " g_iReRunFrameId:" << g_iReRunFrameId;
|
||
i64ReRunTimeStamp = g_i64ReRunTimeStamp;
|
||
iReRunOrigFrameId = g_iReRunOrigFrameId;
|
||
iReRunFrameId = g_iReRunFrameId;
|
||
g_i64ReRunTimeStamp = 0;
|
||
g_iReRunOrigFrameId = 0;
|
||
g_iReRunFrameId = 0;
|
||
bIsEndByStop = true;
|
||
}
|
||
|
||
for (auto iter = mapUseDataSouceCfg_.begin(); iter!=mapUseDataSouceCfg_.end(); iter++)
|
||
{
|
||
int iSourceId = iter->first;
|
||
char szCameraNo[5] = {0};
|
||
sprintf(szCameraNo, "%03d/", iSourceId + 1);
|
||
uint32_t iOrigFrameId = iOrigDataNO_ * dataSourceConfig_.iSkipInterval;
|
||
uint32_t iFrameId = iDataNO_ * dataSourceConfig_.iSkipInterval;
|
||
bool bIsEndFlag = (moveData_.bIsEnd && moveData_.iFrameId == iOrigFrameId);
|
||
if (bIsEndByStop)
|
||
{
|
||
bIsEndFlag = true;
|
||
iOrigFrameId = iReRunOrigFrameId;
|
||
iFrameId = iReRunFrameId;
|
||
}
|
||
|
||
LogInfo << "sourceid:" << iSourceId << " MakeProcessData origtime:" << moveData_.strTrainName << " iOrigFrameId:" << iOrigFrameId
|
||
<< " time:" << strTrainName_ << " iFrameId:" << iFrameId << " bIsEndFlag:" << bIsEndFlag;
|
||
|
||
std::string strImgName = strDataDir_ + szCameraNo + std::to_string(iOrigFrameId);
|
||
strImgName += (iter->second.iRotate != 0) ? "_rotate.jpg" : ".jpg";
|
||
std::string strFileName = strDataDir_ + szCameraNo + std::to_string(iOrigFrameId) + ".txt";
|
||
|
||
//摄像头读取失败后重试30次。
|
||
Json::Value jvFrameInfo;
|
||
RawData rawData;
|
||
bool bRet = false;
|
||
int iNoDataCnt = 0;
|
||
while (!isStop_ && iNoDataCnt < 30)
|
||
{
|
||
bRet = ReadFileInfo(jvFrameInfo, rawData, strFileName, strImgName);
|
||
if (bRet)
|
||
{
|
||
break;
|
||
}
|
||
usleep(500 * 1000); // 500ms
|
||
iNoDataCnt++;
|
||
}
|
||
|
||
//组织数据
|
||
std::shared_ptr<ProcessData> pProcessData = std::make_shared<ProcessData>();
|
||
pProcessData->iDataSource = iSourceId;
|
||
pProcessData->iFrameId = iFrameId;
|
||
pProcessData->strPicFilePath = strImgName;
|
||
pProcessData->i64TimeStamp = i64TimeStampTemp;
|
||
pProcessData->strOrigTrainDate = moveData_.strTrainDate;
|
||
pProcessData->strOrigTrainName = moveData_.strTrainName;
|
||
pProcessData->iOrigFrameId = iOrigFrameId;
|
||
pProcessData->strTrainDate = strTrainDate_;
|
||
pProcessData->strTrainName = strTrainName_;
|
||
pProcessData->iStatus = TRAINSTATUS_RUN;
|
||
pProcessData->bIsEnd = bIsEndFlag;
|
||
pProcessData->iDataNO = iDataNO_;
|
||
pProcessData->nMonitorState = moveData_.nMonitorState;
|
||
|
||
if (bRet)
|
||
{
|
||
i64TimeStampTemp = jvFrameInfo["timeStamp"].asUInt64();
|
||
pProcessData->i64TimeStamp = i64TimeStampTemp;
|
||
pProcessData->iWidth = jvFrameInfo["width"].asInt();
|
||
pProcessData->iHeight = jvFrameInfo["height"].asInt();
|
||
pProcessData->iDirection = jvFrameInfo["direction"].asInt();
|
||
pProcessData->iRate = jvFrameInfo["rate"].asInt();
|
||
|
||
cv::Mat cvframe = cv::imread(pProcessData->strPicFilePath);
|
||
int iBufferSize = pProcessData->iWidth * pProcessData->iHeight * 3;
|
||
void* pBGRBufferobj = nullptr;
|
||
pBGRBufferobj = new uint8_t[iBufferSize];
|
||
memcpy(pBGRBufferobj, cvframe.data, iBufferSize);
|
||
pProcessData->pData.reset(pBGRBufferobj, [](void* data){if(data) {delete[] data; data = nullptr;}});
|
||
pProcessData->iSize = iBufferSize;
|
||
}
|
||
|
||
std::vector<std::string> vecPushPorts = mapSourcePushPort_[iSourceId];
|
||
for (int iPort = 0; iPort < vecPushPorts.size(); iPort++)
|
||
{
|
||
if (iPort == vecPushPorts.size() - 1)
|
||
{
|
||
//iRet = outputQueMap_[vecPushPorts[iPort]]->push(std::static_pointer_cast<void>(pProcessData));
|
||
PushData(vecPushPorts[iPort], pProcessData);
|
||
continue;
|
||
}
|
||
std::shared_ptr<ProcessData> pNewProcessData = std::make_shared<ProcessData>();
|
||
*pNewProcessData = *pProcessData;
|
||
//iRet = outputQueMap_[vecPushPorts[iPort]]->push(std::static_pointer_cast<void>(pNewProcessData));
|
||
PushData(vecPushPorts[iPort], pNewProcessData);
|
||
}
|
||
|
||
}
|
||
|
||
iOrigDataNO_++;
|
||
iDataNO_++;
|
||
//每组处理数据需间隔一定时间
|
||
usleep(iIntervalTime_);
|
||
|
||
if (bIsEndByStop)
|
||
{
|
||
iDataNO_ = 1;
|
||
iOrigDataNO_ = iReRunOrigFrameId / dataSourceConfig_.iSkipInterval;
|
||
std::string strDateTime = MyUtils::getins()->GetDateTimeByMilliSeconds(i64ReRunTimeStamp);
|
||
strTrainDate_ = strDateTime.substr(0, 10);
|
||
strTrainName_ = strDateTime.substr(11, strDateTime.length());
|
||
std::string strTrainPath = strResultPath_ + strTrainDate_ + "/" + strTrainName_ + "/";
|
||
MyUtils::getins()->CreateDirPath(strTrainPath);
|
||
LogDebug << "rerun traindate:" << strTrainDate_ << " trainname:" << strTrainName_;
|
||
}
|
||
}
|
||
|
||
APP_ERROR DataDealEngine::Process()
|
||
{
|
||
int iRet = APP_ERR_OK;
|
||
//return APP_ERR_OK;
|
||
while (!isStop_)
|
||
{
|
||
//获取主摄像头检测的状态
|
||
std::shared_ptr<void> pVoidData0 = nullptr;
|
||
iRet = inputQueMap_[strPort0_]->pop(pVoidData0);
|
||
|
||
if (nullptr != pVoidData0)
|
||
{
|
||
std::shared_ptr<MoveData> pMoveData = std::static_pointer_cast<MoveData>(pVoidData0);
|
||
|
||
// queuwMoveData_.push(*pMoveData);
|
||
moveData_ = *pMoveData;
|
||
LogDebug << "traindate:" << moveData_.strTrainDate << " trainname:" << moveData_.strTrainName
|
||
<< " MoveData frameid:" << moveData_.iFrameId << " IsEnd:" << moveData_.bIsEnd;
|
||
}
|
||
|
||
// LogDebug << "【帧号】" << (iDataNO_ * dataSourceConfig_.iSkipInterval);
|
||
// if (queuwMoveData_.size() > 0 && (iDataNO_ * dataSourceConfig_.iSkipInterval) >= queuwMoveData_.front().iFrameId)
|
||
// {
|
||
// moveData_ = queuwMoveData_.front();
|
||
// queuwMoveData_.pop();
|
||
// LogDebug << "!!!--- moveDate 更新";
|
||
// }
|
||
|
||
if (!moveData_.bHasTrain)
|
||
{
|
||
usleep(1000); //1ms
|
||
continue;
|
||
}
|
||
|
||
//第一个数据,休眠1s,等待图片存入本地
|
||
if (iOrigDataNO_ == 1)
|
||
{
|
||
usleep(1000 * 1000); //1s
|
||
}
|
||
|
||
if (strDataDir_.empty())
|
||
{
|
||
strDataDir_ = strResultPath_ + moveData_.strTrainDate + "/" + moveData_.strTrainName + "/";
|
||
strTrainDate_ = moveData_.strTrainDate;
|
||
strTrainName_ = moveData_.strTrainName;
|
||
}
|
||
|
||
//如果设置了方向,则方向不对直接过滤,但结束帧不能过滤,需流转到后面Engine,保证后面处理正确。
|
||
if (iDirection_ == DIRECTION_UNKNOWN)
|
||
{
|
||
std::string strFilePath = strResultPath_ + strTrainDate_ + "/" + strTrainName_ + "/" + "direction.txt";
|
||
Json::Value jvDirectionInfo;
|
||
if (MyUtils::getins()->ReadJsonInfo(jvDirectionInfo, strFilePath, 0))
|
||
{
|
||
iDirection_ = jvDirectionInfo["direction"].asInt();
|
||
}
|
||
}
|
||
|
||
if (!moveData_.bIsEnd)
|
||
{
|
||
if (iDirection_ == DIRECTION_UNKNOWN || iDirection_ == iPushDirection_ || iPushDirection_ == DIRECTION_UNKNOWN)
|
||
{
|
||
MakeProcessData();
|
||
}
|
||
else
|
||
{
|
||
LogDebug << "traindate:" << strTrainDate_ << " trainname:" << strTrainName_ << " iOrigDataNO_:" << iOrigDataNO_ << " continue.";
|
||
bNotPushFlag_ = true;
|
||
usleep(iIntervalTime_); //每组处理数据需间隔一定时间
|
||
continue;
|
||
}
|
||
}
|
||
else
|
||
{
|
||
if (bNotPushFlag_)
|
||
{
|
||
iOrigDataNO_ = moveData_.iFrameId / dataSourceConfig_.iSkipInterval;
|
||
iDataNO_ = moveData_.iFrameId / dataSourceConfig_.iSkipInterval;
|
||
}
|
||
|
||
if (iOrigDataNO_ * dataSourceConfig_.iSkipInterval > moveData_.iFrameId)
|
||
{
|
||
LogDebug << "dealFrameid:" << iOrigDataNO_ * dataSourceConfig_.iSkipInterval << " moveFrameid:" << moveData_.iFrameId;
|
||
iOrigDataNO_ = moveData_.iFrameId / dataSourceConfig_.iSkipInterval;
|
||
MakeProcessData();
|
||
}
|
||
else
|
||
{
|
||
while (!isStop_ && iOrigDataNO_ * dataSourceConfig_.iSkipInterval <= moveData_.iFrameId)
|
||
{
|
||
//继续处理
|
||
MakeProcessData();
|
||
}
|
||
}
|
||
|
||
InitParam();
|
||
}
|
||
}
|
||
return APP_ERR_OK;
|
||
}
|
||
|