总结了一下中继引擎(can中继器,TCP总机器)开发实际经验

多路数据进行中继的研究

1.数据中继的概念

数据中继是一种数据传输技术,用于在两个通信设备之间提供数字信号的传输。它利用数字信道传输数据信号,可以提供永久性和半永久性连接的数字数据传输信道。

数据中继的主要作用是提高通信质量和可靠性,同时实现多路复用,即在同一个物理链路上传输多个信号。

在数字通信网络中,数据中继可以用于计算机之间的通信,传送数字化传真、数字话音、数字图像信号或其它数字化信号等。

 简单来说:中继的核心就是数据传输,比如传输简单的基础数据、话音、传真、图像信息等;

最简单就是1到2和2到1的数据交互,如下模型,就是左右之间数据的交互。

         

 

2.中继扩展

 在简单的1<>2工作模型,扩展开来,比如3<>4, 5<>6...

        等成千上万,上百万个job处理时。就是中继引擎

​                             

 如上,左边的方式,适合于生命周期短的方案(倾向于通道互斥);

右边的方式时适合于生命周期长的方案(倾向于通道的信息共享);

3.中继方案1

实现步骤1

最简单就是1到2和2到1的数据交互的模型实现

拆分读写:

        

可以用epoll/select/poll 模型都可以;

                举例如下:

