rtmp协议转websocketflv的去队列积压

websocket server的优点

websocket server的好处:WebSocket 服务器能够实现实时的数据推送,服务器可以主动向客户端发送数据

1 不需要客户端不断轮询。
2 不需要实现httpserver跨域。
在需要修改协议的时候比较灵活,我们发送数据的时候比较方便,因为两边可以随时发送协议, 且做客户端的程序更为方便,websocket协议头部已经定义了包长,使用大部分库可以直接收数据,解决了粘包的问题,所以websocket协议是一个使用比较顺畅的协议

实现websocket server

先用boost 的协程顶一下,主要是需要将http协议升级到websocket, 因此在一个函数里面实现两种server的接收,http协议顺便就接收了,同时在客户端里面存储所有的链接对象,以下是主要实现的握手协议代码,以供参考

bool func_hand_shake(boost::asio::yield_context &yield)
	{
		DEFINE_EC
		asio::streambuf content_;
		size_t length = asio::async_read_until(v_socket, content_, "\r\n\r\n", yield[ec]);
		ERROR_RETURN_FALSE
		asio::streambuf::const_buffers_type bufs = content_.data();
		std::string lines(asio::buffers_begin(bufs), asio::buffers_begin(bufs) + length);
		//std::cout<<lines<<std::endl;
		c_header_map hmap;
		//std::string get;
		int protocol = fetch_head_info(lines, hmap, v_app_stream);
		if (protocol != GET)
			return false;
		cout << "GET:" << v_app_stream << endl; //like this--> live/1001 rtmp server must like this

		auto iter = hmap.find("Upgrade");
		if (iter == hmap.end())
		{
			//it is the http protocol ,not websocket
			//func_hand_http(m, yield);
			size_t ret = boost::asio::async_write(v_socket, boost::asio::buffer(FLV_HTTP_HEADERS,
				FLV_HTTP_HEADERS_LEN), yield[ec]);
			//ERROR_RETURN_FALSE
		
			v_key = hash_add(v_app_stream.c_str(), HASH_PRIME_MIDDLE);
			if (c_hubs::instance()->push(v_key, shared_from_this(), true) !=0)
			{
				//we can not find the stream 
				//return 404 error
				if (ret == -1)
				{
					size_t len_ = sizeof(buffer404) - 1; //remove the '\0' one bytes
					asio::async_write(v_socket, asio::buffer(buffer404, len_), yield[ec]);
					//ERROR_RETURN_FALSE
					//return false;
				}
				return false;
			}
			return true;
		}
		else
		{
			v_iswebsocket = true;
			std::string response, key, encrypted_key;
			//find the get
			//std::string request;
			size_t n = lines.find_first_of('\r');
			//find the Sec-WebSocket-Key
			size_t pos = lines.find("Sec-WebSocket-Key");
			if (pos == lines.npos)
				return false;
			size_t end = lines.find("\r\n", pos);
			key = lines.substr(pos + 19, end - pos - 19) + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
			//get the base64 encode string with sha1
#if 0
			boost::uuids::detail::sha1 sha1;
			sha1.process_bytes(key.c_str(), key.size());
			unsigned int digest[5];
			sha1.get_digest(digest);
#endif
#if 1
			SHA1 sha;
			unsigned int digest[5];
			sha.Reset();
			sha << key.c_str();
			sha.Result(digest);
#endif


			for (int i = 0; i < 5; i++) {
				digest[i] = htonl(digest[i]);
			}

			encrypted_key = base64_encode(reinterpret_cast<const uint8_t*>(&digest[0]), 20);
			//base64_encode(first, encrypted_key);

			/*
			The handshake from the server looks as follows :

			HTTP / 1.1 101 Switching Protocols
			Upgrade : websocket
			Connection : Upgrade
			Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK + xOo =
			Sec-WebSocket-Protocol: chat
			 */
			 //set the response text
			response.append("HTTP/1.1 101 WebSocket Protocol Handshake\r\n");
			response.append("Upgrade: websocket\r\n");
			response.append("Connection: Upgrade\r\n");
			response.append("Sec-WebSocket-Accept: " + encrypted_key + "\r\n\r\n");
			//response.append("Sec-WebSocket-Protocol: chat\r\n");
			//response.append("Sec-WebSocket-Version: 13\r\n\r\n");
			size_t ret = boost::asio::async_write(v_socket, boost::asio::buffer(response), yield[ec]);
			//ERROR_RETURN_FALSE
				//calculate the hash key 
			v_key = hash_add(v_app_stream.c_str(), HASH_PRIME_MIDDLE);

			c_hubs::instance()->push(v_key, shared_from_this(),false);
		}
		return true;
	}

