改造muduo,不依赖boost,用C++11重构

组件的实现

1. 序

1.1. 总述

        muduo库是基于多Reactor-多线程模型实现的TCP网络编程库,性能良好。如libev作者:“One loop per thread is usually a good model”,muduo库的作者陈硕在其《Linux多线程服务端编程》中也力荐这种“One loop per thread”的IO模型,使我们仅需要关注EventLoop的设计与实现,然后每个线程run一个loop即可。不过由于当时C++11并没有进入实用,在这一书中,作者没有谈及C++11,整个muduo库的实现,也依赖了boost库。

        而在项目设计与实现中,按照C++11标准对muduo库中核心部分进行重写,主要涉及了以下模块:Channel、Poller、EventLoop、Thread、EventLoopThread、EventLoopThreadPool、Socket、Acceptor、Buffer、TcpConnection、TcpServer,下面将进行分述。

1.2. One loop per thread

        在多Reactor-多线程模型中,运用one loop per thread的思想,由一个mainReactor负责accept连接,然后把该连接挂载到某个subReactor,多个连接分配到多个线程,充分利用CPU。

2. 核心部分

        在手写muduo库项目之中,存在三个核心部分,分别是Channel类、Poller类和EventLoop类,这三大类的组合,实现了reactor用以监听fd并同时处理相应的回调函数。其中Poller和Channel之间通过EventLoop相互通信。

2.1. Channel
  1. fd_:封装sockfd,两种Channel:listenfd-acceptorChannel,connfd-ConnectionChannel;
  2. events_:fd监听的事件类型;
  3. revents_:Poller返回的具体监听到的事件。
  4. callback:上层设置的各种类型事件回调;
  5. tie_:weak_ptr<void>,在事件监听器返回监听结果后,就会调用Channel中的handleEvent()函数。首先会把tie_这个weak_ptr提升为shared_ptr,它会指向当前的TcpConnection对象,即使外面调用了删除析构了其他所有指向该TcpConnection的智能指针,只要没有handleEvent()完,这个TcpConnection都不会被析构释放堆内存。
2.2. Poller/EpollPoller

        muduo库提供poll和epoll两种IO多路复用方法来实现事件监听,重写时,通过基类Poller和派生类EpollPoller,支持了Epoll。Poller主要扮演Reactor模型中Demultiplex事件分发器(也可以说是事件监听器)的角色。

  1. epollfd_:记录epoll_create返回的句柄
  2. channels_:用来记录注册在其上的Channel的unordered_map。
2.3. EventLoop

        EventLoop扮演Reactor模型中Reactor的角色,是对epoll的封装。EventLoop在epoll_create,注册各个Channel之后,处于epoll_wait阻塞状态,要想唤醒当前的EventLoop去执行新的连接,通过往wakefd上写入一个字符,唤醒当前的EventLoop。(而并非生产者-消费者模型)。

  1. 包含了所有的Channel
  2. 每一个loop都有一个wakeupFd
2.4. 具体方法的部分代码实现
  • EventLoop::loop()——开启事件循环
// 开启事件循环
void EventLoop::loop()
{
    // ...   
    while(!quit_)
    {
        activeChannels_.clear();
        // 监听两种fd: client的fd、wakeupfd
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); // epoll_wait发生的位置
        for(Channel *channel : activeChannels_)
        {   
            // Poller监听哪些Channel发生事件了,然后上报给EventLoop,EventLoop通知处理相应的事件
            // handleEevent根据具体事件类型调用相应类型的回调函数
            channel->handleEvent(pollReturnTime_);
        }
        // ...
    }
    LOG_INFO("EventLoop %p stop looping. \n", this);
    // ...
}
  • EpollPoller::poll()——开启Poller事件监听,调用了::epoll_wait()
// 通过epoll_wait监听哪些Channel/fd发生事件
Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels)
{
    // ...
    int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);
    int savedErrno = errno;
    Timestamp now(Timestamp::now());

    if(numEvents > 0)   // 有事件发生
    {
        LOG_DEBUG("%d events happened \n", numEvents);
        fillActiveChannels(numEvents, activeChannels);
        if(numEvents == events_.size())
        {
            events_.resize(events_.size() * 2);
        }
    }
    else if(numEvents == 0) // 超时
    {
        LOG_DEBUG("%s timeout! \n", __FUNCTION__);
    }
    else
    {
        if(savedErrno != EINTR)
        {
            errno = savedErrno;
            LOG_ERROR("EPollPoller::poll() err! \n");
        }
    }
    return now;
}
  • 唤醒机制——通过向eventfd写一个数据

        在Linux操作系统上,可以通过三种方式唤醒fd:1. 通过管道pipe向绑定到epollfd的一端写一个字节;2. 使用Linux内核2.6版本之后的eventfd;3. 使用socketpair。而在本项目中,采用的是创建eventfd然后在需要唤醒的时候写数据(8个字节)来唤醒subLoop。

