// PHZ // 2018-6-10 #include "RtspConnection.h" #include "RtspServer.h" #include "MediaSession.h" #include "MediaSource.h" #include "SocketUtil.h" #define USER_AGENT "-_-" #define RTSP_DEBUG 0 #define MAX_RTSP_MESSAGE_SIZE 2048 using namespace xop; using namespace std; RtspConnection::RtspConnection(std::shared_ptr rtsp, TaskScheduler *task_scheduler, SOCKET sockfd) : TcpConnection(task_scheduler, sockfd) , rtsp_(rtsp) , task_scheduler_(task_scheduler) , rtp_channel_(new Channel(sockfd)) , rtsp_request_(new RtspRequest) , rtsp_response_(new RtspResponse) { this->SetReadCallback([this](std::shared_ptr conn, xop::BufferReader& buffer) { return this->OnRead(buffer); }); this->SetCloseCallback([this](std::shared_ptr conn) { this->OnClose(); }); alive_count_ = 1; rtp_channel_->SetReadCallback([this]() { this->HandleRead(); }); rtp_channel_->SetWriteCallback([this]() { this->HandleWrite(); }); rtp_channel_->SetCloseCallback([this]() { this->HandleClose(); }); rtp_channel_->SetErrorCallback([this]() { this->HandleError(); }); for(int chn=0; chnhas_auth_info_) { has_auth_ = false; auth_info_.reset(new DigestAuthentication(rtsp->realm_, rtsp->username_, rtsp->password_)); } } RtspConnection::~RtspConnection() { } bool RtspConnection::OnRead(BufferReader& buffer) { KeepAlive(); int size = buffer.ReadableBytes(); if (size <= 0) { return false; //close } if (conn_mode_ == RTSP_SERVER) { if (!HandleRtspRequest(buffer)){ return false; } } else if (conn_mode_ == RTSP_PUSHER) { if (!HandleRtspResponse(buffer)) { return false; } } if (buffer.ReadableBytes() > MAX_RTSP_MESSAGE_SIZE) { buffer.RetrieveAll(); } return true; } void RtspConnection::OnClose() { if(session_id_ != 0) { auto rtsp = rtsp_.lock(); if (rtsp) { MediaSession::Ptr media_session = rtsp->LookMediaSession(session_id_); if (media_session) { media_session->RemoveClient(this->GetSocket()); } } } for(int chn=0; chnIsNoneEvent()) { task_scheduler_->RemoveChannel(rtcp_channels_[chn]); } } } bool RtspConnection::HandleRtspRequest(BufferReader& buffer) { #if RTSP_DEBUG string str(buffer.Peek(), buffer.ReadableBytes()); if (str.find("rtsp") != string::npos || str.find("RTSP") != string::npos) { std::cout << str << std::endl; } #endif if (rtsp_request_->ParseRequest(&buffer)) { RtspRequest::Method method = rtsp_request_->GetMethod(); if(method == RtspRequest::RTCP) { HandleRtcp(buffer); return true; } else if(!rtsp_request_->GotAll()) { return true; } switch (method) { case RtspRequest::OPTIONS: HandleCmdOption(); break; case RtspRequest::DESCRIBE: HandleCmdDescribe(); break; case RtspRequest::SETUP: HandleCmdSetup(); break; case RtspRequest::PLAY: HandleCmdPlay(); break; case RtspRequest::TEARDOWN: HandleCmdTeardown(); break; case RtspRequest::GET_PARAMETER: HandleCmdGetParamter(); break; default: break; } if (rtsp_request_->GotAll()) { rtsp_request_->Reset(); } } else { return false; } return true; } bool RtspConnection::HandleRtspResponse(BufferReader& buffer) { #if RTSP_DEBUG string str(buffer.Peek(), buffer.ReadableBytes()); if (str.find("rtsp") != string::npos || str.find("RTSP") != string::npos) { cout << str << endl; } #endif if (rtsp_response_->ParseResponse(&buffer)) { RtspResponse::Method method = rtsp_response_->GetMethod(); switch (method) { case RtspResponse::OPTIONS: if (conn_mode_ == RTSP_PUSHER) { SendAnnounce(); } break; case RtspResponse::ANNOUNCE: case RtspResponse::DESCRIBE: SendSetup(); break; case RtspResponse::SETUP: SendSetup(); break; case RtspResponse::RECORD: HandleRecord(); break; default: break; } } else { return false; } return true; } void RtspConnection::SendRtspMessage(std::shared_ptr buf, uint32_t size) { #if RTSP_DEBUG cout << buf.get() << endl; #endif this->Send(buf, size); return; } void RtspConnection::HandleRtcp(BufferReader& buffer) { char *peek = buffer.Peek(); if(peek[0] == '$' && buffer.ReadableBytes() > 4) { uint32_t pkt_size = peek[2]<<8 | peek[3]; if(pkt_size +4 >= buffer.ReadableBytes()) { buffer.Retrieve(pkt_size +4); } } } void RtspConnection::HandleRtcp(SOCKET sockfd) { char buf[1024] = {0}; if(recv(sockfd, buf, 1024, 0) > 0) { KeepAlive(); } } void RtspConnection::HandleCmdOption() { std::shared_ptr res(new char[2048], std::default_delete()); int size = rtsp_request_->BuildOptionRes(res.get(), 2048); this->SendRtspMessage(res, size); } void RtspConnection::HandleCmdDescribe() { if (auth_info_!=nullptr && !HandleAuthentication()) { return; } if (rtp_conn_ == nullptr) { rtp_conn_.reset(new RtpConnection(shared_from_this())); } int size = 0; std::shared_ptr res(new char[4096], std::default_delete()); MediaSession::Ptr media_session = nullptr; auto rtsp = rtsp_.lock(); if (rtsp) { media_session = rtsp->LookMediaSession(rtsp_request_->GetRtspUrlSuffix()); } if(!rtsp || !media_session) { size = rtsp_request_->BuildNotFoundRes(res.get(), 4096); } else { session_id_ = media_session->GetMediaSessionId(); media_session->AddClient(this->GetSocket(), rtp_conn_); for(int chn=0; chnGetMediaSource((MediaChannelId)chn); if(source != nullptr) { rtp_conn_->SetClockRate((MediaChannelId)chn, source->GetClockRate()); rtp_conn_->SetPayloadType((MediaChannelId)chn, source->GetPayloadType()); } } std::string sdp = media_session->GetSdpMessage(SocketUtil::GetSocketIp(this->GetSocket()), rtsp->GetVersion()); if(sdp == "") { size = rtsp_request_->BuildServerErrorRes(res.get(), 4096); } else { size = rtsp_request_->BuildDescribeRes(res.get(), 4096, sdp.c_str()); } } SendRtspMessage(res, size); return ; } void RtspConnection::HandleCmdSetup() { if (auth_info_ != nullptr && !HandleAuthentication()) { return; } int size = 0; std::shared_ptr res(new char[4096], std::default_delete()); MediaChannelId channel_id = rtsp_request_->GetChannelId(); MediaSession::Ptr media_session = nullptr; auto rtsp = rtsp_.lock(); if (rtsp) { media_session = rtsp->LookMediaSession(session_id_); } if(!rtsp || !media_session) { goto server_error; } if(media_session->IsMulticast()) { std::string multicast_ip = media_session->GetMulticastIp(); if(rtsp_request_->GetTransportMode() == RTP_OVER_MULTICAST) { uint16_t port = media_session->GetMulticastPort(channel_id); uint16_t session_id = rtp_conn_->GetRtpSessionId(); if (!rtp_conn_->SetupRtpOverMulticast(channel_id, multicast_ip.c_str(), port)) { goto server_error; } size = rtsp_request_->BuildSetupMulticastRes(res.get(), 4096, multicast_ip.c_str(), port, session_id); } else { goto transport_unsupport; } } else { if(rtsp_request_->GetTransportMode() == RTP_OVER_TCP) { uint16_t rtp_channel = rtsp_request_->GetRtpChannel(); uint16_t rtcp_channel = rtsp_request_->GetRtcpChannel(); uint16_t session_id = rtp_conn_->GetRtpSessionId(); rtp_conn_->SetupRtpOverTcp(channel_id, rtp_channel, rtcp_channel); size = rtsp_request_->BuildSetupTcpRes(res.get(), 4096, rtp_channel, rtcp_channel, session_id); } else if(rtsp_request_->GetTransportMode() == RTP_OVER_UDP) { uint16_t peer_rtp_port = rtsp_request_->GetRtpPort(); uint16_t peer_rtcp_port = rtsp_request_->GetRtcpPort(); uint16_t session_id = rtp_conn_->GetRtpSessionId(); if(rtp_conn_->SetupRtpOverUdp(channel_id, peer_rtp_port, peer_rtcp_port)) { SOCKET rtcp_fd = rtp_conn_->GetRtcpSocket(channel_id); rtcp_channels_[channel_id].reset(new Channel(rtcp_fd)); rtcp_channels_[channel_id]->SetReadCallback([rtcp_fd, this]() { this->HandleRtcp(rtcp_fd); }); rtcp_channels_[channel_id]->EnableReading(); task_scheduler_->UpdateChannel(rtcp_channels_[channel_id]); } else { goto server_error; } uint16_t serRtpPort = rtp_conn_->GetRtpPort(channel_id); uint16_t serRtcpPort = rtp_conn_->GetRtcpPort(channel_id); size = rtsp_request_->BuildSetupUdpRes(res.get(), 4096, serRtpPort, serRtcpPort, session_id); } else { goto transport_unsupport; } } SendRtspMessage(res, size); return ; transport_unsupport: size = rtsp_request_->BuildUnsupportedRes(res.get(), 4096); SendRtspMessage(res, size); return ; server_error: size = rtsp_request_->BuildServerErrorRes(res.get(), 4096); SendRtspMessage(res, size); return ; } void RtspConnection::HandleCmdPlay() { if (auth_info_ != nullptr) { if (!HandleAuthentication()) { return; } } if (rtp_conn_ == nullptr) { return; } conn_state_ = START_PLAY; rtp_conn_->Play(); uint16_t session_id = rtp_conn_->GetRtpSessionId(); std::shared_ptr res(new char[2048], std::default_delete()); int size = rtsp_request_->BuildPlayRes(res.get(), 2048, nullptr, session_id); SendRtspMessage(res, size); } void RtspConnection::HandleCmdTeardown() { if (rtp_conn_ == nullptr) { return; } rtp_conn_->Teardown(); uint16_t session_id = rtp_conn_->GetRtpSessionId(); std::shared_ptr res(new char[2048], std::default_delete()); int size = rtsp_request_->BuildTeardownRes(res.get(), 2048, session_id); SendRtspMessage(res, size); //HandleClose(); } void RtspConnection::HandleCmdGetParamter() { if (rtp_conn_ == nullptr) { return; } uint16_t session_id = rtp_conn_->GetRtpSessionId(); std::shared_ptr res(new char[2048], std::default_delete()); int size = rtsp_request_->BuildGetParamterRes(res.get(), 2048, session_id); SendRtspMessage(res, size); } bool RtspConnection::HandleAuthentication() { if (auth_info_ != nullptr && !has_auth_) { std::string cmd = rtsp_request_->MethodToString[rtsp_request_->GetMethod()]; std::string url = rtsp_request_->GetRtspUrl(); if (_nonce.size() > 0 && (auth_info_->GetResponse(_nonce, cmd, url) == rtsp_request_->GetAuthResponse())) { _nonce.clear(); has_auth_ = true; } else { std::shared_ptr res(new char[4096], std::default_delete()); _nonce = auth_info_->GetNonce(); int size = rtsp_request_->BuildUnauthorizedRes(res.get(), 4096, auth_info_->GetRealm().c_str(), _nonce.c_str()); SendRtspMessage(res, size); return false; } } return true; } void RtspConnection::SendOptions(ConnectionMode mode) { if (rtp_conn_ == nullptr) { rtp_conn_.reset(new RtpConnection(shared_from_this())); } auto rtsp = rtsp_.lock(); if (!rtsp) { HandleClose(); return; } conn_mode_ = mode; rtsp_response_->SetUserAgent(USER_AGENT); rtsp_response_->SetRtspUrl(rtsp->GetRtspUrl().c_str()); std::shared_ptr req(new char[2048], std::default_delete()); int size = rtsp_response_->BuildOptionReq(req.get(), 2048); SendRtspMessage(req, size); } void RtspConnection::SendAnnounce() { MediaSession::Ptr media_session = nullptr; auto rtsp = rtsp_.lock(); if (rtsp) { media_session = rtsp->LookMediaSession(1); } if (!rtsp || !media_session) { HandleClose(); return; } else { session_id_ = media_session->GetMediaSessionId(); media_session->AddClient(this->GetSocket(), rtp_conn_); for (int chn = 0; chn<2; chn++) { MediaSource* source = media_session->GetMediaSource((MediaChannelId)chn); if (source != nullptr) { rtp_conn_->SetClockRate((MediaChannelId)chn, source->GetClockRate()); rtp_conn_->SetPayloadType((MediaChannelId)chn, source->GetPayloadType()); } } } std::string sdp = media_session->GetSdpMessage(SocketUtil::GetSocketIp(this->GetSocket()), rtsp->GetVersion()); if (sdp == "") { HandleClose(); return; } std::shared_ptr req(new char[4096], std::default_delete()); int size = rtsp_response_->BuildAnnounceReq(req.get(), 4096, sdp.c_str()); SendRtspMessage(req, size); } void RtspConnection::SendDescribe() { std::shared_ptr req(new char[2048], std::default_delete()); int size = rtsp_response_->BuildDescribeReq(req.get(), 2048); SendRtspMessage(req, size); } void RtspConnection::SendSetup() { int size = 0; std::shared_ptr buf(new char[2048], std::default_delete()); MediaSession::Ptr media_session = nullptr; auto rtsp = rtsp_.lock(); if (rtsp) { media_session = rtsp->LookMediaSession(session_id_); } if (!rtsp || !media_session) { HandleClose(); return; } if (media_session->GetMediaSource(channel_0) && !rtp_conn_->IsSetup(channel_0)) { rtp_conn_->SetupRtpOverTcp(channel_0, 0, 1); size = rtsp_response_->BuildSetupTcpReq(buf.get(), 2048, channel_0); } else if (media_session->GetMediaSource(channel_1) && !rtp_conn_->IsSetup(channel_1)) { rtp_conn_->SetupRtpOverTcp(channel_1, 2, 3); size = rtsp_response_->BuildSetupTcpReq(buf.get(), 2048, channel_1); } else { size = rtsp_response_->BuildRecordReq(buf.get(), 2048); } SendRtspMessage(buf, size); } void RtspConnection::HandleRecord() { conn_state_ = START_PUSH; rtp_conn_->Record(); }