【Linux后端服务器开发】Reactor模式实现网络计算器

目录

一、Reactor模式概述

二、日志模块:Log.hpp

三、TCP连接模块:Sock.hpp

四、非阻塞通信模块:Util.hpp

五、多路复用I/O模块:Epoller.hpp

六、协议定制模块:Protocol.hpp

七、服务器模块:Server.hpp    server.cc

八、客户端模块:Client.hpp    client.cc


前情提示:在学习Reactor模式之前,需要熟悉socket套接字及TCP网络通信,需要熟悉 select / poll / epoll 三种多路转接IO,需要理解Linux文件系统的文件描述符与基础IO,需要理解服务器server与客户端client的设计。

【Linux后端服务器开发】基础IO与文件系统_命运on-9的博客-CSDN博客

【Linux后端服务器开发】TCP通信设计_命运on-9的博客-CSDN博客

【Linux后端服务器开发】协议定制(序列化与反序列化)_命运on-9的博客-CSDN博客

【Linux后端服务器开发】select多路转接IO服务器_命运on-9的博客-CSDN博客

【Linux后端服务器开发】poll/epoll多路转接IO服务器_命运on-9的博客-CSDN博客

一、Reactor模式概述

Reactor是什么?reactor的英文翻译是【反应堆】,reactor设计模式是一种事件处理模式。

如何让一个server服务器连接多个client客户端并处理业务?我们可以采用多进程 / 多线程的方法,但是无论是进程还是线程,对系统资源的消耗都是较大的,于是我们可以采用 select / poll / epoll 的多路复用IO方法,而在三种不同的多路复用方法中,性能最优的是epoll。

epoll的LT模式和ET模式该如何选择?LT和ET是不同的事件通知策略,LT水平触发是阻塞式通知,ET边缘触发是非阻塞式通知,ET模式会倒逼程序员一次性将就绪的数据读写完毕,这样也就使得一般情况下ET模式的效率更高,所以在Reactor模式的设计中,采用ET模式。

Reactor模式也叫Dispatcher(分派器)模式,它的工作原理是什么呢? I/O多路复用监听事件,当有事件就绪时,根据事件类型分配给某个进程/线程。

Reactor模式主要由Reactor分派器和资源处理这两个部分组成:

  1. Reactor分派器负责监听和分发事件,事件类型包含连接、读写、异常
  2. 资源处理负责业务处理,通常流程是 read -> 业务逻辑 -> send

Reactor模式是灵活多变的,根据不同的业务场景有不同的设计,可以是【单Reactor】也可以是【多Reactor】,可以是【单进程/线程】 也可以是【多进程/线程】,不过此文中的Reactor网络计算器设计采用的是【单Reactor 单进程/线程】模式。

【单Reactor 单进程/线程】模式设计

初始化TcpServer,创建listensock,并将listensock添加进epoller模型中进行监听,listensock会生成第一个Connection对象(注册_Accepter接口处理读取连接任务),之后的TcpClient的连接请求都是由这个绑定了_Accepter接口的listensock进行监听。

epoller模型Wait()等待TcpClient的请求,如果是连接请求,则TcpServer会派发任务给listensock对象,让其调用Sock对象的Accept接口创建新的套接字sock进行IO,sock会创建一个新的Connection对象(注册_Reader、_Sender、_Excepter接口处理读/写/异常任务)。

Connection进行业务处理的时候,根据TCP通信协议的通信流程是 read -> 业务逻辑 -> send,若是在通信中遇到异常,则会调用_Excepter接口关闭连接。

在TCP通信设计中,我们需要设计通信数据的序列化和反序列化,这便是在Connection对象读取到数据之后的业务处理逻辑,通过序列化和反序列化将数据发送给TcpClient,TcpClient收到服务器发送数据后再通过序列化和反序列化拿到想要的数据。

在服务器设计的时候,日志功能是可以省略的,但是加上日志功能的服务器功能更完整并且方便调试和服务器维护。

二、日志模块:Log.hpp

日志模块里面将日志分为(DEBUG、NORMAL、WARNING、ERROR、FATAL)五个记录等级,并且定义了不同的错误类型,日志记录需要记录进程的IP和端口号以及记录时间。

由于Linux系统的gdb调试是很复杂的,通过在代码中添加DEBUG的打印日志更方便调试。

#pragma once

#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <cstdarg>
#include <unistd.h>

#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4

#define NUM 1024

enum
{
    USAGE_ERR = 1,
    SOCKET_ERR,
    BIND_ERR,
    LISTEN_ERR,
    EPOLL_CREATE_ERR
};

