Train_Identify/nvidia_ascend_base/Base/Framework/ModuleManager/ModuleManager.cpp

326 lines
11 KiB
C++

/*
* Copyright(C) 2020. Huawei Technologies Co.,Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ModuleManager.h"
#include "Log.h"
#ifdef ASCEND_MODULE_USE_ACL
#include "ResourceManager.h"
#endif
namespace ascendBaseModule {
const int MODULE_QUEUE_SIZE = 200;
ModuleManager::ModuleManager() {}
ModuleManager::~ModuleManager() {}
APP_ERROR ModuleManager::Init(std::string &configPath, std::string &aclConfigPath)
{
LogDebug << "ModuleManager: begin to init.";
// load and parse config file
APP_ERROR ret = configParser_.ParseConfig(configPath);
if (ret != APP_ERR_OK) {
LogFatal << "ModuleManager: cannot parse file.";
return ret;
}
// Init Acl
#ifdef ASCEND_MODULE_USE_ACL
ret = InitAcl(aclConfigPath);
if (ret != APP_ERR_OK) {
LogFatal << "ModuleManager: fail to init Acl.";
return ret;
}
#endif
// Init pipeline module
ret = InitPipelineModule();
if (ret != APP_ERR_OK) {
LogFatal << "ModuleManager: fail to init pipeline module.";
return ret;
}
return ret;
}
#ifdef ASCEND_MODULE_USE_ACL
APP_ERROR ModuleManager::InitAcl(std::string &aclConfigPath)
{
LogDebug << "ModuleManager: begin to init Acl.";
std::string itemCfgStr;
itemCfgStr = "SystemConfig.deviceId";
APP_ERROR ret = configParser_.GetIntValue(itemCfgStr, deviceId_);
if (ret != APP_ERR_OK) {
LogFatal << "ModuleManager: fail to get device id.";
return ret;
}
ret = aclrtGetRunMode(&runMode_);
if (ret != APP_ERR_OK) {
LogFatal << "ModuleManager: fail to get run mode of device, ret=" << ret << ".";
return ret;
}
if (runMode_ == ACL_DEVICE) {
deviceId_ = 0;
} else if (deviceId_ < 0) {
LogFatal << "ModuleManager: invalid device id, deviceId=" << deviceId_ << ".";
return APP_ERR_COMM_INVALID_PARAM;
}
ResourceInfo resourceInfo;
resourceInfo.aclConfigPath = aclConfigPath;
resourceInfo.deviceIds.insert(deviceId_);
return ResourceManager::GetInstance()->InitResource(resourceInfo);
}
#endif
APP_ERROR ModuleManager::InitModuleInstance(std::shared_ptr<ModuleBase> moduleInstance, int instanceId,
std::string pipelineName, std::string moduleName)
{
LogDebug << "ModuleManager: begin to init module instance." << moduleName;
ModuleInitArgs initArgs;
#ifdef ASCEND_MODULE_USE_ACL
initArgs.context = ResourceManager::GetInstance()->GetContext(deviceId_);
initArgs.runMode = runMode_;
#endif
initArgs.pipelineName = pipelineName;
initArgs.moduleName = moduleName;
initArgs.instanceId = instanceId;
// Initialize the Init function of each module
APP_ERROR ret = moduleInstance->Init(configParser_, initArgs);
if (ret != APP_ERR_OK) {
LogFatal << "ModuleManager: fail to init module, name = " << moduleName.c_str() << ", instance id = " <<
instanceId << ".";
return ret;
}
LogDebug << "ModuleManager: module " << initArgs.moduleName << "[" << instanceId << "] init success.";
return ret;
}
APP_ERROR ModuleManager::RegisterModules(std::string pipelineName, ModuleDesc *modulesDesc, int moduleTypeCount,
int defaultCount)
{
auto iter = pipelineMap_.find(pipelineName);
std::map<std::string, ModulesInfo> modulesInfoMap;
if (iter != pipelineMap_.end()) {
modulesInfoMap = iter->second;
}
std::shared_ptr<ModuleBase> moduleInstance = nullptr;
// create new object of module
// auto initialize the Init function of each module
for (int i = 0; i < moduleTypeCount; i++) {
ModuleDesc moduleDesc = modulesDesc[i];
int moduleCount = (moduleDesc.moduleCount == -1) ? defaultCount : moduleDesc.moduleCount;
ModulesInfo modulesInfo;
for (int j = 0; j < moduleCount; j++) {
moduleInstance.reset(static_cast<ModuleBase *>(ModuleFactory::MakeModule(moduleDesc.moduleName)));
APP_ERROR ret = InitModuleInstance(moduleInstance, j, pipelineName, moduleDesc.moduleName);
if (ret != APP_ERR_OK) {
return ret;
}
modulesInfo.moduleVec.push_back(moduleInstance);
}
modulesInfoMap[moduleDesc.moduleName] = modulesInfo;
}
pipelineMap_[pipelineName] = modulesInfoMap;
return APP_ERR_OK;
}
APP_ERROR ModuleManager::RegisterModuleConnects(std::string pipelineName, ModuleConnectDesc *connnectDesc,
int moduleConnectCount)
{
auto iter = pipelineMap_.find(pipelineName);
std::map<std::string, ModulesInfo> modulesInfoMap;
if (iter == pipelineMap_.end()) {
return APP_ERR_COMM_INVALID_PARAM;
}
modulesInfoMap = iter->second;
std::shared_ptr<BlockingQueue<std::shared_ptr<void>>> dataQueue = nullptr;
// add connect
for (int i = 0; i < moduleConnectCount; i++) {
ModuleConnectDesc connectDesc = connnectDesc[i];
LogDebug << "Add Connect " << connectDesc.moduleSend << " " << connectDesc.moduleRecv << " type " <<
connectDesc.connectType;
auto iterSend = modulesInfoMap.find(connectDesc.moduleSend);
auto iterRecv = modulesInfoMap.find(connectDesc.moduleRecv);
if (iterSend == modulesInfoMap.end() || iterRecv == modulesInfoMap.end()) {
LogFatal << "Cann't find Module";
return APP_ERR_COMM_INVALID_PARAM;
}
ModulesInfo moduleInfoSend = iterSend->second;
ModulesInfo moduleInfoRecv = iterRecv->second;
// create input queue for recv module
for (unsigned int j = 0; j < moduleInfoRecv.moduleVec.size(); j++) {
dataQueue = std::make_shared<BlockingQueue<std::shared_ptr<void>>>(MODULE_QUEUE_SIZE);
moduleInfoRecv.inputQueueVec.push_back(dataQueue);
}
RegisterInputVec(pipelineName, connectDesc.moduleRecv, moduleInfoRecv.inputQueueVec);
//
RegisterOutputModule(pipelineName, connectDesc.moduleSend, connectDesc.moduleRecv, connectDesc.connectType,
moduleInfoRecv.inputQueueVec);
}
return APP_ERR_OK;
}
APP_ERROR ModuleManager::RegisterInputVec(std::string pipelineName, std::string moduleName,
std::vector<std::shared_ptr<BlockingQueue<std::shared_ptr<void>>>> inputQueVec)
{
auto pipelineIter = pipelineMap_.find(pipelineName);
std::map<std::string, ModulesInfo> modulesInfoMap;
if (pipelineIter == pipelineMap_.end()) {
return APP_ERR_COMM_INVALID_PARAM;
}
modulesInfoMap = pipelineIter->second;
// set inputQueue
auto iter = modulesInfoMap.find(moduleName);
if (iter != modulesInfoMap.end()) {
ModulesInfo moduleInfo = iter->second;
if (moduleInfo.moduleVec.size() != inputQueVec.size()) {
return APP_ERR_COMM_FAILURE;
}
std::shared_ptr<BlockingQueue<std::shared_ptr<void>>> inputQueue = nullptr;
for (unsigned int j = 0; j < moduleInfo.moduleVec.size(); j++) {
std::shared_ptr<ModuleBase> moduleInstance = moduleInfo.moduleVec[j];
inputQueue = inputQueVec[j];
moduleInstance->SetInputVec(inputQueue);
}
}
return APP_ERR_OK;
}
APP_ERROR ModuleManager::RegisterOutputModule(std::string pipelineName, std::string moduleSend, std::string moduleRecv,
ModuleConnectType connectType, std::vector<std::shared_ptr<BlockingQueue<std::shared_ptr<void>>>> outputQueVec)
{
auto pipelineIter = pipelineMap_.find(pipelineName);
std::map<std::string, ModulesInfo> modulesInfoMap;
if (pipelineIter == pipelineMap_.end()) {
return APP_ERR_COMM_INVALID_PARAM;
}
modulesInfoMap = pipelineIter->second;
// set outputInfo
auto iter = modulesInfoMap.find(moduleSend);
if (iter != modulesInfoMap.end()) {
ModulesInfo moduleInfo = iter->second;
for (unsigned int j = 0; j < moduleInfo.moduleVec.size(); j++) {
std::shared_ptr<ModuleBase> moduleInstance = moduleInfo.moduleVec[j];
moduleInstance->SetOutputInfo(moduleRecv, connectType, outputQueVec);
}
}
return APP_ERR_OK;
}
APP_ERROR ModuleManager::InitPipelineModule()
{
return APP_ERR_OK;
}
APP_ERROR ModuleManager::RunPipeline()
{
LogInfo << "ModuleManager: begin to run pipeline.";
// start the thread of the corresponding module
std::map<std::string, ModulesInfo> modulesInfoMap;
std::shared_ptr<ModuleBase> moduleInstance;
for (auto pipelineIter = pipelineMap_.begin(); pipelineIter != pipelineMap_.end(); pipelineIter++) {
modulesInfoMap = pipelineIter->second;
for (auto iter = modulesInfoMap.begin(); iter != modulesInfoMap.end(); iter++) {
ModulesInfo modulesInfo = iter->second;
for (uint32_t i = 0; i < modulesInfo.moduleVec.size(); i++) {
moduleInstance = modulesInfo.moduleVec[i];
APP_ERROR ret = moduleInstance->Run();
if (ret != APP_ERR_OK) {
LogFatal << "ModuleManager: fail to run module ";
return ret;
}
}
}
}
LogInfo << "ModuleManager: run pipeline success.";
return APP_ERR_OK;
}
APP_ERROR ModuleManager::DeInit(void)
{
LogInfo << "begin to deinit module manager.";
APP_ERROR ret = APP_ERR_OK;
// DeInit pipeline module
ret = DeInitPipelineModule();
if (ret != APP_ERR_OK) {
LogFatal << "ModuleManager: fail to deinit pipeline module, ret[%d]" << ret << ".";
return ret;
}
#ifdef ASCEND_MODULE_USE_ACL
ResourceManager::GetInstance()->Release();
#endif
return ret;
}
void ModuleManager::StopModule(std::shared_ptr<ModuleBase> module)
{
LogDebug << module->GetModuleName() << "[" << module->GetInstanceId() << "] stop begin";
APP_ERROR ret = module->Stop();
if (ret != APP_ERR_OK) {
LogFatal << module->GetModuleName() << "[" << module->GetInstanceId() << "] stop failed";
}
LogInfo << module->GetModuleName() << "[" << module->GetInstanceId() << "] stop success";
}
APP_ERROR ModuleManager::DeInitPipelineModule()
{
LogDebug << "begin to deinit pipeline module.";
std::map<std::string, ModulesInfo> modulesInfoMap;
for (auto pipelineIter = pipelineMap_.begin(); pipelineIter != pipelineMap_.end(); pipelineIter++) {
modulesInfoMap = pipelineIter->second;
for (auto iter = modulesInfoMap.begin(); iter != modulesInfoMap.end(); iter++) {
ModulesInfo modulesInfo = iter->second;
std::vector<std::thread> threadVec;
for (auto &moduleInstance : modulesInfo.moduleVec) {
threadVec.emplace_back(ModuleManager::StopModule, moduleInstance);
}
for (auto &t : threadVec) {
t.join();
}
}
LogInfo << "Deinit " << pipelineIter->first << " success.";
}
return APP_ERR_OK;
}
}