前言
GB/T28181-2022实时流的传输方式介绍:https://blog.csdn.net/www_dong/article/details/134255185
tcp passive收流
流程图
注意:
- m字段指定传输方式为TCP/RTP/AVP;
- sdp信息中增加"a=setup:passive";
- SIP服务器启动端口监听,设备发起tcp连接请求;
设计
- 创建socket、bind、listen,启动数据接收线程;
// TcpServer为封装的socket类
int CGBTcpServerStreamReceiver::Start()
{
if (m_localIP.empty() || m_localPort <= 0)
return -1;
if (m_tcpServer.get())
return 0;
m_tcpServer = std::make_shared<TcpServer>(TcpDataCB, this);
if (!m_tcpServer.get())
return -1;
if (0 != m_tcpServer->TcpCreate()
|| 0 != m_tcpServer->TcpBind(m_localPort)
|| 0 != m_tcpServer->TcpListen(5))
return -1;
m_thread = std::thread(TcpDataThread, this);
return 0;
}
- 在线程内等待连接,连接成功后接收数据并回调至应用层处理
void CGBTcpServerStreamReceiver::TcpDataWorker()
{
bool bAccept = false;
uint8_t payload;
while (m_running)
{
if (!bAccept)
{
// 等待设备连接
if (0 == m_tcpServer->TcpAccept())
{
bAccept = true;
// 连接成功后,初始化rtp参数
if (0 != InitRtpSession_())
{
break;
}
}
continue;
}
Poll();
BeginDataAccess();
// 开始接收数据
if (GotoFirstSourceWithData())
{
do
{
RTPPacket* packet = nullptr;
while (nullptr != (packet = GetNextPacket()))
{
payload = packet->GetPayloadType();
if (0 == payload)
{
DeletePacket(packet);
continue;
}
struct rtp_packet_tcp data;
data.mark = packet->HasMarker();
data.pts = packet->GetTimestamp();
data.seq = packet->GetSequenceNumber();
data.data = packet->GetPayloadData();
data.len = (int)packet->GetPayloadLength();
m_payload = payload;
if (m_lastSeq < 0)
{
m_lastSeq = data.seq - 1;
}
if (m_lastSeq = data.seq - 1)
{
PackData_(data.data, data.len);
}
DeletePacket(packet);
}
} while (GotoNextSourceWithData());
}
EndDataAccess();
Sleep(30);
}
Destroy();
}
- 初始化rtp参数
int CGBTcpServerStreamReceiver::InitRtpSession_()
{
const int packSize = 45678;
RTPSessionParams sessionParams;
sessionParams.SetProbationType(RTPSources::NoProbation);
sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);
sessionParams.SetMaximumPacketSize(packSize + 64);
m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);
m_rtpTcpTransmitter->Init(true);
m_rtpTcpTransmitter->Create(65535, 0);
if (0 != Create(sessionParams, m_rtpTcpTransmitter))
return -1;
if (0 != AddDestination(RTPTCPAddress(m_tcpServer->GetClientSocket())))
return -1;
return 0;
}
注意:RTP over TCP模式比RTP over UDP模式多了两字节的长度字段。
tcp active收流
流程图
注意:
- m字段指定传输方式为TCP/RTP/AVP;
- sdp信息中增加"a=setup:active";
- 设备返回200 OK,报文的SDP信息中包含tcp监听端口;
- SIP服务器根据设备监听端口发起TCP连接请求;
设计
- 创建socket、connect、初始化rtp,启动数据接收线程
// TcpClient为封装的客户端socket类
int CGBTcpClientStreamReceiver::Start(int streamType)
{
if (m_localIP.empty() || m_localPort <= 0)
return -1;
if (m_tcpClient.get())
return 0;
// 创建socket
m_tcpClient = std::make_shared<TcpClient>(TcpDataCB, this);
if (!m_tcpClient.get() || 0 != m_tcpClient->TcpCreate())
return -1;
// connect
int ret = m_tcpClient->TcpConnectByTime(m_localIP.c_str(), m_localPort, 5);
if (0 != ret)
return -1;
// 初始化rtp session
if (0 != InitRtpSession_())
return -1;
// 启动接收线程
m_thread = std::thread(TcpDataThread, this);
}
- 初始化rtp参数
int CGBTcpClientStreamReceiver::InitRtpSession_()
{
const int packSize = 45678;
RTPSessionParams sessionParams;
sessionParams.SetProbationType(RTPSources::NoProbation);
sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);
sessionParams.SetMaximumPacketSize(packSize + 64);
m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);
m_rtpTcpTransmitter->Init(true);
m_rtpTcpTransmitter->Create(65535, 0);
if (0 != Create(sessionParams, m_rtpTcpTransmitter))
return -1;
// 添加客户端socket
if (0 != AddDestination(RTPTCPAddress(m_tcpClient->GetClientSocket())))
return -1;
return 0;
}
- 在线程内等待连接,连接成功后接收数据并回调至应用层处理
同tcp passive收流流程。
rtp解包工具
基于qt+ireader库实现tcp解包(ps封装+264载荷)
解包流程
- 打开文件,创建解码器
void RtpUnpackDlg::StartRtpUnpack()
{
if (m_thread.joinable())
return;
QString filePath = ui.le_filePath->text();
if (filePath.isEmpty())
{
QMessageBox::critical(this, QString::fromLocal8Bit("错误"), QString::fromLocal8Bit("文件路径为空"), QMessageBox::Ok);
return;
}
m_rtpPayloadParam.payload = 100;
m_rtpPayloadParam.encoding = "PS";
m_rtpPayloadParam.frtp = fopen(filePath.toStdString().c_str(), "rb");
if (!m_rtpPayloadParam.frtp)
{
QMessageBox::critical(this, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("打开输入文件失败"), QMessageBox::Ok);
return;
}
m_rtpPayloadParam.fout = fopen(outFilePath.toStdString().c_str(), "wb");
if (!m_rtpPayloadParam.fout)
{
QMessageBox::critical(this, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("打开输出文件失败"), QMessageBox::Ok);
return;
}
struct rtp_payload_t handler;
handler.alloc = RtpPacketAlloc;
handler.free = RtpPacketFree;
handler.packet = RtpDecodePacket;
m_rtpPayloadParam.decoder = rtp_payload_decode_create(100, "PS", &handler, this);
m_thread = std::thread(DataReadThread, this);
}
- 读数据
void RtpUnpackDlg::RtpDataReadWorker()
{
while (m_running)
{
// 先读两个字节的头
unsigned char s2[2];
if (2 != fread(s2, 1, 2, m_rtpPayloadParam.frtp))
break;
m_rtpPayloadParam.size = (s2[0] << 8) | s2[1];
assert(ctx.size < sizeof(ctx.packet));
if (m_rtpPayloadParam.size != (int)fread(m_rtpPayloadParam.packet, 1, m_rtpPayloadParam.size, m_rtpPayloadParam.frtp))
break;
// 塞数据
if (m_rtpPayloadParam.packet[1] < RTCP_FIR || m_rtpPayloadParam.packet[1] > RTCP_LIMIT)
rtp_payload_decode_input(m_rtpPayloadParam.decoder, m_rtpPayloadParam.packet, m_rtpPayloadParam.size);
}
fclose(m_rtpPayloadParam.frtp);
fclose(m_rtpPayloadParam.fout);
}
- 解包
int RtpUnpackDlg::DecodePacket(const void* packet, int bytes, uint32_t timestamp, int flags)
{
static const unsigned char start_code[4] = { 0, 0, 0, 1 };
static unsigned char buffer[2 * 1024 * 1024] = {0, };
size_t size = 0;
if (0 == strcmp("H264", m_rtpPayloadParam.encoding)
|| 0 == strcmp("H265", m_rtpPayloadParam.encoding)
|| 0 == strcmp("PS", m_rtpPayloadParam.encoding))
{
memcpy(buffer, start_code, sizeof(start_code));
size += sizeof(start_code);
}
memcpy(buffer + size, packet, bytes);
size += bytes;
fwrite(buffer, 1, size, m_rtpPayloadParam.fout);
// 新增界面播放功能
if (m_playWidget)
m_playWidget->AddData(CODEC_VIDEO_H264, (void*)buffer, size);
return 0;
}
界面示例
参考:https://github.com/ireader中的demo示例