GB28181学习(十七)——基于jrtplib实现tcp被动和主动发流

前言

GB/T28181-2022实时流的传输方式介绍:https://blog.csdn.net/www_dong/article/details/134255185

基于jrtplib实现tcp被动和主动收流介绍:https://blog.csdn.net/www_dong/article/details/134451387

本文主要介绍下级平台或设备发流功能,用于对接特定的SIP服务器或上级平台。

UDP发流

流程图

在这里插入图片描述

发送端流程

  • 初始化rtp参数;
  • 裸流数据做PS复用;
  • 组RTP包发送;

设计

  1. 初始化rtp参数
int CUdp::InitRtp_()
{
	RTPSessionParams sessionParams;
	sessionParams.SetMinimumRTCPTransmissionInterval(10);
	sessionParams.SetOwnTimestampUnit(1.0 / 90000.0);
	sessionParams.SetAcceptOwnPackets(true);
	sessionParams.SetMaximumPacketSize(1450);

	RTPUDPv4TransmissionParams transParams;
	transParams.SetRTPSendBuffer(2*1024*1024);
	transParams.SetBindIP(m_ip);
	transParams.SetPortbase((uint16_t)m_port);

	if (0 != Create(sessionParams, &transParams))
	{
		return -1;
	}

	SetDefaultPayloadType((uint8_t)m_payload);
	SetDefaultTimestampIncrement(3600);
	SetDefaultMark(true);

	RTPIPv4Address addr(ntohl(inet_addr(m_ip), (uint16_t)m_port);
	if(0 != AddDestination(addr))
    {
		return -1;
	}

	return 0;
}
  1. 流数据复用为PS
// 使用ireader开源库进行ps复用
// 初始化
CData2PS::CData2PS()
{
	struct ps_muxer_func_t func;
	func.alloc = Alloc;
	func.free = Free;
	func.write = Packet;
	m_ps = ps_muxer_create(&func, this);
	// TODO codecid待补充
	m_ps_stream = ps_muxer_add_stream(m_ps, PSI_STREAM_H264, nullptr, 0);
}


// 塞数据
int CData2PS::InputData(void* data, int len)
{
	if (!m_ps)
		return -1;

	uint64_t clock = time64_now();
	if (0 == m_ps_clock)
		m_ps_clock = clock;

	return ps_muxer_input(m_ps, m_ps_stream, 0, (clock - m_ps_clock) * 90, (clock - m_ps_clock) * 90, data, len);
}
  1. 发送rtp包
// 调用jrtplib中SendPacket(data, len);接口发送数据

// 以下为SendPacket部分源码
// 主要流程:
// 1. 构建packet
// 2. 发送rtp数据
int RTPSession::SendPacket(const void *data,size_t len,
                uint8_t pt,bool mark,uint32_t timestampinc)
{
	int status;

	if (!created)
		return ERR_RTP_SESSION_NOTCREATED;
	
	BUILDER_LOCK
	if ((status = packetbuilder.BuildPacket(data,len,pt,mark,timestampinc)) < 0)
	{
		BUILDER_UNLOCK
		return status;
	}
	if ((status = SendRTPData(packetbuilder.GetPacket(),packetbuilder.GetPacketLength())) < 0)
	{
		BUILDER_UNLOCK
		return status;
	}
	BUILDER_UNLOCK
	
	SOURCES_LOCK
	sources.SentRTPPacket();
	SOURCES_UNLOCK
	PACKSENT_LOCK
	sentpackets = true;
	PACKSENT_UNLOCK
	return 0;
}

// 构建包
int RTPPacketBuilder::PrivateBuildPacket(const void *data,size_t len,
	                  uint8_t pt,bool mark,uint32_t timestampinc,bool gotextension,
	                  uint16_t hdrextID,const void *hdrextdata,size_t numhdrextwords)
{
	RTPPacket p(pt,data,len,seqnr,timestamp,ssrc,mark,numcsrcs,csrcs,gotextension,hdrextID,
	            (uint16_t)numhdrextwords,hdrextdata,buffer,maxpacksize,GetMemoryManager());
	int status = p.GetCreationError();

	if (status < 0)
		return status;
	packetlength = p.GetPacketLength();

	if (numpackets == 0) // first packet
	{
		lastwallclocktime = RTPTime::CurrentTime();
		lastrtptimestamp = timestamp;
		prevrtptimestamp = timestamp;
	}
	else if (timestamp != prevrtptimestamp)
	{
		lastwallclocktime = RTPTime::CurrentTime();
		lastrtptimestamp = timestamp;
		prevrtptimestamp = timestamp;
	}
	
	numpayloadbytes += (uint32_t)p.GetPayloadLength();
	numpackets++;
	timestamp += timestampinc;
	seqnr++;

	return 0;
}

// 发送包
int RTPSession::SendRTPData(const void *data, size_t len)
{
	if (!m_changeOutgoingData)
		return rtptrans->SendRTPData(data, len);

	void *pSendData = 0;
	size_t sendLen = 0;
	int status = 0;

	status = OnChangeRTPOrRTCPData(data, len, true, &pSendData, &sendLen);
	if (status < 0)
		return status;

	if (pSendData)
	{
		status = rtptrans->SendRTPData(pSendData, sendLen);
		OnSentRTPOrRTCPData(pSendData, sendLen, true);
	}

	return status;
}

// 底层实现
int RTPUDPv4Transmitter::SendRTPData(const void *data,size_t len)	
{
	if (!init)
		return ERR_RTP_UDPV4TRANS_NOTINIT;

	MAINMUTEX_LOCK
	
	if (!created)
	{
		MAINMUTEX_UNLOCK
		return ERR_RTP_UDPV4TRANS_NOTCREATED;
	}
	if (len > maxpacksize)
	{
		MAINMUTEX_UNLOCK
		return ERR_RTP_UDPV4TRANS_SPECIFIEDSIZETOOBIG;
	}
	
	destinations.GotoFirstElement();
	while (destinations.HasCurrentElement())
	{
        // 调用sendto函数实现udp包的发送
		sendto(rtpsock,(const char *)data,len,0,(const struct sockaddr *)destinations.GetCurrentElement().GetRTPSockAddr(),sizeof(struct sockaddr_in));
		destinations.GotoNextElement();
	}
	
	MAINMUTEX_UNLOCK
	return 0;
}

tcp passive发流

流程图

在这里插入图片描述

发送端流程:

  • 上级平台或sip服务器以主动方式连接,对于下级平台或者设备(数据发送端)为被动方式;
  • 下级平台或者设备(数据发送端)启动端口监听;
  • 接收上级平台或sip服务器tcp连接请求;
  • 向上级平台或sip服务器发送流数据;

设计

  1. 创建socket、bind、listen,启动数据接收线程;
// TcpServer为封装的socket类

int CGBTcpServer::Start()
{
	if (0 != m_localPort || m_tcpServer.get())
		return 0;

	int ret = -1;
	do 
	{
		m_tcpServer = std::make_shared<TcpServer>(nullptr, this);
		if (!m_tcpServer.get())
			break;

		ret = m_tcpServer->TcpCreate();
		if (0 != ret)
			break;

		ret = m_tcpServer->TcpBind(m_localPort);
		if (0 != ret)
			break;

		ret = m_tcpServer->TcpListen(5);
		if (0 != ret)
			break;

		m_thread = std::thread(TCPData2PSThread, this);
		return 0;
	} while (0);

	Stop();
	return ret;
}
  1. 在线程内等待连接,连接成功后接收数据并回调至应用层处理
void CGBTcpServer::TCPData2PSWorker()
{
	if (!m_pspacker)
		m_pspacker = new(std::nothrow) CData2PS(PSTCPDataCB, this);

	bool bAccept = false;
	while (m_running)
	{
		if (!bAccept)
		{
			if (0 == m_tcpServer->TcpAccept())
			{
				bAccept = true;
				if (0 != InitRtp_())
				{
					break;
				}
			}

			continue;
		}

		std::this_thread::sleep_for(std::chrono::seconds(1));
	}
}
  1. 初始化rtp参数
int CGBTcpServer::InitRtp_()
{
	const int packetSize = 45678;
	RTPSessionParams sessionparams;
	sessionparams.SetProbationType(RTPSources::NoProbation);
	sessionparams.SetOwnTimestampUnit(1.0 / packetSize);
	sessionparams.SetMaximumPacketSize(packetSize + 64);

	m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);
	m_rtpTcpTransmitter->Init(true);
	m_rtpTcpTransmitter->Create(65535, 0);

	int status = Create(sessionparams, m_rtpTcpTransmitter);
	if (status < 0)
	{
		return status;
	}

	status = AddDestination(RTPTCPAddress(m_tcpServer->GetClientSocket()));
	if (0 != status)
		return status;

	SetDefaultPayloadType(96);
	SetDefaultMark(false);
	SetDefaultTimestampIncrement(160);
	return 0;
}
  1. 将数据复用为PS;
  2. tcp方式发包
