多路转接-epoll/Reactor(2)

epoll

上次说到了poll,它存在效率问题,因此出现了改进的poll----epoll。

目前epoll是公认的效率最高的多路转接的方案。

快速了解epoll接口

 epoll_create:

这个参数其实已经被废弃了。 这个值只要大于0就可以了。

 这是用来创建一个epoll模型的。

创建成功了就返回一个文件描述符。失败了返回-1

epoll_wait:

第一个参数就是epoll_create的返回值。后面俩参数是用户定义的缓冲区,用来返回就绪的文件描述符和事件。这里的timeout的单位是毫秒,跟poll那里一样。

这个接口的返回值是:已经就绪的fd的个数。

epoll_event结构体

 这里依旧是以位图的形式传递标记位。

epoll_ctl:

第一个参数依旧是epfd,op表示我们要进行哪些操作

 看名字也能大概看出来,分别是新增,修改和删除。

epoll原理

在Linux内核中,专门为epoll设计了一套独立的模型

 首先在内核中设计了一颗红黑树,它以fd为键值,其结点的值一般是一个结构体,里面包含了要关心的fd,以及这个fd关心的事件,还有一些链接字段等,然后还会有一个等待队列,如果有fd就绪了,就会将结点链入到这个就绪队列中,等待上层来处理,注意:这个结点既可以在红黑树中,也可以同时在就绪队列中。我们用epoll的时候,OS会将一个回调函数注册到底层,底层一旦就绪,就会自动调用这个回调函数,比如将数据向上交付,交给tcp的接受队列,查找红黑树中的fd,然后构建就绪结点插入到就绪队列中。

  这样,用户只需要从就绪队列中获取就绪结点就可以了。因此我们把这三个组合起来就是epoll模型。

这样回想起来之间了解的接口,比如epoll_create,它创建的就是一套epoll模型和注册底层的回调机制。但是有没有可能创建了多个epoll模型呢?那么多个模型OS也要管理起来,(比如可以记录红黑树的头结点指针和队列的头结点指针,就可以将这两个结构放在一起。)所以内核会创建一个struct file,把它也当作一个文件,struct file中也存在一个指针指向这个模型,OS再将这个文件添加到进程的文件描述符表里,通过fd来找到,这就解释了为什么epoll_create返回的也是一个文件描述符,因为epoll模型也被接入到了struct file中了。

如下

到这里,我们发现,select&&poll,他俩跟epoll完全不一样。 

所以现在对着图来看epoll的接口,就会发现简单的很多了,epoll_create就是为了创建epoll模型,epoll_ctl就是拿着epfd,进行的操作其实就是在修改红黑树。 

epoll_wait也是拿着epfd,看就绪队列是否有fd就绪。

另外这里的红黑树的性质,像不像我们在使用select/poll时用户维护的数组?性质是一样的,不过这次是由内核维护的。所以epoll是单独设计的一点,还体现在用户不需要再使用额外的数据结构来管理文件描述符和要关心的事件了。

epoll的优势:

1.检测就绪的时间复杂度是O(1)。因为只要看就绪队列是否为空就可以了。

获取就绪fd的时间复杂度是O(n)。

2.fd和event没有上限。

3.epoll的返回值 n,表示有几个fd就绪了,并且在内核中,会把这些结点一个一个弹出。就绪事件是连续的。这样就不需要用户再进行遍历排除非法的fd了,也就是需要浪费用户额外的时间了。

在进入代码前,可以先了解一下cmake

CMake

cmake是一个自动生成makefile的工具。

在CentOS下可以用

sudo yum install -y cmake

 来安装Cmake,如果是Ubuntu的话把yum改成apt就好了。

cmake --version

这个命令来查看cmake的版本。 

关于使用

首先要创建一个文件:CMakeLists.txt

因为我的CentOS只能安装到2.8.12.2这样的版本,所以在这里我选择最低的版本为2.8这样的,写好后,我们执行

cmake .

然后就会生成一大堆文件,包括makefile,

 vim一下这个Makefile

发现已经帮我们写好了

我们可以直接make

接下来就跟我们熟悉的一样了。

所以其实makefile已经有取代方案了,今后我们面对复杂的项目,使用CMake会简单很多。 