​
#include<stdio.h>
#include<stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include<errno.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/epoll.h>
#define     TTY1       "/dev/tty11"
#define     TTY2       "/dev/tty12"
#define     BUFSIZE     1024
enum {
	//几种状态
	STATE_R,
	STATE_W,
	STATE_AUTO,
	STATE_EX,
	STATE_T
};
struct fsm_st 
{
	int state;//记录状态
	int sfd;//源文件
	int dfd;//目的文件
	char buf[BUFSIZE];//中间缓冲区
	int len;//读到的长度
	int pos;//写的过程如果一次没有写完,记录上次写的位置
	char* err;//错误信息
};
static void fsm_driver(struct fsm_st* fsm) 
{
	int ret;
	switch (fsm->state) 
	{
		case STATE_R:
			fsm->len = read(fsm->sfd, fsm->buf, BUFSIZE);
			if (fsm->len == 0)
				fsm->state = STATE_T;
			else if (fsm->len < 0) {
				if (errno == EAGAIN)
					fsm->state = STATE_R;
				else {
					fsm->err = "read()";
					fsm->state = STATE_EX;
				}
			}
			else {
				fsm->pos = 0;
				fsm->state = STATE_W;
			}
			break;
		case STATE_W:
			ret = write(fsm->dfd, fsm->buf + fsm->pos, BUFSIZE);
			if (ret < 0) {
				if (errno == EAGAIN)
					fsm->state = STATE_W;
				else {
					fsm->err = "write()";
					fsm->state = STATE_EX;
				}
			}
			else {
				fsm->pos += ret;
				fsm->len -= ret;
				if (fsm->len == 0)
					fsm->state = STATE_R;//写完了再去读
				else
					fsm->state = STATE_W;//没写完继续写
			}
			break;
		case STATE_EX:
			perror(fsm->err);
			fsm->state = STATE_T;
			break;
		case STATE_T:
			/*  do smoething*/
			break;
		default:
			abort();
			break;
	}
}
static int max(int a, int b) 
{
	return a > b ? a : b;
}
static void relay(int fd1, int fd2) 
{
	struct fsm_st fsm12, fsm21;
	int epfd;
	struct epoll_event ev;
	int fd1_save = fcntl(fd1, F_GETFL);
	fcntl(fd1, F_SETFL, fd1_save | O_NONBLOCK);		//非阻塞	打开
	int fd2_save = fcntl(fd2, F_GETFL);
	fcntl(fd2, F_SETFL, fd2_save | O_NONBLOCK);		//非阻塞	打开
	//初始状态
	fsm12.state = STATE_R;
	fsm12.sfd = fd1;
	fsm12.dfd = fd2;
	
	fsm21.state = STATE_R;
	fsm21.sfd = fd2;
	fsm21.dfd = fd1;
	/*******创建epoll实例:告诉内核这个监听的数目是10*/
	epfd = epoll_create(10);
	if (epfd < 0) 
	{
		perror("epoll_create()");
		exit(1);
	}
	/*控制、设置:注册要监听的事件类型*/
	ev.events = 0;//暂时不确定监视何种行为 - 位图清0 
	ev.data.fd = fd1;

	epoll_ctl(epfd, EPOLL_CTL_ADD, fd1, &ev);//增删改操作:加入fd1-确保系统监视fd1
	ev.events = 0;//暂时不确定监视何种行为 - 位图清0 
	ev.data.fd = fd2;

	epoll_ctl(epfd, EPOLL_CTL_ADD, fd2, &ev);//增删改操作  在循环外添加避免循环中来回操作
	while (fsm12.state != STATE_T || fsm21.state != STATE_T) 
	{
		//1.为fd1布置监视任务
		ev.data.fd = fd1;
		ev.events = 0;//位图清0
		
		if (fsm12.state == STATE_R)   //1->2:说明如果fd1可读
			ev.events |= EPOLLIN;//读事件
			
		if (fsm21.state == STATE_W)   //2->1:说明如果fd1可写
			ev.events |= EPOLLOUT;//写事件
		epoll_ctl(epfd, EPOLL_CTL_MOD, fd1, &ev);//增删改操作
		
		ev.data.fd = fd2;
		ev.events = 0;//位图清0
		if (fsm12.state == STATE_W)//1->2:说明如果fd2可写
			ev.events |= EPOLLOUT;
		if (fsm21.state == STATE_R)//2->1:说明如果fd1可读
			ev.events |= EPOLLIN;
		//epoll_ctl(epfd, EPOLL_CTL_MOD, fd2, &ev);//增删改操作
		
		//2.监视--等待事件发生
		if (fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO) {					
			//epfd组,事件结构体,事件数量-元素个数ev,-1-死等
			while (epoll_wait(epfd, &ev, 1, -1) < 0) 
			{			 //
				if (errno == EINTR) //假错
				{
					continue;
				}
				perror("epoll_wait()");
				exit(1);
			}
		}
		//3.查看监视结果
		//监视结果是fd1 且fd1可读
		if (ev.data.fd == fd1 && ev.events & EPOLLIN || ev.data.fd == fd2 \
			&& ev.events & EPOLLOUT || fsm12.state > STATE_AUTO)
			fsm_driver(&fsm12);//如果1可读2可写或者处于EX,T态
		if (ev.data.fd == fd1 && ev.events & EPOLLOUT || ev.data.fd == fd2 \
			&& ev.events & EPOLLIN || fsm21.state > STATE_AUTO)//如果2可读或者1可写
			fsm_driver(&fsm21);
	}
	//复原退出
	fcntl(fd1, F_SETFL, fd1_save);
	fcntl(fd2, F_SETFL, fd2_save);
	close(epfd);
}
int main(int argc, char** argv) 
{
	int fd1, fd2;
	fd1 = open(TTY1, O_RDWR);//先以阻塞打开(故意先阻塞形式)
	if (fd1 < 0) 
	{
		perror("open()");
		exit(1);
	}
	write(fd1, "TTY1\n", 5);
	fd2 = open(TTY2, O_RDWR | O_NONBLOCK);//非阻塞
	if (fd2 < 0) {
		perror("open()");
		exit(1);
	}
	write(fd2, "TTY2\n", 5);
	relay(fd1, fd2);		//核心代码
    close(fd2);
    close(fd1);
	exit(0);
}

​

实现步骤2 将该步骤,扩展

每一个job加入数组进行管理,然后遍历数组,实现管理每一个job任务(下方例子仅作参考,有待对每个job加入epoll)