在这里插入图片描述

rtmp协议

这个协议太过出名,实在没什么好说的

实现http协议

实现websocket协议的时候顺带实现,使用map数据结构存储

转发单线程,去除队列

1 数据共享
在发送数据的时候,rtmp 和 httpflv发送 以及 websocket发送使用同一缓存,这样有一个问题,即使我们使用共享的数据结构同时使用同一个内存,也不一定会共享申请内存时多余的头部,以下是数据结构

typedef struct s_memory
{
	//head 
	uint8_t *v_data = NULL;
	//v_data_h =>rtmp use it
	uint8_t *v_data_h = NULL;
	//real data
	uint8_t *v_data_r = NULL;
	//uint8_t *v_data   = NULL;
	size_t   v_len;
	uint32_t v_ts; // timestamp
	en_flv_header v_av_type = en_flv_null;
	void memory_create(uint32_t size, int header = 18)
	{
		//zero copy
		//the last reserve 4 bytes for flv write 4 bytes tail for av data size
		//the header 18 bytes for max rtmp use
		v_data = new uint8_t[size + header +4];
		v_len    = size; //not include the header and tail 
		//we do not know the head where
		//v_data_h = v_data;
		v_data_r = v_data + header;
		
	}

	void memory_create(uint32_t size, int header, int tail)
	{
		v_data = new uint8_t[size + header + tail];
		v_len = size; //not include the header and tail 
		v_data_h = v_data;
		v_data_r = v_data + header;
	}
	~s_memory()
	{
		if (v_data != NULL)
			delete[] v_data;
	}
}s_memory;

这边要做的就是在申请内存时多申请上头部和尾部,这样,使用的时候就可以在数据前面增加不同协议的数据头部。
所以是下面这句话

v_data = new uint8_t[size + header + tail];

读者自行理解就好
在这里插入图片描述
收到数据以后不进行任何的数据拷贝, 在缓冲数据前面加上数据头部,立刻发送出去,上图可以看到,rtmp协议和websocket flv 同时打开,vlc的rtmp协议稍稍会延后一点时间。两路内存占用如下图所示:

在这里插入图片描述
可以看到去除队列积压,内存占用比较小

多个线程需要修改头部的情况

如果使用多个线程,如何在各类协议之间共享数据呢,这是个问题,我们退而求其次,利用tcp 协议的特点,它是可以分开来发送批量数据,下图是使用websocket协议发送flv数据的示例,包括发送tag,taglen,data,datalen, 以及自身websocket发送的头部字节,分了三次发送,head和headlen 是实现websocket的头部而写。

/*
 sock      : need send socket
 data      : flv av data
 datalen   : flv av data len
*/
bool c_flvserver::func_set_head_send(tcp::socket &sock,
	uint8_t* tag, int taglen, uint8_t *data, size_t datalen,
	asio::yield_context &yield)
{
	uint8_t buffer[10];
	uint8_t *head = NULL;// buf;// 0x82;
	int headlen = 0;
	int totallen = taglen + datalen;
	if (totallen <= 65535)
	{
		if (totallen < 126)
		{
			head = &buffer[0] +8;
			//relen += 1;
			*head = 0x82; //0x81:1000 0001 text code ; // 1000 0010 binary code
			*(head + 1) = (uint8_t)totallen;
			headlen = 2;
		}
		else //>=126 <65536
		{
			head = &buffer[0] +6;
			*head = 0x82;
			*(head + 1) = 126;
			*(head + 2) = (uint8_t)((totallen >> 8) & 0xFF);
			*(head + 3) = (uint8_t)(totallen & 0xFF);
			headlen = 4;
		}
	}
	else //>65535
	{
		head = &buffer[0];
		*head = 0x82;
		*(head + 1) = 127;
		*(head + 2) = 0;   //>>56
		*(head + 3) = 0;   //>>48
		*(head + 4) = 0;// >>40
		*(head + 5) = 0; // >> 32;
		*(head + 6) = (uint8_t)(totallen >> 24);
		*(head + 7) = (uint8_t)(totallen >> 16);
		*(head + 8) = (uint8_t)(totallen >> 8);
		*(head + 9) = (uint8_t)(totallen & 0xFF);
		headlen = 10;
	}


	DEFINE_EC
	asio::async_write(sock, asio::buffer(head, headlen), yield[ec]);
	asio::async_write(sock, asio::buffer(tag, taglen), yield[ec]);
	asio::async_write(sock, asio::buffer(data, datalen), yield[ec]);
	//send the data
	//flv_const_buffer bb(frame, framelen,tag,taglen, data, dlen);
	//asio::async_write(sock, bb, yield[ec]);
	return ec? false : true;
}

