RtspDecoderByFFmpeg/RTSPDecoder/RTSPDecoder.cpp

539 lines
16 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "RTSPDecoder.h"
#include <thread>
#include <queue>
#include <condition_variable>
#include <stdexcept>
#include <cuda_runtime_api.h>
#define MAX_RETRY_COUNT 5
#define RETRY_DELAY_MS 1000
RTSPDecoder::RTSPDecoder() {
avformat_network_init();
}
RTSPDecoder::~RTSPDecoder() {
// 停止所有解码线程
for (auto& stream : streams_) {
if (stream && stream->running) {
stream->running = false;
if (stream->decode_thread.joinable()) {
stream->decode_thread.join();
}
}
cleanupStream(stream.get());
}
avformat_network_deinit();
}
bool RTSPDecoder::init(const DecoderConfig& config) {
if (initialized_) return true;
config_ = config;
// 验证配置
if (config_.max_streams < 1 || config_.max_streams > 60) {
throw std::invalid_argument("Max streams must be between 1 and 60");
}
// 初始化GPU分配
if (config_.use_hw_accel) {
int gpu_count = getGPUCount();
if (gpu_count == 0) {
config_.use_hw_accel = false;
} else if (config_.gpu_id >= 0 && config_.gpu_id < gpu_count) {
next_gpu_id_ = config_.gpu_id;
}
}
initialized_ = true;
return true;
}
int RTSPDecoder::addStream(const std::string& rtsp_url) {
if (!initialized_) {
throw std::runtime_error("Decoder not initialized");
}
std::lock_guard<std::mutex> lock(streams_mutex_);
if (streams_.size() >= config_.max_streams) {
throw std::runtime_error("Maximum number of streams reached");
}
auto ctx = std::make_unique<StreamContext>();
int stream_id = static_cast<int>(streams_.size());
// 分配GPU
int gpu_id = config_.use_hw_accel ? allocateGPU() : -1;
// 初始化格式上下文
ctx->format_ctx = avformat_alloc_context();
if (!ctx->format_ctx) {
throw std::runtime_error("Failed to allocate format context");
}
// 设置RTSP参数
AVDictionary* options = nullptr;
av_dict_set(&options, "rtsp_transport", "tcp", 0);
av_dict_set(&options, "stimeout", "5000000", 0); // 5秒超时
// 打开输入流
int retry_count = 0;
while (retry_count < MAX_RETRY_COUNT) {
int ret = avformat_open_input(&ctx->format_ctx, rtsp_url.c_str(), nullptr, &options);
if (ret == 0) break;
retry_count++;
if (retry_count < MAX_RETRY_COUNT) {
std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_DELAY_MS));
}
}
av_dict_free(&options);
if (retry_count == MAX_RETRY_COUNT) {
cleanupStream(ctx.get());
throw std::runtime_error("Failed to open RTSP stream after retries");
}
// 查找流信息
if (avformat_find_stream_info(ctx->format_ctx, nullptr) < 0) {
cleanupStream(ctx.get());
throw std::runtime_error("Failed to find stream information");
}
// 查找视频流
for (unsigned int i = 0; i < ctx->format_ctx->nb_streams; i++) {
if (ctx->format_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
ctx->video_stream_idx = i;
break;
}
}
if (ctx->video_stream_idx == -1) {
cleanupStream(ctx.get());
throw std::runtime_error("No video stream found");
}
// 获取解码器参数
AVCodecParameters* codecpar = ctx->format_ctx->streams[ctx->video_stream_idx]->codecpar;
// 查找解码器
const AVCodec* codec = nullptr;
if (config_.use_hw_accel && gpu_id >= 0) {
codec = avcodec_find_decoder(codecpar->codec_id);
} else {
codec = avcodec_find_decoder(codecpar->codec_id);
}
if (!codec) {
cleanupStream(ctx.get());
throw std::runtime_error("Unsupported codec");
}
// 创建解码器上下文
ctx->codec_ctx = avcodec_alloc_context3(codec);
if (!ctx->codec_ctx) {
cleanupStream(ctx.get());
throw std::runtime_error("Failed to allocate codec context");
}
// 复制参数到解码器上下文
if (avcodec_parameters_to_context(ctx->codec_ctx, codecpar) < 0) {
cleanupStream(ctx.get());
throw std::runtime_error("Failed to copy codec parameters");
}
// 初始化硬件加速
if (config_.use_hw_accel && gpu_id >= 0) {
if (!initHWAccel(*ctx, gpu_id)) {
cleanupStream(ctx.get());
throw std::runtime_error("Failed to initialize hardware acceleration");
}
}
// 打开解码器
if (avcodec_open2(ctx->codec_ctx, codec, nullptr) < 0) {
cleanupStream(ctx.get());
throw std::runtime_error("Failed to open codec");
}
// 设置线程数 (根据CPU核心数自动调整)
ctx->codec_ctx->thread_count = std::thread::hardware_concurrency();
if (ctx->codec_ctx->thread_count == 0) {
ctx->codec_ctx->thread_count = 4; // 默认值
}
// 启动解码线程
ctx->gpu_id = gpu_id;
ctx->running = true;
ctx->decode_thread = std::thread(&RTSPDecoder::decodeThreadFunc, this, stream_id, ctx.get());
streams_.push_back(std::move(ctx));
return stream_id;
}
bool RTSPDecoder::removeStream(int stream_id) {
std::lock_guard<std::mutex> lock(streams_mutex_);
if (stream_id < 0 || stream_id >= streams_.size() || !streams_[stream_id]) {
return false;
}
auto& ctx = streams_[stream_id];
// 停止解码线程
ctx->running = false;
if (ctx->decode_thread.joinable()) {
ctx->decode_thread.join();
}
// 清理资源
cleanupStream(ctx.get());
// 从列表中移除
streams_[stream_id].reset();
return true;
}
bool RTSPDecoder::getFrame(int stream_id, cv::Mat& frame, int timeout_ms) {
if (stream_id < 0 || stream_id >= streams_.size() || !streams_[stream_id]) {
return false;
}
auto& ctx = streams_[stream_id];
std::unique_lock<std::mutex> lock(ctx->mutex);
if (ctx->frame_queue.empty()) {
if (timeout_ms <= 0) {
return false;
}
if (ctx->cond.wait_for(lock, std::chrono::milliseconds(timeout_ms)) == std::cv_status::timeout) {
return false;
}
}
if (ctx->frame_queue.empty()) {
return false;
}
frame = ctx->frame_queue.front();
ctx->frame_queue.pop();
return true;
}
int RTSPDecoder::getActiveStreamCount() const {
int count = 0;
for (const auto& stream : streams_) {
if (stream && stream->running) {
count++;
}
}
return count;
}
int RTSPDecoder::getGPUCount() {
int count = 0;
cudaError_t err = cudaGetDeviceCount(&count);
if (err != cudaSuccess) {
return 0;
}
return count;
}
bool RTSPDecoder::initHWAccel(StreamContext& ctx, int gpu_id) {
AVBufferRef* hw_device_ctx = nullptr;
// 创建CUDA设备上下文
char device_name[32];
snprintf(device_name, sizeof(device_name), "%d", gpu_id);
if (av_hwdevice_ctx_create(&hw_device_ctx, AV_HWDEVICE_TYPE_CUDA, device_name, nullptr, 0) < 0) {
return false;
}
ctx.hw_device_ctx = av_buffer_ref(hw_device_ctx);
ctx.codec_ctx->hw_device_ctx = av_buffer_ref(hw_device_ctx);
av_buffer_unref(&hw_device_ctx);
return true;
}
void RTSPDecoder::decodeThreadFunc(int stream_id, StreamContext* ctx) {
AVFrame* frame = av_frame_alloc();
AVFrame* sw_frame = nullptr;
AVPacket* pkt = av_packet_alloc();
if (!frame || !pkt) {
ctx->running = false;
if (frame) av_frame_free(&frame);
if (pkt) av_packet_free(&pkt);
return;
}
// 如果使用硬件加速,准备软件帧
if (ctx->hw_device_ctx) {
sw_frame = av_frame_alloc();
if (!sw_frame) {
ctx->running = false;
av_frame_free(&frame);
av_packet_free(&pkt);
return;
}
}
// 解码循环
while (ctx->running) {
// 读取数据包
int ret = av_read_frame(ctx->format_ctx, pkt);
if (ret < 0) {
if (ret == AVERROR(EAGAIN)) {
continue;
}
// 处理错误或EOF
if (ret != AVERROR_EOF) {
// 重试逻辑
int retry_count = 0;
while (retry_count < MAX_RETRY_COUNT && ctx->running) {
avformat_close_input(&ctx->format_ctx);
AVDictionary* options = nullptr;
av_dict_set(&options, "rtsp_transport", "tcp", 0);
av_dict_set(&options, "stimeout", "5000000", 0);
ret = avformat_open_input(&ctx->format_ctx, ctx->format_ctx->url, nullptr, &options);
av_dict_free(&options);
if (ret == 0) {
avformat_find_stream_info(ctx->format_ctx, nullptr);
break;
}
retry_count++;
std::this_thread::sleep_for(std::chrono::milliseconds(RETRY_DELAY_MS));
}
if (retry_count == MAX_RETRY_COUNT) {
ctx->running = false;
break;
}
} else {
// EOF, 重新开始
av_seek_frame(ctx->format_ctx, ctx->video_stream_idx, 0, AVSEEK_FLAG_BACKWARD);
}
av_packet_unref(pkt);
continue;
}
// 检查是否是视频流
if (pkt->stream_index != ctx->video_stream_idx) {
av_packet_unref(pkt);
continue;
}
// 发送数据包到解码器
ret = avcodec_send_packet(ctx->codec_ctx, pkt);
av_packet_unref(pkt);
if (ret < 0 && ret != AVERROR(EAGAIN)) {
continue;
}
// 接收解码后的帧
while (ctx->running) {
ret = avcodec_receive_frame(ctx->codec_ctx, frame);
if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
break;
}
if (ret < 0) {
break;
}
AVFrame* output_frame = frame;
// 如果使用硬件加速需要将帧从GPU内存复制到CPU内存
if (ctx->hw_device_ctx) {
if (av_hwframe_transfer_data(sw_frame, frame, 0) < 0) {
av_frame_unref(frame);
break;
}
output_frame = sw_frame;
}
// 转换为OpenCV Mat
int width = config_.width > 0 ? config_.width : output_frame->width;
int height = config_.height > 0 ? config_.height : output_frame->height;
// 检查是否需要缩放
if (width != output_frame->width || height != output_frame->height ||
output_frame->format != AV_PIX_FMT_BGR24) {
if (!ctx->sws_ctx) {
ctx->sws_ctx = sws_getContext(
output_frame->width, output_frame->height,
static_cast<AVPixelFormat>(output_frame->format),
width, height, AV_PIX_FMT_BGR24,
SWS_BILINEAR, nullptr, nullptr, nullptr);
if (!ctx->sws_ctx) {
av_frame_unref(output_frame);
break;
}
}
// 创建目标帧
AVFrame* bgr_frame = av_frame_alloc();
if (!bgr_frame) {
av_frame_unref(output_frame);
break;
}
bgr_frame->format = AV_PIX_FMT_BGR24;
bgr_frame->width = width;
bgr_frame->height = height;
if (av_frame_get_buffer(bgr_frame, 0) < 0) {
av_frame_free(&bgr_frame);
av_frame_unref(output_frame);
break;
}
// 执行转换
sws_scale(ctx->sws_ctx,
output_frame->data, output_frame->linesize, 0, output_frame->height,
bgr_frame->data, bgr_frame->linesize);
// 创建OpenCV Mat
cv::Mat mat(height, width, CV_8UC3, bgr_frame->data[0], bgr_frame->linesize[0]);
// 复制数据 (因为bgr_frame会被释放)
cv::Mat mat_copy = mat.clone();
av_frame_free(&bgr_frame);
// 将帧添加到队列
{
std::lock_guard<std::mutex> lock(ctx->mutex);
if (ctx->frame_queue.size() >= config_.buffer_size) {
ctx->frame_queue.pop();
}
ctx->frame_queue.push(mat_copy);
ctx->cond.notify_one();
}
} else {
// 直接转换格式
cv::Mat mat(output_frame->height, output_frame->width, CV_8UC3);
// 根据像素格式处理
if (output_frame->format == AV_PIX_FMT_NV12) {
// NV12转BGR
cv::Mat y_plane(output_frame->height, output_frame->width, CV_8UC1, output_frame->data[0]);
cv::Mat uv_plane(output_frame->height / 2, output_frame->width / 2, CV_8UC2, output_frame->data[1]);
cv::cvtColorTwoPlane(y_plane, uv_plane, mat, cv::COLOR_YUV2BGR_NV12);
} else if (output_frame->format == AV_PIX_FMT_YUV420P) {
// YUV420P转BGR
cv::Mat yuv_mat(output_frame->height * 3 / 2, output_frame->width, CV_8UC1, output_frame->data[0]);
cv::cvtColor(yuv_mat, mat, cv::COLOR_YUV2BGR_I420);
} else if (output_frame->format == AV_PIX_FMT_BGR24) {
// 直接复制
cv::Mat(output_frame->height, output_frame->width, CV_8UC3, output_frame->data[0]).copyTo(mat);
} else {
// 其他格式使用SWS转换
if (!ctx->sws_ctx) {
ctx->sws_ctx = sws_getContext(
output_frame->width, output_frame->height,
static_cast<AVPixelFormat>(output_frame->format),
width, height, AV_PIX_FMT_BGR24,
SWS_BILINEAR, nullptr, nullptr, nullptr);
}
if (ctx->sws_ctx) {
AVFrame* bgr_frame = av_frame_alloc();
bgr_frame->format = AV_PIX_FMT_BGR24;
bgr_frame->width = width;
bgr_frame->height = height;
if (av_frame_get_buffer(bgr_frame, 0) == 0) {
sws_scale(ctx->sws_ctx,
output_frame->data, output_frame->linesize, 0, output_frame->height,
bgr_frame->data, bgr_frame->linesize);
mat = cv::Mat(height, width, CV_8UC3, bgr_frame->data[0]);
}
av_frame_free(&bgr_frame);
}
}
// 将帧添加到队列
if (!mat.empty()) {
std::lock_guard<std::mutex> lock(ctx->mutex);
if (ctx->frame_queue.size() >= config_.buffer_size) {
ctx->frame_queue.pop();
}
ctx->frame_queue.push(mat);
ctx->cond.notify_one();
}
}
av_frame_unref(output_frame);
}
}
// 清理资源
av_frame_free(&frame);
av_frame_free(&sw_frame);
av_packet_free(&pkt);
if (ctx->sws_ctx) {
sws_freeContext(ctx->sws_ctx);
ctx->sws_ctx = nullptr;
}
}
void RTSPDecoder::cleanupStream(StreamContext* ctx) {
if (!ctx) return;
if (ctx->sws_ctx) {
sws_freeContext(ctx->sws_ctx);
ctx->sws_ctx = nullptr;
}
if (ctx->codec_ctx) {
avcodec_free_context(&ctx->codec_ctx);
}
if (ctx->format_ctx) {
avformat_close_input(&ctx->format_ctx);
}
if (ctx->hw_device_ctx) {
av_buffer_unref(&ctx->hw_device_ctx);
}
// 清空帧队列
std::lock_guard<std::mutex> lock(ctx->mutex);
while (!ctx->frame_queue.empty()) {
ctx->frame_queue.pop();
}
}
int RTSPDecoder::allocateGPU() {
if (!config_.use_hw_accel) return -1;
int gpu_count = getGPUCount();
if (gpu_count == 0) return -1;
// 简单的轮询分配策略
int gpu_id = next_gpu_id_++ % gpu_count;
return gpu_id;
}