#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<pthread.h>
#include <unistd.h>
#include<string.h>
#include<fcntl.h>
#include"relayer.h"
#define   BUFSIZE  1024
static struct rel_job_st* rel_job[REL_JOBMAX];
static pthread_mutex_t mut_rel_job = PTHREAD_MUTEX_INITIALIZER;
static pthread_once_t init_once = PTHREAD_ONCE_INIT;
enum {
	//状态机的几种状态
	STATE_R,
	STATE_W,
	STATE_EX,
	STATE_T
};
enum {//job的状态
	STATE_RUNNING = 1,
	STATE_CANCELED,
	STATE_OVER
};
//状态机
struct rel_fsm_st {
	int state;//记录状态机的状态
	int sfd;//源文件
	int dfd;//目的文件
	char buf[BUFSIZE];//中间缓冲区
	int len;//读到的长度
	int pos;//写的过程如果一次没有写完,记录上次写的位置
	char* err;//错误信息
	int64_t count; //输出字符数量
};
//每一对终端结构体
struct rel_job_st{
	int fd1;//两个终端
	int fd2;
	//该对终端状态STATE_RUNNING,STATE_CANCELED, STATE_OVER
	int job_state;
	//两个终端的状态机结构体
	struct rel_fsm_st fsm12, fsm21;
	//用来退出复原状态
	int fd1_save, fd2_save;
};
//状态转移函数
static void fsm_driver(struct rel_fsm_st* fsm) {
	int ret;
	switch (fsm->state) {
	case STATE_R:
		fsm->len = read(fsm->sfd, fsm->buf, BUFSIZE);
		if (fsm->len == 0)
			fsm->state = STATE_T;
		else if (fsm->len < 0) {
			if (errno == EAGAIN)
				fsm->state = STATE_R;
			else {
				fsm->err = "read()";
				fsm->state = STATE_EX;
			}
		}
		else {
			fsm->pos = 0;
			fsm->state = STATE_W;
		}
		break;
	case STATE_W:
		ret = write(fsm->dfd, fsm->buf + fsm->pos, fsm->len);
		if (ret < 0) {
			if (errno == EAGAIN)
				fsm->state = STATE_W;
			else {
				fsm->err = "write()";
				fsm->state = STATE_EX;
			}
		}
		else {
			fsm->pos += ret;
			fsm->len -= ret;
			if (fsm->len == 0)
				fsm->state = STATE_R;//写完了再去读
			else
				fsm->state = STATE_W;//没写完继续写
		}
		break;
	case STATE_EX:
		perror(fsm->err);
		fsm->state = STATE_T;
		break;
	case STATE_T:
		/*  do smoething*/
		break;
	default:
		abort();
		break;
	}
}
static void *thr_relayer(void *p) {
	int i;
	while (1) 
	{
		pthread_mutex_lock(&mut_rel_job);//死等
		for (i = 0; i < REL_JOBMAX; i++) 
		{
			if (rel_job[i] != NULL) //不断的找到一个任务然后推送执行
			{
				if (rel_job[i]->job_state == STATE_RUNNING)//运行态
				{
					fsm_driver(&rel_job[i]->fsm12);//先推再判断
					fsm_driver(&rel_job[i]->fsm21);
					if (rel_job[i]->fsm12.state == STATE_T && rel_job[i]->fsm21.state == STATE_T)
						rel_job[i]->job_state = STATE_OVER;
				}
			}
		}
		pthread_mutex_unlock(&mut_rel_job);
	}
	
}
static void module_load(void) 
{
	int err;
	pthread_t tid_relayer;
	err = pthread_create(&tid_relayer, NULL, thr_relayer, NULL);
	if (err) {
		fprintf(stderr, "pthread_create():%s\n", strerror(err));
		exit(1);
	}
}
static int get_free_pos_unlocked() {
	int i;
	for (i = 0; i < REL_JOBMAX; i++) {
		if (rel_job[i] == NULL)
			return i;
	}
	return -1;
}
int rel_addjob(int fd1, int fd2) {
	struct rel_job_st *me;
	int pos;
	pthread_once(&init_once, module_load);//单次调用:pthread_once
	me = malloc(sizeof(*me));
	if (me == NULL)   //空间问题
		return -ENOMEM;
	me->fd1 = fd1;
	me->fd2 = fd2;
	me->job_state = STATE_RUNNING;//该对终端设置正在运行
	me->fd1_save = fcntl(me->fd1, F_GETFL);
	fcntl(me->fd1, F_SETFL, me->fd1_save | O_NONBLOCK);  //非阻塞	打开
	me->fd2_save = fcntl(me->fd2, F_GETFL);
	fcntl(me->fd2, F_SETFL, me->fd2_save | O_NONBLOCK);//非阻塞	打开
	me->fsm12.sfd = me->fd1;
	me->fsm12.dfd = me->fd2;
	me->fsm12.state = STATE_R;
	me->fsm21.sfd = me->fd2;
	me->fsm21.dfd = me->fd1;
	me->fsm21.state = STATE_R;
	pthread_mutex_lock(&mut_rel_job);
	pos = get_free_pos_unlocked();//临界状态-需要加入互斥锁
	if (pos < 0) {
		pthread_mutex_unlock(&mut_rel_job);
		fcntl(me->fd1, F_SETFL, me->fd1_save);//恢复现场
		fcntl(me->fd2, F_SETFL, me->fd2_save);
		free(me);//释放空间
		return -ENOSPC;
	}
	rel_job[pos] = me;
	pthread_mutex_unlock(&mut_rel_job);
	return pos;
}
int rel_canceljob(int id);
/*
	return == 0			成功,指定任务成功取消
		   == -EINVAL	失败,参数非法
		   == -EBUSY    失败,任务早已被取消
*/
int rel_waitjob(int id, struct rel_stat_st*);
int rel_statjob(int id, struct rel_stat_st*);