这样在发送rtmp协议的时候,使用申请内存的多余头部空间,发送flv的时候 previous tag 长度四字节放在尾部,发送http协议的时候和flv类似,不需要发送websocket的头部字节,后面加上各类协议,比如rtsp 的tcp等等,也可以这样做,我们可以拷贝,但也可以不拷贝数据而进行零拷贝,零队列发送。

下面多打开几路观察内存

在这里插入图片描述

如下图所示:和刚才区别不大,小于10路内存都在1兆多以内
在这里插入图片描述

后面需要做的实现

实现更多的具体行业应用层服务和比较标准的协议输出, 将会做客户端发流,客户端收留,服务器对接,服务调用gpu等等,会比较谨慎。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/886052.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux云计算 |【第四阶段】RDBMS1-DAY3

主要内容&#xff1a; 子查询&#xff08;单行单列、多行单列、单行多列、多行多列&#xff09;、分页查询limit、联合查询union、插入语句、修改语句、删除语句 一、子查询 子查询就是指的在一个完整的查询语句之中&#xff0c;嵌套若干个不同功能的小查询&#xff0c;从而一…

安宝特案例 | 某知名日系汽车制造厂,借助AR实现智慧化转型

案例介绍 在全球制造业加速数字化的背景下&#xff0c;工厂的生产管理与设备维护效率愈发重要。 某知名日系汽车制造厂当前面临着设备的实时监控、故障维护&#xff0c;以及跨地域的管理协作等挑战&#xff0c;由于场地分散和突发状况的不可预知性&#xff0c;传统方式已无法…

大模型部署——NVIDIA NIM 和 LangChain 如何彻底改变 AI 集成和性能

DigiOps与人工智能 人工智能已经从一个未来主义的想法变成了改变全球行业的强大力量。人工智能驱动的解决方案正在改变医疗保健、金融、制造和零售等行业的企业运营方式。它们不仅提高了效率和准确性&#xff0c;还增强了决策能力。人工智能的价值不断增长&#xff0c;这从它处…

Html 转为 MarkDown

在 RAG 中,通常需要将 HTML 转为 Markdown,有很多第三方 API 都支持 HTML 的转换,本文使用一个代码文档的例子 https://www.joinquant.com/help/api/help#name:Stock,将聚宽 API 转为 Markdown。本文通过两种方式进行实现,使用收费和开源的解决方案。聚宽 API 格式转为 Ma…

【Linux】几种常见配置文件介绍

配置文件目录 linux 系统中有很多配置文件目录 /etc/systemd/system /lib/systemd/system /usr/lib/systemd/system 【结果就是这个目录配置文件是源头】 这三者有什么样的关系呢&#xff1f; 以下是网络上找的资料汇总&#xff0c;并加了一些操作验证。方便后期使用 介…

鸿蒙NEXT开发环境搭建(基于最新api12稳定版)

注意&#xff1a;博主有个鸿蒙专栏&#xff0c;里面从上到下有关于鸿蒙next的教学文档&#xff0c;大家感兴趣可以学习下 如果大家觉得博主文章写的好的话&#xff0c;可以点下关注&#xff0c;博主会一直更新鸿蒙next相关知识 专栏地址: https://blog.csdn.net/qq_56760790/…

Linux 进程的基本概念及描述

目录 0.前言 1. 什么是进程 1.1 进程的定义与特性 1.2 进程与线程的区别 2.描述进程 2.1 PCB (进程控制块) 2.2 task_struct 3.查看进程 3.1 查看进程信息 3.1.1 /proc 文件系统 3.1.2 ps 命令 3.1.2 top 和 htop 命令 3.2 获取进程标识符 3.2.1使用命令获取PID 3.2.2 使用C语言…

中原台球展,2025郑州台球展会,中国台球产业链发展大会

阳春三月&#xff0c;万物复苏&#xff0c;商机无限&#xff1b;品牌宣传正当季&#xff0c;产品招商正当时&#xff0c;新品发布好时期。抓住台球发展的这波财富机遇&#xff0c;借助壹肆柒郑州台球展这个超级平台&#xff0c;将品牌和产品快速打造成为覆盖全国市场的顶流。20…

数据治理003-数据域

数据仓库是面向主题&#xff08;数据综合、归类并进行分析利用的抽象&#xff09;的应用。 数据仓库模型设计除横向的分层外&#xff0c;通常也需要根据业务情况进行纵向划分数据域。数据域是联系较为紧密的数据主题的集合&#xff0c;通常是根据业务类别、数据来源、数据用途…