const char* To_Level_Str(int level)
{
    switch (level)
    {
    case DEBUG:
        return "DEBUG";
    case NORMAL:
        return "NORMAL";
    case WARNING:
        return "WARNING";
    case ERROR:
        return "ERROR";
    case FATAL:
        return "FATAL";
    default:
        return nullptr;
    }
}

std::string To_Time_Str(long int t)
{
    // 将时间戳转化为tm结构体
    struct tm* cur;
    cur = gmtime(&t);
    cur->tm_hour = (cur->tm_hour + 8) % 24; // 东八区

    char tmp[NUM];
    std::string my_format = "%Y-%m-%d %H:%M:%S";
    strftime(tmp, sizeof(tmp), my_format.c_str(), cur);
    
    std::string cur_time = tmp;
    return cur_time;
}

void Log_Message(int level, const char *format, ...)
{
    char logprefix[NUM];
    std::string cur_time = To_Time_Str((long int)time(nullptr));
    snprintf(logprefix, sizeof(logprefix), "[%s][%s][pid: %d]",
        To_Level_Str(level), cur_time.c_str(), getpid());

    char logcontent[NUM];
    va_list arg;
    va_start(arg, format);
    vsnprintf(logcontent, sizeof(logcontent), format, arg);

    std::cout << logprefix << logcontent << std::endl;
}

三、TCP连接模块:Sock.hpp

Sock对象里面将TCP网络连接的底层接口进行了封装,更方便其他模块对于TCP连接的调用。

Sock对象不再对Accept()的连接失败做处理,将处理权交给了TcpServer。

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "Log.hpp"

const static int g_defaultfd = -1;
const static int g_backlog = 32;

class Sock
{
public:
    Sock()
        : _listensock(g_defaultfd)
    {}

    Sock(int listensock)
        : _listensock(listensock)
    {}

    int Fd()
    {
        return _listensock;
    }

    void Socket()
    {
        // 1. 创建socket文件套接字对象
        _listensock = socket(AF_INET, SOCK_STREAM, 0);
        if (_listensock < 0)
        {
            Log_Message(FATAL, "create socket error");
            exit(SOCKET_ERR);
        }
        Log_Message(NORMAL, "create socket success: %d", _listensock);

        int opt = 1;
        setsockopt(_listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
    }

    void Bind(int port)
    {
        // 2. bind绑定自己的网络信息
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;
        if (bind(_listensock, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            Log_Message(FATAL, "bind socket error");
            exit(BIND_ERR);
        }
        Log_Message(NORMAL, "bind socket success");
    }

    void Listen()
    {
        // 3. 设置socket 为监听状态
        if (listen(_listensock, g_backlog) < 0) // 第二个参数backlog后面在填这个坑
        {
            Log_Message(FATAL, "listen socket error");
            exit(LISTEN_ERR);
        }
        Log_Message(NORMAL, "listen socket success");
    }

    int Accept(std::string *clientip, uint16_t *clientport, int* err)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int sock = accept(_listensock, (struct sockaddr *)&peer, &len);
        *err = errno;
        if (sock >= 0)
        {
            *clientip = inet_ntoa(peer.sin_addr);
            *clientport = ntohs(peer.sin_port);
        }
            
        return sock;
    }

    void Close()
    {
        if (_listensock != g_defaultfd)
            close(_listensock);
    }

    ~Sock()
    {
        this->Close();
    }

private:
    int _listensock;
};

四、非阻塞通信模块:Util.hpp

Util.hpp设置静态成员函数Set_Noblock(),将ET通知策略的套接字sock设置为非阻塞模式。

本次网络计算器的设计是ET模式,故所有的连接在新建连接时都需要设置为非阻塞模式。

#pragma once

#include <iostream>
#include <fcntl.h>
#include <unistd.h>

class Util
{
public:
    static bool Set_Nonblock(int fd)
    {
        int fl = fcntl(fd, F_GETFL);
        if (fl < 0)
            return false;
        fcntl(fd, F_SETFL, fl | O_NONBLOCK);
        return true;
    }
};

五、多路复用I/O模块:Epoller.hpp

epoll模型是一个操作系统层面的模型,我们将控制epoll的系统接口封装在Epoller对象中方便TcpServer的调用。

无论是TcpClient的连接请求还是业务请求,从epoll的角度来看,都是一个对文件描述符的读事件,epoll只需要做事件通知即可。

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <sys/epoll.h>
#include <unistd.h>

#include "Log.hpp"

const static int g_default_epfd = -1;
const static int g_size = 64;

class Epoller
{
public:
    Epoller()
        : _epfd(g_default_epfd)
    {}