// 调用jrtplib中SendPacket(data, len);接口发送数据

// 以下为tcp方式SendPacket部分源码
int RTPTCPTransmitter::SendRTPData(const void *data,size_t len)	
{
	return SendRTPRTCPData(data, len);
}

int RTPTCPTransmitter::SendRTPRTCPData(const void *data, size_t len)
{
	if (!m_init)
		return ERR_RTP_TCPTRANS_NOTINIT;

	MAINMUTEX_LOCK
	
	if (!m_created)
	{
		MAINMUTEX_UNLOCK
		return ERR_RTP_TCPTRANS_NOTCREATED;
	}
    
    // #define RTPTCPTRANS_MAXPACKSIZE							65535
	if (len > RTPTCPTRANS_MAXPACKSIZE)
	{
		MAINMUTEX_UNLOCK
		return ERR_RTP_TCPTRANS_SPECIFIEDSIZETOOBIG;
	}
	
	std::map<SocketType, SocketData>::iterator it = m_destSockets.begin();
	std::map<SocketType, SocketData>::iterator end = m_destSockets.end();

	vector<SocketType> errSockets;
	int flags = 0;
#ifdef RTP_HAVE_MSG_NOSIGNAL
	flags = MSG_NOSIGNAL;
#endif // RTP_HAVE_MSG_NOSIGNAL

	while (it != end)
	{
		uint8_t lengthBytes[2] = { (uint8_t)((len >> 8)&0xff), (uint8_t)(len&0xff) };
		SocketType sock = it->first;

        // 调用send接口发送数据
        // 1. 先发送2字节头(固定格式)
        // 2. 再发送数据
		if (send(sock,(const char *)lengthBytes,2,flags) < 0 ||
			send(sock,(const char *)data,len,flags) < 0)
			errSockets.push_back(sock);
		++it;
	}
	
	MAINMUTEX_UNLOCK

	if (errSockets.size() != 0)
	{
		for (size_t i = 0 ; i < errSockets.size() ; i++)
			OnSendError(errSockets[i]);
	}

	// Don't return an error code to avoid the poll thread exiting
	// due to one closed connection for example

	return 0;
}

