/* * Copyright(C) 2020. Huawei Technologies Co.,Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "ModuleManager.h" #include "Log.h" #ifdef ASCEND_MODULE_USE_ACL #include "ResourceManager.h" #endif namespace ascendBaseModule { const int MODULE_QUEUE_SIZE = 200; ModuleManager::ModuleManager() {} ModuleManager::~ModuleManager() {} APP_ERROR ModuleManager::Init(std::string &configPath, std::string &aclConfigPath) { LogDebug << "ModuleManager: begin to init."; // load and parse config file APP_ERROR ret = configParser_.ParseConfig(configPath); if (ret != APP_ERR_OK) { LogFatal << "ModuleManager: cannot parse file."; return ret; } // Init Acl #ifdef ASCEND_MODULE_USE_ACL ret = InitAcl(aclConfigPath); if (ret != APP_ERR_OK) { LogFatal << "ModuleManager: fail to init Acl."; return ret; } #endif // Init pipeline module ret = InitPipelineModule(); if (ret != APP_ERR_OK) { LogFatal << "ModuleManager: fail to init pipeline module."; return ret; } return ret; } #ifdef ASCEND_MODULE_USE_ACL APP_ERROR ModuleManager::InitAcl(std::string &aclConfigPath) { LogDebug << "ModuleManager: begin to init Acl."; std::string itemCfgStr; itemCfgStr = "SystemConfig.deviceId"; APP_ERROR ret = configParser_.GetIntValue(itemCfgStr, deviceId_); if (ret != APP_ERR_OK) { LogFatal << "ModuleManager: fail to get device id."; return ret; } ret = aclrtGetRunMode(&runMode_); if (ret != APP_ERR_OK) { LogFatal << "ModuleManager: fail to get run mode of device, ret=" << ret << "."; return ret; } if (runMode_ == ACL_DEVICE) { deviceId_ = 0; } else if (deviceId_ < 0) { LogFatal << "ModuleManager: invalid device id, deviceId=" << deviceId_ << "."; return APP_ERR_COMM_INVALID_PARAM; } ResourceInfo resourceInfo; resourceInfo.aclConfigPath = aclConfigPath; resourceInfo.deviceIds.insert(deviceId_); return ResourceManager::GetInstance()->InitResource(resourceInfo); } #endif APP_ERROR ModuleManager::InitModuleInstance(std::shared_ptr moduleInstance, int instanceId, std::string pipelineName, std::string moduleName) { LogDebug << "ModuleManager: begin to init module instance." << moduleName; ModuleInitArgs initArgs; #ifdef ASCEND_MODULE_USE_ACL initArgs.context = ResourceManager::GetInstance()->GetContext(deviceId_); initArgs.runMode = runMode_; #endif initArgs.pipelineName = pipelineName; initArgs.moduleName = moduleName; initArgs.instanceId = instanceId; // Initialize the Init function of each module APP_ERROR ret = moduleInstance->Init(configParser_, initArgs); if (ret != APP_ERR_OK) { LogFatal << "ModuleManager: fail to init module, name = " << moduleName.c_str() << ", instance id = " << instanceId << "."; return ret; } LogDebug << "ModuleManager: module " << initArgs.moduleName << "[" << instanceId << "] init success."; return ret; } APP_ERROR ModuleManager::RegisterModules(std::string pipelineName, ModuleDesc *modulesDesc, int moduleTypeCount, int defaultCount) { auto iter = pipelineMap_.find(pipelineName); std::map modulesInfoMap; if (iter != pipelineMap_.end()) { modulesInfoMap = iter->second; } std::shared_ptr moduleInstance = nullptr; // create new object of module // auto initialize the Init function of each module for (int i = 0; i < moduleTypeCount; i++) { ModuleDesc moduleDesc = modulesDesc[i]; int moduleCount = (moduleDesc.moduleCount == -1) ? defaultCount : moduleDesc.moduleCount; ModulesInfo modulesInfo; for (int j = 0; j < moduleCount; j++) { moduleInstance.reset(static_cast(ModuleFactory::MakeModule(moduleDesc.moduleName))); APP_ERROR ret = InitModuleInstance(moduleInstance, j, pipelineName, moduleDesc.moduleName); if (ret != APP_ERR_OK) { return ret; } modulesInfo.moduleVec.push_back(moduleInstance); } modulesInfoMap[moduleDesc.moduleName] = modulesInfo; } pipelineMap_[pipelineName] = modulesInfoMap; return APP_ERR_OK; } APP_ERROR ModuleManager::RegisterModuleConnects(std::string pipelineName, ModuleConnectDesc *connnectDesc, int moduleConnectCount) { auto iter = pipelineMap_.find(pipelineName); std::map modulesInfoMap; if (iter == pipelineMap_.end()) { return APP_ERR_COMM_INVALID_PARAM; } modulesInfoMap = iter->second; std::shared_ptr>> dataQueue = nullptr; // add connect for (int i = 0; i < moduleConnectCount; i++) { ModuleConnectDesc connectDesc = connnectDesc[i]; LogDebug << "Add Connect " << connectDesc.moduleSend << " " << connectDesc.moduleRecv << " type " << connectDesc.connectType; auto iterSend = modulesInfoMap.find(connectDesc.moduleSend); auto iterRecv = modulesInfoMap.find(connectDesc.moduleRecv); if (iterSend == modulesInfoMap.end() || iterRecv == modulesInfoMap.end()) { LogFatal << "Cann't find Module"; return APP_ERR_COMM_INVALID_PARAM; } ModulesInfo moduleInfoSend = iterSend->second; ModulesInfo moduleInfoRecv = iterRecv->second; // create input queue for recv module for (unsigned int j = 0; j < moduleInfoRecv.moduleVec.size(); j++) { dataQueue = std::make_shared>>(MODULE_QUEUE_SIZE); moduleInfoRecv.inputQueueVec.push_back(dataQueue); } RegisterInputVec(pipelineName, connectDesc.moduleRecv, moduleInfoRecv.inputQueueVec); // RegisterOutputModule(pipelineName, connectDesc.moduleSend, connectDesc.moduleRecv, connectDesc.connectType, moduleInfoRecv.inputQueueVec); } return APP_ERR_OK; } APP_ERROR ModuleManager::RegisterInputVec(std::string pipelineName, std::string moduleName, std::vector>>> inputQueVec) { auto pipelineIter = pipelineMap_.find(pipelineName); std::map modulesInfoMap; if (pipelineIter == pipelineMap_.end()) { return APP_ERR_COMM_INVALID_PARAM; } modulesInfoMap = pipelineIter->second; // set inputQueue auto iter = modulesInfoMap.find(moduleName); if (iter != modulesInfoMap.end()) { ModulesInfo moduleInfo = iter->second; if (moduleInfo.moduleVec.size() != inputQueVec.size()) { return APP_ERR_COMM_FAILURE; } std::shared_ptr>> inputQueue = nullptr; for (unsigned int j = 0; j < moduleInfo.moduleVec.size(); j++) { std::shared_ptr moduleInstance = moduleInfo.moduleVec[j]; inputQueue = inputQueVec[j]; moduleInstance->SetInputVec(inputQueue); } } return APP_ERR_OK; } APP_ERROR ModuleManager::RegisterOutputModule(std::string pipelineName, std::string moduleSend, std::string moduleRecv, ModuleConnectType connectType, std::vector>>> outputQueVec) { auto pipelineIter = pipelineMap_.find(pipelineName); std::map modulesInfoMap; if (pipelineIter == pipelineMap_.end()) { return APP_ERR_COMM_INVALID_PARAM; } modulesInfoMap = pipelineIter->second; // set outputInfo auto iter = modulesInfoMap.find(moduleSend); if (iter != modulesInfoMap.end()) { ModulesInfo moduleInfo = iter->second; for (unsigned int j = 0; j < moduleInfo.moduleVec.size(); j++) { std::shared_ptr moduleInstance = moduleInfo.moduleVec[j]; moduleInstance->SetOutputInfo(moduleRecv, connectType, outputQueVec); } } return APP_ERR_OK; } APP_ERROR ModuleManager::InitPipelineModule() { return APP_ERR_OK; } APP_ERROR ModuleManager::RunPipeline() { LogInfo << "ModuleManager: begin to run pipeline."; // start the thread of the corresponding module std::map modulesInfoMap; std::shared_ptr moduleInstance; for (auto pipelineIter = pipelineMap_.begin(); pipelineIter != pipelineMap_.end(); pipelineIter++) { modulesInfoMap = pipelineIter->second; for (auto iter = modulesInfoMap.begin(); iter != modulesInfoMap.end(); iter++) { ModulesInfo modulesInfo = iter->second; for (uint32_t i = 0; i < modulesInfo.moduleVec.size(); i++) { moduleInstance = modulesInfo.moduleVec[i]; APP_ERROR ret = moduleInstance->Run(); if (ret != APP_ERR_OK) { LogFatal << "ModuleManager: fail to run module "; return ret; } } } } LogInfo << "ModuleManager: run pipeline success."; return APP_ERR_OK; } APP_ERROR ModuleManager::DeInit(void) { LogInfo << "begin to deinit module manager."; APP_ERROR ret = APP_ERR_OK; // DeInit pipeline module ret = DeInitPipelineModule(); if (ret != APP_ERR_OK) { LogFatal << "ModuleManager: fail to deinit pipeline module, ret[%d]" << ret << "."; return ret; } #ifdef ASCEND_MODULE_USE_ACL ResourceManager::GetInstance()->Release(); #endif return ret; } void ModuleManager::StopModule(std::shared_ptr module) { LogDebug << module->GetModuleName() << "[" << module->GetInstanceId() << "] stop begin"; APP_ERROR ret = module->Stop(); if (ret != APP_ERR_OK) { LogFatal << module->GetModuleName() << "[" << module->GetInstanceId() << "] stop failed"; } LogInfo << module->GetModuleName() << "[" << module->GetInstanceId() << "] stop success"; } APP_ERROR ModuleManager::DeInitPipelineModule() { LogDebug << "begin to deinit pipeline module."; std::map modulesInfoMap; for (auto pipelineIter = pipelineMap_.begin(); pipelineIter != pipelineMap_.end(); pipelineIter++) { modulesInfoMap = pipelineIter->second; for (auto iter = modulesInfoMap.begin(); iter != modulesInfoMap.end(); iter++) { ModulesInfo modulesInfo = iter->second; std::vector threadVec; for (auto &moduleInstance : modulesInfo.moduleVec) { threadVec.emplace_back(ModuleManager::StopModule, moduleInstance); } for (auto &t : threadVec) { t.join(); } } LogInfo << "Deinit " << pipelineIter->first << " success."; } return APP_ERR_OK; } }