#include "WSServerEngine.h" #include "AppCommon.h" using namespace ai_matrix; WSServerEngine::WSServerEngine() {} WSServerEngine::~WSServerEngine() {} APP_ERROR WSServerEngine::Init() { strPort0_ = engineName_ + "_" + std::to_string(engineId_) + "_0"; strPort1_ = engineName_ + "_" + std::to_string(engineId_) + "_1"; this->wsServerConfig_ = Config::getins()->getWSocketConfig(); LogInfo << "engineId_:" << engineId_ << " WSServerEngine Init ok"; return APP_ERR_OK; } APP_ERROR WSServerEngine::DeInit() { if (this->broadcastThread_.joinable()) { this->broadcastThread_.join(); } this->stop(); if (this->ws_thr_.joinable()) { this->ws_thr_.join(); } LogInfo << "WSServerEngine engineId_:" << engineId_ << " DeInit ok"; return APP_ERR_OK; } void WSServerEngine::onWSSegment(WSConn &conn, uint8_t opcode, const uint8_t *payload, uint32_t pl_len, uint32_t pl_start_idx, bool fin) { LogError << "error: onWSSegment should not be called"; } void WSServerEngine::onWSMsg(WSConn &conn, uint8_t opcode, const uint8_t *payload, uint32_t pl_len) { if (opcode == websocket::OPCODE_PING) { conn.send(websocket::OPCODE_PONG, payload, pl_len); return; } const char *data = (const char *)payload; const char *data_end = data + pl_len; char buf[4096] = {0}; const char *argv[4096]; char *out = buf + 1; int argc = 0; bool in_quote = false; bool single_quote = false; std::string ss = (char *)payload; while (data < data_end) { char ch = *data++; if (!in_quote) { if (ch == ' ') *out++ = 0; else { if (*(out - 1) == 0) argv[argc++] = out; if (ch == '\'') in_quote = single_quote = true; else if (ch == '"') in_quote = true; else if (ch == '\\') *out++ = *data++; else *out++ = ch; } } else { if (single_quote) { if (ch == '\'') in_quote = single_quote = false; else *out++ = ch; } else { if (ch == '"') in_quote = false; else if (ch == '\\' && (*data == '\\' || *data == '"')) *out++ = *data++; else *out++ = ch; } } } if (argc) { *out = 0; struct sockaddr_in addr; conn.getPeername(addr); std::string ip = inet_ntoa(addr.sin_addr); ip.append(":"); ip.append(std::to_string(ntohs(addr.sin_port))); CMDConnData cmdConnData; cmdConnData.ip = ip; std::string resp = onCMD(cmdConnData, argc, argv); if (resp.size()) { // conn.send(websocket::OPCODE_TEXT, (const uint8_t *)resp.data(), resp.size()); } } } void WSServerEngine::onWSClose(WSConn &conn, uint16_t status_code, const char *reason) { CMDConnData cdata = conn.user_data; LogInfo << cdata.ip << ",ws close, status_code: " << status_code << ", reason: " << reason; } bool WSServerEngine::onWSConnect(WSConn &conn, const char *request_uri, const char *host, const char *origin, const char *protocol, const char *extensions, char *resp_protocol, uint32_t resp_protocol_size, char *resp_extensions, uint32_t resp_extensions_size) { struct sockaddr_in addr; conn.getPeername(addr); std::string ip = inet_ntoa(addr.sin_addr); ip.append(":"); ip.append(std::to_string(ntohs(addr.sin_port))); LogInfo << "ws connection from: " << ip ; return true; } void WSServerEngine::stop() { this->running_ = false; } std::string WSServerEngine::onCMD(CMDConnData &conn, int argc, const char **argv) { std::string msg = argv[0]; std::string strShow = "VCarContainer 收到 " + conn.ip + " 的信息:" + msg; LogInfo << strShow; outputQueMap_[strPort0_]->push(std::static_pointer_cast(std::make_shared(msg)), true); this->sendAllClient(strShow); } void WSServerEngine::sendAllClient(const std::string &msg) { LogInfo << "发送广播:" << msg; wsserver_.sendMsg(msg); } APP_ERROR WSServerEngine::Process() { this->broadcastThread_ = std::thread([this](){ while (!this->isStop_) { std::shared_ptr pData = nullptr; inputQueMap_[strPort0_]->pop(pData, true); if (nullptr == pData) { usleep(1000); //1ms continue; } std::shared_ptr sdata = std::static_pointer_cast(pData); this->sendAllClient(*sdata); } }); while (!this->isStop_) { this->listenfd_ = wsserver_.init("0.0.0.0", this->wsServerConfig_.iPort, // 1000, 1000); if (!this->listenfd_) { LogError << "Web socket server init failed ! Listen port: " << this->wsServerConfig_.iPort << " Error:" << this->wsserver_.getLastError(); usleep(1000*1000); continue; } this->running_ = true; this->ws_thr_ = std::thread([this]() { while (!this->isStop_ ) { this->wsserver_.poll(this); std::this_thread::yield(); usleep(1000); } }); LogInfo << "Server running..."; this->ws_thr_.join(); LogInfo << "Server stopped..."; usleep(1000*1000); } return APP_ERR_OK; }