200 lines
6.6 KiB
C++
200 lines
6.6 KiB
C++
/*
|
|
* Copyright(C) 2020. Huawei Technologies Co.,Ltd. All rights reserved.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#include "ModuleBase.h"
|
|
#include <chrono>
|
|
#include "Log.h"
|
|
#include "BlockingQueue.h"
|
|
#include "ErrorCode.h"
|
|
|
|
namespace ascendBaseModule {
|
|
const int INPUTQUEUE_WARN_SIZE = 32;
|
|
const double TIME_COUNTS = 1000.0;
|
|
|
|
void ModuleBase::AssignInitArgs(const ModuleInitArgs &initArgs)
|
|
{
|
|
#ifdef ASCEND_MODULE_USE_ACL
|
|
aclContext_ = initArgs.context;
|
|
runMode_ = initArgs.runMode;
|
|
#endif
|
|
pipelineName_ = initArgs.pipelineName;
|
|
moduleName_ = initArgs.moduleName;
|
|
instanceId_ = initArgs.instanceId;
|
|
isStop_ = false;
|
|
}
|
|
|
|
// run module instance in a new thread created
|
|
APP_ERROR ModuleBase::Run()
|
|
{
|
|
LogDebug << moduleName_ << "[" << instanceId_ << "] Run";
|
|
processThr_ = std::thread(&ModuleBase::ProcessThread, this);
|
|
return APP_ERR_OK;
|
|
}
|
|
|
|
// get the data from input queue then call Process function in the new thread
|
|
void ModuleBase::ProcessThread()
|
|
{
|
|
APP_ERROR ret;
|
|
#ifdef ASCEND_MODULE_USE_ACL
|
|
ret = aclrtSetCurrentContext(aclContext_);
|
|
if (ret != APP_ERR_OK) {
|
|
LogFatal << "Fail to set context for " << moduleName_ << "[" << instanceId_ << "]"
|
|
<< ", ret=" << ret << "(" << GetAppErrCodeInfo(ret) << ").";
|
|
return;
|
|
}
|
|
#endif
|
|
// if the module has no input queue, call Process function directly.
|
|
if (withoutInputQueue_ == true) {
|
|
ret = Process(nullptr);
|
|
if (ret != APP_ERR_OK) {
|
|
LogError << "Fail to process data for " << moduleName_ << "[" << instanceId_ << "]"
|
|
<< ", ret=" << ret << "(" << GetAppErrCodeInfo(ret) << ").";
|
|
}
|
|
return;
|
|
}
|
|
if (inputQueue_ == nullptr) {
|
|
LogFatal << "Invalid input queue of " << moduleName_ << "[" << instanceId_ << "].";
|
|
return;
|
|
}
|
|
LogDebug << "Input queue for " << moduleName_ << "[" << instanceId_ << "], inputQueue=" << inputQueue_;
|
|
// repeatly pop data from input queue and call the Process funtion. Results will be pushed to output queues.
|
|
while (!isStop_) {
|
|
std::shared_ptr<void> frameInfo = nullptr;
|
|
ret = inputQueue_->Pop(frameInfo);
|
|
if (ret == APP_ERR_QUEUE_STOPED) {
|
|
LogDebug << moduleName_ << "[" << instanceId_ << "] input queue Stopped";
|
|
break;
|
|
} else if (ret != APP_ERR_OK || frameInfo == nullptr) {
|
|
LogError << "Fail to get data from input queue for " << moduleName_ << "[" << instanceId_ << "]"
|
|
<< ", ret=" << ret << "(" << GetAppErrCodeInfo(ret) << ").";
|
|
continue;
|
|
}
|
|
CallProcess(frameInfo);
|
|
}
|
|
LogInfo << moduleName_ << "[" << instanceId_ << "] process thread End";
|
|
}
|
|
|
|
void ModuleBase::CallProcess(const std::shared_ptr<void> &sendData)
|
|
{
|
|
auto startTime = std::chrono::high_resolution_clock::now();
|
|
APP_ERROR ret = Process(sendData);
|
|
auto endTime = std::chrono::high_resolution_clock::now();
|
|
double costMs = std::chrono::duration<double, std::milli>(endTime - startTime).count();
|
|
int queueSize = inputQueue_->GetSize();
|
|
if (queueSize > INPUTQUEUE_WARN_SIZE) {
|
|
LogWarn << "[Statistic] [Module] [" << moduleName_ << "] [" << instanceId_ << "] [QueueSize] [" << queueSize \
|
|
<< "] [Process] [" << costMs << " ms]";
|
|
}
|
|
|
|
if (ret != APP_ERR_OK) {
|
|
LogError << "Fail to process data for " << moduleName_ << "[" << instanceId_ << "]"
|
|
<< ", ret=" << ret << "(" << GetAppErrCodeInfo(ret) << ").";
|
|
}
|
|
}
|
|
|
|
void ModuleBase::SetOutputInfo(std::string moduleName, ModuleConnectType connectType,
|
|
std::vector<std::shared_ptr<BlockingQueue<std::shared_ptr<void>>>> outputQueVec)
|
|
{
|
|
if (outputQueVec.size() == 0) {
|
|
LogFatal << "outputQueVec is Empty! " << moduleName;
|
|
return;
|
|
}
|
|
|
|
ModuleOutputInfo outputInfo;
|
|
outputInfo.moduleName = moduleName;
|
|
outputInfo.connectType = connectType;
|
|
outputInfo.outputQueVec = outputQueVec;
|
|
outputInfo.outputQueVecSize = outputQueVec.size();
|
|
outputQueMap_[moduleName] = outputInfo;
|
|
}
|
|
|
|
const std::string ModuleBase::GetModuleName()
|
|
{
|
|
return moduleName_;
|
|
}
|
|
|
|
const int ModuleBase::GetInstanceId()
|
|
{
|
|
return instanceId_;
|
|
}
|
|
|
|
void ModuleBase::SetInputVec(std::shared_ptr<BlockingQueue<std::shared_ptr<void>>> inputQueue)
|
|
{
|
|
inputQueue_ = inputQueue;
|
|
}
|
|
|
|
void ModuleBase::SendToNextModule(std::string moduleName, std::shared_ptr<void> outputData, int channelId)
|
|
{
|
|
if (isStop_) {
|
|
LogDebug << moduleName_ << "[" << instanceId_ << "] is Stopped, can't send to next module";
|
|
return;
|
|
}
|
|
|
|
if (outputQueMap_.find(moduleName) == outputQueMap_.end()) {
|
|
LogFatal << "No Next Module " << moduleName;
|
|
return;
|
|
}
|
|
|
|
auto itr = outputQueMap_.find(moduleName);
|
|
if (itr == outputQueMap_.end()) {
|
|
LogFatal << "No Next Module " << moduleName;
|
|
return;
|
|
}
|
|
ModuleOutputInfo outputInfo = itr->second;
|
|
|
|
if (outputInfo.connectType == MODULE_CONNECT_ONE) {
|
|
outputInfo.outputQueVec[0]->Push(outputData, true);
|
|
} else if (outputInfo.connectType == MODULE_CONNECT_CHANNEL) {
|
|
uint32_t ch = channelId % outputInfo.outputQueVecSize;
|
|
if (ch >= outputInfo.outputQueVecSize) {
|
|
LogFatal << "No Next Module!";
|
|
return;
|
|
}
|
|
outputInfo.outputQueVec[ch]->Push(outputData, true);
|
|
} else if (outputInfo.connectType == MODULE_CONNECT_PAIR) {
|
|
outputInfo.outputQueVec[instanceId_]->Push(outputData, true);
|
|
} else if (outputInfo.connectType == MODULE_CONNECT_RANDOM) {
|
|
outputInfo.outputQueVec[sendCount_ % outputInfo.outputQueVecSize]->Push(outputData, true);
|
|
}
|
|
sendCount_++;
|
|
}
|
|
|
|
// clear input queue and stop the thread of the instance, called before destroy the instance
|
|
APP_ERROR ModuleBase::Stop()
|
|
{
|
|
#ifdef ASCEND_MODULE_USE_ACL
|
|
APP_ERROR ret = aclrtSetCurrentContext(aclContext_);
|
|
if (ret != APP_ERR_OK) {
|
|
LogFatal << "ModuleManager: fail to set context, ret[%d]" << ret << ".";
|
|
return ret;
|
|
}
|
|
#endif
|
|
|
|
// stop input queue
|
|
isStop_ = true;
|
|
|
|
if (inputQueue_ != nullptr) {
|
|
inputQueue_->Stop();
|
|
}
|
|
|
|
if (processThr_.joinable()) {
|
|
processThr_.join();
|
|
}
|
|
|
|
return DeInit();
|
|
}
|
|
}
|