epoll的工作模式 

  epoll有两种工作模式:

LT(Level Triggered): 水平触发

一般epoll的默认工作模式就是LT模式。

当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分.
如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait
仍然会立刻返回并通知socket读事件就绪.
直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
支持阻塞读写和非阻塞读写

也就是说,在LT模式下,如果上层不及时处理数据,LT会一直通知,直到数据全部被处理。 

ET(Edge Triggered):边缘触发

当epoll检测到socket上事件就绪时, 必须立刻处理.
如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,
epoll_wait 不会再返回了.
也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会.
ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epoll.
只支持非阻塞的读写
也就是说只有因为新增而引起的变化时,ET才会通知。
一般普遍的认为ET的工作效率要比LT要高一些。
ET的效率高主要是它的 通知效率高。
因为ET模式下,只会通知一遍,因此在ET模式下,每次通知,必须把数据全部读走,一般是用循环读取,直到读取的字节数少于预期读取的字节数,或者读取出错才停止。另外因为fd是默认阻塞的,所以在ET模式下,要讲fd设置成非阻塞的,避免服务器因为读取被挂起。(这里有可能会成为面试题的)
ET的效率更高,不仅仅是通知效率更高(因为通知一次就要全部读走),它的IO效率也要更高一些,但是IO效率更高要如何理解呢?
我们以TCP协议为例,如果我们能一次将数据全部读走,那么我们就能给对方通知一个更大的窗口,从概率上让对方能一次发送更多的数据,因此IO效率也可能就更高。
但是话说回来,LT的效率一定比ET要低吗?如果在LT模式下,我们也是循环读取,在通知第一次的时候就将数据全部读走的话,不就和ET一样了吗?所以这个时候ET就不一定比LT高效了。

epollecho服务简单实现 

首先对epoll进行简单的封装

Epoller.hpp

#pragma once

#include "nocopy.hpp"
#include "Log.hpp"
#include <cerrno>
#include <cstring>
#include <sys/epoll.h>

class Epoller : nocopy
{
    static const int size = 128;
public:
    Epoller()
    {
        _epfd = epoll_create(size);
        if(_epfd == -1)
        {
            lg(Error,"epoll_create error: %s",strerror(errno));
        }
        else
        {
            lg(Info,"epoll_create success: %d",_epfd);
        }
    }

    int Epollwait(struct epoll_event revents[],int num)
    {
        int n = epoll_wait(_epfd,revents,num,-1); // -1表示阻塞等待
        return n;
    }

    int EpollUpdata(int oper,int sock,uint32_t event)
    {
        int n = 0;
        if(event == EPOLL_CTL_DEL) // 这里对于删除操作单独处理了
        {
            n = epoll_ctl(_epfd,oper,sock,nullptr);
            if(n != 0)
            {
                lg(Error,"epoll_ctl delete error");
            }
        }
        else 
        {
            // EPOLL_CTL_ADD || EPOLL_CTL_MOD
            struct epoll_event ev;
            ev.events = event;
            ev.data.fd = sock; // 为了方便后期它就绪时,我们知道是哪个fd。
            n = epoll_ctl(_epfd,oper,sock,&ev); // 本质上是对红黑树进行插入或修改结点
            if(n != 0)
            {
                lg(Error,"epoll_ctl error");
            }
        }

        return n;
    }

    ~Epoller()
    {
        if(_epfd >= 0)
            close(_epfd);
    }
private:
    int _epfd;
    int _timeout;
};

 EpollServer.hpp

#pragma once

#include <iostream>
#include <memory>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Log.hpp"
#include "Epoller.hpp"
#include "nocopy.hpp"

uint32_t EVENT_IN = (EPOLLIN);
uint32_t EVENT_OUT = (EPOLLOUT);

class EpollServer : nocopy
{
    const int num = 64;
public:
    EpollServer(uint16_t port = 8080)
        :_port(port)
        ,_listensocket_ptr(new Sock())
        ,_epoller_ptr(new Epoller())
        {}
    
    void Init()
    {
        _listensocket_ptr->Socket();
        _listensocket_ptr->Bind(_port);
        _listensocket_ptr->Listen();

        lg(Info,"create listen socket success :%d\n",_listensocket_ptr->Fd());
    }