// 创建wakeupfd,用来notify唤醒subReactor处理新的Channel
//O_CLOEXEC避免文件描述符被继承到子进程中
int createEventFd()
{
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    // ...
    return evtfd;
}

// 用于唤醒loop所在线程: 向wakefd写一个数据
//wakeupFd_在构造函数中通过createsEventFd()函数初始化
void EventLoop::wakeup()
{
    uint64_t one = 1;
    ssize_t n = write(wakeupFd_, &one, sizeof(one));
    if(n != sizeof(one))
    {
        LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
    }
}

3. 其他部分

3.1. EventLoopThreadPool

        EventLoopThreadPool类,可以理解为subLoop池,主要是对EventLoopThread的封装,而EventLoopThread又是对EventLoop(Reactor)和Thread(记录线程的详细信息)的封装。

        其中,初始化时,会提供一个baseLoop(mainLoop)来进行基本的事件循环。通过设置numthreads_来创建对应数量的subReactor,每当创建一个线程,就会生成一个EventLoop。

        在工作方式上,通过getNextLoop()方法,实现对subReactor的轮询。

// ...
class EventLoopThreadPool : noncopyable
{
public:
    using ThreadInitCallback = std::function<void(EventLoop *)>;

    EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg);
    ~EventLoopThreadPool();

    void setThreadNum(int numThreads) { numThreads_ = numThreads; }
    void start(const ThreadInitCallback &cb = ThreadInitCallback());

    // 如果工作在多线程中,baseLoop默认以轮询的方式分配Channel给subLoop
    EventLoop *getNextLoop();

    std::vector<EventLoop *> getAllLoops();

    bool started() const { return started_; }
    const std::string &name() const { return name_;}

private:
    EventLoop *baseLoop_;   //EventLoop loop 用户线程
    std::string name_;
    bool started_;
    int numThreads_;
    int next_;
    std::vector<std::unique_ptr<EventLoopThread>> threads_;
    std::vector<EventLoop *> loops_;
};
3.2. Acceptor

        Acceptor类,封装的是服务器监听socketfd和相关处理函数。接收新用户连接后,通过轮询来选择subReactor并给它分发连接。

3.3. TcpConnection

        每个连接进来的客户端,对应一个TcpConnection,封装了一个connfd,一个Channel,各种回调函数(Callback)和读写缓冲区(Buffer)。

        state_:记录当前连接状态,一共有四种:kConnected、kConnecting、kDisconnecting、kDisconnected。

整个TcpConnection的工作流程

  1. TcpServer通过Acceptor监听用户新连接,用accept拿到connfd
  2. TcpConnection设置回调给Channel,Channel注册到Poller
  3. Poller监听到事件就通知调用Channel的回调
3.4. Buffer

        Buffer缓冲区通过vector来实现,空间不足时,通过vector类的成员函数resize()即可实现扩容。在空间的设计上,主要分为如下图三个区域(和Netty中Buffer的设计类似?)

3.5. TcpServer

        在TcpServer类中,有一个Acceptor,一个EventLoopThreadPool,一些回调函数,一个记录所有连接的unordered_map<string, TcpConnectionPtr>。

// 对外服务器编程需要使用的类
class TcpServer : noncopyable
{
public:
    using ThreadInitCallback = std::function<void(EventLoop *)>;

    enum Option
    {
        kNoReusePort,
        kReusePort,
    };

    TcpServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg,Option option = kNoReusePort);
    ~TcpServer();

    void setThreadInitCallback(const ThreadInitCallback &cb) { threadInitCallback_ = cb; }
    void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }
    void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }
    void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }

    // 设置subLoop个数
    void setThreadNum(int numThreads);

    // 开启服务器监听
    void start();
