网络编程 io_uring

io_uring

1、概述

io_uring是Linux(内核版本在5.1以后)在2019年加入到内核中的一种新型的异步I/O模型;

io_uring使用共享内存,解决高IOPS场景中的用户态和内核态的切换过程,减少系统调用;用户可以直接向共享内存提交要发起的I/O操作,内核线程可以直接获取共享内存中的I/O操作,并进行相应的读写操作;io_uring是一种proactor模式的网络架构;

  • Reactor 是非阻塞同步网络模式,感知的是就绪可读写事件。在每次感知到有事件发生(比如可读就绪事件)后,就需要应用进程主动调用 read 方法来完成数据的读取,也就是要应用进程主动将 socket 接收缓存中的数据读到应用进程内存中,这个过程是同步的,读取完数据后应用进程才能处理数据。

  • Proactor 是异步网络模式, 感知的是已完成的读写事件。在发起异步读写请求时,需要传入数据缓冲区的地址(用来存放结果数据)等信息,这样系统内核才可以自动帮我们把数据的读写工作完成,这里的读写工作全程由操作系统来做,并不需要像 Reactor 那样还需要应用进程主动发起 read/write 来读写数据,操作系统完成读写工作后,就会通知应用进程直接处理数据。

优点
  • 避免了提交I/O事件和完成事件中存在的内存拷贝(使用共享内存)

  • 减少的了I/O任务提交和完成事件任务是的系统调用过程

  • 采取无锁队列,减少了锁资源的竞争

主要内存结构
  • 提交队列(Submission Queue,SQ)连续的内存空间,环形队列,存放将要执行的I/O操作数据
  • 完成队列(Completion Queue, CQ)连续的内存空间,环形队列,存放执行完成I/O操作后的返回结果
  • 提交队列项数组提(Submission Queue Entry,SQE):方便通过环形缓冲区提交内存请求
2、主要接口

io_uring提供三个用户态的系统调用接口

  1. io_uring_setup:初始化一个新的io_uring对象,一个SQ和一个CQ,通过使用共享内存进行数据操作
  2. io_uring_register:注册用于异步I/O的文件或用户缓冲区(buffers)
  3. io_uring_enter:提交I/O任务,等待I/O完成

在这里插入图片描述

SQ和CQ保存的都是SQEs数据的索引,不是真正的请求,真实是请求保存在SQE数组中,在提交请求时可以批量提交一组SQE数值上不连续的请求;

SQ、CQ、SQE中的内存区域都是有内核进行分配的,用户初始化会返回对应的fd,通过fd进行mmap和内核共享内存空间;

3、第三方库

liburing通过对io_uring进行分装,提供了一个简单的API,通过一下命令可以安装该动态库

git clone https://github.com/axboe/liburing.git
cd liburing
./configure
make
sudo make install
sudo ldconfig #更新动态库连接缓存
4、主要使用流程
1. io_uring初始化

io_uring通过io_uring_setup函数初始化,在liburing库中,通过io_uring_queue_init_params函数进行初始化,创建sumbmit队列和complete队列,以及SQE内存数组;

//io_uring实现异步的方式
struct io_uring_params pragma;
memset(&pragma, 0, sizeof(pragma));
struct io_uring ring;
// 初始化io_uring 创建submit队列和complite队列
io_uring_queue_init_params(1024, &ring, &pragma);
2. io_uring 提交(注册)到SQ环形队列

io_uring通过io_uring_register函数提交(注册)到用于异步I/O的缓冲区中,在liburing中通过io_uring_prep_accept函数对io_uring_refister进行封装使用;

// 获取ringbuffer的头
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
connect_info_t accept_info = {sockfd, EVENT_ACCEPT};
// 注册一个I/O事件
io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
3. io_uring_enter 提交I/O

io_uring中通过io_uring_enter函数来提交I/O,并等待事件的完成;在liburing中通过io_uring_submit来提交SQE的读写请求,io_uring_wait_cqe来等待I/O的处理结果,io_uring_peek_batch_cqe来获取CQ中的处理结果;

 // 提交worker中执行