    void Start()
    {
        // 首先将listensock添加到epoll中(还有其关心的事件),也就是添加到内核的红黑树中
        _epoller_ptr->EpollUpdata(EPOLL_CTL_ADD,_listensocket_ptr->Fd(),EVENT_IN);
        struct epoll_event revs[num]; // num是我们自己定的,不受接口限制
        while(true)
        {
            int n = _epoller_ptr->Epollwait(revs,num);
            if(n > 0)
            {
                // 有事件就绪了
                lg(Debug,"event happend, fd is : %d",revs[0].data.fd);
                Dispatcher(revs,n);
            }
            else if(n == 0)
            {
                lg(Info,"time out ....\n");
            }
            else 
            {
                lg(Error,"epoll_wait eroor\n");
            }
        }
    }

    void Dispatcher(struct epoll_event* revs,int num)
    {
        for(int i = 0; i < num; ++i)
        {
            uint32_t events = revs[i].events;
            int fd = revs[i].data.fd;
            if(events & EVENT_IN)
            {
                if(fd == _listensocket_ptr->Fd())
                {
                    //新链接
                    Accepter();
                }
                else 
                {
                    // 普通读取
                    Recver(fd);
                }
            }
            else // 这里只考虑读取事件了
            {}
        }
    }

    void Accepter()
    {
        std::string clientip;
        uint16_t clientport;
        int sock = _listensocket_ptr->Accept(&clientip,&clientport);
        if(sock < 0)
        {
            lg(Error,"Accept error, fd is : %d",sock);
        }

        // 获取到链接后,不能直接read,而是将fd添加进epoll
        _epoller_ptr->EpollUpdata(EPOLL_CTL_ADD,sock,EVENT_IN);
        lg(Info,"get a new link, clientip : %s, clientport: %d",clientip.c_str(),clientport);
    }
    
    void Recver(int fd)
    {
        // 这里依旧只是一个demo,并没有处理到位
        char buffer[1024];
        ssize_t n = read(fd,buffer,sizeof(buffer) - 1);
        if(n > 0)
        {
            buffer[n] = 0;
            std::cout << "get a massge: " << buffer << std::endl;
            //然后回显给对方
            std::string echo_str = "server echo $ ";
            echo_str += buffer;
            write(fd,echo_str.c_str(),echo_str.size());
        }
        else if(n == 0)
        {
            lg(Info,"client quit, me too,close fd is: %d",fd);
            // 这里关闭时有一个细节,要先从epoll中移除
            // 再close,避免先close导致fd非法再从epoll中移除而报错
            _epoller_ptr->EpollUpdata(EPOLL_CTL_DEL,fd,0);
            close(fd);
        }
        else 
        {
            lg(Warning,"recver error,close ..");
            _epoller_ptr->EpollUpdata(EPOLL_CTL_DEL,fd,0);
            close(fd);
        }
    }

    ~EpollServer()
    {
        _listensocket_ptr->Close();
    }

private:
    std::unique_ptr<Sock> _listensocket_ptr;
    std::unique_ptr<Epoller> _epoller_ptr;
    uint16_t _port;
};

在读取这里,以TCP为例,因为TCP是面向字节流的,我们并不能保证每次读取都是能读到完整的数据, 如果当前缓冲区里只有半个请求,我们读吗?读了之后放哪呢?在这里我们并没有处理,因此会在Ractor中解决。

我们可以通过单独设计一个类来进行防拷贝

nocopy.hpp

#pragma once

class nocopy
{
public: 
    nocopy(){}
    nocopy(const nocopy &) = delete;
    const nocopy& operator=(const nocopy &) = delete;
};

Reactor

 Reactor模式是一种事件驱动的并发编程模型,它解决了在高并发环境下处理大量客户端请求的问题。

  Reactor其实是一种半同步半异步的模型,同步是因为调用epoll,要自己等,异步体现在它可以进行回调处理。

代码:

首先我们先对设置非阻塞的函数进行一个封装

Comm.hpp

#pragma once

#include <unistd.h>
#include <fcntl.h>