    void Create()
    {
        _epfd = epoll_create(g_size);
        if (_epfd < 0)
        {
            Log_Message(FATAL, "epoll create error: %s", strerror(errno));
            exit(EPOLL_CREATE_ERR);
        }
    }

    // user -> kernel
    bool Add_Event(int sock, uint32_t events)
    {
        struct epoll_event ev;
        ev.events = events;
        ev.data.fd = sock;

        int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev);
        return n == 0;
    }

    // kernel -> user
    int Wait(struct epoll_event revs[], int num, int timeout)
    {
        return epoll_wait(_epfd, revs, num, timeout);
    }

    bool Control(int sock, uint32_t event, int action)
    {
        int n = 0;
        if (action == EPOLL_CTL_MOD)
        {
            struct epoll_event ev;
            ev.events = event;
            ev.data.fd = sock;
            n = epoll_ctl(_epfd, action, sock, &ev);
        }
        else if (action == EPOLL_CTL_DEL)
        {
            n = epoll_ctl(_epfd, action, sock, nullptr);
        }
        else
        {
            n = -1;
        }

        return n == 0;
    }

    void Close()
    {
        if (_epfd != g_default_epfd)
            close(_epfd);
    }

    ~Epoller()
    {
        this->Close();
    }

private:
    int _epfd;
};

六、协议定制模块:Protocol.hpp

TCP通信的序列化与反序列化就是网络服务器的业务逻辑,因为TCP通信是字节流传输,无法传输结构化数据,所有我们需要自定义协议做序列化与反序列化处理,进行字符串数据与结构化数据的转换。

序列化与反序列化可以完全编写函数做字符串数据处理,也可以通过调用Json库做字符串数据处理,调用Json库需要加上 -ljsoncpp 动态链接库。

这里的序列化与反序列化,包含了客户端Client和服务器Serer双端的业务逻辑。

#pragma once
 
#include <iostream>
#include <cstring>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <jsoncpp/json/json.h>

#include "Log.hpp"

using namespace std;
 
#define SEP " "
#define LINE_SEP "\r\n"
 
enum
{
    OK = 0,
    DIV_ZERO,
    MOD_ZERO,
    OP_ERR
};
 
// "x op y" -> "content_len"\r\n"x op y"\r\n
string En_Length(const string& text)
{
    string send_str = to_string(text.size());
    send_str += LINE_SEP;
    send_str += text;
    send_str += LINE_SEP;
 
    return send_str;
}
 
// "content_len"\r\n"x op y"\r\n
bool De_Length(const string& package, string* text)
{
    auto pos = package.find(LINE_SEP);
    if (pos == string::npos)
        return false;
    string text_len_str = package.substr(0, pos);
    int text_len = stoi(text_len_str);
    *text = package.substr(pos + strlen(LINE_SEP), text_len);
    return true;
}

// 通信协议不止一种,需要将协议进行编号,以供os分辨
// "content_len"\r\n"协议编号"\r\n"x op y"\r\n
 
class Request
{
public:
    Request(int x = 0, int y = 0, char op = 0)
        : _x(x), _y(y), _op(op)
    {}
 
    // 序列化
    bool Serialize(string* out)
    {
        Json::Value root;
        root["first"] = _x;
        root["second"] = _y;
        root["oper"] = _op;
 
        Json::FastWriter write;
        *out = write.write(root);
        return true;
    }
 
    // 反序列化
    bool Deserialiaze(const string& in)
    {
        Json::Value root;
        Json::Reader reader;
        reader.parse(in, root);
 
        _x = root["first"].asInt();
        _y = root["second"].asInt();
        _op = root["oper"].asInt();
        return true;
    }
 
public:
    int _x, _y;
    char _op;
};
 
class Response
{
public:
    Response(int exitcode = 0, int res = 0)
        : _exitcode(exitcode), _res(res)
    {}
 
    bool Serialize(string* out)
    {
        Json::Value root;
        root["exitcode"] = _exitcode;
        root["result"] = _res;
 
        Json::FastWriter writer;
        *out = writer.write(root);
        return true;
    }
 
    bool Deserialize(const string& in)
    {
        Json::Value root;
        Json::Reader reader;
        reader.parse(in, root);
 
        _exitcode = root["exitcode"].asInt();
        _res = root["result"].asInt();
        return true;
    }
 
public:
    int _exitcode;
    int _res;
};
 
// 读取数据包
// "content_len"\r\n"x op y"\r\n
bool Parse_One_Package(string& inbuf, string* text)
{
    *text = "";
    
    // 分析处理
    auto pos = inbuf.find(LINE_SEP);
    if (pos == string::npos)
        return false;

    string text_len_string = inbuf.substr(0, pos);
    int text_len = stoi(text_len_string);
    int total_len = text_len_string.size() + 2 * strlen(LINE_SEP) + text_len;

    if (inbuf.size() < total_len)
        return false;

    // 至少有一个完整的报文
    *text = inbuf.substr(0, total_len);
    inbuf.erase(0, total_len);

    return true;
}