io_uring_submit(&ring);
struct io_uring_cqe *cqe;
//等待complete队列中的结果
io_uring_wait_cqe(&ring, &cqe);
struct io_uring_cqe *cqes[128];
// 获取CQ环形队列中的处理结果
int count = io_uring_peek_batch_cqe(&ring, cqes, 128);
5、实现

io_uring_server.c

#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <netinet/in.h>

enum event_type {
    EVENT_ACCEPT,
    EVENT_READ,
    EVENT_WRITE
};

typedef struct connect_info{
    int conn_fd;
    int event;
}connect_info_t;

struct conn_info {
	int fd;
	int event;
};

int init_server(unsigned short port) 
{   
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket");
        return -1;
    }
    struct sockaddr_in serveraddr;;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_port = htons(port);
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    if (bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0) {
        perror("bind error");
        return -1;
    }

    int opt = 1;
    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
        perror("setsockopt");
        return -1;
    }

    listen(sockfd, 10);
    return sockfd; 
}

int set_event_recv(struct io_uring *ring, int sockfd, void *buf, int len, int flags)
{
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    connect_info_t accept_info = {sockfd, EVENT_READ};
    io_uring_prep_recv(sqe, sockfd, buf, len, flags);
	memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
    printf("set event recv----\n");
    return 0;
}

int set_event_send(struct io_uring *ring, int sockfd, const void *buf, int len, int flags)
{
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    connect_info_t accept_info = {sockfd, EVENT_WRITE};
    io_uring_prep_send(sqe, sockfd, buf, len, flags);
	memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
    printf("set event send----\n");
    return 0;
}

int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *clientaddr,
					socklen_t *addrlen, int flags) {

	// 获取sqe
	struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
	// 初始化accept_info
    connect_info_t accept_info = {sockfd, EVENT_ACCEPT};
	// 准备accept操作
	io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);
	// 设置用户数据
	memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
    printf("set event accept\n");
    return 0;
}

int main(int argc, char *argv[])
{
    // 初始化服务器
    unsigned short port = 9999;
    // 初始化服务器
    int socketfd = init_server(port);
    if (socketfd < 0)
        return -1;
    //io_uring实现异步的方式
    struct io_uring_params pragma;
    // 初始化io_uring 创建submit队列和complite队列
    memset(&pragma, 0, sizeof(pragma));
    struct io_uring ring;
    io_uring_queue_init_params(1024, &ring, &pragma);

    struct sockaddr_in clientaddr;
    socklen_t addrlen = sizeof(struct sockaddr);
    // 提交到submit队列中
    set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);

    char buffer[1024] = {0};

    while (1)
    {
        // 提交worker中执行
        io_uring_submit(&ring);
        printf("complete\n");
        struct io_uring_cqe *cqe;
        //等待complete队列中的结果
        io_uring_wait_cqe(&ring, &cqe);
        printf("complete end\n");

        struct io_uring_cqe *cqes[128];
        int count = io_uring_peek_batch_cqe(&ring, cqes, 128);

        for (int i = 0; i < count; i++)
        {
            struct io_uring_cqe *entries = cqes[i];
            connect_info_t result;
            //struct conn_info result;
	        memcpy(&result, &entries->user_data, sizeof(connect_info_t));
            if (result.event == EVENT_ACCEPT) 
            {
                // 设置读事件
                set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);
                printf("accept success\n");
                int conn_fd = entries->res;
                printf("conn_fd = %d  res = %d\n", conn_fd, entries->res);
                // 设置读事件
                set_event_recv(&ring, conn_fd, buffer, 1024,0);
            }
            else if (result.event == EVENT_READ)
            {
                int ret = entries->res;
                printf("set_event_recv ret: %d, %s\n", ret, buffer);

                if (ret == 0)
                {
                    close(result.conn_fd);
                    continue;
                }
                else if (ret > 0)
                {
                    // 设置写事件
                    set_event_send(&ring, result.conn_fd, buffer, ret,0);
                }
                printf("read success\n");
            }
            else if (result.event == EVENT_WRITE)
            {
                int ret = entries->res;
                set_event_recv(&ring, result.conn_fd, buffer, 1024,0);
                printf("write success\n");
            }
        }
        io_uring_cq_advance(&ring, count);
    }
    
    return 0;
}