void SetNonBlockOrDie(int sock)
{
    int fl = fcntl(sock,F_GETFL);
    if(fl < 0)
        exit(4);
    fcntl(sock,F_SETFL,fl | O_NONBLOCK);
}

TcpServer.hpp

#pragma once

#include <iostream>
#include <string>
#include <memory>
#include <cerrno>
#include <unordered_map>
#include <functional>
#include "Comm.hpp"
#include "Log.hpp"
#include "nocopy.hpp"
#include "Epoller.hpp"
#include "Socket.hpp"

class Connection;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);

const static int g_buffer_size = 128;

using func_t = std::function<void(std::weak_ptr<Connection>)>;
using except_func = std::function<void(std::weak_ptr<Connection>)>;

// 对每个链接进行管理
class Connection
{
public:
    Connection(int sock)
        : _sock(sock)
    {
    }

    void SetHandler(func_t recv_cb, func_t send_cb, except_func except_cb)
    {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }

    int SockFd()
    {
        return _sock;
    }

    void AppendInBuffer(const std::string &info)
    {
        _inbuffer += info;
    }

    void AppendOutBuffer(const std::string &info)
    {
        _outbuffer += info;
    }

    std::string &InBuffer()
    {
        return _inbuffer;
    }

    std::string &OutBuffer()
    {
        return _outbuffer;
    }

    void SetWeakPtr(std::weak_ptr<TcpServer> tcp_server_ptr)
    {
        _tcp_server_ptr = tcp_server_ptr;
    }

    ~Connection()
    {
    }

private:
    int _sock;
    std::string _inbuffer; // 输入缓冲区,但是无法接收二进制流
    std::string _outbuffer;

public:
    // 回调方法
    func_t _recv_cb;
    func_t _send_cb;
    func_t _except_cb;

    // 回指指针
    std::weak_ptr<TcpServer> _tcp_server_ptr;

    std::string _ip;
    uint16_t _port;
};

class TcpServer : public std::enable_shared_from_this<TcpServer>, public nocopy
{
    static const int num = 64;

public:
    TcpServer(uint16_t port, func_t OnMessage)
        : _port(port),
          _OnMessage(OnMessage),
          _quit(true),
          _epoller_ptr(new Epoller()),
          _listensock_ptr(new Sock())
    {
    }

    void Init()
    {
        _listensock_ptr->Socket();
        SetNonBlockOrDie(_listensock_ptr->Fd());
        _listensock_ptr->Bind(_port);
        _listensock_ptr->Listen();
        lg(Info, "create listen socket success: %d", _listensock_ptr->Fd());
        Addconnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
    }

