#include "DataToFtpSrvEngine.h" #undef DISABLE_SSH_AGENT namespace { std::map mapAscii = { {"!", "%21"}, {"#", "%23"}, {"$", "%24"}, {"%", "%25"}, {"&", "%26"}, {"@", "%40"}, {"*", "%2A"} }; } DataToFtpSrvEngine::DataToFtpSrvEngine() {} DataToFtpSrvEngine::~DataToFtpSrvEngine() {} APP_ERROR DataToFtpSrvEngine::Init() { strPort0_ = engineName_ + "_" + std::to_string(engineId_) + "_0"; strType_ = MyYaml::GetIns()->GetStringValue("gc_ftp_type"); strIp_ = MyYaml::GetIns()->GetStringValue("gc_ftp_ip"); iPort_ = MyYaml::GetIns()->GetIntValue("gc_ftp_port"); strUser_ = MyYaml::GetIns()->GetStringValue("gc_ftp_username"); strPwd_ = MyYaml::GetIns()->GetStringValue("gc_ftp_password"); strImgPath_ = MyYaml::GetIns()->GetPathValue("gc_ftp_image_path"); iQuitTime_ = MyYaml::GetIns()->GetIntValue("gc_ftp_quit_time"); strFailSavePath_ = MyYaml::GetIns()->GetPathValue("gc_result_path") + "ftpfailcontent.csv"; strFailSaveBakPath_ = MyYaml::GetIns()->GetPathValue("gc_result_path") + "ftpfailcontent_bak.csv"; //户名、密码中的特殊字符 并转化为Ascii Symbol2Ascii(strUser_); Symbol2Ascii(strPwd_); std::ostringstream ostr; if(strType_ == "sftp") { ostr << "sftp://" << strUser_ << ":" << strPwd_ << "@" << strIp_ << ":" << iPort_ << "/~" << strImgPath_; } else { ostr << "ftp://" << strUser_ << ":" << strPwd_ << "@" << strIp_ << ":" << iPort_ << strImgPath_; } strURL_ = ostr.str(); LogInfo << "DataToFtpSrvEngine Init ok"; return APP_ERR_OK; } APP_ERROR DataToFtpSrvEngine::DeInit() { LogDebug << "释放资源"; curl_easy_cleanup(pCurl_); /* 这个处理移动到main中, 防止多线程调用 LogDebug << "内存清理"; curl_global_cleanup(); */ LogInfo << "DataToFtpSrvEngine DeInit ok"; return APP_ERR_OK; } /** * 将字符串中的特殊字符转换为Ascii值(用于防止用户名密码中存在特殊符号) * inParam : std::string &str 转换前数据 * outParam: std::string &str 转换后数据 * return : N/A */ void DataToFtpSrvEngine::Symbol2Ascii(std::string &str) { std::vector vecTokens = { "!","@","#","$","%%","&","*" }; for (int i = 0; i < vecTokens.size(); ++i) { int iIndex = 0; iIndex = str.find(vecTokens[i]); while (iIndex != std::string::npos) { str.replace(iIndex, vecTokens[i].length(), mapAscii[vecTokens[i]]); iIndex = str.find(vecTokens[i]); } } } /** * libcurl回调函数 * inParam : FILE* pFile 读取的文件句柄 * outParam: void *pBuffer 回调内容地址 : size_t size 回调单个数据大小 : size_t nmemb 回调数据个数 * return : 读取大小 */ size_t DataToFtpSrvEngine::ReadCallBack(void* pBbuffer, size_t size, size_t nmemb, FILE* pFile) { return fread(pBbuffer, size, nmemb, pFile); } /** * 数据上传ftp服务器 * inParam : Json::Value &jvFtpData 上传数据 * outParam: * return : true:成功; false:失败 */ bool DataToFtpSrvEngine::DataToFtpSrv(Json::Value &jvFtpData) { //1. 获得curl操作符 if (nullptr == pCurl_) { LogDebug<<"pCurl_ is null, invoke curl_easy_init"; pCurl_ = curl_easy_init(); if (nullptr == pCurl_) { LogError << "curl_easy_init failed !"; return false; } } //2. 获取待上传的文件信息 std::string strLocalPath = jvFtpData["localFile"].asString(); struct stat fileStat; curl_off_t fileSize; FILE *pFile; if (stat(strLocalPath.c_str(), &fileStat)) { if (errno == ENOENT) //文件不存在返回上传成功 { LogWarn << "file no exist " << strLocalPath; return true; } LogError << "Couldn't open " << strLocalPath; return false; } fileSize = (curl_off_t)fileStat.st_size; pFile = fopen(strLocalPath.c_str(), "rb"); //3. 设置ftp信息 std::string strFtpUrl; strFtpUrl = strURL_ + jvFtpData["ftpFilePath"].asString() + jvFtpData["ftpFileName"].asString(); //设置文件上传的位置 curl_easy_setopt(pCurl_, CURLOPT_CONNECTTIMEOUT, 1); //连接超时(1s连接不上服务器返回超时) curl_easy_setopt(pCurl_, CURLOPT_URL, strFtpUrl.c_str()); //设置特定目标 curl_easy_setopt(pCurl_, CURLOPT_UPLOAD, 1L); //上传使能 curl_easy_setopt(pCurl_, CURLOPT_READFUNCTION, ReadCallBack); //使用curl提供的Read功能 curl_easy_setopt(pCurl_, CURLOPT_READDATA, pFile); //指定上传文件 curl_easy_setopt(pCurl_, CURLOPT_INFILESIZE_LARGE, (curl_off_t)fileSize); //设置要上传的文件的大小(可选) curl_easy_setopt(pCurl_, CURLOPT_FTP_CREATE_MISSING_DIRS, 1L); // 设置当要上传的文件目录不存在时,则创建 //设置传输过程中超时返回 10秒内低于30字节每秒,则中止 (下面两个同时设置才会生效) curl_easy_setopt(pCurl_, CURLOPT_LOW_SPEED_TIME, 10L); //对应持续时间 curl_easy_setopt(pCurl_, CURLOPT_LOW_SPEED_LIMIT, 30L); //设置对应每秒的平均速率(Bps,字节) if (strType_ == "sftp") { #ifndef DISABLE_SSH_AGENT curl_easy_setopt(pCurl_, CURLOPT_SSH_AUTH_TYPES, CURLSSH_AUTH_PASSWORD); #endif } //4. 运行 bool bRet = true; CURLcode res = curl_easy_perform(pCurl_); if (res != CURLE_OK) { LogError << "LocalFile:" << jvFtpData["localFile"].asString() << " curl_easy_perform() failed: res=" << res << " " << curl_easy_strerror(res); bRet = false; } curl_easy_reset(pCurl_); //重置curl fclose(pFile); return bRet; } /** * 处理上传失败的信息 * inParam : N/A * outParam: N/A * return : N/A */ void DataToFtpSrvEngine::DealFtpFailInfo() { //队列有待处理数据,则先不处理异常数据。 if (inputQueMap_[strPort0_]->getSize() > 0) { LogDebug << "have new data to process"; return; } //文件不存在不处理 if (access(strFailSavePath_.c_str(), F_OK) == -1) { LogDebug << "no exist file:" << strFailSavePath_; return; } bool bAllSucc = true; std::ifstream inFile(strFailSavePath_.c_str(), std::ios::in); if (!inFile.is_open()) { LogError << strFailSavePath_ << " open fail"; return; } int iDealCnt = 0; std::string strLine; while (getline(inFile, strLine)) { Json::CharReaderBuilder readerBuilder; std::shared_ptr reader(readerBuilder.newCharReader()); Json::Value jvFtpData; JSONCPP_STRING errs; if (!reader->parse(strLine.data(), strLine.data() + strLine.size(), &jvFtpData, &errs)) { LogError << "json parse fail content:" << strLine; continue; } /* 新数据到达后,异常数据还未开始处理,则直接关闭文件返回。先处理正常数据。 新数据到达后,异常数据处理中,则把异常未处理的数据全部当处理失败写入新文件中。先处理正常数据。 */ if (inputQueMap_[strPort0_]->getSize() > 0) { LogDebug << "Abnormal data processing, have new data to process"; if (0 == iDealCnt) { LogDebug << "Abnormal data processing not start"; inFile.close(); return; } SaveFtpFailInfo(jvFtpData, strFailSaveBakPath_); bAllSucc = false; continue; } iDealCnt++; if (!DataToFtpSrv(jvFtpData)) { LogError << "re DataToFtoSrv err:" << strLine; SaveFtpFailInfo(jvFtpData, strFailSaveBakPath_); bAllSucc = false; continue; } } inFile.close(); if(bAllSucc) { //都处理成功,文件删除 remove(strFailSavePath_.c_str()); } else { //部分处理成功,重命名后再次被处理 rename(strFailSaveBakPath_.c_str(), strFailSavePath_.c_str()); } } /** * 保存上传失败的信息 * inParam : Json::Value &jvFtpData 上传ftp失败信息 * : std::string &strFilePath 保存路径 * outParam: N/A * return : true(成功);false(失败) */ bool DataToFtpSrvEngine::SaveFtpFailInfo(Json::Value &jvFtpData, std::string &strFilePath) { std::ofstream outFile; outFile.open(strFilePath, std::ios::app); if (!outFile.is_open()) { LogError << strFilePath << " open fail"; return false; } Json::StreamWriterBuilder jswBuilder; jswBuilder["indentation"] = ""; std::string strFtpData = Json::writeString(jswBuilder, jvFtpData); outFile << strFtpData << std::endl; outFile.close(); return true; } APP_ERROR DataToFtpSrvEngine::Process() { int iRet = APP_ERR_OK; if (0 == MyYaml::GetIns()->GetIntValue("gc_ftp_open")) { LogDebug << " gc_ftp_open value is 0"; return APP_ERR_OK; } while (!isStop_) { //pop端口0,图片 std::shared_ptr pVoidData0 = nullptr; inputQueMap_[strPort0_]->pop(pVoidData0); if (nullptr == pVoidData0) { usleep(1000); //1ms //无数据大于xxx秒 处理失败信息完毕后断开连接 iNoDataCnt_++; if (iNoDataCnt_ > (iQuitTime_ * 1000)) { DealFtpFailInfo(); curl_easy_cleanup(pCurl_); pCurl_ = nullptr; iNoDataCnt_ = 0; } continue; } iNoDataCnt_ = 0; std::shared_ptr pFtpData = std::static_pointer_cast(pVoidData0); LogDebug<<"strLocalFile:"<strLocalFile; //组装post信息 Json::Value jvFtpData; jvFtpData["localFile"] = pFtpData->strLocalFile; jvFtpData["ftpFilePath"] = pFtpData->strFtpFilePath; jvFtpData["ftpFileName"] = pFtpData->strFtpFileName; jvFtpData["isEnd"] = pFtpData->bIsEnd; if (!DataToFtpSrv(jvFtpData)) { SaveFtpFailInfo(jvFtpData, strFailSavePath_); } //列车结束后再次处理失败的信息 if (pFtpData->bIsEnd) { DealFtpFailInfo(); } } return APP_ERR_OK; }