redis中的io多线程(线程池)

文章目录

    • redis多线程模型
      • redis为什么引入I/O多线程
      • I/O多线程模型
    • 源码解析
      • 测试设置
      • 连接建立
      • 数据传输
      • 线程调度
        • 开启io线程`startThreadedIO`
        • 关闭io线程`stopThreadedIO`

redis多线程模型

redis为什么引入I/O多线程

Redis 的性能瓶颈在网络 IO 的处理上。Redis 是网络 IO 密集型,需要同时处理多条并发请求,读写 IO 的问题(请求大量数据,写日志业务等)。多线程处理网络 IO,单线程执行命令。

Redis 线程池作用读写 IO 阶段,即 read, decode 和 encode, send 阶段。主线程处理业务逻辑,之所以用单线程执行命令,是因为 Redis 采用高效的数据结构,其业务逻辑处理较快。

在这里插入图片描述

I/O多线程模型

主线程拥有两个全局队列clients_pending_readclients_pending_write,每个 io 线程(主线程同时也是 io 线程)拥有一个专属队列 io_threads_list[id]。主线程既作为生产者,产生任务;又作为消费者,获取任务执行。

首先,主线程将一次循环的所有就绪的读事件收集到自己的全局任务队列clients_pending_read中,再把每个事件负载均衡地分配到每个 io 线程的专属任务队列中。一次事件循环中不会出现同名 fd,不同的 fd 分配到每个 io 线程各自的队列中,避免了多个 io 线程同时从全局队列中取数据,因此,不需要加锁操作。

接下来,io 线程从自己的专属队列中取出任务,(除主线程外)并发执行 read 和 decode 操作。主线程将解析后的任务做 compute 操作。最后,io 线程(包括主线程)并发执行 encode 和 send 操作。

在这里插入图片描述

redis的单线程是指,命令执行(logic)都是在单线程中运行的
接受数据read和发送数据write都是可以在io多线程(线程池)中去运行

在Redis中,生产者也可以作为消费者,反之亦然,没有明确界限。

源码解析

测试设置

redis 线程池默认作用在 encode, send 阶段,这是因为客户端从 redis 获取大量数据需要并发处理。若想作用在 read, decode 阶段,需要手动开启。在 redis.conf 文件中,可以设置:

 # 开启io线程的数量
 io-threads 4
 
 # 优化:read deconde 过程。默认优化,encode send从 redis 获取大量数据
 io-threads-do-reads yes

开启 io 多线程的前提是有多个并发连接。如何在单个连接的情况下,开启 io 多线程调试,需要修改 redis 源码:

 // networking.c
 int stopThreadedIOIfNeeded(void) {
     // 单个连接的情况下,开启多线程调试,永远不关闭 io 多线程
     return 0;   
     ...
 }

连接建立