private:
    void newConnection(int sockfd, const InetAddress &peerAddr);
    void removeConnection(const TcpConnectionPtr &conn);
    void removeConnectionInLoop(const TcpConnectionPtr &conn);

    using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;

    EventLoop *loop_;   // baseLoop

    const std::string ipPort_;
    const std::string name_;

    std::unique_ptr<Acceptor> acceptor_;    // 运行在mainLoop,监听新连接事件

    std::shared_ptr<EventLoopThreadPool> threadPool_; // one loop per thread
    
    ConnectionCallback connectionCallback_;         // 有新连接时的回调
    MessageCallback messageCallback_;               // 有读写消息时的回调
    WriteCompleteCallback writeCompleteCallback_;   // 消息发送完成后的回调

    ThreadInitCallback threadInitCallback_;         // loop线程初始化的回调

    std::atomic_int started_; 

    int nextConnId_;
    ConnectionMap connections_; // 保存连接的HashMap
};

        start():启动EventLoopThreadPool,调用acceptor_的listen()方法,监听客户端的连接套接字。

        newConnection():该方法被注册到了acceptor_中,当acceptor_监听到新用户连接时会执行该回调,轮询选择一个subReactor;根据连接成功的sockfd,创建一个连接对象并加入到TcpServer的存储连接信息的connections_中;给这个连接设置回调;然后在mainLoop执行connectEstablished();

        上面提到的关闭连接的回调函数,真实的调用过程:TcpConnection::setCloseCallBack() --> TcpServer::removeConnection() --> TcpServer::removeConnectionInLoop() --> TcpConnection::connectionDestroyed()

4. 工作流程

4.1. 安装

下载到文件夹后,sudo ./autobuild.sh,运行编译和安装脚本,相关头文件也会添加到系统路径。

4.2. 测试代码

下面的内容是一个回射服务器,可以编译运行后,使用telnet、netcat等工具进行简单测试。

#include <ee_muduo_cpp11/TcpServer.h>
#include <ee_muduo_cpp11/Logger.h>

#include <string>
#include <functional>

class EchoServer
{
public:
    EchoServer(EventLoop *loop, const InetAddress &addr, const std::string &name)
        : server_(loop, addr, name)
        , loop_(loop)
    {
        // 注册回调函数
        server_.setConnectionCallback(
            std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
        );

        server_.setMessageCallback(
            std::bind(&EchoServer::onMessage, this,
                std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
        );

        // 设置合适的loop线程数量 loopthread
        server_.setThreadNum(3);
    }
    void start()
    {
        server_.start();
    }
private:
    // 连接建立或者断开的回调
    void onConnection(const TcpConnectionPtr &conn)
    {
        if (conn->connected())
        {
            LOG_INFO("Connection UP : %s", conn->peerAddress().toIpPort().c_str());
        }
        else
        {
            LOG_INFO("Connection DOWN : %s", conn->peerAddress().toIpPort().c_str());
        }
    }

    // 可读写事件回调
    void onMessage(const TcpConnectionPtr &conn,
                Buffer *buf,
                Timestamp time)
    {
        std::string msg = buf->retrieveAllAsString();
        conn->send(msg);
        conn->shutdown(); // 写端   EPOLLHUP =》 closeCallback_
    }

    EventLoop *loop_;
    TcpServer server_;
};

int main()
{
    EventLoop loop;
    InetAddress addr(8000);
    EchoServer server(&loop, addr, "EchoServer"); // Acceptor non-blocking listenfd  create bind 
    server.start(); // listen  loopthread  listenfd => acceptChannel => mainLoop =>
    loop.loop(); // 启动mainLoop的底层Poller

    return 0;
}
4.3. 工作流程

对于整个库:

  1. 用户创建mainLoop,主线程作为mainReactor,主要用来接收/断开用户连接。
  2. 给TcpServer设置连接和读写事件回调,TcpServer再给TcpConnection设置回调(用户设置的),TcpConnection再给Channel设置回调(先执行这个,再执行用户回调)。
  3. TcpServer根据用户设置传入的线程数,去ThreadPool中开启几个线程。如果没有设置,mainLoop还要负责读写事件的任务。
  4. 当有新连接进来,创建一个TcpConnection,然后由Acceptor轮询唤醒subLoop来提供服务。
  5. 每个subLoop在服务时,其所包含的Poller没有事件就会处于循环阻塞状态,发生事件之后,根据类型再去执行相应的回调操作。

5. 参考资料