    void Addconnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, except_func except_cb,
                       const std::string &ip = "0.0.0.0", uint16_t port = 0)
    {
        // 1.给sock建立一个Connection对象,并插入到map中
        std::shared_ptr<Connection> new_connection(new Connection(sock));
        new_connection->SetWeakPtr(shared_from_this()); // shared_from_this()是返回当前对象的shared_ptr,目的是设置回指
        new_connection->SetHandler(recv_cb, send_cb, except_cb);
        new_connection->_ip = ip;
        new_connection->_port = port;

        // 2.添加到unordered_map中
        _connections.insert(std::make_pair(sock, new_connection));

        // 3.添加对应的fd和事件到epoll中
        _epoller_ptr->EpollUpdata(EPOLL_CTL_ADD, sock, event);
    }

    // 链接管理器
    void Accepter(std::weak_ptr<Connection> conn)
    {
        auto connection = conn.lock();
        while (true)
        {
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);
            int sock = ::accept(connection->SockFd(), (struct sockaddr *)&peer, &len); // :: 表示使用系统原生的接口
            if (sock > 0)
            {
                uint16_t peerport = ntohs(peer.sin_port);
                char ipbuf[128];
                inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));
                lg(Debug, "get a new client,get info[%s : %d],sockfd : %d", ipbuf, peerport, sock);

                SetNonBlockOrDie(sock);
                // listensock只要设置recv_cb,而其他sock读写异常都要关心设置
                Addconnection(sock, EVENT_IN,
                              std::bind(&TcpServer::Recver, this, std::placeholders::_1),
                              std::bind(&TcpServer::Sender, this, std::placeholders::_1),
                              std::bind(&TcpServer::Excepter, this, std::placeholders::_1),
                              ipbuf, peerport);
            }
            else
            {
                if (errno == EWOULDBLOCK)
                    break;
                else if (errno == EINTR)
                    continue;
                else
                    break; // 说明是真出错了
            }
        }
    }

    // 事件管理器
    // 这里不需要关心数据的格式,服务器只要IO数据即可,有没有读完,报文的细节交给上层
    void Recver(std::weak_ptr<Connection> conn)
    {
        if (conn.expired())
            return;
        auto connection = conn.lock();
        int sock = connection->SockFd();
        while (true) // 循环读取数据,因为是ET模式
        {
            char buffer[g_buffer_size];
            memset(buffer, 0, sizeof(buffer));
            int n = recv(sock, buffer, sizeof(buffer) - 1, 0); // 虽然这里flags是0,但是sock已经设置成非阻塞了
            if (n > 0)
            {
                connection->AppendInBuffer(buffer);
            }
            else if (n == 0)
            {
                lg(Info, "sockfd : %d,client info %s:%d,quit...", connection->SockFd(), connection->_ip.c_str(), connection->_port);
                connection->_except_cb(connection);
                return;
            }
            else
            {
                if (errno = EWOULDBLOCK)
                    break;
                if (errno == EINTR)
                    continue;
                else
                {
                    lg(Warning, "sockfd: %d,client info %s:%d recv error...", connection->SockFd(), connection->_ip.c_str(), connection->_port);
                    connection->_except_cb(connection);
                    return;
                }
            }
        }

        // 数据读上来了,但是不一定完整,交给上层处理
        _OnMessage(connection);
    }

    // 发送这里会麻烦一些
    void Sender(std::weak_ptr<Connection> conn)
    {
        if (conn.expired())
            return;
        auto connection = conn.lock();
        auto &outbuffer = connection->OutBuffer();
        while (true)
        {
            ssize_t n = send(connection->SockFd(), outbuffer.c_str(), sizeof(outbuffer), 0); // 同理,不会再阻塞了
            if (n > 0)
            {
                outbuffer.erase(0, n);
                if (outbuffer.empty())
                    break;
            }
            else if (n == 0)
            {
                return;
            }
            else
            {
                if (errno = EWOULDBLOCK)
                    break;
                if (errno == EINTR)
                    continue;
                else
                {
                    lg(Warning, "sockfd: %d,client info %s:%d send error...", connection->SockFd(), connection->_ip.c_str(), connection->_port);
                    connection->_except_cb(connection);
                    return;
                }
            }
        }

        // 如果发送缓冲区被打满了,可能还没发完就退出循环了
        // 此时才来判断是否要关心写事件就绪
        if(!outbuffer.empty())
        {
            // 开启对写事件的关心
            EnableEvent(connection->SockFd(),true,true);
        }
        else 
        {
            // 关闭对写事件的关心
            EnableEvent(connection->SockFd(),true,false);
        }
    }

    void Excepter(std::weak_ptr<Connection> conn)
    {
        if(conn.expired());
        auto connection = conn.lock();

        int fd = connection->SockFd();
        lg(Warning,"Excepter hander sockfd: %d,client info %s : %d, excepter hander",
            fd,connection->_ip.c_str(),connection->_port);

        // 1.移除对该fd的关心
        _epoller_ptr->EpollUpdata(EPOLL_CTL_DEL,fd,0);

        // 2.再关闭fd
        close(fd);
        lg(Debug,"close fd done %d  ...",fd);

        // 3.再从unordered_map中移除
        _connections.erase(fd);
    }

    void EnableEvent(int sock,bool readable,bool writeable)
    {
        uint32_t events = 0;
        events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
        _epoller_ptr->EpollUpdata(EPOLL_CTL_MOD,sock,events);
    }

    bool IsConnectionSafe(int fd)
    {
        auto iter = _connections.find(fd);
        if(iter == _connections.end())
            return false;
        else 
            return true;
    }

    void Dispatcher(int timeout)
    {
        int n = _epoller_ptr->Epollwait(revs,num,timeout);
        for(int i = 0; i < n; ++i)
        {
            uint32_t event = revs[i].events;
            int sock = revs[i].data.fd;

            //这里只处理读写事件
            if((event & EPOLLIN) && IsConnectionSafe(sock))
            {
                if(_connections[sock]->_recv_cb)
                    _connections[sock]->_recv_cb(_connections[sock]);
            }
            if((event & EPOLLOUT) && IsConnectionSafe(sock))
            {
                if(_connections[sock]->_send_cb)
                    _connections[sock]->_send_cb(_connections[sock]);
            }
        }
    }

    void Loop()
    {
        _quit = false;

        while(!_quit)
        {
            Dispatcher(-1); // 阻塞式

        }

        _quit = true;
    }

    void PrintConnection()
    {
        std::cout << "_connections fd list: ";
        for (auto &connection : _connections)
        {
            std::cout << connection.second->SockFd() << ", ";
            std::cout << "inbuffer: " << connection.second->InBuffer().c_str();
        }
        std::cout << std::endl;
    }

    ~TcpServer()
    {}