主线程处理连接建立,listenfd

  • 连接到达,触发读事件回调:acceptTcpHandler
  • 接收连接:acceptTcpHandler
  • 初始化新连接:createClient
 // server.c
 void initServer(void) {
    ...
    // 1、连接到来,触发读事件回调
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
             acceptTcpHandler,NULL) == AE_ERR)  
    ...
 }
 
 // networking.c
 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
     ...
     while(max--) {
         // 2、接收连接:内部封装 accept
         cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
         ...
         // 为 cfd 初始化新连接,内部调用 createClient
         acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
     }
 }
 
 static void acceptCommonHandler(connection *conn, int flags, char *ip) {
     ...
     /* Create connection and client */
     // 3、创建新的连接
     if ((c = createClient(conn)) == NULL) {
         ...
     }
     ...
 }
 
 client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));

    /* passing NULL as conn it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if (conn) {
        connNonBlock(conn);
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
        // 4.接收数据的读事件触发,回调readQueryFromClient函数
        connSetReadHandler(conn, readQueryFromClient);
        connSetPrivateData(conn, c);
    }
}

数据传输

clientfd

  • 读事件回调:readQueryFromClient

  • 分割并处理数据包 processInputBuffer

    • 分割数据包:processInlineBuffer 和 processMultibulkBuffer
    • 处理数据包:processCommandAndResetClient
  • 数据写到 buffer:addReply

  • 数据写到 socket:writeToClient

  • 写事件回调:sendReplyToClient

当读事件触发时,执行读事件回调函数。主线程收集读事件就绪的连接放入全局任务队列``clients_pending_read,并设置连接状态为CLIENT_PENDING_READ`。子线程从该全局队列中获取任务后,也调用该读事件回调函数,进行 read 和 decode 的业务逻辑处理。

// networking.c
 void readQueryFromClient(connection *conn) {
     ...
     /* Check if we want to read from the client later when exiting from
      * the event loop. This is the case if threaded I/O is enabled. */
     // 开启 io 线程后,延迟处理客户端的读,将任务丢到全局队列,再分配给 io 线程
     // 主线程返回 1,不执行业务逻辑处理;
     // 子线程返回 0,继续往下,执行业务逻辑处理
     if (postponeClientRead(c)) return;  
 
     // 1、read 阶段,(io 线程)将任务读到缓冲区 
     nread = connRead(c->conn, c->querybuf+qblen, readlen);
 
     // 2、decode 阶段,(io 线程)解析数据包
      processInputBuffer(c);
 }
 
 int postponeClientRead(client *c) {
     if (server.io_threads_active &&
         server.io_threads_do_reads &&
         !clientsArePaused() &&
         !ProcessingEventsWhileBlocked &&
         !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
     {
         // 主线程,返回 1
         // 将连接状态设置为 CLIENT_PENDING_READ
         c->flags |= CLIENT_PENDING_READ;
         // 收集任务,把客户端连接放到全局队列中,后续会分配到 io 线程
         listAddNodeHead(server.clients_pending_read,c);
         return 1;
     } else {
         // 子线程,即 io 线程,返回 0
         return 0;
     }
 }

子线程(IO 线程)从专属任务队列 io_threads_pending获取任务,执行 read decode 和 encode write 业务逻辑处理。

// networking.c
 // 线程池入口函数:子线程
 void *IOThreadMain(void *myid) {
     ...
     while(1) {
         /* Wait for start */
         // 等待获取专属任务队列中的任务
         for (int j = 0; j < 1000000; j++) {
             if (io_threads_pending[id] != 0) break;
         }      
         ...
         /* Process: note that the main thread will never touch our list
          * before we drop the pending count to 0. */
         listIter li;
         listNode *ln;
         // 从专属任务队列中取出任务
         listRewind(io_threads_list[id],&li);    
         while((ln = listNext(&li))) {
             client *c = listNodeValue(ln);
             if (io_threads_op == IO_THREADS_OP_WRITE) {
                 // encode 和 write
                 writeToClient(c,0);
             } else if (io_threads_op == IO_THREADS_OP_READ) {
                 // read 和 decode,读事件回调函数
                 readQueryFromClient(c->conn);
             } else {
                 serverPanic("io_threads_op value is unknown");
             }
         }
         listEmpty(io_threads_list[id]);
         io_threads_pending[id] = 0;
         ...
     }
 }

子线程 decode 结束后,设置连接状态 CLIENT_PENDING_COMMAND,交给主线程来 compute,退出读事件回调函数。主线程负责 compute ,解析 redis 命令。

// networking.c
 // readQueryFromClient 函数中 decode 阶段调用
 void processInputBuffer(client *c) {
     /* Keep processing while there is something in the input buffer */
     while(c->qb_pos < sdslen(c->querybuf)) {
     ...
         if (c->reqtype == PROTO_REQ_INLINE) {
             // 分割数据包。并判断是否完整
             if (processInlineBuffer(c) != C_OK) break;
             ...
         } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
             // 分割 pipline 的数据包,并判断是否完整
             if (processMultibulkBuffer(c) != C_OK) break;
         }
         ...
         else {
             /* If we are in the context of an I/O thread, we can't really
              * execute the command here. All we can do is to flag the client
              * as one that needs to process the command. */
             // io 线程设置任务状态,交给主线程compute,退出读事件回调函数
             if (c->flags & CLIENT_PENDING_READ) {
                 c->flags |= CLIENT_PENDING_COMMAND;
                 break;
             }/* We are finally ready to execute the command. */
             // 3、compute,主线程解析命令
             if (processCommandAndResetClient(c) == C_ERR) {
                 /* If the client is no longer valid, we avoid exiting this
                  * loop and trimming the client buffer later. So we return
                  * ASAP in that case. */
                 return;
             }
         }
     }
     ...
 }

主线程 compute 结束后,调用 addReply 函数,将处理完的连接放到全局任务队列clients_pending_write,并将待发送的数据写到缓冲区。