4.中继方案2

方案2不采用方案1的经验,是因为方案2,在多路io复用的时候,容易形成“群惊”效应。

还有,方案1,是通道排斥,2是信息分享;

因此,采用如下方式:每一个收发都配置一个接收队列,发送直接发给对方的队列装载

                                   

实现步骤1

队列的实现(队列的实现--数量少采用静态数组/数量多动态数组(进程内),进程间的处理反射光hi,采用共享内存+数组方式)

队列实现-例子:数量少采用静态数组(进程内)

/****************************************************
* 函数名:udp_msg_enqueue
* 功能描述:UDP消息入队
* 输入参数:msg_type-消息类型,msg_in_data-消息数据
* 输出参数:
* 返回值:
* 备注:Fhsj YJ
****************************************************/
int udp2_msg_enqueue(const UDP_MSG_STRU *msg_in_data) {
    if (NULL == msg_in_data) {
        printf("func:%s : 指针为NULL", __func__);
        return 0;
    }
    // 数据入队并返回结果
    if (((udp2_msg_queue.rear + 1) % MAX_UDP_MSG_NUM) == udp2_msg_queue.front) {
        return 0;
    } else {
        memset(&udp2_msg_queue.date[udp2_msg_queue.rear], 0, sizeof(UDP_MSG_STRU));
        memcpy(&udp2_msg_queue.date[udp2_msg_queue.rear], msg_in_data, sizeof(UDP_MSG_STRU));
        udp2_msg_queue.rear = (udp2_msg_queue.rear + 1) % MAX_UDP_MSG_NUM;
        return 1;
    }
}

/****************************************************
* 函数名:dsrc_msg_dequeue
* 功能描述:UDP消息出队
* 输入参数:msg_type-消息类型
* 输出参数:
* 返回值:msg_in_data-消息数据
* 备注:Fhsj YJ
****************************************************/
UDP_MSG_STRU *udp2_msg_dequeue(void) {
    u16 u16obj_num = 0;
    // 数据出队
    if (udp2_msg_queue.rear == udp2_msg_queue.front) {
        return NULL;
    } else {
        u16obj_num = udp2_msg_queue.front;
        udp2_msg_queue.front = (udp2_msg_queue.front + 1) % MAX_UDP_MSG_NUM;
        
        return &udp2_msg_queue.date[u16obj_num];
    }
}

/****************************************************
* 函数名:get_udp_msg_num
* 功能描述:UDP消息数量获取
* 输入参数:msg_type-消息类型
* 输出参数:
* 返回值:消息数量
* 备注:Fhsj YJ
****************************************************/
u8 get_udp2_msg_num(void) {
    u8 msg_num;
    if (udp2_msg_queue.rear >= udp2_msg_queue.front) {
        msg_num = udp2_msg_queue.rear - udp2_msg_queue.front;
    } else {
        msg_num = udp2_msg_queue.rear + MAX_UDP_MSG_NUM - udp2_msg_queue.front;
    }
    return msg_num;
}

 实现步骤2,

