#include "RTSPDecoder.h" #include #include #include #include #include #define MAX_RETRY_COUNT 5 #define RETRY_DELAY_MS 1000 RTSPDecoder::RTSPDecoder() { avformat_network_init(); } RTSPDecoder::~RTSPDecoder() { // 停止所有解码线程 for (auto& stream : streams_) { if (stream && stream->running) { stream->running = false; if (stream->decode_thread.joinable()) { stream->decode_thread.join(); } } cleanupStream(stream.get()); } avformat_network_deinit(); } bool RTSPDecoder::init(const DecoderConfig& config) { if (initialized_) return true; config_ = config; // 验证配置 if (config_.max_streams < 1 || config_.max_streams > 60) { throw std::invalid_argument("Max streams must be between 1 and 60"); } // 初始化GPU分配 if (config_.use_hw_accel) { int gpu_count = getGPUCount(); if (gpu_count == 0) { config_.use_hw_accel = false; } else if (config_.gpu_id >= 0 && config_.gpu_id < gpu_count) { next_gpu_id_ = config_.gpu_id; } } initialized_ = true; return true; } int RTSPDecoder::addStream(const std::string& rtsp_url) { if (!initialized_) { throw std::runtime_error("Decoder not initialized"); } std::lock_guard lock(streams_mutex_); if (streams_.size() >= config_.max_streams) { throw std::runtime_error("Maximum number of streams reached"); } auto ctx = std::make_unique(); int stream_id = static_cast(streams_.size()); // 分配GPU int gpu_id = config_.use_hw_accel ? allocateGPU() : -1; // 初始化格式上下文 ctx->format_ctx = avformat_alloc_context(); if (!ctx->format_ctx) { throw std::runtime_error("Failed to allocate format context"); } // 设置RTSP参数 AVDictionary* options = nullptr; av_dict_set(&options, "rtsp_transport", "tcp", 0); av_dict_set(&options, "stimeout", "5000000", 0); // 5秒超时 // 打开输入流 int retry_count = 0; while (retry_count < MAX_RETRY_COUNT) { int ret = avformat_open_input(&ctx->format_ctx, rtsp_url.c_str(), nullptr, &options); if (ret == 0) break; retry_count++; if (retry_count < MAX_RETRY_COUNT) { std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_DELAY_MS)); } } av_dict_free(&options); if (retry_count == MAX_RETRY_COUNT) { cleanupStream(ctx.get()); throw std::runtime_error("Failed to open RTSP stream after retries"); } // 查找流信息 if (avformat_find_stream_info(ctx->format_ctx, nullptr) < 0) { cleanupStream(ctx.get()); throw std::runtime_error("Failed to find stream information"); } // 查找视频流 for (unsigned int i = 0; i < ctx->format_ctx->nb_streams; i++) { if (ctx->format_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { ctx->video_stream_idx = i; break; } } if (ctx->video_stream_idx == -1) { cleanupStream(ctx.get()); throw std::runtime_error("No video stream found"); } // 获取解码器参数 AVCodecParameters* codecpar = ctx->format_ctx->streams[ctx->video_stream_idx]->codecpar; // 查找解码器 const AVCodec* codec = nullptr; if (config_.use_hw_accel && gpu_id >= 0) { codec = avcodec_find_decoder(codecpar->codec_id); } else { codec = avcodec_find_decoder(codecpar->codec_id); } if (!codec) { cleanupStream(ctx.get()); throw std::runtime_error("Unsupported codec"); } // 创建解码器上下文 ctx->codec_ctx = avcodec_alloc_context3(codec); if (!ctx->codec_ctx) { cleanupStream(ctx.get()); throw std::runtime_error("Failed to allocate codec context"); } // 复制参数到解码器上下文 if (avcodec_parameters_to_context(ctx->codec_ctx, codecpar) < 0) { cleanupStream(ctx.get()); throw std::runtime_error("Failed to copy codec parameters"); } // 初始化硬件加速 if (config_.use_hw_accel && gpu_id >= 0) { if (!initHWAccel(*ctx, gpu_id)) { cleanupStream(ctx.get()); throw std::runtime_error("Failed to initialize hardware acceleration"); } } // 打开解码器 if (avcodec_open2(ctx->codec_ctx, codec, nullptr) < 0) { cleanupStream(ctx.get()); throw std::runtime_error("Failed to open codec"); } // 设置线程数 (根据CPU核心数自动调整) ctx->codec_ctx->thread_count = std::thread::hardware_concurrency(); if (ctx->codec_ctx->thread_count == 0) { ctx->codec_ctx->thread_count = 4; // 默认值 } // 启动解码线程 ctx->gpu_id = gpu_id; ctx->running = true; ctx->decode_thread = std::thread(&RTSPDecoder::decodeThreadFunc, this, stream_id, ctx.get()); streams_.push_back(std::move(ctx)); return stream_id; } bool RTSPDecoder::removeStream(int stream_id) { std::lock_guard lock(streams_mutex_); if (stream_id < 0 || stream_id >= streams_.size() || !streams_[stream_id]) { return false; } auto& ctx = streams_[stream_id]; // 停止解码线程 ctx->running = false; if (ctx->decode_thread.joinable()) { ctx->decode_thread.join(); } // 清理资源 cleanupStream(ctx.get()); // 从列表中移除 streams_[stream_id].reset(); return true; } bool RTSPDecoder::getFrame(int stream_id, cv::Mat& frame, int timeout_ms) { if (stream_id < 0 || stream_id >= streams_.size() || !streams_[stream_id]) { return false; } auto& ctx = streams_[stream_id]; std::unique_lock lock(ctx->mutex); if (ctx->frame_queue.empty()) { if (timeout_ms <= 0) { return false; } if (ctx->cond.wait_for(lock, std::chrono::milliseconds(timeout_ms)) == std::cv_status::timeout) { return false; } } if (ctx->frame_queue.empty()) { return false; } frame = ctx->frame_queue.front(); ctx->frame_queue.pop(); return true; } int RTSPDecoder::getActiveStreamCount() const { int count = 0; for (const auto& stream : streams_) { if (stream && stream->running) { count++; } } return count; } int RTSPDecoder::getGPUCount() { int count = 0; cudaError_t err = cudaGetDeviceCount(&count); if (err != cudaSuccess) { return 0; } return count; } bool RTSPDecoder::initHWAccel(StreamContext& ctx, int gpu_id) { AVBufferRef* hw_device_ctx = nullptr; // 创建CUDA设备上下文 char device_name[32]; snprintf(device_name, sizeof(device_name), "%d", gpu_id); if (av_hwdevice_ctx_create(&hw_device_ctx, AV_HWDEVICE_TYPE_CUDA, device_name, nullptr, 0) < 0) { return false; } ctx.hw_device_ctx = av_buffer_ref(hw_device_ctx); ctx.codec_ctx->hw_device_ctx = av_buffer_ref(hw_device_ctx); av_buffer_unref(&hw_device_ctx); return true; } void RTSPDecoder::decodeThreadFunc(int stream_id, StreamContext* ctx) { AVFrame* frame = av_frame_alloc(); AVFrame* sw_frame = nullptr; AVPacket* pkt = av_packet_alloc(); if (!frame || !pkt) { ctx->running = false; if (frame) av_frame_free(&frame); if (pkt) av_packet_free(&pkt); return; } // 如果使用硬件加速,准备软件帧 if (ctx->hw_device_ctx) { sw_frame = av_frame_alloc(); if (!sw_frame) { ctx->running = false; av_frame_free(&frame); av_packet_free(&pkt); return; } } // 解码循环 while (ctx->running) { // 读取数据包 int ret = av_read_frame(ctx->format_ctx, pkt); if (ret < 0) { if (ret == AVERROR(EAGAIN)) { continue; } // 处理错误或EOF if (ret != AVERROR_EOF) { // 重试逻辑 int retry_count = 0; while (retry_count < MAX_RETRY_COUNT && ctx->running) { avformat_close_input(&ctx->format_ctx); AVDictionary* options = nullptr; av_dict_set(&options, "rtsp_transport", "tcp", 0); av_dict_set(&options, "stimeout", "5000000", 0); ret = avformat_open_input(&ctx->format_ctx, ctx->format_ctx->url, nullptr, &options); av_dict_free(&options); if (ret == 0) { avformat_find_stream_info(ctx->format_ctx, nullptr); break; } retry_count++; std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_DELAY_MS)); } if (retry_count == MAX_RETRY_COUNT) { ctx->running = false; break; } } else { // EOF, 重新开始 av_seek_frame(ctx->format_ctx, ctx->video_stream_idx, 0, AVSEEK_FLAG_BACKWARD); } av_packet_unref(pkt); continue; } // 检查是否是视频流 if (pkt->stream_index != ctx->video_stream_idx) { av_packet_unref(pkt); continue; } // 发送数据包到解码器 ret = avcodec_send_packet(ctx->codec_ctx, pkt); av_packet_unref(pkt); if (ret < 0 && ret != AVERROR(EAGAIN)) { continue; } // 接收解码后的帧 while (ctx->running) { ret = avcodec_receive_frame(ctx->codec_ctx, frame); if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { break; } if (ret < 0) { break; } AVFrame* output_frame = frame; // 如果使用硬件加速,需要将帧从GPU内存复制到CPU内存 if (ctx->hw_device_ctx) { if (av_hwframe_transfer_data(sw_frame, frame, 0) < 0) { av_frame_unref(frame); break; } output_frame = sw_frame; } // 转换为OpenCV Mat int width = config_.width > 0 ? config_.width : output_frame->width; int height = config_.height > 0 ? config_.height : output_frame->height; // 检查是否需要缩放 if (width != output_frame->width || height != output_frame->height || output_frame->format != AV_PIX_FMT_BGR24) { if (!ctx->sws_ctx) { ctx->sws_ctx = sws_getContext( output_frame->width, output_frame->height, static_cast(output_frame->format), width, height, AV_PIX_FMT_BGR24, SWS_BILINEAR, nullptr, nullptr, nullptr); if (!ctx->sws_ctx) { av_frame_unref(output_frame); break; } } // 创建目标帧 AVFrame* bgr_frame = av_frame_alloc(); if (!bgr_frame) { av_frame_unref(output_frame); break; } bgr_frame->format = AV_PIX_FMT_BGR24; bgr_frame->width = width; bgr_frame->height = height; if (av_frame_get_buffer(bgr_frame, 0) < 0) { av_frame_free(&bgr_frame); av_frame_unref(output_frame); break; } // 执行转换 sws_scale(ctx->sws_ctx, output_frame->data, output_frame->linesize, 0, output_frame->height, bgr_frame->data, bgr_frame->linesize); // 创建OpenCV Mat cv::Mat mat(height, width, CV_8UC3, bgr_frame->data[0], bgr_frame->linesize[0]); // 复制数据 (因为bgr_frame会被释放) cv::Mat mat_copy = mat.clone(); av_frame_free(&bgr_frame); // 将帧添加到队列 { std::lock_guard lock(ctx->mutex); if (ctx->frame_queue.size() >= config_.buffer_size) { ctx->frame_queue.pop(); } ctx->frame_queue.push(mat_copy); ctx->cond.notify_one(); } } else { // 直接转换格式 cv::Mat mat(output_frame->height, output_frame->width, CV_8UC3); // 根据像素格式处理 if (output_frame->format == AV_PIX_FMT_NV12) { // NV12转BGR cv::Mat y_plane(output_frame->height, output_frame->width, CV_8UC1, output_frame->data[0]); cv::Mat uv_plane(output_frame->height / 2, output_frame->width / 2, CV_8UC2, output_frame->data[1]); cv::cvtColorTwoPlane(y_plane, uv_plane, mat, cv::COLOR_YUV2BGR_NV12); } else if (output_frame->format == AV_PIX_FMT_YUV420P) { // YUV420P转BGR cv::Mat yuv_mat(output_frame->height * 3 / 2, output_frame->width, CV_8UC1, output_frame->data[0]); cv::cvtColor(yuv_mat, mat, cv::COLOR_YUV2BGR_I420); } else if (output_frame->format == AV_PIX_FMT_BGR24) { // 直接复制 cv::Mat(output_frame->height, output_frame->width, CV_8UC3, output_frame->data[0]).copyTo(mat); } else { // 其他格式使用SWS转换 if (!ctx->sws_ctx) { ctx->sws_ctx = sws_getContext( output_frame->width, output_frame->height, static_cast(output_frame->format), width, height, AV_PIX_FMT_BGR24, SWS_BILINEAR, nullptr, nullptr, nullptr); } if (ctx->sws_ctx) { AVFrame* bgr_frame = av_frame_alloc(); bgr_frame->format = AV_PIX_FMT_BGR24; bgr_frame->width = width; bgr_frame->height = height; if (av_frame_get_buffer(bgr_frame, 0) == 0) { sws_scale(ctx->sws_ctx, output_frame->data, output_frame->linesize, 0, output_frame->height, bgr_frame->data, bgr_frame->linesize); mat = cv::Mat(height, width, CV_8UC3, bgr_frame->data[0]); } av_frame_free(&bgr_frame); } } // 将帧添加到队列 if (!mat.empty()) { std::lock_guard lock(ctx->mutex); if (ctx->frame_queue.size() >= config_.buffer_size) { ctx->frame_queue.pop(); } ctx->frame_queue.push(mat); ctx->cond.notify_one(); } } av_frame_unref(output_frame); } } // 清理资源 av_frame_free(&frame); av_frame_free(&sw_frame); av_packet_free(&pkt); if (ctx->sws_ctx) { sws_freeContext(ctx->sws_ctx); ctx->sws_ctx = nullptr; } } void RTSPDecoder::cleanupStream(StreamContext* ctx) { if (!ctx) return; if (ctx->sws_ctx) { sws_freeContext(ctx->sws_ctx); ctx->sws_ctx = nullptr; } if (ctx->codec_ctx) { avcodec_free_context(&ctx->codec_ctx); } if (ctx->format_ctx) { avformat_close_input(&ctx->format_ctx); } if (ctx->hw_device_ctx) { av_buffer_unref(&ctx->hw_device_ctx); } // 清空帧队列 std::lock_guard lock(ctx->mutex); while (!ctx->frame_queue.empty()) { ctx->frame_queue.pop(); } } int RTSPDecoder::allocateGPU() { if (!config_.use_hw_accel) return -1; int gpu_count = getGPUCount(); if (gpu_count == 0) return -1; // 简单的轮询分配策略 int gpu_id = next_gpu_id_++ % gpu_count; return gpu_id; }