// networking.c
 int processCommandAndResetClient(client *c) {
     ...
     // 处理命令
     if (processCommand(c) == C_OK) {
         commandProcessed(c);
     }
     ...
 }
 
 // server.c
 int processCommand(client *c) {
     ...
     /* Exec the command */
     // 开启 io 多线程,且不是事务命令
     if (c->flags & CLIENT_MULTI &&
         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
     {
         // 把数据写到缓冲区
         addReply(c,shared.queued);
     } else {
         // 执行 redis 命令
         call(c,CMD_CALL_FULL);
         ...
     }
     ...
 }// networking.c
 // 数据写到发送缓冲区
 void addReply(client *c, robj *obj) {
     if (prepareClientToWrite(c) != C_OK) return;
     ...
 }int prepareClientToWrite(client *c) {
     ...
     if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
             clientInstallWriteHandler(c);   // 任务写到全局队列中
     ...
 }

接下来,子线程和主线程都可以从自己的专属任务队列中获得该任务,执行 encode 和 send 的业务逻辑处理 writeToClient。若数据未发送完,则注册写事件回调,等待再次发送。

// 子线程:线程池入口函数
 void *IOThreadMain(void *myid) {
     ... 
         if (io_threads_op == IO_THREADS_OP_WRITE) {
             // encode 和 write
             writeToClient(c,0); // 数据写到 socket
         } else if (io_threads_op == IO_THREADS_OP_READ) {
             // read 和 decode
             readQueryFromClient(c->conn); // 读事件回调函数
     ...
 }
 // 主线程
 int handleClientsWithPendingWritesUsingThreads(void) {
     int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but the boring synchronous code. */
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {//判断是否有必要开启IO多线程
        return handleClientsWithPendingWrites();
    }

    /* Start threads if needed. */
    if (!server.io_threads_active) startThreadedIO();//开启io多线程

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);//创建一个迭代器li,用于遍历任务队列clients_pending_write
    int item_id = 0;//默认是0,先分配给主线程去做(生产者也可能是消费者),如果设置成1,则先让io线程1去做
    //io_threads_list[0] 主线程
    //io_threads_list[1] io线程
    //io_threads_list[2] io线程   
    //io_threads_list[3] io线程   
    //io_threads_list[4] io线程
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);//取出一个任务
        c->flags &= ~CLIENT_PENDING_WRITE;

        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {//表示该客户端的输出缓冲区超过了服务器允许范围,将在下一次循环进行一个关闭,也不返回任何信息给客户端,删除待读客户端
            listDelNode(server.clients_pending_write, ln);
            continue;
        }

        /* Since all replicas and replication backlog use global replication
         * buffer, to guarantee data accessing thread safe, we must put all
         * replicas client into io_threads_list[0] i.e. main thread handles
         * sending the output buffer of all replicas. */
        if (getClientType(c) == CLIENT_TYPE_SLAVE) {
            listAddNodeTail(io_threads_list[0],c);
            continue;
        }
        //负载均衡:将任务队列中的任务 添加 到不同的线程消费队列中去,每个线程就可以从当前线程的消费队列中取任务就行了
        //这样做的好处是,避免加锁。当前是在主线程中,进行分配任务
        //通过取余操作,将任务均分给不同io线程
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);//设置io线程启动条件,启动io线程
    }
     
     /* Also use the main thread to process a slice of clients. */
     // 让主线程去处理一部分任务
     listRewind(io_threads_list[0],&li);
     while((ln = listNext(&li))) {
         client *c = listNodeValue(ln);
         writeToClient(c,0); 
     }
     listEmpty(io_threads_list[0]);
     
      /* Wait for all the other threads to end their work. */
    while(1) {//剩下的任务io_threads_list[1],io_threads_list[2].....给io线程去做,等待io线程完成任务
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);//等待io线程结束,并返回处理的数量
        if (pending == 0) break;
    }

    io_threads_op = IO_THREADS_OP_IDLE;
     
     /* Run the list of clients again to install the write handler where
      * needed. */
     listRewind(server.clients_pending_write,&li);
     while((ln = listNext(&li))) {
         client *c = listNodeValue(ln);
         /* Install the write handler if there are pending writes in some
          * of the clients. */
         // 数据没写完,注册写事件回调
         if (clientHasPendingReplies(c) &&
                 connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
         {
             freeClientAsync(c);
         }
     }
     listEmpty(server.clients_pending_write);
     ...
 }

负载均衡:将任务队列中的任务 添加 到不同的线程消费队列中去,每个线程就可以从当前线程的消费队列中取任务就行了。这样做的好处是,避免加锁。当前是在主线程中,进行分配任务通过取余操作,将任务均分给不同的io线程。