每一个fd,对应一个数据处理线程,在线程,在每一个线程收到数据后,进入对应队列

然后再另外一个地方(线程或者进程),出队,处理...

对应代码,已经实现,暂不展示了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/370870.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

问题:金属电化学反应的实质是氧化还原反应,被腐蚀金属发生还原反应( ) #知识分享#知识分享#媒体

问题&#xff1a;金属电化学反应的实质是氧化还原反应&#xff0c;被腐蚀金属发生还原反应(  ) A、正确 B、错误 参考答案如图所示

解析Python中HTTP代理的常见问题

在Python编程中&#xff0c;HTTP代理是一个经常被提及的概念&#xff0c;尤其在处理网络请求和爬虫时。但与此同时&#xff0c;使用HTTP代理也经常会遇到一些令人头疼的问题。接下来&#xff0c;就让我们一起解析一下Python中使用HTTP代理时常见的那些问题。 1. 代理服务器无响…

如何在本地部署chatGLM3

文章目录 1. 参考2. ChatGLM3 介绍3. 本地运行3.1 硬件配置3.2 下载ChatGLM3代码3.3 下载需要加载的模型3.4 运行大模型3.4.1 ChatGLM3目录介绍3.4.2 安装依赖3.4.2 综合demo演示3.4.3 启动对话模式工具模式代码解释器 4. 总结 前面一章节有讲到 基于MacBook Pro M1芯片运行ch…

Debian系统显示中文

开发板上的debian默认不显示中文。 安装字体 sudo apt install fonts-wqy-zenhei 安装locals sudo apt install locales &#xff08;无必要&#xff09;设置/etc/locale.gen、设置/etc/locale.conf 运行dpkg-reconfigure locales dpkg-reconfigure locales 可以选择UT…

基于动作合成视频、线免费使用不需要注册,支持多种视频任务:图像生成视频、文本生成视频、视频修改、视频风格化、用Transformer构建世界模型

基于动作合成视频、线免费使用不需要注册&#xff0c;支持多种视频任务&#xff1a;图像生成视频、文本生成视频、视频修改、视频风格化、用Transformer构建世界模型。 WorldDreamer无缝逐帧AI模型: 基于Transformer生成高质量电影级别视频的通用世界模型"。从20亿数据中…

【linux】git和gdb调试工具

在linux下提交代码同步到gitee 1.创建一个新的仓库&#xff08;演示步骤&#xff09; 2.init 这两个步骤用于识别提交代码的身份&#xff0c;一个你的名字&#xff0c;一个你的邮箱 开启本地仓库 克隆本地仓库成功 我们将这个仓库拷到了111目录底下. 我们发现少了一个.gitig…

Fink CDC数据同步(五)Kafka数据同步Hive

