前言
GB/T28181-2022实时流的传输方式介绍:https://blog.csdn.net/www_dong/article/details/134255185
基于jrtplib实现tcp被动和主动收流介绍:https://blog.csdn.net/www_dong/article/details/134451387
本文主要介绍下级平台或设备发流功能,用于对接特定的SIP服务器或上级平台。
UDP发流
流程图
发送端流程:
- 初始化rtp参数;
- 裸流数据做PS复用;
- 组RTP包发送;
设计
- 初始化rtp参数
int CUdp::InitRtp_()
{
RTPSessionParams sessionParams;
sessionParams.SetMinimumRTCPTransmissionInterval(10);
sessionParams.SetOwnTimestampUnit(1.0 / 90000.0);
sessionParams.SetAcceptOwnPackets(true);
sessionParams.SetMaximumPacketSize(1450);
RTPUDPv4TransmissionParams transParams;
transParams.SetRTPSendBuffer(2*1024*1024);
transParams.SetBindIP(m_ip);
transParams.SetPortbase((uint16_t)m_port);
if (0 != Create(sessionParams, &transParams))
{
return -1;
}
SetDefaultPayloadType((uint8_t)m_payload);
SetDefaultTimestampIncrement(3600);
SetDefaultMark(true);
RTPIPv4Address addr(ntohl(inet_addr(m_ip), (uint16_t)m_port);
if(0 != AddDestination(addr))
{
return -1;
}
return 0;
}
- 流数据复用为PS
// 使用ireader开源库进行ps复用
// 初始化
CData2PS::CData2PS()
{
struct ps_muxer_func_t func;
func.alloc = Alloc;
func.free = Free;
func.write = Packet;
m_ps = ps_muxer_create(&func, this);
// TODO codecid待补充
m_ps_stream = ps_muxer_add_stream(m_ps, PSI_STREAM_H264, nullptr, 0);
}
// 塞数据
int CData2PS::InputData(void* data, int len)
{
if (!m_ps)
return -1;
uint64_t clock = time64_now();
if (0 == m_ps_clock)
m_ps_clock = clock;
return ps_muxer_input(m_ps, m_ps_stream, 0, (clock - m_ps_clock) * 90, (clock - m_ps_clock) * 90, data, len);
}
- 发送rtp包
// 调用jrtplib中SendPacket(data, len);接口发送数据
// 以下为SendPacket部分源码
// 主要流程:
// 1. 构建packet
// 2. 发送rtp数据
int RTPSession::SendPacket(const void *data,size_t len,
uint8_t pt,bool mark,uint32_t timestampinc)
{
int status;
if (!created)
return ERR_RTP_SESSION_NOTCREATED;
BUILDER_LOCK
if ((status = packetbuilder.BuildPacket(data,len,pt,mark,timestampinc)) < 0)
{
BUILDER_UNLOCK
return status;
}
if ((status = SendRTPData(packetbuilder.GetPacket(),packetbuilder.GetPacketLength())) < 0)
{
BUILDER_UNLOCK
return status;
}
BUILDER_UNLOCK
SOURCES_LOCK
sources.SentRTPPacket();
SOURCES_UNLOCK
PACKSENT_LOCK
sentpackets = true;
PACKSENT_UNLOCK
return 0;
}
// 构建包
int RTPPacketBuilder::PrivateBuildPacket(const void *data,size_t len,
uint8_t pt,bool mark,uint32_t timestampinc,bool gotextension,
uint16_t hdrextID,const void *hdrextdata,size_t numhdrextwords)
{
RTPPacket p(pt,data,len,seqnr,timestamp,ssrc,mark,numcsrcs,csrcs,gotextension,hdrextID,
(uint16_t)numhdrextwords,hdrextdata,buffer,maxpacksize,GetMemoryManager());
int status = p.GetCreationError();
if (status < 0)
return status;
packetlength = p.GetPacketLength();
if (numpackets == 0) // first packet
{
lastwallclocktime = RTPTime::CurrentTime();
lastrtptimestamp = timestamp;
prevrtptimestamp = timestamp;
}
else if (timestamp != prevrtptimestamp)
{
lastwallclocktime = RTPTime::CurrentTime();
lastrtptimestamp = timestamp;
prevrtptimestamp = timestamp;
}
numpayloadbytes += (uint32_t)p.GetPayloadLength();
numpackets++;
timestamp += timestampinc;
seqnr++;
return 0;
}
// 发送包
int RTPSession::SendRTPData(const void *data, size_t len)
{
if (!m_changeOutgoingData)
return rtptrans->SendRTPData(data, len);
void *pSendData = 0;
size_t sendLen = 0;
int status = 0;
status = OnChangeRTPOrRTCPData(data, len, true, &pSendData, &sendLen);
if (status < 0)
return status;
if (pSendData)
{
status = rtptrans->SendRTPData(pSendData, sendLen);
OnSentRTPOrRTCPData(pSendData, sendLen, true);
}
return status;
}
// 底层实现
int RTPUDPv4Transmitter::SendRTPData(const void *data,size_t len)
{
if (!init)
return ERR_RTP_UDPV4TRANS_NOTINIT;
MAINMUTEX_LOCK
if (!created)
{
MAINMUTEX_UNLOCK
return ERR_RTP_UDPV4TRANS_NOTCREATED;
}
if (len > maxpacksize)
{
MAINMUTEX_UNLOCK
return ERR_RTP_UDPV4TRANS_SPECIFIEDSIZETOOBIG;
}
destinations.GotoFirstElement();
while (destinations.HasCurrentElement())
{
// 调用sendto函数实现udp包的发送
sendto(rtpsock,(const char *)data,len,0,(const struct sockaddr *)destinations.GetCurrentElement().GetRTPSockAddr(),sizeof(struct sockaddr_in));
destinations.GotoNextElement();
}
MAINMUTEX_UNLOCK
return 0;
}
tcp passive发流
流程图
发送端流程:
- 上级平台或sip服务器以主动方式连接,对于下级平台或者设备(数据发送端)为被动方式;
- 下级平台或者设备(数据发送端)启动端口监听;
- 接收上级平台或sip服务器tcp连接请求;
- 向上级平台或sip服务器发送流数据;
设计
- 创建socket、bind、listen,启动数据接收线程;
// TcpServer为封装的socket类
int CGBTcpServer::Start()
{
if (0 != m_localPort || m_tcpServer.get())
return 0;
int ret = -1;
do
{
m_tcpServer = std::make_shared<TcpServer>(nullptr, this);
if (!m_tcpServer.get())
break;
ret = m_tcpServer->TcpCreate();
if (0 != ret)
break;
ret = m_tcpServer->TcpBind(m_localPort);
if (0 != ret)
break;
ret = m_tcpServer->TcpListen(5);
if (0 != ret)
break;
m_thread = std::thread(TCPData2PSThread, this);
return 0;
} while (0);
Stop();
return ret;
}
- 在线程内等待连接,连接成功后接收数据并回调至应用层处理
void CGBTcpServer::TCPData2PSWorker()
{
if (!m_pspacker)
m_pspacker = new(std::nothrow) CData2PS(PSTCPDataCB, this);
bool bAccept = false;
while (m_running)
{
if (!bAccept)
{
if (0 == m_tcpServer->TcpAccept())
{
bAccept = true;
if (0 != InitRtp_())
{
break;
}
}
continue;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
- 初始化rtp参数
int CGBTcpServer::InitRtp_()
{
const int packetSize = 45678;
RTPSessionParams sessionparams;
sessionparams.SetProbationType(RTPSources::NoProbation);
sessionparams.SetOwnTimestampUnit(1.0 / packetSize);
sessionparams.SetMaximumPacketSize(packetSize + 64);
m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);
m_rtpTcpTransmitter->Init(true);
m_rtpTcpTransmitter->Create(65535, 0);
int status = Create(sessionparams, m_rtpTcpTransmitter);
if (status < 0)
{
return status;
}
status = AddDestination(RTPTCPAddress(m_tcpServer->GetClientSocket()));
if (0 != status)
return status;
SetDefaultPayloadType(96);
SetDefaultMark(false);
SetDefaultTimestampIncrement(160);
return 0;
}
- 将数据复用为PS;
- tcp方式发包
// 调用jrtplib中SendPacket(data, len);接口发送数据
// 以下为tcp方式SendPacket部分源码
int RTPTCPTransmitter::SendRTPData(const void *data,size_t len)
{
return SendRTPRTCPData(data, len);
}
int RTPTCPTransmitter::SendRTPRTCPData(const void *data, size_t len)
{
if (!m_init)
return ERR_RTP_TCPTRANS_NOTINIT;
MAINMUTEX_LOCK
if (!m_created)
{
MAINMUTEX_UNLOCK
return ERR_RTP_TCPTRANS_NOTCREATED;
}
// #define RTPTCPTRANS_MAXPACKSIZE 65535
if (len > RTPTCPTRANS_MAXPACKSIZE)
{
MAINMUTEX_UNLOCK
return ERR_RTP_TCPTRANS_SPECIFIEDSIZETOOBIG;
}
std::map<SocketType, SocketData>::iterator it = m_destSockets.begin();
std::map<SocketType, SocketData>::iterator end = m_destSockets.end();
vector<SocketType> errSockets;
int flags = 0;
#ifdef RTP_HAVE_MSG_NOSIGNAL
flags = MSG_NOSIGNAL;
#endif // RTP_HAVE_MSG_NOSIGNAL
while (it != end)
{
uint8_t lengthBytes[2] = { (uint8_t)((len >> 8)&0xff), (uint8_t)(len&0xff) };
SocketType sock = it->first;
// 调用send接口发送数据
// 1. 先发送2字节头(固定格式)
// 2. 再发送数据
if (send(sock,(const char *)lengthBytes,2,flags) < 0 ||
send(sock,(const char *)data,len,flags) < 0)
errSockets.push_back(sock);
++it;
}
MAINMUTEX_UNLOCK
if (errSockets.size() != 0)
{
for (size_t i = 0 ; i < errSockets.size() ; i++)
OnSendError(errSockets[i]);
}
// Don't return an error code to avoid the poll thread exiting
// due to one closed connection for example
return 0;
}
tcp active发流
流程图
发送端流程:
- 上级平台或sip服务器启动tcp监听连接,对于下级平台或者设备(数据发送端)为主动方式;
- 下级平台或者设备(数据发送端)发起tcp连接;
- 接收上级平台或sip服务器tcp响应;
- 向上级平台或sip服务器发送流数据;
设计
- 创建socket、connect、初始化rtp,启动数据接收线程
// TcpClient为封装的客户端socket类
int CGBTcpClient::Start()
{
if (0 != m_localPort || m_tcpClient.get())
return 0;
int ret = -1;
do
{
m_tcpClient = std::make_shared<TcpClient>(nullptr, this);
if (!m_tcpClient.get() || 0 != m_tcpClient->TcpCreate())
break;
ret = m_tcpClient->TcpConnectByTime(m_localIP.c_str(), m_localPort, 5);
if (0 != ret)
break;
ret = InitRtp_();
if (0 != ret)
break;
m_thread = std::thread(RTPPackerThread, this);
return 0;
} while (0);
Stop();
return ret;
}
- 初始化rtp参数
int CGBTcpClient::InitRtp_()
{
const int packSize = 45678;
RTPSessionParams sessionParams;
sessionParams.SetProbationType(RTPSources::NoProbation);
sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);
sessionParams.SetMaximumPacketSize(packSize + 64);
m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);
m_rtpTcpTransmitter->Init(true);
m_rtpTcpTransmitter->Create(65535, 0);
if (0 != Create(sessionParams, m_rtpTcpTransmitter))
return -1;
if (0 != AddDestination(RTPTCPAddress(m_tcpClient->GetClientSocket())))
return -1;
return 0;
}
- 视音频数据复用为PS
- 发送数据,同tcp passive发流