tcp active发流

流程图

在这里插入图片描述

发送端流程:

  • 上级平台或sip服务器启动tcp监听连接,对于下级平台或者设备(数据发送端)为主动方式;
  • 下级平台或者设备(数据发送端)发起tcp连接;
  • 接收上级平台或sip服务器tcp响应;
  • 向上级平台或sip服务器发送流数据;

设计

  1. 创建socket、connect、初始化rtp,启动数据接收线程
// TcpClient为封装的客户端socket类

int CGBTcpClient::Start()
{
	if (0 != m_localPort || m_tcpClient.get())
		return 0;

	int ret = -1;
	do
	{
		m_tcpClient = std::make_shared<TcpClient>(nullptr, this);
		if (!m_tcpClient.get() || 0 != m_tcpClient->TcpCreate())
			break;

		ret = m_tcpClient->TcpConnectByTime(m_localIP.c_str(), m_localPort, 5);
		if (0 != ret)
			break;

		ret = InitRtp_();
		if (0 != ret)
			break;

		m_thread = std::thread(RTPPackerThread, this);
		return 0;
	} while (0);

	Stop();
	return ret;
}
  1. 初始化rtp参数
int CGBTcpClient::InitRtp_()
{
	const int packSize = 45678;
	RTPSessionParams sessionParams;
	sessionParams.SetProbationType(RTPSources::NoProbation);
	sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);
	sessionParams.SetMaximumPacketSize(packSize + 64);

	m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);
	m_rtpTcpTransmitter->Init(true);
	m_rtpTcpTransmitter->Create(65535, 0);

	if (0 != Create(sessionParams, m_rtpTcpTransmitter))
		return -1;

	if (0 != AddDestination(RTPTCPAddress(m_tcpClient->GetClientSocket())))
		return -1;

	return 0;
}
  1. 视音频数据复用为PS
  2. 发送数据,同tcp passive发流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/177623.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