bool Recv_Package(int sock, string& inbuf, string* text)
{
    char buf[1024];
    while (true)
    {
        ssize_t n = recv(sock, buf, sizeof(buf) - 1, 0);
        if (n > 0)
        {
            buf[n] = 0;
            inbuf += buf;
 
            auto pos = inbuf.find(LINE_SEP);
            if (pos == string::npos)
                continue;
            string text_len_str = inbuf.substr(0, pos);
            int text_len = stoi(text_len_str);
            int total_len = text_len_str.size() + 2 * strlen(LINE_SEP) + text_len;
            cout << "\n收到响应报文:\n" << inbuf;
 
            if (inbuf.size() < total_len)
            {
                cout << "输入不符合协议规定" << endl;
                continue;
            }
 
            *text = inbuf.substr(0, total_len);
            inbuf.erase(0, total_len);
 
            break;
        }
        else
        {
            return false;
        }
    }
 
    return true;
}

七、服务器模块:Server.hpp    server.cc

Server初始化创建listensock、创建epoll模型、创建事件就绪队列(struct epoll_event* _recv),listensock套接字会创建一个注册了_Accepter接口的Connection对象,负责新建Client的连接。

所有的Connection对象通过哈希表管理,极大的提高了效率。每一个Connection对象都有读写缓冲区(_inbuffer / _outbuffer),可以绑定回调的_Recver、_Sender、_Excepter函数,以对事件做读、写、异常处理。

服务器的本质就是一个死循环,循环的等待epoll模型的事件通知然后再Dispatch分派任务做读写处理,若遇到异常问题,将其转化为读写问题再去分派任务。

我们将网络计算服务器的计算任务放入了server.cc中进行函数定义,为了解耦我们也可以再单独创建一个Task()对象,但是此处由于计算任务比较简单,我们就直接在server.cc源文件中定义了。

服务器中的所有监听的Connection对象,我们将其读监听设置为一直开启,将其写监听设置为按需开启,当写任务完成后即关闭写监听。

Server.hpp

#pragma once

#include <iostream>
#include <string>
#include <functional>
#include <unordered_map>
#include <cassert>
#include <unistd.h>

#include "Sock.hpp"
#include "Epoller.hpp"
#include "Log.hpp"
#include "Util.hpp"
#include "Protocol.hpp"

using namespace std;

class Connection;
class TcpServer;

static const uint16_t g_defaultport = 8080;
static const int g_num = 64;

using func_t = function<void(Connection*)>;

class Connection
{
public:
    Connection(int sock, TcpServer* tcp)
        : _sock(sock), _tcp(tcp)
    {}

    void Register(func_t recver, func_t sender, func_t excepter)
    {
        _recver = recver;
        _sender = sender;
        _excepter = excepter;
    }

    void Close()
    {
        close(_sock);
    }

public:
    int _sock;
    string _inbuffer;   // 输入缓冲区
    string _outbuffer;  // 输出缓冲区

    func_t _recver;   // 读
    func_t _sender;   // 写
    func_t _excepter; // 异常

    TcpServer *_tcp; // 可以省略

    // uint64_t last_time;     // 记录最近访问时间,可用于主动关闭某时间段内未访问的连接
};

class TcpServer
{
public:
    TcpServer(func_t service, uint16_t port = g_defaultport)
        : _service(service), _port(port), _revs(nullptr)
    {}

    void InitServer()
    {
        // 1. 创建socket
        _sock.Socket();
        _sock.Bind(_port);
        _sock.Listen();

        // 2. 创建epoller
        _epoller.Create();

        // 3. 将目前唯一的一个sock,添加到Epoller中
        Add_Connection(_sock.Fd(), EPOLLIN | EPOLLET,
                       bind(&TcpServer::Accepter, this, placeholders::_1), nullptr, nullptr);

        _revs = new struct epoll_event[g_num];
        _num = g_num;
    }