  • 《高性能服务结构设计思想——one-thread-one-loop》,张小方,CppGuide,05. 高性能服务结构设计思想——one-thread-one-loop
  • 《Linux多线程服务器编程:使用muduo C++网络库》,陈硕
  • 《Muduo网络库源代码分析:EventLoopThread和EventLoopThreadPool的封装》,blfbuaa,https://www.cnblogs.com/blfbuaa/p/7263398.html
  • 《图解操作系统》,小林coding,https://mp.weixin.qq.com/mp/appmsgalbum?action=getalbum&__biz=MzUxODAzNDg4NQ==&scene=1&album_id=1408057986861416450&count=3#wechat_redirect

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/429308.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

linux中对信号的认识

信号的概念与相关知识认识 信号是向目标进程发送消息通知的的一种机制。 信号可以以异步的方式发送给进程&#xff0c;也就是说&#xff0c;进程无需主动等待&#xff0c;而是在任何时间都可以接收到信号。 信号的种类 用kill-l命令查看系统定义的信号列表&#xff1a; 前台…

初识Hive

官网地址为&#xff1a; Design - Apache Hive - Apache Software Foundation 一、架构 先来看下官网给的图&#xff1a; 图上显示了Hive的主要组件及其与Hadoop的交互。Hive的主要组件有&#xff1a; UI&#xff1a; 用户向系统提交查询和其他操作的用户界面。截至2011年&…

Linux - 安装 maven(详细教程)

目录 一、下载二、安装三、配置环境变量四、镜像资源配置 一、下载 官网&#xff1a;https://maven.apache.org/download.cgi 打开 maven 的官网下载页面&#xff0c;点击 bin.tar.gz 文件链接 即可下载最新版本的 maven 如果想要下载旧版本的 meven&#xff0c;则点击 Maven…

【短时交通流量预测】基于GRNN神经网络

课题名称&#xff1a;基于GRNN神经网络的短时交通流量预测 版本时间&#xff1a;2023-04-27 代码获取方式&#xff1a;QQ&#xff1a;491052175 或者 私聊博主获取 模型简介&#xff1a; 城市交通路网中交通路段上某时刻的交通流量与本路段前几个时段的交通流量有关&#x…

Python类 __init__() 是一个特殊的方法

设计者&#xff1a;ISDF工软未来 版本&#xff1a;v1.0 日期&#xff1a;2024/3/5__init__() 是一个特殊的方法 类似c# C的构造函数 两头都包含两个下划线&#xff0c;这是约定&#xff0c;用于与普通的函数保持区分class User:用户类def __init__(self,first_name,last_name):…

软件应用,财务收支系统试用版操作教程,佳易王记录账单的软件系统

软件应用&#xff0c;财务收支系统试用版操作教程&#xff0c;佳易王记录账单的软件系统 一、前言 以下软件操作教程以 佳易王账单记账统计管理系统V17.0为例说明 软件文件下载可以点击最下方官网卡片——软件下载——试用版软件下载 如上图&#xff0c;统计报表包含 收支汇…

JavaScript基础2之运算符、函数

JavaScript基础 运算符一元操作符递增/递减一元加和减 布尔操作符逻辑非逻辑与逻辑或 乘性操作符乘法操作符除法操作符取模操作符 加性操作符加法操作符减法操作符 比较操作符相等操作符关系操作符 函数函数声明函数表达式箭头函数函数的实参和形参arguments 默认参数参数的拓展…

QUIC来了!

什么是QUIC QUIC&#xff0c;快速UDP网络连接(Quick UDP Internet Connection)的简称&#xff0c;即RFC文档描述它为一个面向连接的安全通用传输协议。其基于UDP协议实现了可靠传输及拥塞控制&#xff0c;简单来说&#xff0c;QUIC TCP TLS。 为什么有了QUIC HTTP2.0为了为了…

如何处理微服务之间的通信和数据一致性?

✨✨祝屏幕前的兄弟姐妹们每天都有好运相伴左右&#xff0c;一定要天天开心哦&#xff01;✨✨ &#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; 目录 引言 一、微服务通信 1、同步通信&#xff1a;HTTP 1.1.同步通信示例代码&#xf…

第四十九回 吴学究双掌连环计 宋公明三打祝家庄-Python与HTTP服务交互

吴用请戴宗从梁山请来铁面孔目裴宣、圣手书生萧让、通臂猿侯健、玉臂匠金大坚来帮忙。又告诫扈家庄的扈成&#xff0c;打起来不要去帮祝家庄。 孙立把旗号改成“登州兵马提辖孙立”&#xff0c;来祝家庄找峦廷玉&#xff0c;被热情接待。 第三天&#xff0c;宋江派小李广花荣…

Vue 路由功能

安装路由 npm install vue-router4创建路由器并导出 //导入vue-router import { createRouter, createWebHistory } from vue-router //导入组件 import LoginVue from /views/Login.vue import LayoutVue from /views/Layout.vue//定义路由关系 const routes [{ path: /log…

安卓玩机工具推荐----ADB状态读写分区 备份分区 恢复分区 查看分区号 工具操作解析

在以往玩机过程中。很多机型备份分区 备份固件需要借助adb手动指令或者第三方手机软件或者特定的一些工具来操作。有些朋友需要查看当前机型分区名称和对应的分区号。此类操作我前面的博文专门说过对应的adb指令。但有些界面化的工具比较方便简单。 相关分区同类博文&#xff…

WPF中如何设置自定义控件(二)

前一篇文章中简要讲解了圆角按钮、圆形按钮的使用,以及在windows.resource和app.resource中设置圆角或圆形按钮的样式。 这篇主要讲解Polygon(多边形)、Ellipse(椭圆)、Path(路径)这三个内容。 Polygon 我们先看一下的源码: namespace System.Windows.Shapes { pu…

性能问题分析排查思路之机器(3)

本文是性能问题分析排查思路的展开内容之一&#xff0c;第2篇&#xff0c;主要分为日志1期&#xff0c;机器4期、环境2期共7篇系列文章&#xff0c;本期是第三篇&#xff0c;讲机器&#xff08;硬件&#xff09;的网络方面的排查方法和最佳实践。 主要内容如图所示&#xff1a…

【短时交通流量预测】基于单层BP神经网络

课题名称&#xff1a;基于单层BP神经网络的短时交通流量预测 版本时间&#xff1a;2023-04-27 代码获取方式&#xff1a;QQ&#xff1a;491052175 或者 私聊博主获取 模型简介&#xff1a; 城市交通路网中交通路段上某时刻的交通流量与本路段前几个时段的交通流量有关&…

【计算机学习】-- 电脑的组装和外设

系列文章目录 文章目录 系列文章目录前言一、电脑的组装1.CPU2.主板3.显卡4.硬盘5.内存6.散热器7.电源8.机箱 二、电脑外设选用1.显示器2.鼠标3.键盘4.音响 总结 前言 一、电脑的组装 1.CPU 返回目录 认识CPU CPU&#xff0c;即中央处理器&#xff0c;负责电脑资源的调度安…

器件选型【电容,电阻篇】

电阻篇&#xff1a; 一句话先做总结&#xff1a;电阻的选型主要考虑额定电压和过流能力&#xff08;基于封装大小&#xff09; 电阻封装规格越大功率越大。但其功率也与温度有关&#xff0c;如果温度超过 70℃&#xff0c;其额定功率是会下降的。并且&#xff0c;R01005 和 R0…

#QT(串口助手-实现)

1.IDE&#xff1a;QTCreator 2.实验 3.记录 &#xff08;1&#xff09;在widget.h中加入必要文件&#xff0c;并且定义一个类指针 &#xff08;2&#xff09;如果有类的成员不知道怎么写&#xff0c;可以通过以下途径搜索 &#xff08;2&#xff09;设置串口数据 void Widget…

AI大全-通往AGI之路

背景 自从AI大模型出来之后&#xff0c;就有很多做资源整理的社区&#xff0c;整理学习资料&#xff0c;整理各种AI工具大全&#xff0c;我也整理过一段时间的最新AI的资讯&#xff0c;也曾尝试去弄一个AI的入口类的东西。但是最近看到一个在飞书上的分享&#xff0c;我觉得他…

IDEA自带 .http 请求工具文档

基础语法 请求格式 基础格式 Method Request-URI HTTP-Version Header-field: Header-valueRequest-Body其中&#xff0c;GET 请求可以省略 Method 不写&#xff1b;HTTP-Version 可以省略不写&#xff0c;默认使用 1.1 版本。 示例&#xff1a; GET https://www.baidu.co…