VCarContainer/engine/WSServerEngine/WSServerEngine.cpp

210 lines
6.1 KiB
C++
Raw Normal View History

2024-11-24 00:58:06 +00:00
#include "WSServerEngine.h"
#include "AppCommon.h"
using namespace ai_matrix;
WSServerEngine::WSServerEngine() {}
WSServerEngine::~WSServerEngine() {}
APP_ERROR WSServerEngine::Init()
{
strPort0_ = engineName_ + "_" + std::to_string(engineId_) + "_0";
strPort1_ = engineName_ + "_" + std::to_string(engineId_) + "_1";
this->wsServerConfig_ = Config::getins()->getWSocketConfig();
LogInfo << "engineId_:" << engineId_ << " WSServerEngine Init ok";
return APP_ERR_OK;
}
APP_ERROR WSServerEngine::DeInit()
{
if (this->broadcastThread_.joinable())
{
this->broadcastThread_.join();
}
this->stop();
if (this->ws_thr_.joinable())
{
this->ws_thr_.join();
}
LogInfo << "WSServerEngine engineId_:" << engineId_ << " DeInit ok";
return APP_ERR_OK;
}
void WSServerEngine::onWSSegment(WSConn &conn, uint8_t opcode, const uint8_t *payload, uint32_t pl_len, uint32_t pl_start_idx,
bool fin)
{
LogError << "error: onWSSegment should not be called";
}
void WSServerEngine::onWSMsg(WSConn &conn, uint8_t opcode, const uint8_t *payload, uint32_t pl_len)
{
if (opcode == websocket::OPCODE_PING)
{
conn.send(websocket::OPCODE_PONG, payload, pl_len);
return;
}
const char *data = (const char *)payload;
const char *data_end = data + pl_len;
char buf[4096] = {0};
const char *argv[4096];
char *out = buf + 1;
int argc = 0;
bool in_quote = false;
bool single_quote = false;
std::string ss = (char *)payload;
while (data < data_end)
{
char ch = *data++;
if (!in_quote)
{
if (ch == ' ')
*out++ = 0;
else
{
if (*(out - 1) == 0)
argv[argc++] = out;
if (ch == '\'')
in_quote = single_quote = true;
else if (ch == '"')
in_quote = true;
else if (ch == '\\')
*out++ = *data++;
else
*out++ = ch;
}
}
else
{
if (single_quote)
{
if (ch == '\'')
in_quote = single_quote = false;
else
*out++ = ch;
}
else
{
if (ch == '"')
in_quote = false;
else if (ch == '\\' && (*data == '\\' || *data == '"'))
*out++ = *data++;
else
*out++ = ch;
}
}
}
if (argc)
{
*out = 0;
struct sockaddr_in addr;
conn.getPeername(addr);
std::string ip = inet_ntoa(addr.sin_addr);
ip.append(":");
ip.append(std::to_string(ntohs(addr.sin_port)));
CMDConnData cmdConnData;
cmdConnData.ip = ip;
std::string resp = onCMD(cmdConnData, argc, argv);
if (resp.size())
{
// conn.send(websocket::OPCODE_TEXT, (const uint8_t *)resp.data(), resp.size());
}
}
}
void WSServerEngine::onWSClose(WSConn &conn, uint16_t status_code, const char *reason)
{
CMDConnData cdata = conn.user_data;
LogInfo << cdata.ip << ",ws close, status_code: " << status_code << ", reason: " << reason;
}
bool WSServerEngine::onWSConnect(WSConn &conn, const char *request_uri, const char *host, const char *origin, const char *protocol,
const char *extensions, char *resp_protocol, uint32_t resp_protocol_size, char *resp_extensions,
uint32_t resp_extensions_size)
{
struct sockaddr_in addr;
conn.getPeername(addr);
std::string ip = inet_ntoa(addr.sin_addr);
ip.append(":");
ip.append(std::to_string(ntohs(addr.sin_port)));
LogInfo << "ws connection from: " << ip ;
return true;
}
void WSServerEngine::stop()
{
this->running_ = false;
}
std::string WSServerEngine::onCMD(CMDConnData &conn, int argc, const char **argv)
{
std::string msg = argv[0];
2025-07-12 09:16:46 +00:00
if (msg == "heartbeat") return "";
std::string strShow = "VCarContainer 收到 " + conn.ip + " 的信息:" + msg;
2024-11-24 00:58:06 +00:00
LogInfo << strShow;
outputQueMap_[strPort0_]->push(std::static_pointer_cast<std::string>(std::make_shared<std::string>(msg)), true);
this->sendAllClient(strShow);
2025-07-12 09:16:46 +00:00
return "";
2024-11-24 00:58:06 +00:00
}
void WSServerEngine::sendAllClient(const std::string &msg)
{
LogInfo << "发送广播:" << msg;
wsserver_.sendMsg(msg);
}
APP_ERROR WSServerEngine::Process()
{
this->broadcastThread_ = std::thread([this](){
while (!this->isStop_)
{
std::shared_ptr<void> pData = nullptr;
inputQueMap_[strPort0_]->pop(pData, true);
if (nullptr == pData)
{
usleep(1000); //1ms
continue;
}
std::shared_ptr<std::string> sdata = std::static_pointer_cast<std::string>(pData);
this->sendAllClient(*sdata);
}
});
while (!this->isStop_)
{
this->listenfd_ = wsserver_.init("0.0.0.0",
this->wsServerConfig_.iPort,
// 1000,
1000);
if (!this->listenfd_)
{
LogError << "Web socket server init failed ! Listen port: " << this->wsServerConfig_.iPort
<< " Error:" << this->wsserver_.getLastError();
usleep(1000*1000);
continue;
}
this->running_ = true;
this->ws_thr_ = std::thread([this]()
{
while (!this->isStop_ ) {
this->wsserver_.poll(this);
std::this_thread::yield();
usleep(1000);
}
});
LogInfo << "Server running...";
this->ws_thr_.join();
LogInfo << "Server stopped...";
usleep(1000*1000);
}
return APP_ERR_OK;
}