private:
    std::shared_ptr<Epoller> _epoller_ptr;                             // 内核
    std::shared_ptr<Sock> _listensock_ptr;                             // 监听socket
    std::unordered_map<int, std::shared_ptr<Connection>> _connections; // 哈希表管理链接
    struct epoll_event revs[num];
    uint16_t _port;
    bool _quit;
    // 处理上层信息(跟上层的应用场景有关)
    func_t _OnMessage;
};

其中在写,也就是Sender那里要说明:

select/poll/epoll因为发送缓冲区经常有空间,因此写事件是经常就绪的,如果我们对它进行EPOLLOUT设置关心,EPOLLOUT几乎每次都就绪,server经常返回,浪费CPU资源,

因此:对于读事件,我们设置常关心,对于写事件,按需关心 -> 也就是直接写入,如果写入完成,那么就结束了,不需要关心,如果因为一些原因(比如发送缓冲区满了)而没写完,也就是outbuffer里面还有数据,此时再设置关心,等下次写完了,再去掉关心。 

Main.cc

#include <iostream>
#include <functional>
#include <memory>
#include "Log.hpp"
#include "TcpServer.hpp"  // 只处理IO
// 这里可以把之前的网络计算器搬过来,用来当作上层应用


// for debug
void DefaultOnmessage(std::weak_ptr<Connection> conn)
{
    if(conn.expired()) return;
    auto connection_ptr = conn.lock();

    std::cout << "上层得到了数据: " << connection_ptr->InBuffer() << std::endl;

    // 在这里可以加入多线程(线程池),那么主线程只负责读取数据,在这里交给其他线程处理数据

    //std::string response_str = ?
    // if(response_str.empty()) return;
    // lg(Debug, "%s", response_str.c_str());
    // response_str 发送出去
    // connection_ptr->AppendOutBuffer(response_str);
    // 正确的理解发送?
    // connection_ptr->_send_cb(connection_ptr);
    
    auto tcpserver = connection_ptr->_tcp_server_ptr.lock();
    tcpserver->Sender(connection_ptr);
}

int main()
{
    std::shared_ptr<TcpServer> epoll_svr(new TcpServer(8080,DefaultOnmessage));
    epoll_svr->Init();
    epoll_svr->Loop();

    return 0;
}

总之我们的服务仅仅只是处理数据,将读取上来,而并没有做处理,我们也可以加入上层应用,比如之前的网络版本计算器,现在没有加入,导致读取上来的数据是堆积在一起的。

在大型服务器中,我们还可以加入多线程,主线程专门只做连接,和读取数据,其他线程处理数据。

还可以加入链接管理机制,比如用一个小堆,里面放入每个链接的链接时长,对于长时间没有相应的链接,我们可以判定超时而将它关闭,如果在超时前发送了消息,那么就重置它的超时时间。 

总之Reactor就是一个反应堆,它是一个半同步半异步模型,说是同步,体现在它也要进行等待,它参与了这个过程,异步体现在它可以将数据交给其他线程处理,然后只将结果返回。

就像打地鼠游戏一样,玩家监视着很多地洞,哪个洞冒出了地鼠,我们玩家就要去处理。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/519673.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

阿里云服务器资费:一年或1个月费用价格,2024年更新