6、Kafka同步到Hive 6.1 建映射表 通过flink sql client 建Kafka topic的映射表 CREATE TABLE kafka_user_topic(id int,name string,birth string,gender string ) WITH (connector kafka,topic flink-cdc-user,properties.bootstrap.servers 192.168.0.4:6668…

微信小程序使用ucharts折线图,有负数显示0刻度线

当数据有负数和正数的时候默认不会显示0刻度线&#xff0c;不方便看出正负对比 实现思路&#xff1a;显示的刻度线是根据数据的最大值和最小值自动分配到刻度线上面&#xff0c;把最大值和最小值设置为一样&#xff0c;然后平均分配给五个刻度线中间的刻度线就会为0就实现了显…

uniapp /微信小程序 使用map组件实现手绘地图方案

获取地图范围 点图拾取坐标-地图开放平台|腾讯位置服务 获取需要手绘地图左下角和右上角GPS坐标 以北京故宫为例&#xff1a; 截取需要手绘地图进行手绘地图制作 ​​​​​​​​​​​​​​ 素材处理 由于地图素材文件比较大&#xff0c;小程序又限制包大小<2M,无…

13.从桥接模式细品人生的几座桥

“物理学不存在了&#xff0c;今后也不会存在。”——《三体》 在《三体》中&#xff0c;有这样一个桥段&#xff0c;顶级的物理学家杨冬在三体文明超级计算机“智子”的干扰和误导下&#xff0c;得出了物理实验的结果在实验之前就会被某种力量确定的结论&#xff0c;导致自己…

PyTorch 2.2 中文官方教程(九)

在生产环境中部署 PyTorch 模型 通过 Flask 在 Python 中部署 PyTorch 的 REST API 原文&#xff1a;pytorch.org/tutorials/intermediate/flask_rest_api_tutorial.html 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 注意 点击这里下载完整的示例代码 作者&#…

Windows鼠标右键菜单闪一下就没了?说不定是这个搞的鬼!

前言 这几天接到有些小伙伴反馈&#xff1a;Windows的右键菜单闪一下就没了。 本来是要按鼠标右键进行界面刷新或者新建文件夹等操作的&#xff0c;结果闪一下就没有了&#xff0c;感觉这个系统就好像中了病毒了一样。 相信很多小伙伴应该也遇到过同样的情况&#xff0c;但具…

BUGKU-WEB Simple_SSTI_1

02 Simple_SSTI_1 题目描述 解题思路 进入场景后&#xff0c;显示&#xff1a; You need pass in a parameter named flag。ctrlu 查看源码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Simpl…

ElementUI 组件Layout布局 el-row和el-col 简介

Layout布局 el-row属性简介 el-row 组件 提供 gutter 属性来指定每一栏之间的间隔&#xff0c;默认间隔为 0。 提醒&#xff1a; el-row :gutter需要与el-col :span 一起使用才能生效 el-col属性简介 el-col的span属性 默认值为24&#xff0c;表示每一行共24份&#xff0c;:s…

030 可变参数

可变参数定义 public static void main(String[] args) {// 多参数方式传递System.out.println(max(1,3,5,3,6,1,2));// 数组方式传递System.out.println(max(new int[]{1,3,5,3,6,1,2})); }static int max(int... nums){int max Integer.MIN_VALUE;for (int num : nums) {if(…

Mysql架构系列——生产常用的高可用部署模式介绍

模式 高可用模式 Galera Cluster是由Codership开发的MySQL多主集群&#xff0c;包含在MariaDB中&#xff0c;同时支持Percona xtradb、MySQL&#xff0c;是一个易于使用的高可用解决方案&#xff0c;在数据完整性、可扩展性及高性能方面都有可接受的表现。 将会基于Galera C…

三层交换组网实验(华为)

思科设备参考&#xff1a;三层交换组网实验&#xff08;思科&#xff09; 一&#xff0c;技术简介 三层交换技术的出现&#xff0c;解决子网必须依赖路由器进行管理的问题&#xff0c;解决传统路由器低速、复杂所造成的网络瓶颈问题。一个具有三层交换功能的设备可简单理解为…

2.4日总结

第一题&#xff1a;选数 题解&#xff1a;思路还是很简单的&#xff0c;只需要想清楚dfs里的函数都是什么就可以了&#xff0c;还有一个简单的判断素数的函数&#xff0c;这题真没啥难度&#xff0c;就是属于基础题吧&#xff0c;请看AC代码 #include <stdio.h> #includ…

redis的缓存击穿和缓存雪崩和缓存穿透问题解决方法

Redis的缓存击穿&#xff1a; 热点的key&#xff0c;在不停的扛着大并发&#xff0c;当这个key失效时&#xff0c;一瞬间大量的请求冲到持久层的数据库中&#xff0c;就像在一堵墙上某个点凿开了一个洞&#xff01; 解决方法&#xff1a; 1.热点key永不过期&#xff1a; 统计访…

Facebook的数字合作愿景:创新与未来发展

随着科技的飞速发展&#xff0c;Facebook一直处于数字创新的前沿&#xff0c;致力于构建开放、智能、社交的数字社交体验。本文将深入探讨Facebook的数字合作愿景&#xff0c;探索其在创新与未来发展方面的雄心壮志。 引言 在当今数字化时代&#xff0c;社交媒体不仅是人们沟通…