#include "SocketEngine.h" SocketEngine::SocketEngine() { isStop_ = false; } SocketEngine::~SocketEngine() {} APP_ERROR SocketEngine::Init() { strPort0_ = engineName_ + "_" + std::to_string(engineId_) + "_0"; this->socketOpenType_ = MyYaml::GetIns()->GetIntValue("socket_server_open"); this->socketPort_ = MyYaml::GetIns()->GetIntValue("socket_server_port"); this->socketQueueLen_ = MyYaml::GetIns()->GetIntValue("socket_server_queue_len"); MyShellInfo << "SocketEngine init ok"; return APP_ERR_OK; } APP_ERROR SocketEngine::DeInit() { for (int fd = 0; fd <= max_fd; ++fd) { if (FD_ISSET(fd, &master_set)) { close(fd); } } MyShellInfo << "SocketEngine deinit ok"; return APP_ERR_OK; } APP_ERROR SocketEngine::Process() { int ret = APP_ERR_OK; if (this->socketOpenType_) { while (!isStop_) { if (!this->Socket_(this->socketPort_)) continue; if (!this->Bind()) continue; this->Listen(this->socketQueueLen_); this->Run(); } } else { while (!isStop_) { usleep(1000); } } return APP_ERR_OK; } bool SocketEngine::Socket_(int port) { bzero(&server_addr, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = htons(INADDR_ANY); server_addr.sin_port = htons(port); // create socket to listen listen_fd = socket(PF_INET, SOCK_STREAM, 0); if (listen_fd < 0) { LogError << "Create Scoket_Server Failed!"; return false; } int opt = 1; setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); return true; } bool SocketEngine::Bind() { if (-1 == (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)))) { LogError << "Scoket_Server Bind Failed!"; return false; } LogInfo << "Scoket_Server Bind Successfully."; return true; } bool SocketEngine::Listen(int queue_len) { if (-1 == listen(listen_fd, queue_len)) { LogError << "Scoket_Server Listen Failed!"; return false; } LogInfo << "Scoket_Server Listen Successfully."; return true; } bool SocketEngine::Accept() { struct sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); int new_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_addr_len); if (new_fd < 0) { LogError << "Scoket_Server Accept Failed!"; return false; } std::string ip(inet_ntoa(client_addr.sin_addr)); // 获取客户端IP LogInfo << ip << " new connection was accepted."; mmap.insert(std::make_pair(new_fd, std::make_pair(ip, 0))); // 将新建立的连接的fd加入master_set FD_SET(new_fd, &master_set); if (new_fd > max_fd) { max_fd = new_fd; } return true; } bool SocketEngine::Recv(int nums) { for (int fd = 0; fd <= max_fd; ++fd) { if (FD_ISSET(fd, &working_set)) { bool close_conn = false; // 标记当前连接是否断开了 char order_str[512] = { 0 }; int size_get = recv(fd, (char*)&order_str, sizeof(order_str) - 1, 0); if (size_get <= 0) { continue; } else { bool isHeart_beat = (std::string(order_str) == "matrixai"); if (isHeart_beat) { mmap[fd].second = 0; // 每次收到心跳包,count置0 //LogDebug << "Scoket_Server Received heart-beat from client."; } else { LogInfo << "Received message from client:" << std::string(order_str); } } if (close_conn) // 当前这个连接有问题,关闭它 { close(fd); FD_CLR(fd, &master_set); if (fd == max_fd) // 需要更新max_fd; { while (FD_ISSET(max_fd, &master_set) == false) --max_fd; } } } } return true; } bool SocketEngine::Run() { pthread_t id; // 创建心跳检测线程 int ret = pthread_create(&id, NULL, heart_handler, (void*)this); if (ret != 0) { LogError << "Scoket_Server Can not create heart-beat checking thread."; return false; } ret = pthread_create(&id, NULL, sendInfo, (void*)this); if (ret != 0) { LogError << "Scoket_Server Can not create message send thread."; return false; } max_fd = listen_fd; // 初始化max_fd FD_ZERO(&master_set); FD_SET(listen_fd, &master_set); // 添加监听fd while (!isStop_) { FD_ZERO(&working_set); memcpy(&working_set, &master_set, sizeof(master_set)); timeout.tv_sec = 30; timeout.tv_usec = 0; int nums = select(max_fd + 1, &working_set, NULL, NULL, &timeout); /*if (nums < 0) { LogError << "Scoket_Server select() error!"; return false; }*/ if (nums <= 0) { //cout << "select() is timeout!"; continue; } if (FD_ISSET(listen_fd, &working_set)) Accept(); // 有新的客户端请求 else Recv(nums); // 接收客户端的消息 } return true; } //解析命令 bool SocketEngine::getOrder(const std::string &recv, Json::Value &order) { Json::CharReaderBuilder readerBuilder; std::shared_ptr reader(readerBuilder.newCharReader()); JSONCPP_STRING errs; if (!reader->parse(recv.data(), recv.data() + recv.size(), &order, &errs)) return false; if (order.isArray()) { if (order.size() > 0) order = order[0]; return true; } return true; } std::string SocketEngine::getFeedBack(const std::string poundNo, const std::string type, const std::string info) { Json::Value feedBack; Json::StreamWriterBuilder strbuild; feedBack["poundNo"] = poundNo; feedBack["type"] = type; feedBack["info"] = info; // return Json::writeString(strbuild, feedBack); } void* SocketEngine::sendInfo(void* arg) { SocketEngine* s = (SocketEngine*)arg; while (!s->isStop_) { //pop端口0 std::shared_ptr pVoidData0 = nullptr; s->inputQueMap_[s->strPort0_]->pop(pVoidData0); if (nullptr == pVoidData0) { usleep(1000); //1ms continue; } std::shared_ptr pMessage = std::static_pointer_cast(pVoidData0); SendAllClient(s->mmap, *pMessage); } } // thread function void* SocketEngine::heart_handler(void* arg) { LogInfo << "Scoket_Server The heartbeat checking thread started.\n"; SocketEngine* s = (SocketEngine*)arg; while (1) { std::map >::iterator it = s->mmap.begin(); for (; it != s->mmap.end(); ) { if (it->second.second == 5) // sleep(3)*5没有收到心跳包,判定客户端掉线 { LogInfo << "The client " << it->second.first << " has be offline.\n"; int fd = it->first; close(fd); // 关闭该连接 FD_CLR(fd, &s->master_set); if (fd == s->max_fd) // 需要更新max_fd; { while (FD_ISSET(s->max_fd, &s->master_set) == false) s->max_fd--; } s->mmap.erase(it++); // 从map中移除该记录 } else if (it->second.second < 5 && it->second.second >= 0) { it->second.second += 1; ++it; } else { ++it; } } sleep(1); // 定时三秒 } } bool SocketEngine::SendClient(std::map > mmap, const std::string ip, const std::string message) { try { std::map >::iterator it = mmap.begin(); for (; it != mmap.end(); ++it) { if (it->second.first == ip) // 遍历找雷达ip { int fd = it->first; if (send(fd, message.c_str(), message.size(), 0) <= 0) { LogError << "Socker Server send message to IP:" << ip << " failed, message:" << message; return false; } else { char clientFeedBack[256] = { 0 }; if (recv(fd, clientFeedBack, sizeof(clientFeedBack), 0) > 0) { Json::Value feedBack; if (!getOrder(clientFeedBack, feedBack)) { LogError << "Get Client IP:" << ip << " feekBack format is error : " << clientFeedBack; return false; } if (feedBack.get("success", "").asString() == "true") { return true; } else { LogError << "Client FeedBack Error: " << feedBack.get("error_msg", "").asString(); return false; } } else { LogError << "Socker Server send message to IP:" << ip << " successful, But recv error!"; return false; } } } } } catch (const std::exception&) { LogError << "Socker Server send message to IP:" << ip << " failed, message:" << message; return false; } LogError << "Socker Server send message to IP:" << ip << " failed, because no find connection"; return false; } bool SocketEngine::SendAllClient(std::map > mmap, const std::string message) { try { std::map >::iterator it = mmap.begin(); for (; it != mmap.end(); ++it) { int fd = it->first; LogInfo << "Socket send IP: " << it->second.first << " msg:" << message; if (send(fd, message.c_str(), message.size(), MSG_NOSIGNAL) <= 0) { LogError << "Socker Server send message to IP:" << it->second.first << " failed, message:" << message; return false; } else { // char clientFeedBack[256] = { 0 }; // if (recv(fd, clientFeedBack, sizeof(clientFeedBack), 0) > 0) // { // Json::Value feedBack; // if (!getOrder(clientFeedBack, feedBack)) { // LogError << "Get Client IP: "<< it->second.first << " feekBack format is error : " << clientFeedBack; // return false; // } // if (feedBack.get("success", "").asString() == "true") // { // return true; // } // else // { // LogError << "Client FeedBack Error: " << feedBack.get("error_msg", "").asString(); // return false; // } // } // else { // LogError << "Socker Server send message to IP:" << it->second.first << " successful, But recv error!"; // return false; // } } } } catch (const std::exception&) { LogError << "Socker Server send message to all client failed, message:" << message; return false; } return false; } std::string SocketEngine::HexToStr(const std::string &str) { std::string hex = str; long len = hex.length(); std::string newString; for (long i = 0; i < len; i += 2) { std::string byte = hex.substr(i, 2); char chr = (char)(int)strtol(byte.c_str(), NULL, 16); newString.push_back(chr); } return newString; } std::string SocketEngine::StrToHex(const std::string &str) { unsigned char c; char buf[2]; std::string result = ""; std::stringstream ss; ss << str; while (ss.read((char*)(&c), sizeof(c))) { sprintf(buf, "%02x", c); result += buf; } return result; }