阿里云服务器资费多少钱&#xff1f;一年或1个月费用价格&#xff1a;2核2G3M轻量服务器61元一年、ECS云服务器2核2G3M 99元一年&#xff0c;2核4G轻量165元一年&#xff0c;2核4G ECS 199元一年&#xff0c;阿里云服务器网aliyunfuwuqi.com整理如下&#xff1a; 1、ECS经济型e…

SpringBoot快速入门笔记(4)

文章目录 一、Vue框架1、前端环境准备2、简介3、快速开始4、事件绑定 二、Vue组件化开发1、NPM2、Vue Cli3、组件化开发4、SayHello自定义组件5、Movie自定义组件 一、Vue框架 1、前端环境准备 编码工具&#xff1a;VSCode 依赖管理&#xff1a;NPM 项目构建&#xff1a;VueCl…

Word·VBA文档合并

目录 1&#xff0c;复制法&#xff0c;不保留原文档格式2&#xff0c;复制法&#xff0c;保留原文档格式3&#xff0c;插入法&#xff0c;保留原文档格式 之前的文章《WordVBA实现邮件合并》虽然可以生成邮件合并文档结果&#xff0c;但是不能像《python实现word邮件合并》一样…

LeetCode-79. 单词搜索【数组 字符串 回溯 矩阵】

LeetCode-79. 单词搜索【数组 字符串 回溯 矩阵】 题目描述&#xff1a;解题思路一&#xff1a;回溯 回溯三部曲。这里比较关键的是给board做标记&#xff0c;防止之后搜索时重复访问。解题思路二&#xff1a;回溯算法 dfs,直接看代码,很容易理解。visited哈希&#xff0c;防止…

Android面试题之Listview篇

秋招在即&#xff0c;计蒙准备在国庆假期结束前整理一套Android初级面试题籍&#xff0c;希望对大家有所帮助 提示&#xff1a;以下是本篇文章正文内容 ListView 1.当 ListView 数据集改变后&#xff0c;如何更新 ListView 使用该 ListView 的 adapter 的 notifyDataSetChange…

记录一下前端定时器清除失效的问题

目录 一、问题引入 二、错误代码&#xff1a; 三、错误原因 四、修正的代码 附 vue提供的线上运行代码网址以便证实可用性 一、问题引入 按理说&#xff0c;打开定时器 xxx setInterval(()>{ },100)&#xff0c;之后只要 clearInterval(xxx) 就可以顺利关闭定时器…

【浅尝C++】继承机制=>虚基表/菱形虚继承/继承的概念、定义/基类与派生类对象赋值转换/派生类的默认成员函数等详解

&#x1f3e0;专栏介绍&#xff1a;浅尝C专栏是用于记录C语法基础、STL及内存剖析等。 &#x1f3af;每日格言&#xff1a;每日努力一点点&#xff0c;技术变化看得见。 文章目录 继承的概念及定义继承的概念继承的定义定义格式继承关系与访问限定符 基类和派生类对象赋值转换继…

如果你正在投简历,一定要试试这款AI工具!

今天给大家分享一款AI简历神器 - BitBitFly AI 简历助手&#xff0c;这个工具可以帮助大家快速、精准投简历&#xff0c;并且提供职位匹配度分析报告&#xff0c;提供专业优化简历建议提高简历和职位匹配度&#xff0c;轻松拿下offer。 如果你在找工作的时候遇到以下问题&…

主流验证码对比及选型