    void Enable_Read_Write(Connection* conn, bool readable, bool writeable)
    {
        uint32_t event = (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET;
        _epoller.Control(conn->_sock, event, EPOLL_CTL_MOD);
    }

    // 事件派发
    void Dispatch()
    {
        int timeout = -1;
        while (1)
        {
            Loop(timeout);
            // Log_Message(DEBUG, "timeout ...");

            // 遍历_conn_map,计算每一个节点的最近访问时间做节点控制
        }
    }

    ~TcpServer()
    {
        _sock.Close();
        _epoller.Close();
        if (nullptr == _revs)
            delete[] _revs;
    }

private:
    void Add_Connection(int sock, uint32_t events, func_t recver, func_t sender, func_t excepter)
    {
        // 1. 首先为该sock创建Connection并初始化,并添加到_conn_map
        if (events & EPOLLET)
            Util::Set_Nonblock(sock);
        Connection *conn = new Connection(sock, this);

        // 2. 给对应的sock设置对应的回调方法
        conn->Register(recver, sender, excepter);

        // 3. 其次将sock与它要关心的时间"写透式"注册到epoll中,让epoll帮我们关心
        bool f = _epoller.Add_Event(sock, events);
        assert(f);

        // 4. 将kv添加到_conn_map中
        _conn_map.insert(pair<int, Connection*>(sock, conn));

        Log_Message(DEBUG, "Add_Connection: add new sock: %d in epoll and unordered_map", sock);
    }

    void Recver(Connection *conn)
    {
        char buffer[1024];
        while (1)
        {
            ssize_t s = recv(conn->_sock, buffer, sizeof(buffer) - 1, 0);
            if (s > 0)
            {
                buffer[s] = 0;
                conn->_inbuffer += buffer;      // 将读到的数据入队列
                Log_Message(DEBUG, "\n收到client[%d]请求报文:\n%s", conn->_sock, conn->_inbuffer.c_str());

                _service(conn);
            }
            else if (s == 0)
            {
                // 异常回调
                if (conn->_excepter)
                {
                    conn->_excepter(conn);
                    return;
                }
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    break;
                }
                else if (errno == EINTR)
                {
                    continue;
                }
                else
                {
                    if (conn->_excepter)
                    {
                        conn->_excepter(conn);
                        return;
                    }
                }
            }
        }
    }

    void Sender(Connection *conn)
    {
        while (1)
        {
            ssize_t s = send(conn->_sock, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);
            if (s >= 0)
            {
                if (conn->_outbuffer.empty())
                    break;
                else
                    conn->_outbuffer.erase(0, s);
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    break;
                }
                else if (errno == EINTR)
                {
                    continue;
                }
                else
                {
                    if (conn->_excepter)
                    {
                        conn->_excepter(conn);
                        return;
                    }
                }
            }
        }

        // 如果没有发送完毕,需要对对应的sock开启写事件的关心,发完了,关闭对写事件的关心
        if (!conn->_outbuffer.empty())
            conn->_tcp->Enable_Read_Write(conn, true, true);
        else
            conn->_tcp->Enable_Read_Write(conn, true, false);
    }

    void Excepter(Connection *conn)
    {
        _epoller.Control(conn->_sock, 0, EPOLL_CTL_DEL);
        conn->Close();
        _conn_map.erase(conn->_sock);

        Log_Message(DEBUG, "关闭 %d 文件描述符的所有资源", conn->_sock);
        delete conn;
    }

    void Accepter(Connection *conn)
    {
        while (1)
        {
            string clientip;
            uint16_t clientport;
            int err = 0;
            int sock = _sock.Accept(&clientip, &clientport, &err);
            if (sock > 0)
            {
                Add_Connection(sock, EPOLLIN | EPOLLET, 
                               bind(&TcpServer::Recver, this, placeholders::_1),
                               bind(&TcpServer::Sender, this, placeholders::_1), 
                               bind(&TcpServer::Excepter, this, placeholders::_1));
                Log_Message(DEBUG, "get a new link, info: [%s : %d]", clientip.c_str(), clientport);
            }
            else
            {
                if (err == EAGAIN || err == EWOULDBLOCK)
                    break;
                else if (err == EINTR)
                    continue;
                else
                    break;
            }
        }
    }

    bool Is_Connection_Exists(int sock)
    {
        auto iter = _conn_map.find(sock);
        return iter != _conn_map.end();
    }

    void Loop(int timeout)
    {
        int n = _epoller.Wait(_revs, _num, timeout); // 获取已经就绪的事件
        for (int i = 0; i < n; ++i)
        {
            int sock = _revs[i].data.fd;
            uint32_t events = _revs[i].events;

            // 将所有异常问题,转化成读写问题
            if (events & EPOLLERR)
                events |= (EPOLLIN | EPOLLOUT);
            if (events & EPOLLHUP)
                events |= (EPOLLIN | EPOLLOUT);

            // 读写事件就绪
            if ((events & EPOLLIN) && Is_Connection_Exists(sock) && _conn_map[sock]->_recver)
                _conn_map[sock]->_recver(_conn_map[sock]);
            if ((events & EPOLLOUT) && Is_Connection_Exists(sock) && _conn_map[sock]->_sender)
                _conn_map[sock]->_sender(_conn_map[sock]);
        }
    }

