generated from zhangwei/Matrixai
245 lines
9.5 KiB
C++
245 lines
9.5 KiB
C++
#include "ToMinioSrvEngine.h"
|
||
|
||
|
||
ToMinioSrvEngine::ToMinioSrvEngine() {}
|
||
|
||
ToMinioSrvEngine::~ToMinioSrvEngine() {}
|
||
|
||
APP_ERROR ToMinioSrvEngine::Init()
|
||
{
|
||
strPort0_ = engineName_ + "_" + std::to_string(engineId_) + "_0";
|
||
|
||
this->minioConfig_ = Config::getins()->getMinioConfig();
|
||
this->baseConfig_ = Config::getins()->getBaseConfig();
|
||
|
||
Aws::InitAPI(options_);
|
||
Aws::Client::ClientConfiguration config;
|
||
config.scheme = (this->minioConfig_.strUrl.rfind("https", 0) == 0) ?
|
||
Aws::Http::Scheme::HTTPS : Aws::Http::Scheme::HTTP;
|
||
std::string::size_type index = this->minioConfig_.strUrl.rfind("/");
|
||
config.endpointOverride = (std::string::npos == index) ? this->minioConfig_.strUrl : this->minioConfig_.strUrl.substr(index + 1, this->minioConfig_.strUrl.size() - index - 1);
|
||
config.verifySSL = false;
|
||
client_ = new Aws::S3::S3Client(Aws::Auth::AWSCredentials(this->minioConfig_.strAccessKey,
|
||
this->minioConfig_.strSecretKey),
|
||
config,
|
||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
|
||
false);
|
||
|
||
LogInfo << "ToMinioSrvEngine Init ok";
|
||
return APP_ERR_OK;
|
||
}
|
||
|
||
APP_ERROR ToMinioSrvEngine::DeInit()
|
||
{
|
||
if (client_ != nullptr)
|
||
{
|
||
delete client_;
|
||
client_ = nullptr;
|
||
}
|
||
Aws::ShutdownAPI(options_);
|
||
LogInfo << "ToMinioSrvEngine DeInit ok";
|
||
return APP_ERR_OK;
|
||
}
|
||
|
||
/**
|
||
* Create bucket in S3 server
|
||
* inParam : bucketName The bucket name to which the CREATE action was initiated.
|
||
* outParam: N/A
|
||
* return : true in case of success, false in case of failure
|
||
*/
|
||
bool ToMinioSrvEngine::CreateBucket(const std::string &strBucketName)
|
||
{
|
||
LogDebug << "strBucketName:" << strBucketName;
|
||
Aws::S3::Model::CreateBucketRequest request;
|
||
request.WithBucket(strBucketName);
|
||
auto result = client_->CreateBucket(request);
|
||
if (!result.IsSuccess())
|
||
{
|
||
LogError << "CreateBucket error: " << result.GetError().GetExceptionName() << " " << result.GetError().GetMessage();
|
||
return false;
|
||
}
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* Delete bucket on S3 server
|
||
* inParam : bucketName The bucket name to which the DELETE action was initiated.
|
||
* outParam: N/A
|
||
* return : true in case of success, false in case of failure
|
||
*/
|
||
bool ToMinioSrvEngine::DeleteBucket(const std::string &strBucketName)
|
||
{
|
||
LogDebug << "strBucketName:" << strBucketName;
|
||
Aws::S3::Model::DeleteBucketRequest request;
|
||
request.WithBucket(strBucketName);
|
||
auto result = client_->DeleteBucket(request);
|
||
if (!result.IsSuccess())
|
||
{
|
||
LogError << "DeleteBucket error: " << result.GetError().GetExceptionName() << " " << result.GetError().GetMessage();
|
||
return false;
|
||
}
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* Push file to S3 server
|
||
* inParam : std::string strBucketName bucket name
|
||
* : std::string strObjectKey File name in bucket.
|
||
* : std::string strPathkey Local file path.
|
||
* outParam: N/A
|
||
* return : true in case of success, false in case of failure
|
||
*/
|
||
bool ToMinioSrvEngine::PutObject(std::string strBucketName, std::string strObjectKey, std::string strPathkey)
|
||
{
|
||
// LogDebug << "strBucketName:" << strBucketName;
|
||
Aws::S3::Model::PutObjectRequest request;
|
||
request.WithBucket(strBucketName).WithKey(strObjectKey);
|
||
auto inputData = Aws::MakeShared<Aws::FStream>("PutObjectInputStream", strPathkey.c_str(), std::ios_base::in | std::ios_base::binary);
|
||
request.SetBody(inputData);
|
||
auto result = client_->PutObject(request);
|
||
if (!result.IsSuccess())
|
||
{
|
||
LogError << "PutObject error: " << result.GetError().GetExceptionName() << " " << result.GetError().GetMessage();
|
||
return false;
|
||
}
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* Delete file on S3 server
|
||
* inParam : std::string strBucketName bucket name
|
||
* : std::string strObjectKey File name in bucket.
|
||
* outParam: N/A
|
||
* return : true in case of success, false in case of failure
|
||
*/
|
||
bool ToMinioSrvEngine::DeleteObject(std::string strBucketName, std::string strObjectKey)
|
||
{
|
||
Aws::S3::Model::DeleteObjectRequest request;
|
||
request.WithBucket(strBucketName).WithKey(strObjectKey);
|
||
auto result = client_->DeleteObject(request);
|
||
if (!result.IsSuccess())
|
||
{
|
||
LogError << "DeleteObject error: " << result.GetError().GetExceptionName() << " " << result.GetError().GetMessage();
|
||
return false;
|
||
}
|
||
return true;
|
||
}
|
||
|
||
APP_ERROR ToMinioSrvEngine::Process()
|
||
{
|
||
int iRet = APP_ERR_OK;
|
||
|
||
while (!isStop_)
|
||
{
|
||
//pop端口0,图片
|
||
std::shared_ptr<void> pVoidData0 = nullptr;
|
||
inputQueMap_[strPort0_]->pop(pVoidData0);
|
||
|
||
if (nullptr == pVoidData0)
|
||
{
|
||
usleep(1000); //1ms
|
||
continue;
|
||
}
|
||
|
||
if (!this->minioConfig_.bIsUse)
|
||
{
|
||
usleep(1000); //1ms
|
||
continue;
|
||
}
|
||
|
||
std::shared_ptr<VTrainInfo> pTrain = std::static_pointer_cast<VTrainInfo>(pVoidData0);
|
||
|
||
std::string strTrainTime = StringUtil::getins()->replace_all_distinct(pTrain->strTrainTime, ":", "-");
|
||
if (!pTrain->strTNum_image.empty())
|
||
{
|
||
if (!PutObject(this->minioConfig_.strBucket,
|
||
pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ pTrain->strTNum_image,
|
||
this->baseConfig_.strResultPath + "/"
|
||
+ pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ pTrain->strTNum_image))
|
||
{
|
||
LogWarn << "数据上传失败! -- " << this->baseConfig_.strResultPath + "/"
|
||
+ pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ pTrain->strTNum_image;
|
||
}
|
||
else
|
||
{
|
||
LogInfo << "数据上传MinIO 成功!!" << this->minioConfig_.strBucket+ "/"
|
||
+ pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ pTrain->strTNum_image
|
||
<< " -->"
|
||
<< this->baseConfig_.strResultPath + "/"
|
||
+ pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ pTrain->strTNum_image;
|
||
}
|
||
}
|
||
|
||
if (!pTrain->strTPro_image.empty())
|
||
{
|
||
if (!PutObject(this->minioConfig_.strBucket,
|
||
pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ pTrain->strTPro_image,
|
||
this->baseConfig_.strResultPath + "/"
|
||
+ pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ pTrain->strTPro_image))
|
||
{
|
||
LogWarn << "数据上传失败! -- " << this->baseConfig_.strResultPath + "/"
|
||
+ pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ pTrain->strTPro_image;
|
||
}
|
||
else
|
||
{
|
||
LogInfo << "数据上传MinIO 成功!!" << this->minioConfig_.strBucket+ "/"
|
||
+ pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ pTrain->strTPro_image
|
||
<< " -->"
|
||
<< this->baseConfig_.strResultPath + "/"
|
||
+ pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ pTrain->strTPro_image;
|
||
}
|
||
}
|
||
|
||
for (auto it = pTrain->vecContainer.cbegin(); it != pTrain->vecContainer.cend(); ++it)
|
||
{
|
||
if (!it->strImg.empty())
|
||
{
|
||
if (!PutObject(this->minioConfig_.strBucket,
|
||
pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ it->strImg,
|
||
this->baseConfig_.strResultPath + "/"
|
||
+ pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ it->strImg))
|
||
{
|
||
LogWarn << "数据上传失败! -- " << it->strImg;
|
||
}
|
||
else
|
||
{
|
||
LogInfo << "数据上传MinIO 成功!!" << this->minioConfig_.strBucket+ "/"
|
||
+ pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ it->strImg
|
||
<< " -->"
|
||
<< this->baseConfig_.strResultPath + "/"
|
||
+ pTrain->strTrainDate + "/"
|
||
+ strTrainTime + "/"
|
||
+ it->strImg;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
return APP_ERR_OK;
|
||
}
|