VTrain/base/Framework/ModuleManager/ModuleBase.cpp

200 lines
6.6 KiB
C++

/*
* Copyright(C) 2020. Huawei Technologies Co.,Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ModuleBase.h"
#include <chrono>
#include "Log.h"
#include "BlockingQueue.h"
#include "ErrorCode.h"
namespace ascendBaseModule {
const int INPUTQUEUE_WARN_SIZE = 32;
const double TIME_COUNTS = 1000.0;
void ModuleBase::AssignInitArgs(const ModuleInitArgs &initArgs)
{
#ifdef ASCEND_MODULE_USE_ACL
aclContext_ = initArgs.context;
runMode_ = initArgs.runMode;
#endif
pipelineName_ = initArgs.pipelineName;
moduleName_ = initArgs.moduleName;
instanceId_ = initArgs.instanceId;
isStop_ = false;
}
// run module instance in a new thread created
APP_ERROR ModuleBase::Run()
{
LogDebug << moduleName_ << "[" << instanceId_ << "] Run";
processThr_ = std::thread(&ModuleBase::ProcessThread, this);
return APP_ERR_OK;
}
// get the data from input queue then call Process function in the new thread
void ModuleBase::ProcessThread()
{
APP_ERROR ret;
#ifdef ASCEND_MODULE_USE_ACL
ret = aclrtSetCurrentContext(aclContext_);
if (ret != APP_ERR_OK) {
LogFatal << "Fail to set context for " << moduleName_ << "[" << instanceId_ << "]"
<< ", ret=" << ret << "(" << GetAppErrCodeInfo(ret) << ").";
return;
}
#endif
// if the module has no input queue, call Process function directly.
if (withoutInputQueue_ == true) {
ret = Process(nullptr);
if (ret != APP_ERR_OK) {
LogError << "Fail to process data for " << moduleName_ << "[" << instanceId_ << "]"
<< ", ret=" << ret << "(" << GetAppErrCodeInfo(ret) << ").";
}
return;
}
if (inputQueue_ == nullptr) {
LogFatal << "Invalid input queue of " << moduleName_ << "[" << instanceId_ << "].";
return;
}
LogDebug << "Input queue for " << moduleName_ << "[" << instanceId_ << "], inputQueue=" << inputQueue_;
// repeatly pop data from input queue and call the Process funtion. Results will be pushed to output queues.
while (!isStop_) {
std::shared_ptr<void> frameInfo = nullptr;
ret = inputQueue_->Pop(frameInfo);
if (ret == APP_ERR_QUEUE_STOPED) {
LogDebug << moduleName_ << "[" << instanceId_ << "] input queue Stopped";
break;
} else if (ret != APP_ERR_OK || frameInfo == nullptr) {
LogError << "Fail to get data from input queue for " << moduleName_ << "[" << instanceId_ << "]"
<< ", ret=" << ret << "(" << GetAppErrCodeInfo(ret) << ").";
continue;
}
CallProcess(frameInfo);
}
LogInfo << moduleName_ << "[" << instanceId_ << "] process thread End";
}
void ModuleBase::CallProcess(const std::shared_ptr<void> &sendData)
{
auto startTime = std::chrono::high_resolution_clock::now();
APP_ERROR ret = Process(sendData);
auto endTime = std::chrono::high_resolution_clock::now();
double costMs = std::chrono::duration<double, std::milli>(endTime - startTime).count();
int queueSize = inputQueue_->GetSize();
if (queueSize > INPUTQUEUE_WARN_SIZE) {
LogWarn << "[Statistic] [Module] [" << moduleName_ << "] [" << instanceId_ << "] [QueueSize] [" << queueSize \
<< "] [Process] [" << costMs << " ms]";
}
if (ret != APP_ERR_OK) {
LogError << "Fail to process data for " << moduleName_ << "[" << instanceId_ << "]"
<< ", ret=" << ret << "(" << GetAppErrCodeInfo(ret) << ").";
}
}
void ModuleBase::SetOutputInfo(std::string moduleName, ModuleConnectType connectType,
std::vector<std::shared_ptr<BlockingQueue<std::shared_ptr<void>>>> outputQueVec)
{
if (outputQueVec.size() == 0) {
LogFatal << "outputQueVec is Empty! " << moduleName;
return;
}
ModuleOutputInfo outputInfo;
outputInfo.moduleName = moduleName;
outputInfo.connectType = connectType;
outputInfo.outputQueVec = outputQueVec;
outputInfo.outputQueVecSize = outputQueVec.size();
outputQueMap_[moduleName] = outputInfo;
}
const std::string ModuleBase::GetModuleName()
{
return moduleName_;
}
const int ModuleBase::GetInstanceId()
{
return instanceId_;
}
void ModuleBase::SetInputVec(std::shared_ptr<BlockingQueue<std::shared_ptr<void>>> inputQueue)
{
inputQueue_ = inputQueue;
}
void ModuleBase::SendToNextModule(std::string moduleName, std::shared_ptr<void> outputData, int channelId)
{
if (isStop_) {
LogDebug << moduleName_ << "[" << instanceId_ << "] is Stopped, can't send to next module";
return;
}
if (outputQueMap_.find(moduleName) == outputQueMap_.end()) {
LogFatal << "No Next Module " << moduleName;
return;
}
auto itr = outputQueMap_.find(moduleName);
if (itr == outputQueMap_.end()) {
LogFatal << "No Next Module " << moduleName;
return;
}
ModuleOutputInfo outputInfo = itr->second;
if (outputInfo.connectType == MODULE_CONNECT_ONE) {
outputInfo.outputQueVec[0]->Push(outputData, true);
} else if (outputInfo.connectType == MODULE_CONNECT_CHANNEL) {
uint32_t ch = channelId % outputInfo.outputQueVecSize;
if (ch >= outputInfo.outputQueVecSize) {
LogFatal << "No Next Module!";
return;
}
outputInfo.outputQueVec[ch]->Push(outputData, true);
} else if (outputInfo.connectType == MODULE_CONNECT_PAIR) {
outputInfo.outputQueVec[instanceId_]->Push(outputData, true);
} else if (outputInfo.connectType == MODULE_CONNECT_RANDOM) {
outputInfo.outputQueVec[sendCount_ % outputInfo.outputQueVecSize]->Push(outputData, true);
}
sendCount_++;
}
// clear input queue and stop the thread of the instance, called before destroy the instance
APP_ERROR ModuleBase::Stop()
{
#ifdef ASCEND_MODULE_USE_ACL
APP_ERROR ret = aclrtSetCurrentContext(aclContext_);
if (ret != APP_ERR_OK) {
LogFatal << "ModuleManager: fail to set context, ret[%d]" << ret << ".";
return ret;
}
#endif
// stop input queue
isStop_ = true;
if (inputQueue_ != nullptr) {
inputQueue_->Stop();
}
if (processThr_.joinable()) {
processThr_.join();
}
return DeInit();
}
}