做自动驾驶的同学看过来:场景理解、辅助功能、导航、寻路、避障数据集

SANPO&#xff1a;第一个具有大规模密集全景分割和深度注释的人类以自我中心的视频数据集&#xff0c;有助于推动视频分割、深度估计、多任务视觉建模和合成到真实域适应任务发展&#xff0c;同时支持人类导航系统&#xff0c; SANPO&#xff1a;一个大规模的以自我为中心的视…

【20年扬大真题】试写一算法在带头结点的单链表结构上实现线性表操作LENGTH(L)

【20年扬大真题】 试写一算法在带头结点的单链表结构上实现线性表操作LENGTH&#xff08;L&#xff09;。 #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> #include<stdbool.h> #include<malloc.h> //单链表定义 //链表结点 int A[10] { 1,2,3,4,5,6,…

windows系统玩游戏找不到d3dx9_35.dll缺失的解决方法

分享一个我们在打开游戏或许软件过程中遇到的问题——“由于找不到d3dx9_35.dll,无法继续执行代码”的五个修复方案。这个问题可能会影响到我们的工作和娱乐效率&#xff0c;甚至可能导致工作的延期。因此&#xff0c;我希望通过今天的文章&#xff0c;能够帮助大家更好地解决这…

详解StringBuilder和StringBuffer(区别,使用方法,含源码讲解)

目录 一.为什么要使用StringBuilder和StringBuffer 字符串的不可变性 性能损耗 二.StringBuilder和StringBuffer StringBuffer源码讲解 使用方式 三.常用方法总结 示例&#xff1a; 四.StringBuilder和StringBuffer的区别 一.为什么要使用StringBuilder和StringBuffe…

C++多线程学习(二):多线程通信和锁

参考引用 C11 14 17 20 多线程从原理到线程池实战代码运行环境&#xff1a;Visual Studio 2019 1. 多线程状态 1.1 线程状态说明 初始化 (lnit)&#xff1a;该线程正在被创建就绪 (Ready)&#xff1a;该线程在就绪列表中&#xff0c;等待 CPU 调度运行 (Running)&#xff1a;…

实验7设计建模工具的使用(三)

二&#xff0c;实验内容与步骤 1. 百度搜索1-2张状态图&#xff0c;请重新绘制它们&#xff0c;并回答以下问题&#xff1a; 1&#xff09;有哪些状态&#xff1b; 2&#xff09;简要描述该图所表达的含义&#xff1b; 要求&#xff1a;所绘制的图不得与本文中其它习题一样…

电磁优化的并行空间映射方法

空间映射(SM)是一种公认的加速电磁优化的方法。现有的SM方法大多基于顺序计算机制。本文提出了一种用于电磁优化的并行SM方法。在该方法中&#xff0c;每次迭代开发的代理模型被训练以同时匹配多个点的精细模型。多点训练和SM使代理模型在比标准SM更大的邻域内有效。本文提出的…

五大资源之Service(可以固定IP)

Service可以看作是一组同类Pod对外访问接口,借助Service应用可以方便的实现服务发现与负载均衡 创建集群内部可以访问Service #暴露Service(也创建在了namespace dev下) [root@master ~]# kubectl expose deployment(pod控制器) nginx --name=svc-nginx1 --type=Cluste…

MySQL数据库入门到大牛_基础_12_MySQL数据类型精讲

文章目录 1. MySQL中的数据类型2. 整数类型2.1 类型介绍2.2 可选属性2.2.1 M2.2.2 UNSIGNED2.2.3 ZEROFILL 2.3 适用场景2.4 如何选择&#xff1f; 3. 浮点类型3.1 类型介绍3.2 数据精度说明3.3 精度误差说明 4. 定点数类型4.1 类型介绍4.2 开发中经验 5. 位类型&#xff1a;BI…

PyTorch 之 Dataset 类入门学习