private:
    uint16_t _port;
    Sock _sock;
    Epoller _epoller;
    unordered_map<int, Connection*> _conn_map;
    struct epoll_event* _revs;
    int _num;
    func_t _service;
};

server.cc

#include "Server.hpp"
#include <memory>

using namespace std;

// 计算任务
bool Cal(const Request& req, Response& resp)
{
    resp._exitcode = OK;
    resp._res = 0;
 
    if (req._op == '/' && req._y == 0)
    {
        resp._exitcode = DIV_ZERO;
        return false;
    }
    if (req._op == '%' && req._y == 0)
    {
        resp._exitcode = MOD_ZERO;
        return false;
    }
 
    switch (req._op)
    {
    case '+':
        resp._res = req._x + req._y;
        break;
    case '-':
        resp._res = req._x - req._y;
        break;
    case '*':
        resp._res = req._x * req._y;
        break;
    case '/':
        resp._res = req._x / req._y;
        break;
    case '%':
        resp._res = req._x % req._y;
        break;
    default:
        resp._exitcode = OP_ERR;
        break;
    }
 
    return true;
}

void Calculate(Connection* conn)
{
    string one_package;
    while (Parse_One_Package(conn->_inbuffer, &one_package))
    {
        string req_str;
        if (!De_Length(one_package, &req_str))
            return;

        // 对请求体Request反序列化,得到一个结构化的请求对象
        Request req;
        if (!req.Deserialiaze(req_str))
            return;
        
        Response resp;
        Cal(req, resp);

        string resp_str;
        resp.Serialize(&resp_str);

        conn->_outbuffer += En_Length(resp_str);
        cout << "构建完整的响应报文: \n" << conn->_outbuffer << endl;
    }

    // 直接发
    if (conn->_sender)
        conn->_sender(conn);
}

static void Usage(std::string proc)
{
    std::cerr << "Usage:\n\t" << proc << " port" << "\n\n";
    exit(1);
}

string Transaction(const string &request)
{
    return request;
}

// ./select_server 8080
int main(int argc, char *argv[])
{
    // if(argc != 2)
    //     Usage();

    // unique_ptr<SelectServer> svr(new SelectServer(atoi(argv[1])));

    // std::cout << "test: " << sizeof(fd_set) * 8 << std::endl;
    unique_ptr<TcpServer> svr(new TcpServer(Calculate));

    svr->InitServer();
    svr->Dispatch();

    return 0;
}

八、客户端模块:Client.hpp    client.cc

Client.hpp

#pragma once
 
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
 
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
 
#include "Protocol.hpp"
 
using namespace std;
 
class Client
{
public:
    Client(const std::string& server_ip, const uint16_t& server_port)
        : _sock(-1), _server_ip(server_ip), _server_port(server_port)
    {}
 
    void Init()
    {
        _sock = socket(AF_INET, SOCK_STREAM, 0);
        if (_sock < 0)
        {
            std::cerr << "socket error" << std::endl;
            exit(1);
        }
    }
 
    void Run()
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof(server));
        server.sin_family = AF_INET;
        server.sin_port = htons(_server_port);
        server.sin_addr.s_addr = inet_addr(_server_ip.c_str());
 
        if (connect(_sock, (struct sockaddr*)&server, sizeof(server)) < 0)
        {
            std::cerr << "connect error" << std::endl;
            exit(1);
        }
        else
        {
            string line;
            string inbuf;
            while (true)
            {
                cout << "mycal>>> ";
                getline(cin, line);
                Request req = Parse_Line(line);     // 输入字符串,生成Request对象
 
                string content;
                req.Serialize(&content);                // Request对象序列化
                string send_str = En_Length(content);   // 序列化字符串编码 -> "content_len"\r\n"x op y"\r\n

                send(_sock, send_str.c_str(), send_str.size(), 0);
 
                // 将服务器的返回结果序列化与反序列化
                string package, text;
                if (!Recv_Package(_sock, inbuf, &package))
                    continue;
                if (!De_Length(package, &text))
                    continue;
                
                Response resp;
                resp.Deserialize(text);
                cout << "计算结果: " << endl;
                cout << "exitcode: " << resp._exitcode << ", ";
                cout << "result: " << resp._res << endl << endl;
            }
        }
    }
 
    // 将输入转化为Request结构
    Request Parse_Line(const string& line)
    {
        int status = 0;     // 0:操作符之前    1:遇到操作符    2:操作符之后
        int cnt = line.size();
        string left, right;
        char op;
        int i = 0;
        while (i < cnt)
        {
            switch (status)
            {
            case 0:
                if (!isdigit(line[i]))
                {
                    if (line[i] == ' ')
                    {
                        i++;
                        break;
                    }
                    op = line[i];
                    status = 1;
                }
                else
                {
                    left.push_back(line[i++]);
                }
                break;
            case 1:
                i++;
                if (line[i] == ' ')
                    break;
                status = 2;
                break;
            case 2:
                right.push_back(line[i++]);
                break;
            }
        }
        return Request(stoi(left), stoi(right), op);
    }
 
    ~Client()
    {
        if (_sock >= 0)
            close(_sock);
    }
 
