generated from zhangwei/Train_Identify
341 lines
10 KiB
C++
341 lines
10 KiB
C++
#include "DataToFtpSrvEngine.h"
|
||
|
||
#undef DISABLE_SSH_AGENT
|
||
namespace
|
||
{
|
||
std::map<std::string, std::string> mapAscii = {
|
||
{"!", "%21"},
|
||
{"#", "%23"},
|
||
{"$", "%24"},
|
||
{"%", "%25"},
|
||
{"&", "%26"},
|
||
{"@", "%40"},
|
||
{"*", "%2A"}
|
||
};
|
||
}
|
||
|
||
DataToFtpSrvEngine::DataToFtpSrvEngine() {}
|
||
|
||
DataToFtpSrvEngine::~DataToFtpSrvEngine() {}
|
||
|
||
APP_ERROR DataToFtpSrvEngine::Init()
|
||
{
|
||
strPort0_ = engineName_ + "_" + std::to_string(engineId_) + "_0";
|
||
|
||
strType_ = MyYaml::GetIns()->GetStringValue("gc_ftp_type");
|
||
strIp_ = MyYaml::GetIns()->GetStringValue("gc_ftp_ip");
|
||
iPort_ = MyYaml::GetIns()->GetIntValue("gc_ftp_port");
|
||
strUser_ = MyYaml::GetIns()->GetStringValue("gc_ftp_username");
|
||
strPwd_ = MyYaml::GetIns()->GetStringValue("gc_ftp_password");
|
||
strImgPath_ = MyYaml::GetIns()->GetPathValue("gc_ftp_image_path");
|
||
iQuitTime_ = MyYaml::GetIns()->GetIntValue("gc_ftp_quit_time");
|
||
|
||
strFailSavePath_ = MyYaml::GetIns()->GetPathValue("gc_result_path") + "ftpfailcontent.csv";
|
||
strFailSaveBakPath_ = MyYaml::GetIns()->GetPathValue("gc_result_path") + "ftpfailcontent_bak.csv";
|
||
|
||
//户名、密码中的特殊字符 并转化为Ascii
|
||
Symbol2Ascii(strUser_);
|
||
Symbol2Ascii(strPwd_);
|
||
|
||
std::ostringstream ostr;
|
||
if(strType_ == "sftp")
|
||
{
|
||
ostr << "sftp://" << strUser_ << ":" << strPwd_ << "@" << strIp_ << ":" << iPort_ << "/~" << strImgPath_;
|
||
}
|
||
else
|
||
{
|
||
ostr << "ftp://" << strUser_ << ":" << strPwd_ << "@" << strIp_ << ":" << iPort_ << strImgPath_;
|
||
}
|
||
strURL_ = ostr.str();
|
||
|
||
LogInfo << "DataToFtpSrvEngine Init ok";
|
||
return APP_ERR_OK;
|
||
}
|
||
|
||
APP_ERROR DataToFtpSrvEngine::DeInit()
|
||
{
|
||
LogDebug << "释放资源";
|
||
curl_easy_cleanup(pCurl_);
|
||
/*
|
||
这个处理移动到main中, 防止多线程调用
|
||
LogDebug << "内存清理";
|
||
curl_global_cleanup();
|
||
*/
|
||
LogInfo << "DataToFtpSrvEngine DeInit ok";
|
||
return APP_ERR_OK;
|
||
}
|
||
|
||
/**
|
||
* 将字符串中的特殊字符转换为Ascii值(用于防止用户名密码中存在特殊符号)
|
||
* inParam : std::string &str 转换前数据
|
||
* outParam: std::string &str 转换后数据
|
||
* return : N/A
|
||
*/
|
||
void DataToFtpSrvEngine::Symbol2Ascii(std::string &str)
|
||
{
|
||
std::vector<std::string> vecTokens = { "!","@","#","$","%%","&","*" };
|
||
for (int i = 0; i < vecTokens.size(); ++i)
|
||
{
|
||
int iIndex = 0;
|
||
iIndex = str.find(vecTokens[i]);
|
||
|
||
while (iIndex != std::string::npos)
|
||
{
|
||
str.replace(iIndex, vecTokens[i].length(), mapAscii[vecTokens[i]]);
|
||
iIndex = str.find(vecTokens[i]);
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* libcurl回调函数
|
||
* inParam : FILE* pFile 读取的文件句柄
|
||
* outParam: void *pBuffer 回调内容地址
|
||
: size_t size 回调单个数据大小
|
||
: size_t nmemb 回调数据个数
|
||
* return : 读取大小
|
||
*/
|
||
size_t DataToFtpSrvEngine::ReadCallBack(void* pBbuffer, size_t size, size_t nmemb, FILE* pFile)
|
||
{
|
||
return fread(pBbuffer, size, nmemb, pFile);
|
||
}
|
||
|
||
/**
|
||
* 数据上传ftp服务器
|
||
* inParam : Json::Value &jvFtpData 上传数据
|
||
* outParam:
|
||
* return : true:成功; false:失败
|
||
*/
|
||
bool DataToFtpSrvEngine::DataToFtpSrv(Json::Value &jvFtpData)
|
||
{
|
||
//1. 获得curl操作符
|
||
if (nullptr == pCurl_)
|
||
{
|
||
LogDebug<<"pCurl_ is null, invoke curl_easy_init";
|
||
pCurl_ = curl_easy_init();
|
||
if (nullptr == pCurl_)
|
||
{
|
||
LogError << "curl_easy_init failed !";
|
||
return false;
|
||
}
|
||
}
|
||
|
||
//2. 获取待上传的文件信息
|
||
std::string strLocalPath = jvFtpData["localFile"].asString();
|
||
struct stat fileStat;
|
||
curl_off_t fileSize;
|
||
FILE *pFile;
|
||
if (stat(strLocalPath.c_str(), &fileStat))
|
||
{
|
||
if (errno == ENOENT) //文件不存在返回上传成功
|
||
{
|
||
LogWarn << "file no exist " << strLocalPath;
|
||
return true;
|
||
}
|
||
LogError << "Couldn't open " << strLocalPath;
|
||
return false;
|
||
}
|
||
fileSize = (curl_off_t)fileStat.st_size;
|
||
pFile = fopen(strLocalPath.c_str(), "rb");
|
||
|
||
//3. 设置ftp信息
|
||
std::string strFtpUrl;
|
||
strFtpUrl = strURL_ + jvFtpData["ftpFilePath"].asString() + jvFtpData["ftpFileName"].asString(); //设置文件上传的位置
|
||
|
||
curl_easy_setopt(pCurl_, CURLOPT_CONNECTTIMEOUT, 1); //连接超时(1s连接不上服务器返回超时)
|
||
curl_easy_setopt(pCurl_, CURLOPT_URL, strFtpUrl.c_str()); //设置特定目标
|
||
curl_easy_setopt(pCurl_, CURLOPT_UPLOAD, 1L); //上传使能
|
||
curl_easy_setopt(pCurl_, CURLOPT_READFUNCTION, ReadCallBack); //使用curl提供的Read功能
|
||
curl_easy_setopt(pCurl_, CURLOPT_READDATA, pFile); //指定上传文件
|
||
curl_easy_setopt(pCurl_, CURLOPT_INFILESIZE_LARGE, (curl_off_t)fileSize); //设置要上传的文件的大小(可选)
|
||
curl_easy_setopt(pCurl_, CURLOPT_FTP_CREATE_MISSING_DIRS, 1L); // 设置当要上传的文件目录不存在时,则创建
|
||
//设置传输过程中超时返回 10秒内低于30字节每秒,则中止 (下面两个同时设置才会生效)
|
||
curl_easy_setopt(pCurl_, CURLOPT_LOW_SPEED_TIME, 10L); //对应持续时间
|
||
curl_easy_setopt(pCurl_, CURLOPT_LOW_SPEED_LIMIT, 30L); //设置对应每秒的平均速率(Bps,字节)
|
||
if (strType_ == "sftp")
|
||
{
|
||
#ifndef DISABLE_SSH_AGENT
|
||
curl_easy_setopt(pCurl_, CURLOPT_SSH_AUTH_TYPES, CURLSSH_AUTH_PASSWORD);
|
||
#endif
|
||
}
|
||
|
||
//4. 运行
|
||
bool bRet = true;
|
||
CURLcode res = curl_easy_perform(pCurl_);
|
||
if (res != CURLE_OK)
|
||
{
|
||
LogError << "LocalFile:" << jvFtpData["localFile"].asString() << " curl_easy_perform() failed: res=" << res << " " << curl_easy_strerror(res);
|
||
bRet = false;
|
||
}
|
||
|
||
curl_easy_reset(pCurl_); //重置curl
|
||
fclose(pFile);
|
||
|
||
return bRet;
|
||
}
|
||
|
||
/**
|
||
* 处理上传失败的信息
|
||
* inParam : N/A
|
||
* outParam: N/A
|
||
* return : N/A
|
||
*/
|
||
void DataToFtpSrvEngine::DealFtpFailInfo()
|
||
{
|
||
//队列有待处理数据,则先不处理异常数据。
|
||
if (inputQueMap_[strPort0_]->getSize() > 0)
|
||
{
|
||
LogDebug << "have new data to process";
|
||
return;
|
||
}
|
||
|
||
//文件不存在不处理
|
||
if (access(strFailSavePath_.c_str(), F_OK) == -1)
|
||
{
|
||
LogDebug << "no exist file:" << strFailSavePath_;
|
||
return;
|
||
}
|
||
|
||
bool bAllSucc = true;
|
||
std::ifstream inFile(strFailSavePath_.c_str(), std::ios::in);
|
||
if (!inFile.is_open())
|
||
{
|
||
LogError << strFailSavePath_ << " open fail";
|
||
return;
|
||
}
|
||
|
||
int iDealCnt = 0;
|
||
std::string strLine;
|
||
while (getline(inFile, strLine))
|
||
{
|
||
Json::CharReaderBuilder readerBuilder;
|
||
std::shared_ptr<Json::CharReader> reader(readerBuilder.newCharReader());
|
||
Json::Value jvFtpData;
|
||
JSONCPP_STRING errs;
|
||
if (!reader->parse(strLine.data(), strLine.data() + strLine.size(), &jvFtpData, &errs))
|
||
{
|
||
LogError << "json parse fail content:" << strLine;
|
||
continue;
|
||
}
|
||
|
||
/*
|
||
新数据到达后,异常数据还未开始处理,则直接关闭文件返回。先处理正常数据。
|
||
新数据到达后,异常数据处理中,则把异常未处理的数据全部当处理失败写入新文件中。先处理正常数据。
|
||
*/
|
||
if (inputQueMap_[strPort0_]->getSize() > 0)
|
||
{
|
||
LogDebug << "Abnormal data processing, have new data to process";
|
||
if (0 == iDealCnt)
|
||
{
|
||
LogDebug << "Abnormal data processing not start";
|
||
inFile.close();
|
||
return;
|
||
}
|
||
SaveFtpFailInfo(jvFtpData, strFailSaveBakPath_);
|
||
bAllSucc = false;
|
||
continue;
|
||
}
|
||
|
||
iDealCnt++;
|
||
if (!DataToFtpSrv(jvFtpData))
|
||
{
|
||
LogError << "re DataToFtoSrv err:" << strLine;
|
||
SaveFtpFailInfo(jvFtpData, strFailSaveBakPath_);
|
||
bAllSucc = false;
|
||
continue;
|
||
}
|
||
}
|
||
inFile.close();
|
||
|
||
if(bAllSucc)
|
||
{
|
||
//都处理成功,文件删除
|
||
remove(strFailSavePath_.c_str());
|
||
}
|
||
else
|
||
{
|
||
//部分处理成功,重命名后再次被处理
|
||
rename(strFailSaveBakPath_.c_str(), strFailSavePath_.c_str());
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 保存上传失败的信息
|
||
* inParam : Json::Value &jvFtpData 上传ftp失败信息
|
||
* : std::string &strFilePath 保存路径
|
||
* outParam: N/A
|
||
* return : true(成功);false(失败)
|
||
*/
|
||
bool DataToFtpSrvEngine::SaveFtpFailInfo(Json::Value &jvFtpData, std::string &strFilePath)
|
||
{
|
||
std::ofstream outFile;
|
||
outFile.open(strFilePath, std::ios::app);
|
||
if (!outFile.is_open())
|
||
{
|
||
LogError << strFilePath << " open fail";
|
||
return false;
|
||
}
|
||
|
||
Json::StreamWriterBuilder jswBuilder;
|
||
jswBuilder["indentation"] = "";
|
||
std::string strFtpData = Json::writeString(jswBuilder, jvFtpData);
|
||
|
||
outFile << strFtpData << std::endl;
|
||
outFile.close();
|
||
return true;
|
||
}
|
||
|
||
APP_ERROR DataToFtpSrvEngine::Process()
|
||
{
|
||
int iRet = APP_ERR_OK;
|
||
if (0 == MyYaml::GetIns()->GetIntValue("gc_ftp_open"))
|
||
{
|
||
LogDebug << " gc_ftp_open value is 0";
|
||
return APP_ERR_OK;
|
||
}
|
||
|
||
while (!isStop_)
|
||
{
|
||
//pop端口0,图片
|
||
std::shared_ptr<void> pVoidData0 = nullptr;
|
||
inputQueMap_[strPort0_]->pop(pVoidData0);
|
||
if (nullptr == pVoidData0)
|
||
{
|
||
usleep(1000); //1ms
|
||
|
||
//无数据大于xxx秒 处理失败信息完毕后断开连接
|
||
iNoDataCnt_++;
|
||
if (iNoDataCnt_ > (iQuitTime_ * 1000))
|
||
{
|
||
DealFtpFailInfo();
|
||
curl_easy_cleanup(pCurl_);
|
||
pCurl_ = nullptr;
|
||
iNoDataCnt_ = 0;
|
||
}
|
||
continue;
|
||
}
|
||
iNoDataCnt_ = 0;
|
||
|
||
std::shared_ptr<FtpData> pFtpData = std::static_pointer_cast<FtpData>(pVoidData0);
|
||
LogDebug<<"strLocalFile:"<<pFtpData->strLocalFile;
|
||
//组装post信息
|
||
Json::Value jvFtpData;
|
||
jvFtpData["localFile"] = pFtpData->strLocalFile;
|
||
jvFtpData["ftpFilePath"] = pFtpData->strFtpFilePath;
|
||
jvFtpData["ftpFileName"] = pFtpData->strFtpFileName;
|
||
jvFtpData["isEnd"] = pFtpData->bIsEnd;
|
||
|
||
if (!DataToFtpSrv(jvFtpData))
|
||
{
|
||
SaveFtpFailInfo(jvFtpData, strFailSavePath_);
|
||
}
|
||
|
||
//列车结束后再次处理失败的信息
|
||
if (pFtpData->bIsEnd)
|
||
{
|
||
DealFtpFailInfo();
|
||
}
|
||
}
|
||
return APP_ERR_OK;
|
||
}
|