VTrain_BothSides/tools/rtsp_server/net/SelectTaskScheduler.cpp

203 lines
3.9 KiB
C++

// PHZ
// 2018-5-15
#include "SelectTaskScheduler.h"
#include "Logger.h"
#include "Timer.h"
#include <cstring>
#include <forward_list>
using namespace xop;
#define SELECT_CTL_ADD 0
#define SELECT_CTL_MOD 1
#define SELECT_CTL_DEL 2
SelectTaskScheduler::SelectTaskScheduler(int id)
: TaskScheduler(id)
{
FD_ZERO(&fd_read_backup_);
FD_ZERO(&fd_write_backup_);
FD_ZERO(&fd_exp_backup_);
this->UpdateChannel(wakeup_channel_);
}
SelectTaskScheduler::~SelectTaskScheduler()
{
}
void SelectTaskScheduler::UpdateChannel(ChannelPtr channel)
{
std::lock_guard<std::mutex> lock(mutex_);
SOCKET socket = channel->GetSocket();
if(channels_.find(socket) != channels_.end()) {
if(channel->IsNoneEvent()) {
is_fd_read_reset_ = true;
is_fd_write_reset_ = true;
is_fd_exp_reset_ = true;
channels_.erase(socket);
}
else {
//is_fd_read_reset_ = true;
is_fd_write_reset_ = true;
}
}
else {
if(!channel->IsNoneEvent()) {
channels_.emplace(socket, channel);
is_fd_read_reset_ = true;
is_fd_write_reset_ = true;
is_fd_exp_reset_ = true;
}
}
}
void SelectTaskScheduler::RemoveChannel(ChannelPtr& channel)
{
std::lock_guard<std::mutex> lock(mutex_);
SOCKET fd = channel->GetSocket();
if(channels_.find(fd) != channels_.end()) {
is_fd_read_reset_ = true;
is_fd_write_reset_ = true;
is_fd_exp_reset_ = true;
channels_.erase(fd);
}
}
bool SelectTaskScheduler::HandleEvent(int timeout)
{
if(channels_.empty()) {
if (timeout <= 0) {
timeout = 10;
}
Timer::Sleep(timeout);
return true;
}
fd_set fd_read;
fd_set fd_write;
fd_set fd_exp;
FD_ZERO(&fd_read);
FD_ZERO(&fd_write);
FD_ZERO(&fd_exp);
bool fd_read_reset = false;
bool fd_write_reset = false;
bool fd_exp_reset = false;
if(is_fd_read_reset_ || is_fd_write_reset_ || is_fd_exp_reset_) {
if (is_fd_exp_reset_) {
maxfd_ = 0;
}
std::lock_guard<std::mutex> lock(mutex_);
for(auto iter : channels_) {
int events = iter.second->GetEvents();
SOCKET fd = iter.second->GetSocket();
if (is_fd_read_reset_ && (events&EVENT_IN)) {
FD_SET(fd, &fd_read);
}
if(is_fd_write_reset_ && (events&EVENT_OUT)) {
FD_SET(fd, &fd_write);
}
if(is_fd_exp_reset_) {
FD_SET(fd, &fd_exp);
if(fd > maxfd_) {
maxfd_ = fd;
}
}
}
fd_read_reset = is_fd_read_reset_;
fd_write_reset = is_fd_write_reset_;
fd_exp_reset = is_fd_exp_reset_;
is_fd_read_reset_ = false;
is_fd_write_reset_ = false;
is_fd_exp_reset_ = false;
}
if(fd_read_reset) {
FD_ZERO(&fd_read_backup_);
memcpy(&fd_read_backup_, &fd_read, sizeof(fd_set));
}
else {
memcpy(&fd_read, &fd_read_backup_, sizeof(fd_set));
}
if(fd_write_reset) {
FD_ZERO(&fd_write_backup_);
memcpy(&fd_write_backup_, &fd_write, sizeof(fd_set));
}
else {
memcpy(&fd_write, &fd_write_backup_, sizeof(fd_set));
}
if(fd_exp_reset) {
FD_ZERO(&fd_exp_backup_);
memcpy(&fd_exp_backup_, &fd_exp, sizeof(fd_set));
}
else {
memcpy(&fd_exp, &fd_exp_backup_, sizeof(fd_set));
}
if(timeout <= 0) {
timeout = 10;
}
struct timeval tv = { timeout/1000, timeout%1000*1000 };
int ret = select((int)maxfd_+1, &fd_read, &fd_write, &fd_exp, &tv);
if (ret < 0) {
#if defined(__linux) || defined(__linux__)
if(errno == EINTR) {
return true;
}
#endif
return false;
}
std::forward_list<std::pair<ChannelPtr, int>> event_list;
if(ret > 0) {
std::lock_guard<std::mutex> lock(mutex_);
for(auto iter : channels_) {
int events = 0;
SOCKET socket = iter.second->GetSocket();
if (FD_ISSET(socket, &fd_read)) {
events |= EVENT_IN;
}
if (FD_ISSET(socket, &fd_write)) {
events |= EVENT_OUT;
}
if (FD_ISSET(socket, &fd_exp)) {
events |= (EVENT_HUP); // close
}
if(events != 0) {
event_list.emplace_front(iter.second, events);
}
}
}
for(auto& iter: event_list) {
iter.first->HandleEvent(iter.second);
}
return true;
}