private:
    int _sock;
    string _server_ip;
    uint16_t _server_port;
};

client.cc

#include "Client.hpp"
#include <memory>
 
using namespace std;
 
static void Usage(string proc)
{
    cout << "\nUsage:\n\t" << proc << " local_port\n\n";
    exit(1);
}
 
int main(int argc, char* argv[])
{
    if (argc != 3)
        Usage(argv[0]);
 
    string server_ip = argv[1];
    uint16_t server_port = atoi(argv[2]);
 
    unique_ptr<Client> tcli(new Client(server_ip, server_port));
    tcli->Init();
    tcli->Run();
 
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/59655.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

神策新一代分析引擎架构演进

近日&#xff0c;神策数据已经推出全新的神策分析 2.5 版本&#xff0c;该版本支持分析模型与外部数据的融合性接入&#xff0c;构建全域数据融合模型&#xff0c;实现从用户到经营的全链路、全场景分析。新版本的神策分析能够为企业提供更全面、更有效的市场信息和经营策略&am…

React Native元素旋转一定的角度

mMeArrowIcon: {fontSize: 30, color: #999, transform: [{rotate: 180deg}]},<Icon name"arrow" style{styles.mMeArrowIcon}></Icon>参考链接&#xff1a; https://reactnative.cn/docs/transforms https://chat.xutongbao.top/

HDFS集群滚动升级以及回滚相关

HDFS集群滚动升级以及回滚相关 介绍不停机滚动升级非联邦HA集群联邦HA集群 停机升级--非HA集群HDFS集群降级和回滚异同点共同点不同点 HA集群降级&#xff08;downgrade&#xff09;注意事项 集群回滚操作 介绍 在hadoop v2中&#xff0c;HDFS支持namenode高可用&#xff08;H…

windows编译zookeeker动态库供C++链接使用以及遇到的错误处理方法

windows下面C链接zookeeper资料不多&#xff0c;特此记录一下 编译环境VS 2015 一. 相关安装包安装下载 1. zookeeper zookeeper3.6.4 下载zip包解压即可 2. ant apache-ant-1.9.16 将包进行解压D:project\apache-ant-1.9.16&#xff0c;然后配置环境变量 新建 ANT_HOME 系…

OpenSource - 一站式API服务

文章目录 概述功能模块运行启动Docker启动功能预览首页接口开发接口属性请求参数在线预览应用创建接口列表数据库类型限流熔断接口申请申请审批 概述 Crabc是低代码开发平台&#xff0c;企业级API发布管理系统&#xff0c;采用SpringBoot、JWT、Mybatis等框架和SPI插件机制实现…

四、Unity中颜色空间

Unity中的设置 通过点击菜单Edit->Project Settings->Player页签->Other Settings下的Rendering部分进行修改&#xff0c;参数Color Space可以选择Gamma或Linear。 当选择Gamma Space时&#xff0c;Unity不会做任何处理。当选择Linear Space时&#xff0c;引擎的渲染…

排序八卦炉之冒泡、快排【完整版】

文章目录 1.冒泡排序1.1代码实现1.2复杂度 2.快速排序2.1人物及思想介绍【源于百度】2.2hoare【霍尔】版本1.初识代码2.代码分析3.思其因果 2.3挖坑版本1.初始代码2.代码分析3.思想比较 2.4指针版本1.初识代码2.代码分析3.问题探讨 2.5集体优化2.6极致优化2.7非递归版本1.初识代…

【ConcurrentHashMap1.7源码】十分钟带你深入ConcurrentHashMap并发解析

ConcurrentHashMap1.7源码 四个核心要点 初始化PUT扩容GET Unsafe 初始化 五个构造方法 /*** Creates a new, empty map with the default initial table size (16).*/public ConcurrentHashMap() {}/*** Creates a new, empty map with an initial table size* accommodati…

FFmpeg下载安装及Windows开发环境设置

1 FFmpeg简介 FFmpeg&#xff1a;FFmpeg是一套可以用来记录、转换数字音频、视频&#xff0c;并能将其转化为流的开源计算机程序。采用LGPL或GPL许可证。它提供了录制、转换以及流化音视频的完整解决方案。项目的名称来自MPEG视频编码标准&#xff0c;前面的"FF"代表…

【Spring】Spring中的设计模式

文章目录 责任链模式工厂模式适配器模式代理模式模版方法观察者模式构造器模式 责任链模式 Spring中的Aop的通知调用会使用责任链模式责任链模式介绍 角色&#xff1a;抽象处理者&#xff08;Handler&#xff09;具体处理者&#xff08;ConcreteHandler1&#xff09;客户类角…

【Spring Cloud 五】OpenFeign服务调用

这里写目录标题 系列文章目录背景一、OpenFeign是什么Feign是什么Feign的局限性 OpenFeign是什么 二、为什么要有OpenFeign三、如何使用OpenFeign服务提供者order-servicepom文件yml配置文件启动类实体ParamController 服务消费者user-servicepom文件yml配置文件启动类接口类Us…

微信小程序真机防盗链referer问题处理

公司使用百度云存储一些资源&#xff0c;然后现在要做防盗链&#xff0c;在CDN加入Referer白名单后发现PC是正常的&#xff0c;微信小程序无法正常访问资源了。然后是各种查啊&#xff0c;然后发现是微信小程序不支持Referer的修改&#xff0c;且在小程序开发工具是Referer是固…

ATFX汇评:非农就业报告来袭,汇市或迎剧烈波动

ATFX汇评&#xff1a;美国非农就业报告每月发布一次&#xff0c;其中非农就业人口和失业率两项数据最受关注。7月季调后非农就业人口&#xff0c;将于今日20:30公布&#xff0c;前值为20.9万人&#xff0c;预期值20万人&#xff1b;7月失业率&#xff0c;同一时间公布&#xff…

极光笔记 | 浅谈企业级SaaS产品的客户成长旅程管理(上)—— 分析篇

本文作者&#xff1a;陈伟&#xff08;极光用户体验部高级总监&#xff09; “企业级SaaS产品与C端互联网产品特征差异很大&#xff0c;有些甚至是截然相反&#xff0c;这些特征也会成为后续客户成长旅程的重要影响变量。本文就如何设计并服务好企业级SaaS产品客户成长旅程进行…

概念解析 | 利用IAA迭代自适应方法实现高精度角度估计

利用IAA迭代自适应方法实现高精度角度估计 注1:本文系“概念辨析”系列之一,致力于简洁清晰地解释、辨析复杂而专业的概念。本次辨析的概念是:IAA迭代自适应方法在雷达角度估计中的应用。 背景介绍 在雷达目标检测与定位中,准确估计目标角度是实现高精度定位的关键。传统的基于…

面试热题(前中序遍历构建树)

给定两个整数数组 preorder 和 inorder &#xff0c;其中 preorder 是二叉树的先序遍历&#xff0c; inorder 是同一棵树的中序遍历&#xff0c;请构造二叉树并返回其根节点。 题目中是给定两个数组&#xff0c;一个是存放这颗树的前序遍历的数组&#xff0c;一个是存放这棵树的…

消息队列项目(2)

我们使用 SQLite 来进行对 Exchange, Queue, Binding 的硬盘保存 对 Message 就保存在硬盘的文本中 SQLite 封装 这里是在 application.yaml 中来引进对 SQLite 的封装 spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.…

字典与数组第5讲:数组区域内,数组公式的编辑和删除

【分享成果&#xff0c;随喜正能量】我们的心和宇宙本是相通的&#xff0c;所以生命内在蕴含了无限的智慧&#xff0c;但在没有开发没有证悟之前&#xff0c;生命是渺小而短暂的……..。 《VBA数组与字典方案》教程&#xff08;10144533&#xff09;是我推出的第三套教程&#…

cmake配置Qt工程

cmake 工程配置 # 指定版本和项目 cmake_minimum_required(VERSION 3.10) set(TARGET_NAME labelDeviceView) project(${TARGET_NAME} ) include(${CMAKE_CURRENT_LIST_DIR}/../../../../../../ossLib/ossLib/env.cmake) set(CMAKE_PREFIX_PATH "D:/Qt6/6.5.2/msvc2019…

MyBatis核心 - SqlSession如何通过Mapper接口生成Mapper对象

书接上文 MyBatis – 执行流程 我们通过SqlSession获取到了UserMapper对象&#xff0c;代码如下&#xff1a; // 获取SqlSession对象 SqlSession sqlSession sqlSessionFactory.openSession();// 执行查询操作 try {// 获取映射器接口UserMapper userMapper sqlSession.get…