156 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			C++
		
	
	
	
		
		
			
		
	
	
			156 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			C++
		
	
	
	
|  | #include "TcpConnection.h"
 | ||
|  | #include "SocketUtil.h"
 | ||
|  | 
 | ||
|  | using namespace xop; | ||
|  | 
 | ||
|  | TcpConnection::TcpConnection(TaskScheduler *task_scheduler, SOCKET sockfd) | ||
|  | 	: task_scheduler_(task_scheduler) | ||
|  | 	, read_buffer_(new BufferReader) | ||
|  | 	, write_buffer_(new BufferWriter(500)) | ||
|  | 	, channel_(new Channel(sockfd)) | ||
|  | { | ||
|  | 	is_closed_ = false; | ||
|  | 
 | ||
|  | 	channel_->SetReadCallback([this]() { this->HandleRead(); }); | ||
|  | 	channel_->SetWriteCallback([this]() { this->HandleWrite(); }); | ||
|  | 	channel_->SetCloseCallback([this]() { this->HandleClose(); }); | ||
|  | 	channel_->SetErrorCallback([this]() { this->HandleError(); }); | ||
|  | 
 | ||
|  | 	SocketUtil::SetNonBlock(sockfd); | ||
|  | 	SocketUtil::SetSendBufSize(sockfd, 100 * 1024); | ||
|  | 	SocketUtil::SetKeepAlive(sockfd); | ||
|  | 
 | ||
|  | 	channel_->EnableReading(); | ||
|  | 	task_scheduler_->UpdateChannel(channel_); | ||
|  | } | ||
|  | 
 | ||
|  | TcpConnection::~TcpConnection() | ||
|  | { | ||
|  | 	SOCKET fd = channel_->GetSocket(); | ||
|  | 	if (fd > 0) { | ||
|  | 		SocketUtil::Close(fd); | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | void TcpConnection::Send(std::shared_ptr<char> data, uint32_t size) | ||
|  | { | ||
|  | 	if (!is_closed_) { | ||
|  | 		mutex_.lock(); | ||
|  | 		write_buffer_->Append(data, size); | ||
|  | 		mutex_.unlock(); | ||
|  | 
 | ||
|  | 		this->HandleWrite(); | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | void TcpConnection::Send(const char *data, uint32_t size) | ||
|  | { | ||
|  | 	if (!is_closed_) { | ||
|  | 		mutex_.lock(); | ||
|  | 		write_buffer_->Append(data, size); | ||
|  | 		mutex_.unlock(); | ||
|  | 
 | ||
|  | 		this->HandleWrite(); | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | void TcpConnection::Disconnect() | ||
|  | { | ||
|  | 	std::lock_guard<std::mutex> lock(mutex_); | ||
|  | 	auto conn = shared_from_this(); | ||
|  | 	task_scheduler_->AddTriggerEvent([conn]() { | ||
|  | 		conn->Close(); | ||
|  | 	}); | ||
|  | } | ||
|  | 
 | ||
|  | void TcpConnection::HandleRead() | ||
|  | { | ||
|  | 	{ | ||
|  | 		std::lock_guard<std::mutex> lock(mutex_); | ||
|  | 
 | ||
|  | 		if (is_closed_) { | ||
|  | 			return; | ||
|  | 		} | ||
|  | 		 | ||
|  | 		int ret = read_buffer_->Read(channel_->GetSocket()); | ||
|  | 		if (ret <= 0) { | ||
|  | 			this->Close(); | ||
|  | 			return; | ||
|  | 		} | ||
|  | 	} | ||
|  | 
 | ||
|  | 	if (read_cb_) { | ||
|  | 		bool ret = read_cb_(shared_from_this(), *read_buffer_); | ||
|  | 		if (false == ret) { | ||
|  | 			std::lock_guard<std::mutex> lock(mutex_); | ||
|  | 			this->Close(); | ||
|  | 		} | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | void TcpConnection::HandleWrite() | ||
|  | { | ||
|  | 	if (is_closed_) { | ||
|  | 		return; | ||
|  | 	} | ||
|  | 	 | ||
|  | 	//std::lock_guard<std::mutex> lock(mutex_);
 | ||
|  | 	if (!mutex_.try_lock()) { | ||
|  | 		return; | ||
|  | 	} | ||
|  | 
 | ||
|  | 	int ret = 0; | ||
|  | 	bool empty = false; | ||
|  | 	do | ||
|  | 	{ | ||
|  | 		ret = write_buffer_->Send(channel_->GetSocket()); | ||
|  | 		if (ret < 0) { | ||
|  | 			this->Close(); | ||
|  | 			mutex_.unlock(); | ||
|  | 			return; | ||
|  | 		} | ||
|  | 		empty = write_buffer_->IsEmpty(); | ||
|  | 	} while (0); | ||
|  | 
 | ||
|  | 	if (empty) { | ||
|  | 		if (channel_->IsWriting()) { | ||
|  | 			channel_->DisableWriting(); | ||
|  | 			task_scheduler_->UpdateChannel(channel_); | ||
|  | 		} | ||
|  | 	} | ||
|  | 	else if(!channel_->IsWriting()) { | ||
|  | 		channel_->EnableWriting(); | ||
|  | 		task_scheduler_->UpdateChannel(channel_); | ||
|  | 	} | ||
|  | 
 | ||
|  | 	mutex_.unlock(); | ||
|  | } | ||
|  | 
 | ||
|  | void TcpConnection::Close() | ||
|  | { | ||
|  | 	if (!is_closed_) { | ||
|  | 		is_closed_ = true; | ||
|  | 		task_scheduler_->RemoveChannel(channel_); | ||
|  | 
 | ||
|  | 		if (close_cb_) { | ||
|  | 			close_cb_(shared_from_this()); | ||
|  | 		}			 | ||
|  | 
 | ||
|  | 		if (disconnect_cb_) { | ||
|  | 			disconnect_cb_(shared_from_this()); | ||
|  | 		}	 | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | void TcpConnection::HandleClose() | ||
|  | { | ||
|  | 	std::lock_guard<std::mutex> lock(mutex_); | ||
|  | 	this->Close(); | ||
|  | } | ||
|  | 
 | ||
|  | void TcpConnection::HandleError() | ||
|  | { | ||
|  | 	std::lock_guard<std::mutex> lock(mutex_); | ||
|  | 	this->Close(); | ||
|  | } |