目录 一、什么是验证码二、验证码的作用三、验证码的类型四、验证码厂商1、 [腾讯云验证码](https://cloud.tencent.com/document/product/1110)1.1 验证方式1.2 费用 2、[阿里云验证码](https://www.aliyun.com/activity/security/wafcaptcha)2.1 验证方式2.2 费用 3、[顶象验…

计算机网络——35什么是网络安全

什么是网络安全 机密性&#xff1a;只有发送方和预订的接收方能否理解传输的报文内容 发送方加密报文接收方解密报文 认证&#xff1a;发送方和接收方需要确认对方的身份报文完整性&#xff1a;发送方、接收方需要确认的报文在传输的过程中或者事后没有被改变访问控制和服务的…

android11 SystemUI入門之KeyguardPatternView解析

view层级树为&#xff1a; 被包含在 keyguard_host_view.xml中 。 <?xml version"1.0" encoding"utf-8"?> <!-- This is the host view that generally contains two sub views: the widget viewand the security view. --> <com.andro…

麻了,别再为难软件测试员了

前言 有不少技术友在测试群里讨论&#xff0c;近期的面试越来越难了&#xff0c;要背的八股文越来越多了,考察得越来越细&#xff0c;越来越底层&#xff0c;明摆着就是想让我们徒手造航母嘛&#xff01;实在是太为难我们这些测试工程师了。 这不&#xff0c;为了帮大家节约时…

RAG知识分享

文章目录 1.为什么要做RAG1.1. 解决幻觉问题1.1.1 直接输入问题1.1.2. 问题 相关知识 2. 什么是RAG2.1. 基本概念2.2. 基本RAG方法2.2.1. 知识预处理2.2.2. 知识检索2.2.3. 答案生成 3. RAG 与 Long Context3.1. Long Context3.2. RAG 与Long Context3.3 RAG对比Long Context的…

(2024,超分辨率,膨胀卷积和低通滤波,SD)FouriScale:免训练高分辨率图像合成的频率视角

FouriScale: A Frequency Perspective on Training-Free High-Resolution Image Synthesis 公和众和号&#xff1a;EDPJ&#xff08;进 Q 交流群&#xff1a;922230617 或加 VX&#xff1a;CV_EDPJ 进 V 交流群&#xff09; 目录 0. 摘要 2. 相关工作 2.2 通过扩散模型进行…

【攻防世界】ics-05

php://filter 伪协议查看源码 preg_replace 函数漏洞 1.获取网页源代码。多点点界面&#xff0c;发现点云平台设备维护中心时&#xff0c;页面发生变化。 /?pageindex 输入什么显示什么&#xff0c;有回显。 用php://filter读取网页源代码 ?pagephp://filter/readconvert.…

PC版复古珠宝饰品网站模板 基于pbootcms的首饰类源码下载

PbootCMS复古珠宝饰品网站模板&#xff1a;PCWAP双端同步&#xff0c;数据即时共享&#xff0c;轻松打造专业饰品首饰平台 本模板基于PbootCMS内核开发&#xff0c;专为饰品首饰网站、复古珠宝饰品网站等企业量身定制。同时&#xff0c;其他行业同样适用&#xff0c;只需替换文…

【PostgreSQL】技术传承:使用Docker快速部署PostgreSQL数据库

前言 PostgreSQL的重要贡献者Simon Riggs因一起坠机事故不幸离世。Simon Riggs是英国著名的软件与服务领导者&#xff0c;也是PostgreSQL的主要开发者和贡献者。事故发生在英国当地时间3月26日13:41分&#xff0c;当时他驾驶的私人通用航空Cirrus SR22飞机在英国达克斯福德机场…

SpringBoot整合Netty整合WebSocket-带参认证

文章目录 一. VectorNettyApplication启动类配置二.WebSocketServerBoot初始化服务端Netty三. WebsocketServerChannelInitializer初始化服务端Netty读写处理器四.initParamHandler处理器-去参websocket识别五.MessageHandler核心业务处理类-采用工厂策略模式5.1 策略上下文 六…

Thinkphp5萤火商城B2C小程序源码

源码介绍 Thinkphp5萤火商城B2C小程序源码&#xff0c;是一款开源的电商系统&#xff0c;为中小企业提供最佳的新零售解决方案。采用稳定的MVC框架开发&#xff0c;执行效率、扩展性、稳定性值得信赖。 环境要求 Nginx/Apache/IIS PHP5.4 MySQL5.1 建议使用环境&#xff…

代码随想录第三十天 | 回溯问题P6 | ● 332● 51● 37● 总结

332.重新安排行程 给你一份航线列表 tickets &#xff0c;其中 tickets[i] [fromi, toi] 表示飞机出发和降落的机场地点。请你对该行程进行重新规划排序。 所有这些机票都属于一个从 JFK&#xff08;肯尼迪国际机场&#xff09;出发的先生&#xff0c;所以该行程必须从 JFK…