156 lines
2.8 KiB
C++
156 lines
2.8 KiB
C++
|
|
#include "TcpConnection.h"
|
||
|
|
#include "SocketUtil.h"
|
||
|
|
|
||
|
|
using namespace xop;
|
||
|
|
|
||
|
|
TcpConnection::TcpConnection(TaskScheduler *task_scheduler, SOCKET sockfd)
|
||
|
|
: task_scheduler_(task_scheduler)
|
||
|
|
, read_buffer_(new BufferReader)
|
||
|
|
, write_buffer_(new BufferWriter(500))
|
||
|
|
, channel_(new Channel(sockfd))
|
||
|
|
{
|
||
|
|
is_closed_ = false;
|
||
|
|
|
||
|
|
channel_->SetReadCallback([this]() { this->HandleRead(); });
|
||
|
|
channel_->SetWriteCallback([this]() { this->HandleWrite(); });
|
||
|
|
channel_->SetCloseCallback([this]() { this->HandleClose(); });
|
||
|
|
channel_->SetErrorCallback([this]() { this->HandleError(); });
|
||
|
|
|
||
|
|
SocketUtil::SetNonBlock(sockfd);
|
||
|
|
SocketUtil::SetSendBufSize(sockfd, 100 * 1024);
|
||
|
|
SocketUtil::SetKeepAlive(sockfd);
|
||
|
|
|
||
|
|
channel_->EnableReading();
|
||
|
|
task_scheduler_->UpdateChannel(channel_);
|
||
|
|
}
|
||
|
|
|
||
|
|
TcpConnection::~TcpConnection()
|
||
|
|
{
|
||
|
|
SOCKET fd = channel_->GetSocket();
|
||
|
|
if (fd > 0) {
|
||
|
|
SocketUtil::Close(fd);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void TcpConnection::Send(std::shared_ptr<char> data, uint32_t size)
|
||
|
|
{
|
||
|
|
if (!is_closed_) {
|
||
|
|
mutex_.lock();
|
||
|
|
write_buffer_->Append(data, size);
|
||
|
|
mutex_.unlock();
|
||
|
|
|
||
|
|
this->HandleWrite();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void TcpConnection::Send(const char *data, uint32_t size)
|
||
|
|
{
|
||
|
|
if (!is_closed_) {
|
||
|
|
mutex_.lock();
|
||
|
|
write_buffer_->Append(data, size);
|
||
|
|
mutex_.unlock();
|
||
|
|
|
||
|
|
this->HandleWrite();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void TcpConnection::Disconnect()
|
||
|
|
{
|
||
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
||
|
|
auto conn = shared_from_this();
|
||
|
|
task_scheduler_->AddTriggerEvent([conn]() {
|
||
|
|
conn->Close();
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
void TcpConnection::HandleRead()
|
||
|
|
{
|
||
|
|
{
|
||
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
||
|
|
|
||
|
|
if (is_closed_) {
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
int ret = read_buffer_->Read(channel_->GetSocket());
|
||
|
|
if (ret <= 0) {
|
||
|
|
this->Close();
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if (read_cb_) {
|
||
|
|
bool ret = read_cb_(shared_from_this(), *read_buffer_);
|
||
|
|
if (false == ret) {
|
||
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
||
|
|
this->Close();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void TcpConnection::HandleWrite()
|
||
|
|
{
|
||
|
|
if (is_closed_) {
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
//std::lock_guard<std::mutex> lock(mutex_);
|
||
|
|
if (!mutex_.try_lock()) {
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
int ret = 0;
|
||
|
|
bool empty = false;
|
||
|
|
do
|
||
|
|
{
|
||
|
|
ret = write_buffer_->Send(channel_->GetSocket());
|
||
|
|
if (ret < 0) {
|
||
|
|
this->Close();
|
||
|
|
mutex_.unlock();
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
empty = write_buffer_->IsEmpty();
|
||
|
|
} while (0);
|
||
|
|
|
||
|
|
if (empty) {
|
||
|
|
if (channel_->IsWriting()) {
|
||
|
|
channel_->DisableWriting();
|
||
|
|
task_scheduler_->UpdateChannel(channel_);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
else if(!channel_->IsWriting()) {
|
||
|
|
channel_->EnableWriting();
|
||
|
|
task_scheduler_->UpdateChannel(channel_);
|
||
|
|
}
|
||
|
|
|
||
|
|
mutex_.unlock();
|
||
|
|
}
|
||
|
|
|
||
|
|
void TcpConnection::Close()
|
||
|
|
{
|
||
|
|
if (!is_closed_) {
|
||
|
|
is_closed_ = true;
|
||
|
|
task_scheduler_->RemoveChannel(channel_);
|
||
|
|
|
||
|
|
if (close_cb_) {
|
||
|
|
close_cb_(shared_from_this());
|
||
|
|
}
|
||
|
|
|
||
|
|
if (disconnect_cb_) {
|
||
|
|
disconnect_cb_(shared_from_this());
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void TcpConnection::HandleClose()
|
||
|
|
{
|
||
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
||
|
|
this->Close();
|
||
|
|
}
|
||
|
|
|
||
|
|
void TcpConnection::HandleError()
|
||
|
|
{
|
||
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
||
|
|
this->Close();
|
||
|
|
}
|