io_uring_test.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>

#include <sys/socket.h>
#include <arpa/inet.h>

#define TIMESUB_MS(tv1, tv2)  (((tv2).tv_sec - (tv1).tv_sec) * 1000 + ((tv2).tv_usec - (tv1).tv_usec) / 1000)
#define TEST_MESSAGE   "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n"
#define RBUFFER_LENGTH 2048
#define WBUFFER_LENGTH 2048

typedef struct test_conttext
{
    char server_ip[16];
    int server_port;
    int thread_num;
    int connection_num;
    int request_num;
    int fail_num;
} test_conttext_t;

int send_recv_tcp(int sockfd)
{
    char wbuffer[WBUFFER_LENGTH];
    char rbuffer[RBUFFER_LENGTH];
    memset(wbuffer, 0, sizeof(wbuffer));
    memset(rbuffer, 0, sizeof(rbuffer));
    for (int i = 0; i < 8; i++)
    {
        strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);
    }

    int res = send(sockfd, wbuffer, strlen(wbuffer), 0);
    if (res <= 0)
    {
        return -1;
    }

    res = recv(sockfd, rbuffer, sizeof(rbuffer), 0);
    if (res <= 0)
    {
        return -1;
    }

    if (strcmp(rbuffer, wbuffer) != 0)
    {
        printf("failed: '%s' != '%s'\n", rbuffer, wbuffer);
        return -1;
    }
    return 0;
}

int connect_tcpserver(char *ip, int port)
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        perror("socket");
        return -1;
    }
   
    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);
    server_addr.sin_addr.s_addr = inet_addr(ip);
    if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0)
    {
        perror("connect");
        close(sockfd);
        return -1;
    }
    return sockfd;
}

static void *test_qps(void *arg)
{
    test_conttext_t *ctx = (test_conttext_t *)arg;
    int sockfd = connect_tcpserver(ctx->server_ip, ctx->server_port);
    if (sockfd < 0)
    {
        printf("connect server failed\n");
        return NULL;
    }
    int conut = ctx->request_num / ctx->connection_num;
    int indx = 0;
    int res;
    while (indx++ < conut)
    {
        res = send_recv_tcp(sockfd);
        if (res < 0)
        {
            printf("send_recv_tcp failed\n");
            ctx->fail_num++;
            continue;
        }
    }
    return NULL;
}

