Train_Identify_arm/nvidia_ascend_engine/common_engine/DataUploadEngine/DataToFtpSrvEngine.cpp

341 lines
10 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "DataToFtpSrvEngine.h"
#undef DISABLE_SSH_AGENT
namespace
{
std::map<std::string, std::string> mapAscii = {
{"!", "%21"},
{"#", "%23"},
{"$", "%24"},
{"%", "%25"},
{"&", "%26"},
{"@", "%40"},
{"*", "%2A"}
};
}
DataToFtpSrvEngine::DataToFtpSrvEngine() {}
DataToFtpSrvEngine::~DataToFtpSrvEngine() {}
APP_ERROR DataToFtpSrvEngine::Init()
{
strPort0_ = engineName_ + "_" + std::to_string(engineId_) + "_0";
strType_ = MyYaml::GetIns()->GetStringValue("gc_ftp_type");
strIp_ = MyYaml::GetIns()->GetStringValue("gc_ftp_ip");
iPort_ = MyYaml::GetIns()->GetIntValue("gc_ftp_port");
strUser_ = MyYaml::GetIns()->GetStringValue("gc_ftp_username");
strPwd_ = MyYaml::GetIns()->GetStringValue("gc_ftp_password");
strImgPath_ = MyYaml::GetIns()->GetPathValue("gc_ftp_image_path");
iQuitTime_ = MyYaml::GetIns()->GetIntValue("gc_ftp_quit_time");
strFailSavePath_ = MyYaml::GetIns()->GetPathValue("gc_result_path") + "ftpfailcontent.csv";
strFailSaveBakPath_ = MyYaml::GetIns()->GetPathValue("gc_result_path") + "ftpfailcontent_bak.csv";
//户名、密码中的特殊字符 并转化为Ascii
Symbol2Ascii(strUser_);
Symbol2Ascii(strPwd_);
std::ostringstream ostr;
if(strType_ == "sftp")
{
ostr << "sftp://" << strUser_ << ":" << strPwd_ << "@" << strIp_ << ":" << iPort_ << "/~" << strImgPath_;
}
else
{
ostr << "ftp://" << strUser_ << ":" << strPwd_ << "@" << strIp_ << ":" << iPort_ << strImgPath_;
}
strURL_ = ostr.str();
LogInfo << "DataToFtpSrvEngine Init ok";
return APP_ERR_OK;
}
APP_ERROR DataToFtpSrvEngine::DeInit()
{
LogDebug << "释放资源";
curl_easy_cleanup(pCurl_);
/*
这个处理移动到main中 防止多线程调用
LogDebug << "内存清理";
curl_global_cleanup();
*/
LogInfo << "DataToFtpSrvEngine DeInit ok";
return APP_ERR_OK;
}
/**
* 将字符串中的特殊字符转换为Ascii值用于防止用户名密码中存在特殊符号
* inParam : std::string &str 转换前数据
* outParam: std::string &str 转换后数据
* return : N/A
*/
void DataToFtpSrvEngine::Symbol2Ascii(std::string &str)
{
std::vector<std::string> vecTokens = { "!","@","#","$","%%","&","*" };
for (int i = 0; i < vecTokens.size(); ++i)
{
int iIndex = 0;
iIndex = str.find(vecTokens[i]);
while (iIndex != std::string::npos)
{
str.replace(iIndex, vecTokens[i].length(), mapAscii[vecTokens[i]]);
iIndex = str.find(vecTokens[i]);
}
}
}
/**
* libcurl回调函数
* inParam : FILE* pFile 读取的文件句柄
* outParam: void *pBuffer 回调内容地址
: size_t size 回调单个数据大小
: size_t nmemb 回调数据个数
* return : 读取大小
*/
size_t DataToFtpSrvEngine::ReadCallBack(void* pBbuffer, size_t size, size_t nmemb, FILE* pFile)
{
return fread(pBbuffer, size, nmemb, pFile);
}
/**
* 数据上传ftp服务器
* inParam : Json::Value &jvFtpData 上传数据
* outParam:
* return : true:成功; false:失败
*/
bool DataToFtpSrvEngine::DataToFtpSrv(Json::Value &jvFtpData)
{
//1. 获得curl操作符
if (nullptr == pCurl_)
{
LogDebug<<"pCurl_ is null, invoke curl_easy_init";
pCurl_ = curl_easy_init();
if (nullptr == pCurl_)
{
LogError << "curl_easy_init failed !";
return false;
}
}
//2. 获取待上传的文件信息
std::string strLocalPath = jvFtpData["localFile"].asString();
struct stat fileStat;
curl_off_t fileSize;
FILE *pFile;
if (stat(strLocalPath.c_str(), &fileStat))
{
if (errno == ENOENT) //文件不存在返回上传成功
{
LogWarn << "file no exist " << strLocalPath;
return true;
}
LogError << "Couldn't open " << strLocalPath;
return false;
}
fileSize = (curl_off_t)fileStat.st_size;
pFile = fopen(strLocalPath.c_str(), "rb");
//3. 设置ftp信息
std::string strFtpUrl;
strFtpUrl = strURL_ + jvFtpData["ftpFilePath"].asString() + jvFtpData["ftpFileName"].asString(); //设置文件上传的位置
curl_easy_setopt(pCurl_, CURLOPT_CONNECTTIMEOUT, 1); //连接超时(1s连接不上服务器返回超时)
curl_easy_setopt(pCurl_, CURLOPT_URL, strFtpUrl.c_str()); //设置特定目标
curl_easy_setopt(pCurl_, CURLOPT_UPLOAD, 1L); //上传使能
curl_easy_setopt(pCurl_, CURLOPT_READFUNCTION, ReadCallBack); //使用curl提供的Read功能
curl_easy_setopt(pCurl_, CURLOPT_READDATA, pFile); //指定上传文件
curl_easy_setopt(pCurl_, CURLOPT_INFILESIZE_LARGE, (curl_off_t)fileSize); //设置要上传的文件的大小(可选)
curl_easy_setopt(pCurl_, CURLOPT_FTP_CREATE_MISSING_DIRS, 1L); // 设置当要上传的文件目录不存在时,则创建
//设置传输过程中超时返回 10秒内低于30字节每秒,则中止 (下面两个同时设置才会生效)
curl_easy_setopt(pCurl_, CURLOPT_LOW_SPEED_TIME, 10L); //对应持续时间
curl_easy_setopt(pCurl_, CURLOPT_LOW_SPEED_LIMIT, 30L); //设置对应每秒的平均速率Bps字节
if (strType_ == "sftp")
{
#ifndef DISABLE_SSH_AGENT
curl_easy_setopt(pCurl_, CURLOPT_SSH_AUTH_TYPES, CURLSSH_AUTH_PASSWORD);
#endif
}
//4. 运行
bool bRet = true;
CURLcode res = curl_easy_perform(pCurl_);
if (res != CURLE_OK)
{
LogError << "LocalFile:" << jvFtpData["localFile"].asString() << " curl_easy_perform() failed: res=" << res << " " << curl_easy_strerror(res);
bRet = false;
}
curl_easy_reset(pCurl_); //重置curl
fclose(pFile);
return bRet;
}
/**
* 处理上传失败的信息
* inParam : N/A
* outParam: N/A
* return : N/A
*/
void DataToFtpSrvEngine::DealFtpFailInfo()
{
//队列有待处理数据,则先不处理异常数据。
if (inputQueMap_[strPort0_]->getSize() > 0)
{
LogDebug << "have new data to process";
return;
}
//文件不存在不处理
if (access(strFailSavePath_.c_str(), F_OK) == -1)
{
LogDebug << "no exist file:" << strFailSavePath_;
return;
}
bool bAllSucc = true;
std::ifstream inFile(strFailSavePath_.c_str(), std::ios::in);
if (!inFile.is_open())
{
LogError << strFailSavePath_ << " open fail";
return;
}
int iDealCnt = 0;
std::string strLine;
while (getline(inFile, strLine))
{
Json::CharReaderBuilder readerBuilder;
std::shared_ptr<Json::CharReader> reader(readerBuilder.newCharReader());
Json::Value jvFtpData;
JSONCPP_STRING errs;
if (!reader->parse(strLine.data(), strLine.data() + strLine.size(), &jvFtpData, &errs))
{
LogError << "json parse fail content:" << strLine;
continue;
}
/*
新数据到达后,异常数据还未开始处理,则直接关闭文件返回。先处理正常数据。
新数据到达后,异常数据处理中,则把异常未处理的数据全部当处理失败写入新文件中。先处理正常数据。
*/
if (inputQueMap_[strPort0_]->getSize() > 0)
{
LogDebug << "Abnormal data processing, have new data to process";
if (0 == iDealCnt)
{
LogDebug << "Abnormal data processing not start";
inFile.close();
return;
}
SaveFtpFailInfo(jvFtpData, strFailSaveBakPath_);
bAllSucc = false;
continue;
}
iDealCnt++;
if (!DataToFtpSrv(jvFtpData))
{
LogError << "re DataToFtoSrv err:" << strLine;
SaveFtpFailInfo(jvFtpData, strFailSaveBakPath_);
bAllSucc = false;
continue;
}
}
inFile.close();
if(bAllSucc)
{
//都处理成功,文件删除
remove(strFailSavePath_.c_str());
}
else
{
//部分处理成功,重命名后再次被处理
rename(strFailSaveBakPath_.c_str(), strFailSavePath_.c_str());
}
}
/**
* 保存上传失败的信息
* inParam : Json::Value &jvFtpData 上传ftp失败信息
* : std::string &strFilePath 保存路径
* outParam: N/A
* return : true(成功);false(失败)
*/
bool DataToFtpSrvEngine::SaveFtpFailInfo(Json::Value &jvFtpData, std::string &strFilePath)
{
std::ofstream outFile;
outFile.open(strFilePath, std::ios::app);
if (!outFile.is_open())
{
LogError << strFilePath << " open fail";
return false;
}
Json::StreamWriterBuilder jswBuilder;
jswBuilder["indentation"] = "";
std::string strFtpData = Json::writeString(jswBuilder, jvFtpData);
outFile << strFtpData << std::endl;
outFile.close();
return true;
}
APP_ERROR DataToFtpSrvEngine::Process()
{
int iRet = APP_ERR_OK;
if (0 == MyYaml::GetIns()->GetIntValue("gc_ftp_open"))
{
LogDebug << " gc_ftp_open value is 0";
return APP_ERR_OK;
}
while (!isStop_)
{
//pop端口0图片
std::shared_ptr<void> pVoidData0 = nullptr;
inputQueMap_[strPort0_]->pop(pVoidData0);
if (nullptr == pVoidData0)
{
usleep(1000); //1ms
//无数据大于xxx秒 处理失败信息完毕后断开连接
iNoDataCnt_++;
if (iNoDataCnt_ > (iQuitTime_ * 1000))
{
DealFtpFailInfo();
curl_easy_cleanup(pCurl_);
pCurl_ = nullptr;
iNoDataCnt_ = 0;
}
continue;
}
iNoDataCnt_ = 0;
std::shared_ptr<FtpData> pFtpData = std::static_pointer_cast<FtpData>(pVoidData0);
LogDebug<<"strLocalFile:"<<pFtpData->strLocalFile;
//组装post信息
Json::Value jvFtpData;
jvFtpData["localFile"] = pFtpData->strLocalFile;
jvFtpData["ftpFilePath"] = pFtpData->strFtpFilePath;
jvFtpData["ftpFileName"] = pFtpData->strFtpFileName;
jvFtpData["isEnd"] = pFtpData->bIsEnd;
if (!DataToFtpSrv(jvFtpData))
{
SaveFtpFailInfo(jvFtpData, strFailSavePath_);
}
//列车结束后再次处理失败的信息
if (pFtpData->bIsEnd)
{
DealFtpFailInfo();
}
}
return APP_ERR_OK;
}