线程调度

开启io线程startThreadedIO

每个io线程都有一把锁,如果主线程把锁还回去了,那么io线程就会启动,不再阻塞
并设置io线程标识为活跃状态io_threads_active=1

void startThreadedIO(void) {
    serverAssert(server.io_threads_active == 0);
    for (int j = 1; j < server.io_threads_num; j++)
        pthread_mutex_unlock(&io_threads_mutex[j]);
    server.io_threads_active = 1;
}
关闭io线程stopThreadedIO

每个io线程都有一把锁,如果主线程拿了,那么io线程就会阻塞等待,也就是停止了IO线程
并设置io线程标识为非活跃状态io_threads_active=0

void stopThreadedIO(void) {
    /* We may have still clients with pending reads when this function
     * is called: handle them before stopping the threads. */
    handleClientsWithPendingReadsUsingThreads();
    serverAssert(server.io_threads_active == 1);
    for (int j = 1; j < server.io_threads_num; j++)
        pthread_mutex_lock(&io_threads_mutex[j]);//
    server.io_threads_active = 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/105959.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

《ATTCK视角下的红蓝对抗实战指南》一本书构建完整攻防知识体系

一. 网络安全现状趋势分析 根据中国互联网络信息中心&#xff08;CNNIC&#xff09;发布的第51次《中国互联网络发展状况统计报告》&#xff0c;截至2022年12月&#xff0c;我国网民规模为10.67亿&#xff0c;互联网普及率达75.6%。我国有潜力建设全球规模最大、应用渗透最强的…

雪糕冰淇淋经营配送小程序商城效果如何

雪糕冰淇淋是很多年轻人喜欢的食品之一&#xff0c;市面上的雪糕品牌类型众多&#xff0c;销售模式主要为厂家批发、经销商零售等&#xff0c;由于雪糕冰淇淋的易化性&#xff0c;很多用户会选择就近购买&#xff0c;但制作技术升级和长途冷藏技术下&#xff0c;网购成为另一种…

建模仿真软件 Comsol Multiphysics mac中文版软件介绍

COMSOL Multiphysics mac是一款全球通用的基于高级数值方法和模拟物理场问题的通用软件&#xff0c;拥有、网格划分、研究和优化、求解器、可视化和后处理、仿真 App等相关功能&#xff0c;轻松实现各个环节的流畅进行&#xff0c;它能够解释耦合或多物理现象。 附加产品扩展了…

基于Java的电动车实名挂牌系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09; 代码参考数据库参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…

NLog详解

目录 1.简介 2.项目中使用NLog 2.1 快速使用NLog 2.2 通过配置文件使用NLog 3.NLog配置参数详解 3.1 全局配置 3.2 根元素 3.2.1 targets 3.2.1.1 layout 3.2.2 rules 3.2.3 extensions 3.2.4 include 3.2.5 variable 4.附录 1.简介 NLog是一个基于.NET平台编写…

minio + linux + docker + spring boot实现文件上传与下载

minio docker spring boot实现文件上传与下载 1.在linux上安装并启动docker2.在docker中拉取minio并启动3.Spring Boot 整合 minio4.测试 minio 文件上传、下载及图片预览等功能 1.在linux上安装并启动docker 检查linux内核&#xff0c;必须是3.10以上 uname ‐r安装docker…

基于aop 代理 Sentinel Nacos配置控制包装类实现原理

基于aop & 代理 & Sentinel & Nacos配置控制包装类实现原理 Hi&#xff0c;我是阿昌&#xff0c;今天记录下看sentinel源码结合业务实现的思路基于aop & 代理 & Sentinel & Nacos配置控制包装类实现原理&#xff1b;下面并不会手把手的记录方案的实现…

绿野仙踪不仅是童话,还是便宜又好用的产品测试法!

以 ChatGPT 为代表的大语言模型爆火后&#xff0c;推动了对话类人工智能产品的高速发展&#xff0c;我们已经看到了如智能助理、问答系统、自动写作等多种类型的个性化对话类 AI 服务。 AI 能力的提升让人们对智能 AI 产品的期望越来越高&#xff0c;相关产品的用户体验也因此变…

基于springboot实现校友社交平台管理系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现校友社交平台管理系统演示 摘要 校友社交系统提供给用户一个校友社交信息管理的网站&#xff0c;最新的校友社交信息让用户及时了解校友社交动向,完成校友社交的同时,还能通过论坛中心进行互动更方便。本系统采用了B/S体系的结构&#xff0c;使用了java技…

PPT文档图片设计素材资源下载站模板源码/织梦内核(带用户中心+VIP充值系统+安装教程)

源码简介&#xff1a; PPT文档图片设计素材资源下载站模板源码&#xff0c;作为织梦内核素材资源下载站源码&#xff0c;它自带了用户中心和VIP充值系统&#xff0c;也有安装教程。 织梦最新内核开发的模板&#xff0c;该模板属于素材下载、文档下载、图库下载、PPT下载、办公…

疯狂java 三-六章

第三章 数据类型和运算符 Java语言是强类型语言&#xff0c;意思是每个变量和每个表达式都有一个在编译时就确定的类型&#xff0c;所有的变量都必须显式声明类型 标识符就是类&#xff0c;变量、方法命名的符号 标识符不能包含空格 标识符只能包含美元符($)&#xff0c;不…

python自动化测试平台开发:自动化测试平台简介

一.测试平台简介 为什么需要测试平台 已有的开源测试平台不能满足需要&#xff0c;不要轻易造轮子 需要公司级别的定制 需要整合公司内部的多套平台 例子&#xff1a;DevOps平台、精准化测试平台、质量监控平台等等 常见的测试平台开发模式 大一统模式&#xff08;适合简单的…

基于springboot实现校友社交平台管理系统项目【项目源码+论文说明】

基于springboot实现校友社交平台管理系统演示 摘要 校友社交系统提供给用户一个校友社交信息管理的网站&#xff0c;最新的校友社交信息让用户及时了解校友社交动向,完成校友社交的同时,还能通过论坛中心进行互动更方便。本系统采用了B/S体系的结构&#xff0c;使用了java技…

STM32-程序占用内存大小计算

STM32中程序占用内存容量 Keil MDK下Code, RO-data,RW-data,ZI-data这几个段: Code存储程序代码。 RO-data存储const常量和指令。 RW-data存储初始化值不为0的全局变量。 ZI-data存储未初始化的全局变量或初始化值为0的全局变量。 占用的FlashCode RO Data RW Data; 运行消…

如何为你的地图数据设置地图样式?

地图样式设置是GIS系统中非常重要的功能模块&#xff0c;水经微图Web版本最近对符号样式功能模块进行了升级。 你可以通过以下网址直接打开访问&#xff1a; https://map.wemapgis.com 现在我们为大家分享一下水经微图Web版中&#xff0c;如何为你标注的地图数据设置地图样式…

微信小程序——后台交互

目录 后台准备 pom.xml 配置数据源 整合mtbatis 前后端交互 method1 method2 后台准备 pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org…

postgresSQL 数据库本地创建表空间读取本地备份SQL文件

使用pgAdmin4&#xff0c;你安装PG得文件夹****/16/paAdmin 4 /runtime/pgAdmin4.exe 第一步&#xff1a;找到Tablespaces 第二步&#xff1a;创建表空间名称 第三步&#xff1a;指向数据文件 第四步&#xff1a;找到Databases&#xff0c;创建表空间 第五步&#xff1a;输入数…

Jmeter性能测试:高并发分布式性能测试

​一、为什么要进行分布式性能测试 当进行高并发性能测试的时候&#xff0c;受限于Jmeter工具本身和电脑硬件的原因&#xff0c;无法满足我们对大并发性能测试的要求。 基于这种场景下&#xff0c;我们就需要采用分布式的方式来实现我们高并发的性能测试要求。 二、分布式性能…

Zeth:首个Type 0 zkEVM

1. 引言 一年前&#xff0c;V神博客The different types of ZK-EVMs中指出&#xff1a; 以太坊初始设计未围绕ZK友好性&#xff0c;因此&#xff0c;以太坊协议的很多部分都需要大量计算来做ZK-prove。Type 1 zkEVM致力于精准复制以太坊&#xff0c;因此它没有办法减轻这些低…

台积电2纳米黑科技 - 晶背供电 | 百能云芯

近期&#xff0c;台积电总裁魏哲家在一次法说会中透露了有关2纳米芯片的最新进展&#xff0c;并提到了“晶背供电”技术&#xff0c;这个领域的神秘黑科技正逐渐引起人们的兴趣。 在最近的台积电法说会上&#xff0c;总裁魏哲家不仅提到了2纳米制程的进展&#xff0c;还透露&am…