Train_Identify_arm/nvidia_ascend_engine/common_engine/DataUploadEngine/DataToFtpSrvEngine.cpp

341 lines
10 KiB
C++
Raw Permalink Normal View History

2024-06-19 06:35:05 +00:00
#include "DataToFtpSrvEngine.h"
#undef DISABLE_SSH_AGENT
namespace
{
std::map<std::string, std::string> mapAscii = {
{"!", "%21"},
{"#", "%23"},
{"$", "%24"},
{"%", "%25"},
{"&", "%26"},
{"@", "%40"},
{"*", "%2A"}
};
}
DataToFtpSrvEngine::DataToFtpSrvEngine() {}
DataToFtpSrvEngine::~DataToFtpSrvEngine() {}
APP_ERROR DataToFtpSrvEngine::Init()
{
strPort0_ = engineName_ + "_" + std::to_string(engineId_) + "_0";
strType_ = MyYaml::GetIns()->GetStringValue("gc_ftp_type");
strIp_ = MyYaml::GetIns()->GetStringValue("gc_ftp_ip");
iPort_ = MyYaml::GetIns()->GetIntValue("gc_ftp_port");
strUser_ = MyYaml::GetIns()->GetStringValue("gc_ftp_username");
strPwd_ = MyYaml::GetIns()->GetStringValue("gc_ftp_password");
strImgPath_ = MyYaml::GetIns()->GetPathValue("gc_ftp_image_path");
iQuitTime_ = MyYaml::GetIns()->GetIntValue("gc_ftp_quit_time");
strFailSavePath_ = MyYaml::GetIns()->GetPathValue("gc_result_path") + "ftpfailcontent.csv";
strFailSaveBakPath_ = MyYaml::GetIns()->GetPathValue("gc_result_path") + "ftpfailcontent_bak.csv";
//户名、密码中的特殊字符 并转化为Ascii
Symbol2Ascii(strUser_);
Symbol2Ascii(strPwd_);
std::ostringstream ostr;
if(strType_ == "sftp")
{
ostr << "sftp://" << strUser_ << ":" << strPwd_ << "@" << strIp_ << ":" << iPort_ << "/~" << strImgPath_;
}
else
{
ostr << "ftp://" << strUser_ << ":" << strPwd_ << "@" << strIp_ << ":" << iPort_ << strImgPath_;
}
strURL_ = ostr.str();
LogInfo << "DataToFtpSrvEngine Init ok";
return APP_ERR_OK;
}
APP_ERROR DataToFtpSrvEngine::DeInit()
{
LogDebug << "释放资源";
curl_easy_cleanup(pCurl_);
/*
main中 线
LogDebug << "内存清理";
curl_global_cleanup();
*/
LogInfo << "DataToFtpSrvEngine DeInit ok";
return APP_ERR_OK;
}
/**
* Ascii值
* inParam : std::string &str
* outParam: std::string &str
* return : N/A
*/
void DataToFtpSrvEngine::Symbol2Ascii(std::string &str)
{
std::vector<std::string> vecTokens = { "!","@","#","$","%%","&","*" };
for (int i = 0; i < vecTokens.size(); ++i)
{
int iIndex = 0;
iIndex = str.find(vecTokens[i]);
while (iIndex != std::string::npos)
{
str.replace(iIndex, vecTokens[i].length(), mapAscii[vecTokens[i]]);
iIndex = str.find(vecTokens[i]);
}
}
}
/**
* libcurl回调函数
* inParam : FILE* pFile
* outParam: void *pBuffer
: size_t size
: size_t nmemb
* return :
*/
size_t DataToFtpSrvEngine::ReadCallBack(void* pBbuffer, size_t size, size_t nmemb, FILE* pFile)
{
return fread(pBbuffer, size, nmemb, pFile);
}
/**
* ftp服务器
* inParam : Json::Value &jvFtpData
* outParam:
* return : true:; false:
*/
bool DataToFtpSrvEngine::DataToFtpSrv(Json::Value &jvFtpData)
{
//1. 获得curl操作符
if (nullptr == pCurl_)
{
LogDebug<<"pCurl_ is null, invoke curl_easy_init";
pCurl_ = curl_easy_init();
if (nullptr == pCurl_)
{
LogError << "curl_easy_init failed !";
return false;
}
}
//2. 获取待上传的文件信息
std::string strLocalPath = jvFtpData["localFile"].asString();
struct stat fileStat;
curl_off_t fileSize;
FILE *pFile;
if (stat(strLocalPath.c_str(), &fileStat))
{
if (errno == ENOENT) //文件不存在返回上传成功
{
LogWarn << "file no exist " << strLocalPath;
return true;
}
LogError << "Couldn't open " << strLocalPath;
return false;
}
fileSize = (curl_off_t)fileStat.st_size;
pFile = fopen(strLocalPath.c_str(), "rb");
//3. 设置ftp信息
std::string strFtpUrl;
strFtpUrl = strURL_ + jvFtpData["ftpFilePath"].asString() + jvFtpData["ftpFileName"].asString(); //设置文件上传的位置
curl_easy_setopt(pCurl_, CURLOPT_CONNECTTIMEOUT, 1); //连接超时(1s连接不上服务器返回超时)
curl_easy_setopt(pCurl_, CURLOPT_URL, strFtpUrl.c_str()); //设置特定目标
curl_easy_setopt(pCurl_, CURLOPT_UPLOAD, 1L); //上传使能
curl_easy_setopt(pCurl_, CURLOPT_READFUNCTION, ReadCallBack); //使用curl提供的Read功能
curl_easy_setopt(pCurl_, CURLOPT_READDATA, pFile); //指定上传文件
curl_easy_setopt(pCurl_, CURLOPT_INFILESIZE_LARGE, (curl_off_t)fileSize); //设置要上传的文件的大小(可选)
curl_easy_setopt(pCurl_, CURLOPT_FTP_CREATE_MISSING_DIRS, 1L); // 设置当要上传的文件目录不存在时,则创建
//设置传输过程中超时返回 10秒内低于30字节每秒,则中止 (下面两个同时设置才会生效)
curl_easy_setopt(pCurl_, CURLOPT_LOW_SPEED_TIME, 10L); //对应持续时间
curl_easy_setopt(pCurl_, CURLOPT_LOW_SPEED_LIMIT, 30L); //设置对应每秒的平均速率Bps字节
if (strType_ == "sftp")
{
#ifndef DISABLE_SSH_AGENT
curl_easy_setopt(pCurl_, CURLOPT_SSH_AUTH_TYPES, CURLSSH_AUTH_PASSWORD);
#endif
}
//4. 运行
bool bRet = true;
CURLcode res = curl_easy_perform(pCurl_);
if (res != CURLE_OK)
{
LogError << "LocalFile:" << jvFtpData["localFile"].asString() << " curl_easy_perform() failed: res=" << res << " " << curl_easy_strerror(res);
bRet = false;
}
curl_easy_reset(pCurl_); //重置curl
fclose(pFile);
return bRet;
}
/**
*
* inParam : N/A
* outParam: N/A
* return : N/A
*/
void DataToFtpSrvEngine::DealFtpFailInfo()
{
//队列有待处理数据,则先不处理异常数据。
if (inputQueMap_[strPort0_]->getSize() > 0)
{
LogDebug << "have new data to process";
return;
}
//文件不存在不处理
if (access(strFailSavePath_.c_str(), F_OK) == -1)
{
LogDebug << "no exist file:" << strFailSavePath_;
return;
}
bool bAllSucc = true;
std::ifstream inFile(strFailSavePath_.c_str(), std::ios::in);
if (!inFile.is_open())
{
LogError << strFailSavePath_ << " open fail";
return;
}
int iDealCnt = 0;
std::string strLine;
while (getline(inFile, strLine))
{
Json::CharReaderBuilder readerBuilder;
std::shared_ptr<Json::CharReader> reader(readerBuilder.newCharReader());
Json::Value jvFtpData;
JSONCPP_STRING errs;
if (!reader->parse(strLine.data(), strLine.data() + strLine.size(), &jvFtpData, &errs))
{
LogError << "json parse fail content:" << strLine;
continue;
}
/*
*/
if (inputQueMap_[strPort0_]->getSize() > 0)
{
LogDebug << "Abnormal data processing, have new data to process";
if (0 == iDealCnt)
{
LogDebug << "Abnormal data processing not start";
inFile.close();
return;
}
SaveFtpFailInfo(jvFtpData, strFailSaveBakPath_);
bAllSucc = false;
continue;
}
iDealCnt++;
if (!DataToFtpSrv(jvFtpData))
{
LogError << "re DataToFtoSrv err:" << strLine;
SaveFtpFailInfo(jvFtpData, strFailSaveBakPath_);
bAllSucc = false;
continue;
}
}
inFile.close();
if(bAllSucc)
{
//都处理成功,文件删除
remove(strFailSavePath_.c_str());
}
else
{
//部分处理成功,重命名后再次被处理
rename(strFailSaveBakPath_.c_str(), strFailSavePath_.c_str());
}
}
/**
*
* inParam : Json::Value &jvFtpData ftp失败信息
* : std::string &strFilePath
* outParam: N/A
* return : true();false()
*/
bool DataToFtpSrvEngine::SaveFtpFailInfo(Json::Value &jvFtpData, std::string &strFilePath)
{
std::ofstream outFile;
outFile.open(strFilePath, std::ios::app);
if (!outFile.is_open())
{
LogError << strFilePath << " open fail";
return false;
}
Json::StreamWriterBuilder jswBuilder;
jswBuilder["indentation"] = "";
std::string strFtpData = Json::writeString(jswBuilder, jvFtpData);
outFile << strFtpData << std::endl;
outFile.close();
return true;
}
APP_ERROR DataToFtpSrvEngine::Process()
{
int iRet = APP_ERR_OK;
if (0 == MyYaml::GetIns()->GetIntValue("gc_ftp_open"))
{
LogDebug << " gc_ftp_open value is 0";
return APP_ERR_OK;
}
while (!isStop_)
{
//pop端口0图片
std::shared_ptr<void> pVoidData0 = nullptr;
inputQueMap_[strPort0_]->pop(pVoidData0);
if (nullptr == pVoidData0)
{
usleep(1000); //1ms
//无数据大于xxx秒 处理失败信息完毕后断开连接
iNoDataCnt_++;
if (iNoDataCnt_ > (iQuitTime_ * 1000))
{
DealFtpFailInfo();
curl_easy_cleanup(pCurl_);
pCurl_ = nullptr;
iNoDataCnt_ = 0;
}
continue;
}
iNoDataCnt_ = 0;
std::shared_ptr<FtpData> pFtpData = std::static_pointer_cast<FtpData>(pVoidData0);
LogDebug<<"strLocalFile:"<<pFtpData->strLocalFile;
//组装post信息
Json::Value jvFtpData;
jvFtpData["localFile"] = pFtpData->strLocalFile;
jvFtpData["ftpFilePath"] = pFtpData->strFtpFilePath;
jvFtpData["ftpFileName"] = pFtpData->strFtpFileName;
jvFtpData["isEnd"] = pFtpData->bIsEnd;
if (!DataToFtpSrv(jvFtpData))
{
SaveFtpFailInfo(jvFtpData, strFailSavePath_);
}
//列车结束后再次处理失败的信息
if (pFtpData->bIsEnd)
{
DealFtpFailInfo();
}
}
return APP_ERR_OK;
}