RtspDecoderByFFmpeg/RTSPDecoder/RTSPDecoder.cpp

539 lines
16 KiB
C++
Raw Permalink Normal View History

2025-04-27 05:54:21 +00:00
#include "RTSPDecoder.h"
#include <thread>
#include <queue>
#include <condition_variable>
#include <stdexcept>
#include <cuda_runtime_api.h>
#define MAX_RETRY_COUNT 5
#define RETRY_DELAY_MS 1000
RTSPDecoder::RTSPDecoder() {
avformat_network_init();
}
RTSPDecoder::~RTSPDecoder() {
// 停止所有解码线程
for (auto& stream : streams_) {
if (stream && stream->running) {
stream->running = false;
if (stream->decode_thread.joinable()) {
stream->decode_thread.join();
}
}
cleanupStream(stream.get());
}
avformat_network_deinit();
}
bool RTSPDecoder::init(const DecoderConfig& config) {
if (initialized_) return true;
config_ = config;
// 验证配置
if (config_.max_streams < 1 || config_.max_streams > 60) {
throw std::invalid_argument("Max streams must be between 1 and 60");
}
// 初始化GPU分配
if (config_.use_hw_accel) {
int gpu_count = getGPUCount();
if (gpu_count == 0) {
config_.use_hw_accel = false;
} else if (config_.gpu_id >= 0 && config_.gpu_id < gpu_count) {
next_gpu_id_ = config_.gpu_id;
}
}
initialized_ = true;
return true;
}
int RTSPDecoder::addStream(const std::string& rtsp_url) {
if (!initialized_) {
throw std::runtime_error("Decoder not initialized");
}
std::lock_guard<std::mutex> lock(streams_mutex_);
if (streams_.size() >= config_.max_streams) {
throw std::runtime_error("Maximum number of streams reached");
}
auto ctx = std::make_unique<StreamContext>();
int stream_id = static_cast<int>(streams_.size());
// 分配GPU
int gpu_id = config_.use_hw_accel ? allocateGPU() : -1;
// 初始化格式上下文
ctx->format_ctx = avformat_alloc_context();
if (!ctx->format_ctx) {
throw std::runtime_error("Failed to allocate format context");
}
// 设置RTSP参数
AVDictionary* options = nullptr;
av_dict_set(&options, "rtsp_transport", "tcp", 0);
av_dict_set(&options, "stimeout", "5000000", 0); // 5秒超时
// 打开输入流
int retry_count = 0;
while (retry_count < MAX_RETRY_COUNT) {
int ret = avformat_open_input(&ctx->format_ctx, rtsp_url.c_str(), nullptr, &options);
if (ret == 0) break;
retry_count++;
if (retry_count < MAX_RETRY_COUNT) {
std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_DELAY_MS));
}
}
av_dict_free(&options);
if (retry_count == MAX_RETRY_COUNT) {
cleanupStream(ctx.get());
throw std::runtime_error("Failed to open RTSP stream after retries");
}
// 查找流信息
if (avformat_find_stream_info(ctx->format_ctx, nullptr) < 0) {
cleanupStream(ctx.get());
throw std::runtime_error("Failed to find stream information");
}
// 查找视频流
for (unsigned int i = 0; i < ctx->format_ctx->nb_streams; i++) {
if (ctx->format_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
ctx->video_stream_idx = i;
break;
}
}
if (ctx->video_stream_idx == -1) {
cleanupStream(ctx.get());
throw std::runtime_error("No video stream found");
}
// 获取解码器参数
AVCodecParameters* codecpar = ctx->format_ctx->streams[ctx->video_stream_idx]->codecpar;
// 查找解码器
const AVCodec* codec = nullptr;
if (config_.use_hw_accel && gpu_id >= 0) {
codec = avcodec_find_decoder(codecpar->codec_id);
} else {
codec = avcodec_find_decoder(codecpar->codec_id);
}
if (!codec) {
cleanupStream(ctx.get());
throw std::runtime_error("Unsupported codec");
}
// 创建解码器上下文
ctx->codec_ctx = avcodec_alloc_context3(codec);
if (!ctx->codec_ctx) {
cleanupStream(ctx.get());
throw std::runtime_error("Failed to allocate codec context");
}
// 复制参数到解码器上下文
if (avcodec_parameters_to_context(ctx->codec_ctx, codecpar) < 0) {
cleanupStream(ctx.get());
throw std::runtime_error("Failed to copy codec parameters");
}
// 初始化硬件加速
if (config_.use_hw_accel && gpu_id >= 0) {
if (!initHWAccel(*ctx, gpu_id)) {
cleanupStream(ctx.get());
throw std::runtime_error("Failed to initialize hardware acceleration");
}
}
// 打开解码器
if (avcodec_open2(ctx->codec_ctx, codec, nullptr) < 0) {
cleanupStream(ctx.get());
throw std::runtime_error("Failed to open codec");
}
// 设置线程数 (根据CPU核心数自动调整)
ctx->codec_ctx->thread_count = std::thread::hardware_concurrency();
if (ctx->codec_ctx->thread_count == 0) {
ctx->codec_ctx->thread_count = 4; // 默认值
}
// 启动解码线程
ctx->gpu_id = gpu_id;
ctx->running = true;
ctx->decode_thread = std::thread(&RTSPDecoder::decodeThreadFunc, this, stream_id, ctx.get());
streams_.push_back(std::move(ctx));
return stream_id;
}
bool RTSPDecoder::removeStream(int stream_id) {
std::lock_guard<std::mutex> lock(streams_mutex_);
if (stream_id < 0 || stream_id >= streams_.size() || !streams_[stream_id]) {
return false;
}
auto& ctx = streams_[stream_id];
// 停止解码线程
ctx->running = false;
if (ctx->decode_thread.joinable()) {
ctx->decode_thread.join();
}
// 清理资源
cleanupStream(ctx.get());
// 从列表中移除
streams_[stream_id].reset();
return true;
}
bool RTSPDecoder::getFrame(int stream_id, cv::Mat& frame, int timeout_ms) {
if (stream_id < 0 || stream_id >= streams_.size() || !streams_[stream_id]) {
return false;
}
auto& ctx = streams_[stream_id];
std::unique_lock<std::mutex> lock(ctx->mutex);
if (ctx->frame_queue.empty()) {
if (timeout_ms <= 0) {
return false;
}
if (ctx->cond.wait_for(lock, std::chrono::milliseconds(timeout_ms)) == std::cv_status::timeout) {
return false;
}
}
if (ctx->frame_queue.empty()) {
return false;
}
frame = ctx->frame_queue.front();
ctx->frame_queue.pop();
return true;
}
int RTSPDecoder::getActiveStreamCount() const {
int count = 0;
for (const auto& stream : streams_) {
if (stream && stream->running) {
count++;
}
}
return count;
}
int RTSPDecoder::getGPUCount() {
int count = 0;
cudaError_t err = cudaGetDeviceCount(&count);
if (err != cudaSuccess) {
return 0;
}
return count;
}
bool RTSPDecoder::initHWAccel(StreamContext& ctx, int gpu_id) {
AVBufferRef* hw_device_ctx = nullptr;
// 创建CUDA设备上下文
char device_name[32];
snprintf(device_name, sizeof(device_name), "%d", gpu_id);
if (av_hwdevice_ctx_create(&hw_device_ctx, AV_HWDEVICE_TYPE_CUDA, device_name, nullptr, 0) < 0) {
return false;
}
ctx.hw_device_ctx = av_buffer_ref(hw_device_ctx);
ctx.codec_ctx->hw_device_ctx = av_buffer_ref(hw_device_ctx);
av_buffer_unref(&hw_device_ctx);
return true;
}
void RTSPDecoder::decodeThreadFunc(int stream_id, StreamContext* ctx) {
AVFrame* frame = av_frame_alloc();
AVFrame* sw_frame = nullptr;
AVPacket* pkt = av_packet_alloc();
if (!frame || !pkt) {
ctx->running = false;
if (frame) av_frame_free(&frame);
if (pkt) av_packet_free(&pkt);
return;
}
// 如果使用硬件加速,准备软件帧
if (ctx->hw_device_ctx) {
sw_frame = av_frame_alloc();
if (!sw_frame) {
ctx->running = false;
av_frame_free(&frame);
av_packet_free(&pkt);
return;
}
}
// 解码循环
while (ctx->running) {
// 读取数据包
int ret = av_read_frame(ctx->format_ctx, pkt);
if (ret < 0) {
if (ret == AVERROR(EAGAIN)) {
continue;
}
// 处理错误或EOF
if (ret != AVERROR_EOF) {
// 重试逻辑
int retry_count = 0;
while (retry_count < MAX_RETRY_COUNT && ctx->running) {
avformat_close_input(&ctx->format_ctx);
AVDictionary* options = nullptr;
av_dict_set(&options, "rtsp_transport", "tcp", 0);
av_dict_set(&options, "stimeout", "5000000", 0);
ret = avformat_open_input(&ctx->format_ctx, ctx->format_ctx->url, nullptr, &options);
av_dict_free(&options);
if (ret == 0) {
avformat_find_stream_info(ctx->format_ctx, nullptr);
break;
}
retry_count++;
std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_DELAY_MS));
}
if (retry_count == MAX_RETRY_COUNT) {
ctx->running = false;
break;
}
} else {
// EOF, 重新开始
av_seek_frame(ctx->format_ctx, ctx->video_stream_idx, 0, AVSEEK_FLAG_BACKWARD);
}
av_packet_unref(pkt);
continue;
}
// 检查是否是视频流
if (pkt->stream_index != ctx->video_stream_idx) {
av_packet_unref(pkt);
continue;
}
// 发送数据包到解码器
ret = avcodec_send_packet(ctx->codec_ctx, pkt);
av_packet_unref(pkt);
if (ret < 0 && ret != AVERROR(EAGAIN)) {
continue;
}
// 接收解码后的帧
while (ctx->running) {
ret = avcodec_receive_frame(ctx->codec_ctx, frame);
if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
break;
}
if (ret < 0) {
break;
}
AVFrame* output_frame = frame;
// 如果使用硬件加速需要将帧从GPU内存复制到CPU内存
if (ctx->hw_device_ctx) {
if (av_hwframe_transfer_data(sw_frame, frame, 0) < 0) {
av_frame_unref(frame);
break;
}
output_frame = sw_frame;
}
// 转换为OpenCV Mat
int width = config_.width > 0 ? config_.width : output_frame->width;
int height = config_.height > 0 ? config_.height : output_frame->height;
// 检查是否需要缩放
if (width != output_frame->width || height != output_frame->height ||
output_frame->format != AV_PIX_FMT_BGR24) {
if (!ctx->sws_ctx) {
ctx->sws_ctx = sws_getContext(
output_frame->width, output_frame->height,
static_cast<AVPixelFormat>(output_frame->format),
width, height, AV_PIX_FMT_BGR24,
SWS_BILINEAR, nullptr, nullptr, nullptr);
if (!ctx->sws_ctx) {
av_frame_unref(output_frame);
break;
}
}
// 创建目标帧
AVFrame* bgr_frame = av_frame_alloc();
if (!bgr_frame) {
av_frame_unref(output_frame);
break;
}
bgr_frame->format = AV_PIX_FMT_BGR24;
bgr_frame->width = width;
bgr_frame->height = height;
if (av_frame_get_buffer(bgr_frame, 0) < 0) {
av_frame_free(&bgr_frame);
av_frame_unref(output_frame);
break;
}
// 执行转换
sws_scale(ctx->sws_ctx,
output_frame->data, output_frame->linesize, 0, output_frame->height,
bgr_frame->data, bgr_frame->linesize);
// 创建OpenCV Mat
cv::Mat mat(height, width, CV_8UC3, bgr_frame->data[0], bgr_frame->linesize[0]);
// 复制数据 (因为bgr_frame会被释放)
cv::Mat mat_copy = mat.clone();
av_frame_free(&bgr_frame);
// 将帧添加到队列
{
std::lock_guard<std::mutex> lock(ctx->mutex);
if (ctx->frame_queue.size() >= config_.buffer_size) {
ctx->frame_queue.pop();
}
ctx->frame_queue.push(mat_copy);
ctx->cond.notify_one();
}
} else {
// 直接转换格式
cv::Mat mat(output_frame->height, output_frame->width, CV_8UC3);
// 根据像素格式处理
if (output_frame->format == AV_PIX_FMT_NV12) {
// NV12转BGR
cv::Mat y_plane(output_frame->height, output_frame->width, CV_8UC1, output_frame->data[0]);
cv::Mat uv_plane(output_frame->height / 2, output_frame->width / 2, CV_8UC2, output_frame->data[1]);
cv::cvtColorTwoPlane(y_plane, uv_plane, mat, cv::COLOR_YUV2BGR_NV12);
} else if (output_frame->format == AV_PIX_FMT_YUV420P) {
// YUV420P转BGR
cv::Mat yuv_mat(output_frame->height * 3 / 2, output_frame->width, CV_8UC1, output_frame->data[0]);
cv::cvtColor(yuv_mat, mat, cv::COLOR_YUV2BGR_I420);
} else if (output_frame->format == AV_PIX_FMT_BGR24) {
// 直接复制
cv::Mat(output_frame->height, output_frame->width, CV_8UC3, output_frame->data[0]).copyTo(mat);
} else {
// 其他格式使用SWS转换
if (!ctx->sws_ctx) {
ctx->sws_ctx = sws_getContext(
output_frame->width, output_frame->height,
static_cast<AVPixelFormat>(output_frame->format),
width, height, AV_PIX_FMT_BGR24,
SWS_BILINEAR, nullptr, nullptr, nullptr);
}
if (ctx->sws_ctx) {
AVFrame* bgr_frame = av_frame_alloc();
bgr_frame->format = AV_PIX_FMT_BGR24;
bgr_frame->width = width;
bgr_frame->height = height;
if (av_frame_get_buffer(bgr_frame, 0) == 0) {
sws_scale(ctx->sws_ctx,
output_frame->data, output_frame->linesize, 0, output_frame->height,
bgr_frame->data, bgr_frame->linesize);
mat = cv::Mat(height, width, CV_8UC3, bgr_frame->data[0]);
}
av_frame_free(&bgr_frame);
}
}
// 将帧添加到队列
if (!mat.empty()) {
std::lock_guard<std::mutex> lock(ctx->mutex);
if (ctx->frame_queue.size() >= config_.buffer_size) {
ctx->frame_queue.pop();
}
ctx->frame_queue.push(mat);
ctx->cond.notify_one();
}
}
av_frame_unref(output_frame);
}
}
// 清理资源
av_frame_free(&frame);
av_frame_free(&sw_frame);
av_packet_free(&pkt);
if (ctx->sws_ctx) {
sws_freeContext(ctx->sws_ctx);
ctx->sws_ctx = nullptr;
}
}
void RTSPDecoder::cleanupStream(StreamContext* ctx) {
if (!ctx) return;
if (ctx->sws_ctx) {
sws_freeContext(ctx->sws_ctx);
ctx->sws_ctx = nullptr;
}
if (ctx->codec_ctx) {
avcodec_free_context(&ctx->codec_ctx);
}
if (ctx->format_ctx) {
avformat_close_input(&ctx->format_ctx);
}
if (ctx->hw_device_ctx) {
av_buffer_unref(&ctx->hw_device_ctx);
}
// 清空帧队列
std::lock_guard<std::mutex> lock(ctx->mutex);
while (!ctx->frame_queue.empty()) {
ctx->frame_queue.pop();
}
}
int RTSPDecoder::allocateGPU() {
if (!config_.use_hw_accel) return -1;
int gpu_count = getGPUCount();
if (gpu_count == 0) return -1;
// 简单的轮询分配策略
int gpu_id = next_gpu_id_++ % gpu_count;
return gpu_id;
}