int main(int argc, char *argv[])
{
    int i;
    printf("----%d\n", argc);
    // for (i = 1; i < argc; i++)
    //     printf("%s\n", argv[i]);
    
    test_conttext_t ctx = {0};
    int opt;
    while ((opt = getopt(argc, argv, "s:p:t:c:n:")) != -1)
    {
        switch (opt)
        {
        case 's':
            strcpy(ctx.server_ip, optarg);
            printf("-s: %s\n", optarg);
            break;
        case 'p':
            ctx.server_port = atoi(optarg);
            printf("-p: %s\n", optarg);
            break;
        case 't':
            ctx.thread_num = atoi(optarg);
            printf("-t: %s\n", optarg);
            break;
        case 'c':
            ctx.connection_num = atoi(optarg);
            printf("-c: %s\n", optarg);
            break;
        case 'n':
            ctx.request_num = atoi(optarg);
            printf("-n: %s\n", optarg);
            break;
        default:
            return 
                EXIT_FAILURE;
        }
    }

    pthread_t *threads = (pthread_t *)malloc(sizeof(pthread_t) * ctx.thread_num);
    struct timeval start, end;
    gettimeofday(&start, NULL);
    for (i = 0; i < ctx.thread_num; i++)
    {
        printf("thread %d pthread_create\n", i);
        pthread_create(&threads[i], NULL, test_qps, &ctx);
    }
    for (i = 0; i < ctx.thread_num; i++)
    {
        pthread_join(threads[i], NULL);
        printf("thread %d finished\n", i);
    }
    gettimeofday(&end, NULL);
    int time_used = TIMESUB_MS(start, end);
    printf("success :%d, failed:%d,  time used: %d , qps %d\n", 
        ctx.request_num-ctx.fail_num, ctx.fail_num, time_used, ctx.request_num * 1000 / time_used);
    free(threads);
    return EXIT_SUCCESS;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/421870.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2024年腾讯云会员老用户续费优惠活动,可领代金券

腾讯云优惠活动2024新春采购节活动上线&#xff0c;云服务器价格已经出来了&#xff0c;云服务器61元一年起&#xff0c;配置和价格基本上和上个月没什么变化&#xff0c;但是新增了8888元代金券和会员续费优惠&#xff0c;腾讯云百科txybk.com整理腾讯云最新优惠活动云服务器配…

数电实验之流水灯、序列发生器

最近又用到了数电实验设计的一些操作和设计思想&#xff0c;遂整理之。 广告流水灯 实验内容 用触发器、组合函数器件和门电路设计一个广告流水灯&#xff0c;该流水灯由 8 个 LED 组成&#xff0c;工作时始终为 1 暗 7 亮&#xff0c;且这一个暗灯循环右移。 1) 写出设计过…

MYSQL--JDBC优化

一.JDBC优化: 优化前提: 有时候我们并不清楚某些表当中一共有多少列,以及这些列的数据类型,这个时候我们就需要提前通过一些方法提前了解到这些数据,从而更好的进行输出 具体语句: package cn.jdbc;import java.sql.*;public class JDBCDEmo1 {public static void main(String…

MySQL篇—执行计划介绍(第二篇,总共三篇)

☘️博主介绍☘️&#xff1a; ✨又是一天没白过&#xff0c;我是奈斯&#xff0c;DBA一名✨ ✌✌️擅长Oracle、MySQL、SQLserver、Linux&#xff0c;也在积极的扩展IT方向的其他知识面✌✌️ ❣️❣️❣️大佬们都喜欢静静的看文章&#xff0c;并且也会默默的点赞收藏加关注❣…

力扣SQL50 大的国家 查询

Problem: 595. 大的国家 Code select name,population,area from World where area > 3000000 or population > 25000000;

JS:原型与原型链(附带图解与代码)

一、原型 写在前面&#xff1a; 任何对象都有原型。 函数也是对象&#xff0c;所以函数也有原型。 1.什么是原型 在 JavaScript 中&#xff0c;对象有一个特殊的隐藏属性 [[Prototype]]&#xff0c;它要么为 null&#xff0c;要么就是对另一个对象的引用&#xff0c;该对象…

数据可视化原理-腾讯-热力图

在做数据分析类的产品功能设计时&#xff0c;经常用到可视化方式&#xff0c;挖掘数据价值&#xff0c;表达数据的内在规律与特征展示给客户。 可是作为一个产品经理&#xff0c;&#xff08;1&#xff09;如果不能够掌握各类可视化图形的含义&#xff0c;就不知道哪类数据该用…

特殊设计模式

▶实现一个类&#xff0c;不能被拷贝 ▶实现一个类&#xff0c;只能在堆上创建 ❗实现一个类&#xff0c;只能创建在栈上 ❗设计一个不能继承的类 ❗单例模式——一个类只能生成一个对象   ❔饿汉模式——在每次程序启动都会自动生成一个对象   ❓懒汉模式——在第一次需要…

【数学建模获奖经验】2023第八届数维杯数学建模:华中科技大学本科组创新奖获奖分享

2024年第九届数维杯大学生数学建模挑战赛将于&#xff1a;2024年5月10日08:00-5月13日09:00举行&#xff0c;近期同学们都开始陆续进入了备赛阶段&#xff0c;今天我们就一起来看看上一届优秀的创新奖选手都有什么获奖感言吧~希望能帮到更多热爱数学建模的同学。据说点赞的大佬…

javaWebssh票据管理系统myeclipse开发mysql数据库MVC模式java编程计算机网页设计

一、源码特点 java ssh票据管理系统是一套完善的web设计系统&#xff08;系统采用ssh框架进行设计开发&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模 式开发。开发环境为TOMCAT7.0,My…

npm digital envelope routines::unsupported

问题描述&#xff1a;npm运行命令报错&#xff1a;digital envelope routines::unsupported 原因&#xff1a;node版本过高 解决方案&#xff1a;在运行命令之前加上 SET NODE_OPTIONS--openssl-legacy-provider && SET NODE_OPTIONS--openssl-legacy-provider &&a…

【机器学习基础】层次聚类-BIRCH聚类

&#x1f680;个人主页&#xff1a;为梦而生~ 关注我一起学习吧&#xff01; &#x1f4a1;专栏&#xff1a;机器学习 欢迎订阅&#xff01;相对完整的机器学习基础教学&#xff01; ⭐特别提醒&#xff1a;针对机器学习&#xff0c;特别开始专栏&#xff1a;机器学习python实战…

【JavaEE】_Spring Web MVC简介

目录 1. Spring Web MVC简介 2. MVC简介 3. Spring MVC 1. Spring Web MVC简介 官网对于Spring Web MVC的介绍如下&#xff1a; 链接如下&#xff1a; https://docs.spring.io/spring-framework/reference/web/webmvc.html#https://docs.spring.io/spring-framework/refer…

14.网络游戏逆向分析与漏洞攻防-网络通信数据包分析工具-数据包分析工具界面与通信设计

内容参考于&#xff1a; 易道云信息技术研究院VIP课 上一个内容&#xff1a;13.如果没有工具就创造工具 码云地址&#xff08;master 分支&#xff09;&#xff1a;https://gitee.com/dye_your_fingers/titan 码云版本号&#xff1a;fef5089bd11dfb86ae8b4e26f25cf59e85f896…

缓存穿透解决方案之布隆过滤器

布隆过滤器可以快速判断数据是否存在&#xff0c;避免从数据库中查询数据是否存在&#xff0c;减轻数据库的压力 布隆过滤器是由一个初值为0的bit数组和N个哈希函数&#xff0c;可以用来快速的判断某个数据是否存在 当我们想要标记某个数据是否存在时&#xff0c;布隆过滤器会…

《Spring Security 简易速速上手小册》第6章 Web 安全性(2024 最新版)

文章目录 6.1 CSRF 防护6.1.1 基础知识详解CSRF 攻击原理CSRF 防护机制最佳实践 6.1.2 重点案例&#xff1a;Spring Security 中的 CSRF 防护案例 Demo测试 CSRF 防护 6.1.3 拓展案例 1&#xff1a;自定义 CSRF 令牌仓库案例 Demo测试自定义 CSRF 令牌仓库 6.1.4 拓展案例 2&am…

动态规划(算法竞赛、蓝桥杯)--分组背包DP

1、B站视频链接&#xff1a;E16 背包DP 分组背包_哔哩哔哩_bilibili #include <bits/stdc.h> using namespace std; const int N110; int v[N][N],w[N][N],s[N]; // v[i,j]:第i组第j个物品的体积 s[i]:第i组物品的个数 int f[N][N]; // f[i,j]:前i组物品&#xff0c;能放…

【如何像网吧一样弄个游戏菜单在家里】

GGmenu 个人家庭版游戏、应用管理 桌面图标管理器

Tomcat概念、安装及相关文件介绍

目录 一、web技术 1、C/S架构与B/S架构 1.1 http协议与C/S架构 1.2 http协议与B/S架构 2、前端三大核心技术 2.1 HTML&#xff08;Hypertext Markup Language&#xff09; 2.2 css&#xff08;Cascading Style Sheets&#xff09; 2.3 JavaScript 3、同步和异步 4、…

day08_分类品牌管理商品规格管理商品管理

文章目录 1 分类品牌管理1.1 菜单添加1.2 表结构介绍1.3 页面制作1.4 品牌列表加载1.4.1 后端接口BrandControllerBrandServiceBrandMapperBrandMapper.xml 1.4.2 前端对接brand.jscategoryBrand.vue 1.5 分类数据加载1.6 列表查询1.6.1 需求说明1.6.2 后端接口需求分析Categor…