PyTorch 之 Dataset 类入门学习 Dataset 类简介 PyTorch 中的 Dataset 类是一个抽象类&#xff0c;用来表示数据集。通过继承 Dataset 类可以进行自定义数据集的格式、大小和其它属性&#xff0c;供后续使用&#xff1b; 可以看到官方封装好的数据集也是直接或间接的继承自 …

《微信小程序案例大全》大学生期末大作业可以直接使用!!

前言 在大学生活中&#xff0c;期末大作业是锻炼和展示自己所学知识的重要时刻。微信小程序作为一种快速、便捷的应用开发方式&#xff0c;成为了大学生开发实践的热门选择。本文将为大家推荐一系列可以直接使用的微信小程序案例&#xff0c;包括仿真社交、图书管理、学习工具…

智慧城市内涝积水监测仪功能,提升城市预防功能

内涝积水监测仪不仅改变了人们应对城市内涝的老办法&#xff0c;还让智慧城市往前迈了一大步。这个监测仪是怎么做到的呢&#xff1f;就是靠它精准的数据监测和预警&#xff0c;让城市管理有了更科学高效的解决妙招。它就像有了个聪明又负责任的助手&#xff0c;让城市管理更加…

排序算法-----快速排序(非递归实现)

目录 前言 快速排序 基本思路 非递归代码实现 前言 很久没跟新数据结构与算法这一栏了&#xff0c;因为数据结构与算法基本上都发布完了&#xff0c;哈哈&#xff0c;那今天我就把前面排序算法那一块的快速排序完善一下&#xff0c;前面只发布了快速排序递归算法&#xff0c;…

不到十个例题带你拿下c++双指针算法(leetcode)

移动零问题 https://leetcode.cn/problems/move-zeroes/submissions/ 1.题目解析 必须在原数组进行修改&#xff0c;不可以新建一个数组 非零元素相对顺序不变 2.算法原理 【数组划分】【数组分块】 这一类题会给我们一个数组&#xff0c;让我们划分区间&#xff0c;比如…

C++虚析构和纯虚析构解决delete堆区父类指针无法调用子类的构造函数

#include<iostream> #include<string>using namespace std;//虚析构和纯虚析构 class Animal { public:Animal(){cout<<"执行Animal的构造函数"<<endl;}~Animal(){cout<<"执行Animal的析构函数"<<endl;}virtual void …

对接苹果支付退款退单接口

前言 一般而言&#xff0c;我们其实很少对接退款接口&#xff0c;因为退款基本都是商家自己决定后进行操作的&#xff0c;但是苹果比较特殊&#xff0c;用户可以直接向苹果发起退款请求&#xff0c;苹果觉得合理会退给用户&#xff0c;但是目前公司业务还是需要对接这个接口&am…

蓝桥杯每日一题2023.11.22

题目描述 题目分析 由题目知其每个品牌积分一定小于315故直接暴力枚举每个品牌如果符合要求直接输出即可 &#xff08;答案&#xff1a;150&#xff09; #include<bits/stdc.h> using namespace std; int main() {for(int i 1; i < 315; i ){for(int j 1; j <…

【无标题】dp80采集机和机器人通信相关框架总结

采血机器人通信解析相关框架总结: 类似于dp80,将整个过程进行了分解如下: 类似于dp80,将整个过程进行了分解如下: 上位机界面在进行点击操作的时候,先是通信协议的解析,解析后改变采血的控制状态如下: Dp80主要框架解析࿱

层次分析法--可以帮助你做决策的简单算法

作用 层次分析法是一个多指标的评价算法&#xff0c;主要用来在做决策时&#xff0c;给目标的多个影响因子做权重评分。特别是那些需要主观决策的、或者需要用经验判断的决策方案&#xff0c;例如&#xff1a; 买房子&#xff08;主观决策&#xff09;选择旅游地&#xff08;…

RabbitMQ快速入门(简单收发消息)

文章目录 前言一、数据隔离1.用户管理2.virtual host 二、控制台收发1.交换机2.队列3.绑定 三、编程式收发1.依赖和配置2.收发信息 总结 前言 1.了解数据隔离 2.RabbitMQ控制台收发信息 3.SpringBoot整合RabbitMQ收发信息 一、数据隔离 1.用户管理 点击Admin选项卡&#xff0…