InternLM + LlamaIndex RAG 实践

llamaindexInternlm2 RAG实践 参考教程 正式介绍检索增强生成&#xff08;Retrieval Augmented Generation&#xff0c;RAG&#xff09;技术以前&#xff0c;大家不妨想想为什么会出现这样一个技术。 给模型注入新知识的方式&#xff0c;可以简单分为两种方式&#xff0c;一种…

线性代数(持续更新)

一.矩阵及其计算 1.矩阵的概念 矩阵就是一个数表 元素全是0&#xff0c;是零矩阵&#xff0c;用0来表示 当mn时&#xff0c;称为n阶矩阵&#xff08;方阵&#xff09; 只有一行的叫行矩阵&#xff0c;只有一列的叫列矩阵 只有对角线有元素的叫做对角矩阵&#xff0c;用dia…

(Linux驱动学习 - 4).Linux 下 DHT11 温湿度传感器驱动编写

DHT11的通信协议是单总线协议&#xff0c;可以用之前学习的pinctl和gpio子系统完成某IO引脚上数据的读与写。 一.在设备树下添加dht11的设备结点 1.流程图 2.设备树代码 &#xff08;1&#xff09;.在设备树的 iomuxc结点下添加 pinctl_dht11 &#xff08;2&#xff09;.在根…

HuggingChat macOS 版现已发布

Hugging Face 的开源聊天应用程序 Hugging Chat&#xff0c;现已推出适用于 macOS 的版本。 主要特点 Hugging Chat macOS 版本具有以下亮点: 强大的模型支持: 用户可以一键访问多个顶尖的开源大语言模型&#xff0c;包括 Qwen 2.5 72B、Command R、Phi 3.5、Mistral 12B 等等&…

WebRTC入门

主要参考资料&#xff1a; WebRTC 在 ESP32 系列硬件平台上的实现: https://www.bilibili.com/video/BV1AEHseWEda/?spm_id_from333.337.search-card.all.click&vd_sourcedd284033cd0c4d1f3f59a2cd40ae4ef9 火山 RTC豆包大模型&#xff0c;给用户体验装上银色子弹: https:…

【网络安全】Cookie与ID未强绑定导致账户接管

未经许可,不得转载。 文章目录 前言正文前言 DigiLocker 是一项在线服务,旨在为公民提供一个安全的数字平台,用于存储和访问重要的文档,如 Aadhaar 卡、PAN 卡和成绩单等。DigiLocker 通过多因素身份验证(MFA)来保护用户账户安全,通常包括 6 位数的安全 PIN 和一次性密…

【RabbitMQ】面试题

在本篇文章中&#xff0c;主要是介绍RabbitMQ一些常见的面试题。对于前几篇文章的代码&#xff0c;都已经在码云中给出&#xff0c;链接是mq-test: 学习RabbitMQ的一些简单案例 (gitee.com)&#xff0c;如果存在问题的话欢迎各位提出&#xff0c;望共同进步。 MQ的作用以及应用…

sentinel原理源码分析系列(一)-总述

背景 微服务是目前java主流开发架构&#xff0c;微服务架构技术栈有&#xff0c;服务注册中心&#xff0c;网关&#xff0c;熔断限流&#xff0c;服务同学&#xff0c;配置中心等组件&#xff0c;其中&#xff0c;熔断限流主要3个功能特性&#xff0c;限流&#xff0c;熔断&…

《OpenCV》—— 指纹验证

用两张指纹图片中的其中一张对其验证 完整代码 import cv2def cv_show(name, img):cv2.imshow(name, img)cv2.waitKey(0)def verification(src, model):sift cv2.SIFT_create()kp1, des1 sift.detectAndCompute(src, None)kp2, des2 sift.detectAndCompute(model, None)fl…

使用 Llama 3.1 和 Qdrant 构建多语言医疗保健聊天机器人的步骤

长话短说&#xff1a; 准备好深入研究&#xff1a; 矢量存储的复杂性以及如何利用 Qdrant 进行高效数据摄取。掌握 Qdrant 中的集合管理以获得最佳性能。释放上下文感知响应的相似性搜索的潜力。精心设计复杂的 LangChain 工作流程以增强聊天机器人的功能。将革命性的 Llama …

在线代码编辑器

在线代码编辑器 文章说明前台核心代码后台核心代码效果展示源码下载 文章说明 采用Java结合vue3设计实现的在线代码编辑功能&#xff0c;支持在线编辑代码、运行代码&#xff0c;同时支持导入文件&#xff0c;支持图片识别&#xff0c;支持复制代码&#xff0c;可将代码导出为图…