iiiiiiiiiiiiiiiiiiiiiiiiiio_contexttttttttttttttttttttttttt

https://www.cnblogs.com/bwbfight/p/17594353.html

谈一谈linux下线程池 - 白伟碧一些小心得 - 博客园 (cnblogs.com)

谈一谈linux下线程池 - 白伟碧一些小心得 - 博客园 (cnblogs.com)

https://www.cnblogs.com/bwbfight/p/10901574.html

前面的设计,我们对asio的使用都是单线程模式,为了提升网络io并发处理的效率,这一次我们设计多线程模式下asio的使用方式。总体来说asio有两个多线程模型,第一个是启动多个线程,每个线程管理一个iocontext。第二种是只启动一个iocontext,被多个线程共享,后面的文章会对比两个模式的区别,这里先介绍第一种模式,多个线程,每个线程管理独立的iocontext服务。

单线程和多线程对比

之前的单线程模式图如下

我们设计的IOServicePool类型的多线程模型如下:

IOServicePool多线程模式特点

1   每一个io_context跑在不同的线程里,所以同一个socket会被注册在同一个io_context里,它的回调函数也会被单独的一个线程回调,那么对于同一个socket,他的回调函数每次触发都是在同一个线程里,就不会有线程安全问题,网络io层面上的并发是线程安全的。

2   但是对于不同的socket,回调函数的触发可能是同一个线程(两个socket被分配到同一个io_context),也可能不是同一个线程(两个socket被分配到不同的io_context里)。所以如果两个socket对应的上层逻辑处理,如果有交互或者访问共享区,会存在线程安全问题。比如socket1代表玩家1,socket2代表玩家2,玩家1和玩家2在逻辑层存在交互,比如两个玩家都在做工会任务,他们属于同一个工会,工会积分的增加就是共享区的数据,需要保证线程安全。可以通过加锁或者逻辑队列的方式解决安全问题,我们目前采取了后者。

3   多线程相比单线程,极大的提高了并发能力,因为单线程仅有一个io_context服务用来监听读写事件,就绪后回调函数在一个线程里串行调用, 如果一个回调函数的调用时间较长肯定会影响后续的函数调用,毕竟是穿行调用。而采用多线程方式,可以在一定程度上减少前一个逻辑调用影响下一个调用的情况,比如两个socket被部署到不同的iocontext上,但是当两个socket部署到同一个iocontext上时仍然存在调用时间影响的问题。不过我们已经通过逻辑队列的方式将网络线程和逻辑线程解耦合了,不会出现前一个调用时间影响下一个回调触发的问题。

 其中代码如下:

const.h

复制代码

#pragma once
#define MAX_LENGTH  1024*2
//头部总长度
#define HEAD_TOTAL_LEN 4
//头部id长度
#define HEAD_ID_LEN 2
//头部数据长度
#define HEAD_DATA_LEN 2
#define MAX_RECVQUE  10000
#define MAX_SENDQUE 1000


enum MSG_IDS {
    MSG_HELLO_WORD = 1001
};

复制代码

Singleton.h

复制代码

#pragma once
#include <memory>
#include <mutex>
#include <iostream>
using namespace std;
template <typename T>
class Singleton {
protected:
    Singleton() = default;
    Singleton(const Singleton<T>&) = delete;
    Singleton& operator=(const Singleton<T>& st) = delete;
    
    static std::shared_ptr<T> _instance;
public:
    static std::shared_ptr<T> GetInstance() {
        static std::once_flag s_flag;
        std::call_once(s_flag, [&]() {
            _instance = shared_ptr<T>(new T);
            });

        return _instance;
    }
    void PrintAddress() {
        std::cout << _instance.get() << endl;
    }
    ~Singleton() {
        std::cout << "this is singleton destruct" << std::endl;
    }
};

template <typename T>
std::shared_ptr<T> Singleton<T>::_instance = nullptr;

复制代码

MsgNode.h

复制代码

#pragma once
#include <string>
#include "const.h"
#include <iostream>
#include <boost/asio.hpp>
using namespace std;
using boost::asio::ip::tcp;
class LogicSystem;
class MsgNode
{
public:
    MsgNode(short max_len) :_total_len(max_len), _cur_len(0) {
        _data = new char[_total_len + 1]();
        _data[_total_len] = '\0';
    }

    ~MsgNode() {
        std::cout << "destruct MsgNode" << endl;
        delete[] _data;
    }

    void Clear() {
        ::memset(_data, 0, _total_len);
        _cur_len = 0;
    }

    short _cur_len;
    short _total_len;
    char* _data;
};

class RecvNode :public MsgNode {
    friend class LogicSystem;
public:
    RecvNode(short max_len, short msg_id);
private:
    short _msg_id;
};

class SendNode:public MsgNode {
    friend class LogicSystem;
public:
    SendNode(const char* msg,short max_len, short msg_id);
private:
    short _msg_id;
};

复制代码

MsgNode.cpp

复制代码

#include "MsgNode.h"
RecvNode::RecvNode(short max_len, short msg_id):MsgNode(max_len),
_msg_id(msg_id){

}


SendNode::SendNode(const char* msg, short max_len, short msg_id):MsgNode(max_len + HEAD_TOTAL_LEN)
, _msg_id(msg_id){
    //先发送id, 转为网络字节序
    short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);
    memcpy(_data, &msg_id_host, HEAD_ID_LEN);
    //转为网络字节序
    short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
    memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN);
    memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len);
}

复制代码

LogicSystem.h

复制代码

#pragma once
#include "Singleton.h"
#include <queue>
#include <thread>
#include "CSession.h"
#include <queue>
#include <map>
#include <functional>
#include "const.h"
#include <json/json.h>
#include <json/value.h>
#include <json/reader.h>

typedef  function<void(shared_ptr<CSession>, const short &msg_id, const string &msg_data)> FunCallBack;
class LogicSystem:public Singleton<LogicSystem>
{
    friend class Singleton<LogicSystem>;
public:
    ~LogicSystem();
    void PostMsgToQue(shared_ptr < LogicNode> msg);
private:
    LogicSystem();
    void DealMsg();
    void RegisterCallBacks();
    void HelloWordCallBack(shared_ptr<CSession>, const short &msg_id, const string &msg_data);
    std::thread _worker_thread;
    std::queue<shared_ptr<LogicNode>> _msg_que;
    std::mutex _mutex;
    std::condition_variable _consume;
    bool _b_stop;
    std::map<short, FunCallBack> _fun_callbacks;
};

复制代码

LogicSystem.cpp

复制代码

#include "LogicSystem.h"

using namespace std;

LogicSystem::LogicSystem():_b_stop(false){
    RegisterCallBacks();
    _worker_thread = std::thread (&LogicSystem::DealMsg, this);
}

LogicSystem::~LogicSystem(){
    _b_stop = true;
    _consume.notify_one();
    _worker_thread.join();
}

void LogicSystem::PostMsgToQue(shared_ptr < LogicNode> msg) {
    std::unique_lock<std::mutex> unique_lk(_mutex);
    _msg_que.push(msg);
    //由0变为1则发送通知信号
    if (_msg_que.size() == 1) {
        unique_lk.unlock();
        _consume.notify_one();
    }
}

void LogicSystem::DealMsg() {
    for (;;) {
        std::unique_lock<std::mutex> unique_lk(_mutex);
        //判断队列为空则用条件变量阻塞等待,并释放锁
        while (_msg_que.empty() && !_b_stop) {
            _consume.wait(unique_lk);
        }

        //判断是否为关闭状态,把所有逻辑执行完后则退出循环
        if (_b_stop ) {
            while (!_msg_que.empty()) {
                auto msg_node = _msg_que.front();
                cout << "recv_msg id  is " << msg_node->_recvnode->_msg_id << endl;
                auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);
                if (call_back_iter == _fun_callbacks.end()) {
                    _msg_que.pop();
                    continue;
                }
                call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id,
                    std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));
                _msg_que.pop();
            }
            break;
        }

        //如果没有停服,且说明队列中有数据
        auto msg_node = _msg_que.front();
        cout << "recv_msg id  is " << msg_node->_recvnode->_msg_id << endl;
        auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);
        if (call_back_iter == _fun_callbacks.end()) {
            _msg_que.pop();
            continue;
        }
        call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id, 
            std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));
        _msg_que.pop();
    }
}

void LogicSystem::RegisterCallBacks() {
    _fun_callbacks[MSG_HELLO_WORD] = std::bind(&LogicSystem::HelloWordCallBack, this,
        placeholders::_1, placeholders::_2, placeholders::_3);
}

void LogicSystem::HelloWordCallBack(shared_ptr<CSession> session, const short &msg_id, const string &msg_data) {
    Json::Reader reader;
    Json::Value root;
    reader.parse(msg_data, root);
    std::cout << "recevie msg id  is " << root["id"].asInt() << " msg data is "
        << root["data"].asString() << endl;
    root["data"] = "server has received msg, msg data is " + root["data"].asString();
    std::string return_str = root.toStyledString();
    session->Send(return_str, root["id"].asInt());
}

复制代码

CSession.h

复制代码

#pragma once
#include <boost/asio.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <queue>
#include <mutex>
#include <memory>
#include "const.h"
#include "MsgNode.h"
using namespace std;

using boost::asio::ip::tcp;
class CServer;
class LogicSystem;

class CSession: public std::enable_shared_from_this<CSession>
{
public:
    CSession(boost::asio::io_context& io_context, CServer* server);
    ~CSession();
    tcp::socket& GetSocket();
    std::string& GetUuid();
    void Start();
    void Send(char* msg,  short max_length, short msgid);
    void Send(std::string msg, short msgid);
    void Close();
    std::shared_ptr<CSession> SharedSelf();
private:
    void HandleRead(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self);
    void HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self);
    tcp::socket _socket;
    std::string _uuid;
    char _data[MAX_LENGTH];
    CServer* _server;
    bool _b_close;
    std::queue<shared_ptr<SendNode> > _send_que;
    std::mutex _send_lock;
    //收到的消息结构
    std::shared_ptr<RecvNode> _recv_msg_node;
    bool _b_head_parse;
    //收到的头部结构
    std::shared_ptr<MsgNode> _recv_head_node;
};

class LogicNode {
    friend class LogicSystem;
public:
    LogicNode(shared_ptr<CSession>, shared_ptr<RecvNode>);
private:
    shared_ptr<CSession> _session;
    shared_ptr<RecvNode> _recvnode;
};

复制代码

CSession.cpp

复制代码

#include "CSession.h"
#include "CServer.h"
#include <iostream>
#include <sstream>
#include <json/json.h>
#include <json/value.h>
#include <json/reader.h>
#include "LogicSystem.h"

CSession::CSession(boost::asio::io_context& io_context, CServer* server):
    _socket(io_context), _server(server), _b_close(false),_b_head_parse(false){
    boost::uuids::uuid  a_uuid = boost::uuids::random_generator()();
    _uuid = boost::uuids::to_string(a_uuid);
    _recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
}
CSession::~CSession() {
    std::cout << "~CSession destruct" << endl;
}

tcp::socket& CSession::GetSocket() {
    return _socket;
}

std::string& CSession::GetUuid() {
    return _uuid;
}

void CSession::Start(){
    ::memset(_data, 0, MAX_LENGTH);
    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, 
        std::placeholders::_1, std::placeholders::_2, SharedSelf()));
}

void CSession::Send(std::string msg, short msgid) {
    std::lock_guard<std::mutex> lock(_send_lock);
    int send_que_size = _send_que.size();
    if (send_que_size > MAX_SENDQUE) {
        std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
        return;
    }

    _send_que.push(make_shared<SendNode>(msg.c_str(), msg.length(), msgid));
    if (send_que_size > 0) {
        return;
    }
    auto& msgnode = _send_que.front();
    boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
        std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}

void CSession::Send(char* msg, short max_length, short msgid) {
    std::lock_guard<std::mutex> lock(_send_lock);
    int send_que_size = _send_que.size();
    if (send_que_size > MAX_SENDQUE) {
        std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
        return;
    }

    _send_que.push(make_shared<SendNode>(msg, max_length, msgid));
    if (send_que_size>0) {
        return;
    }
    auto& msgnode = _send_que.front();
    boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), 
        std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}

void CSession::Close() {
    _socket.close();
    _b_close = true;
}

std::shared_ptr<CSession>CSession::SharedSelf() {
    return shared_from_this();
}

void CSession::HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) {
    //增加异常处理
    try {
        if (!error) {
            std::lock_guard<std::mutex> lock(_send_lock);
            //cout << "send data " << _send_que.front()->_data+HEAD_LENGTH << endl;
            _send_que.pop();
            if (!_send_que.empty()) {
                auto& msgnode = _send_que.front();
                boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
                    std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_self));
            }
        }
        else {
            std::cout << "handle write failed, error is " << error.what() << endl;
            Close();
            _server->ClearSession(_uuid);
        }
    }
    catch (std::exception& e) {
        std::cerr << "Exception code : " << e.what() << endl;
    }
    
}

void CSession::HandleRead(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self){
    try {
        if (!error) {
            //已经移动的字符数
            int copy_len = 0;
            while (bytes_transferred > 0) {
                if (!_b_head_parse) {
                    //收到的数据不足头部大小
                    if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {
                        memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
                        _recv_head_node->_cur_len += bytes_transferred;
                        ::memset(_data, 0, MAX_LENGTH);
                        _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                            std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                        return;
                    }
                    //收到的数据比头部多
                    //头部剩余未复制的长度
                    int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;
                    memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);
                    //更新已处理的data长度和剩余未处理的长度
                    copy_len += head_remain;
                    bytes_transferred -= head_remain;
                    //获取头部MSGID数据
                    short msg_id = 0;
                    memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
                    //网络字节序转化为本地字节序
                    msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
                    std::cout << "msg_id is " << msg_id << endl;
                    //id非法
                    if (msg_id > MAX_LENGTH) {
                        std::cout << "invalid msg_id is " << msg_id << endl;
                        _server->ClearSession(_uuid);
                        return;
                    }
                    short msg_len = 0;
                    memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);
                    //网络字节序转化为本地字节序
                    msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
                    std::cout << "msg_len is " << msg_len << endl;
                    //id非法
                    if (msg_len > MAX_LENGTH) {
                        std::cout << "invalid data length is " << msg_len << endl;
                        _server->ClearSession(_uuid);
                        return;
                    }

                    _recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);

                    //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
                    if (bytes_transferred < msg_len) {
                        memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
                        _recv_msg_node->_cur_len += bytes_transferred;
                        ::memset(_data, 0, MAX_LENGTH);
                        _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                            std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                        //头部处理完成
                        _b_head_parse = true;
                        return;
                    }

                    memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len);
                    _recv_msg_node->_cur_len += msg_len;
                    copy_len += msg_len;
                    bytes_transferred -= msg_len;
                    _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
                    //cout << "receive data is " << _recv_msg_node->_data << endl;
                    //此处将消息投递到逻辑队列中
                    LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));
                
                    //继续轮询剩余未处理数据
                    _b_head_parse = false;
                    _recv_head_node->Clear();
                    if (bytes_transferred <= 0) {
                        ::memset(_data, 0, MAX_LENGTH);
                        _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                            std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                        return;
                    }
                    continue;
                }

                //已经处理完头部,处理上次未接受完的消息数据
                //接收的数据仍不足剩余未处理的
                int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
                if (bytes_transferred < remain_msg) {
                    memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
                    _recv_msg_node->_cur_len += bytes_transferred;
                    ::memset(_data, 0, MAX_LENGTH);
                    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                        std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                    return;
                }
                memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
                _recv_msg_node->_cur_len += remain_msg;
                bytes_transferred -= remain_msg;
                copy_len += remain_msg;
                _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
                //cout << "receive data is " << _recv_msg_node->_data << endl;
                //此处将消息投递到逻辑队列中
                LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));
                
                //继续轮询剩余未处理数据
                _b_head_parse = false;
                _recv_head_node->Clear();
                if (bytes_transferred <= 0) {
                    ::memset(_data, 0, MAX_LENGTH);
                    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                        std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                    return;
                }
                continue;
            }
        }
        else {
            std::cout << "handle read failed, error is " << error.what() << endl;
            Close();
            _server->ClearSession(_uuid);
        }
    }
    catch (std::exception& e) {
        std::cout << "Exception code is " << e.what() << endl;
    }
}

LogicNode::LogicNode(shared_ptr<CSession>  session, 
    shared_ptr<RecvNode> recvnode):_session(session),_recvnode(recvnode) {
    
}

复制代码

CServer.h

复制代码

#pragma once
#include <boost/asio.hpp>
#include "CSession.h"
#include <memory.h>
#include <map>
#include <mutex>
using namespace std;
using boost::asio::ip::tcp;
class CServer
{
public:
    CServer(boost::asio::io_context& io_context, short port);
    ~CServer();
    void ClearSession(std::string);
private:
    void HandleAccept(shared_ptr<CSession>, const boost::system::error_code & error);
    void StartAccept();
    boost::asio::io_context &_io_context;
    short _port;
    tcp::acceptor _acceptor;
    std::map<std::string, shared_ptr<CSession>> _sessions;
    std::mutex _mutex;
};

复制代码

CServer.cpp

复制代码

#include "CServer.h"
#include <iostream>
#include "AsioIOServicePool.h"
CServer::CServer(boost::asio::io_context& io_context, short port):_io_context(io_context), _port(port),
_acceptor(io_context, tcp::endpoint(tcp::v4(),port))
{
    cout << "Server start success, listen on port : " << _port << endl;
    StartAccept();
}

CServer::~CServer() {
    cout << "Server destruct listen on port : " << _port << endl;
}

void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error){
    if (!error) {
        new_session->Start();
        lock_guard<mutex> lock(_mutex);
        _sessions.insert(make_pair(new_session->GetUuid(), new_session));
    }
    else {
        cout << "session accept failed, error is " << error.what() << endl;
    }

    StartAccept();
}

void CServer::StartAccept() {
    auto &io_context = AsioIOServicePool::GetInstance()->GetIOService();
    shared_ptr<CSession> new_session = make_shared<CSession>(io_context, this);
    _acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1));
}

void CServer::ClearSession(std::string uuid) {
    lock_guard<mutex> lock(_mutex);
    _sessions.erase(uuid);
}

复制代码

AsioIOServicePool.h

复制代码

#pragma once
#include <vector>
#include <boost/asio.hpp>
#include "Singleton.h"
class AsioIOServicePool:public Singleton<AsioIOServicePool>
{
    friend Singleton<AsioIOServicePool>;
public:
    using IOService = boost::asio::io_context;
    using Work = boost::asio::io_context::work;
    using WorkPtr = std::unique_ptr<Work>;
    ~AsioIOServicePool();
    AsioIOServicePool(const AsioIOServicePool&) = delete;
    AsioIOServicePool& operator=(const AsioIOServicePool&) = delete;
    // 使用 round-robin 的方式返回一个 io_service
    boost::asio::io_context& GetIOService();
    void Stop();
private:
    AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency());
    std::vector<IOService> _ioServices;
    std::vector<WorkPtr> _works;
    std::vector<std::thread> _threads;
    std::size_t                        _nextIOService;
};

复制代码

AsioIOServicePool.cpp

复制代码

#include "AsioIOServicePool.h"
#include <iostream>
using namespace std;
AsioIOServicePool::AsioIOServicePool(std::size_t size):_ioServices(size),
_works(size), _nextIOService(0){
    for (std::size_t i = 0; i < size; ++i) {
        _works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));
    }

    //遍历多个ioservice,创建多个线程,每个线程内部启动ioservice
    for (std::size_t i = 0; i < _ioServices.size(); ++i) {
        _threads.emplace_back([this, i]() {
            _ioServices[i].run();
            });
    }
}

AsioIOServicePool::~AsioIOServicePool() {
    std::cout << "AsioIOServicePool destruct" << endl;
}

boost::asio::io_context& AsioIOServicePool::GetIOService() {
    auto& service = _ioServices[_nextIOService++];
    if (_nextIOService == _ioServices.size()) {
        _nextIOService = 0;
    }
    return service;
}

void AsioIOServicePool::Stop(){
    //因为仅仅执行work.reset并不能让iocontext从run的状态中退出
    //当iocontext已经绑定了读或写的监听事件后,还需要手动stop该服务。
    for (auto& work : _works) {
        //把服务先停止
        work->get_io_context().stop();
        work.reset();
    }

    for (auto& t : _threads) {
        t.join();
    }
}

复制代码

main.cpp 

复制代码

#include <iostream>
#include "CServer.h"
#include "Singleton.h"
#include "LogicSystem.h"
#include <csignal>
#include <thread>
#include <mutex>
#include "AsioIOServicePool.h"
using namespace std;
bool bstop = false;
std::condition_variable cond_quit;
std::mutex mutex_quit;

int main()
{
    try {
        auto pool = AsioIOServicePool::GetInstance();
        boost::asio::io_context  io_context;
        boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
        signals.async_wait([&io_context,pool](auto, auto) {
            io_context.stop();
            pool->Stop();
            });
        CServer s(io_context, 10086);
        io_context.run();
    }
    catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << endl;
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/770011.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Kafka集群安装部署

简介 Kafka是一款分布式的、去中心化的、高吞吐低延迟、订阅模式的消息队列系统。 同RabbitMQ一样&#xff0c;Kafka也是消息队列。不过RabbitMQ多用于后端系统&#xff0c;因其更加专注于消息的延迟和容错。 Kafka多用于大数据体系&#xff0c;因其更加专注于数据的吞吐能力…

AI网络爬虫006:从当当网批量获取图书信息

文章目录 一、目标二、输入内容三、输出内容一、目标 用户输入一个图书名称,然后程序自动从当当网批量获取图书信息 查看相关元素在源代码中的位置: 二、输入内容 第一步:在deepseek中输入提示词: 你是一个Python爬虫专家,一步步的思考,完成以下网页爬取的Python脚本任…

WEB攻防-XSS跨站反射型存储型DOM型标签闭合输入输出JS代码解析

文章目录 XSS跨站-输入输出-原理&分类&闭合XSS跨站-分类测试-反射&存储&DOM反射型XSS存储型XSSDOM-base型XSS XSS跨站-输入输出-原理&分类&闭合 漏洞原理&#xff1a;接受输入数据&#xff0c;输出显示数据后解析执行 基础类型&#xff1a;反射(非持续…

ffmpeg下载/配置环境/测试

一、下载 1、访问FFmpeg官方网站下载页面&#xff1a;FFmpeg Download Page&#xff1b; 2、选择适合Windows的版本&#xff08;将鼠标移动到windows端&#xff09;。通常&#xff0c;你会找到“Windows builds from gyan.dev”或者“BtbN GitHub Releases”等选项&#xff0…

Java的异常处理体系

目录 异常处理1、Java的异常类层次结构2、try-catch-finally 使用注意事项3、在Web应用中如何实现全局异常处理机制 异常处理 1、Java的异常类层次结构 其中Error表示程序运行错误 常见的错误类型有&#xff1a; OutOfMemoryError (内存溢出错误) StackOverFlowError (栈内存溢…

ctfshow-web入门-命令执行(web118详解)Linux 内置变量与Bash切片

输入数字和小写字母&#xff0c;回显 evil input 查看源码&#xff0c;发现这里会将提交的参数 code 传给 system 函数 使用 burpsuite 抓包进行单个字符的模糊测试 fuzz&#xff1a; 发现过滤掉了数字和小写字母以及一些符号&#xff0c;下面框起来的部分是可用的 结合题目提…

vue2使用use注册自定义指令实现输入控制与快捷复制

使用场景 在一些form表单填写内容的时候&#xff0c;要限制输入的内容必须是数值、浮点型&#xff0c;本来el-input-number就可以实现&#xff0c;但是它本身带那个数值控制操作&#xff0c;等一系列感觉不舒服的地方。如果只是使用el-input该多好&#xff0c;只要监听一下输入…

爬虫笔记20——票星球抢票脚本的实现

以下内容仅供交流学习使用&#xff01;&#xff01;&#xff01; 思路分析 前面的爬虫笔记一步一步走过来我们的技术水平也有了较大的提升了&#xff0c;现在我们来进行一下票星球抢票实战项目&#xff0c;实现票星球的自动抢票。 我们打开票星球的移动端页面&#xff0c;分…

身份证OCR识别的深度解读

引言 随着信息技术的飞速发展&#xff0c;光学字符识别&#xff08;OCR&#xff09;技术在各个领域得到了广泛应用。身份证OCR识别&#xff0c;作为OCR技术的一个重要分支&#xff0c;以其高效、准确的特点&#xff0c;在身份验证、信息录入等方面发挥着重要作用。本文将深入解…

【Linux】Linux用户,用户组,其他人

1.文件拥有者 初次接触Linux的朋友大概会觉得很怪异&#xff0c;怎么“Linux有这么多用户&#xff0c;还分什么用户组&#xff0c;有什用呢&#xff1f;”&#xff0c;这个“用户与用户组”的功能可是相当健全而且好用的一个安全防护措施。 怎么说呢&#xff1f;由于Linux是个…

Chapter10 高级纹理——Shader入门精要学习笔记

Chapter10 高级纹理 一、立方体纹理1.基本概念①组成②采样 2.天空盒子 Sky Box3.环境映射三种方法①特殊布局的纹理创建②手动创建Cubemap——老方法③脚本生成 4.反射5.折射6.菲涅尔反射 二、渲染1.镜子效果2.玻璃效果3.渲染纹理 vs GrabPass 三、程序纹理1.简单程序纹理2.Un…

使用 bend-ingest-kafka 将数据流实时导入到 Databend

作者&#xff1a;韩山杰 Databend Cloud 研发工程师 https://github.com/hantmac Databend是一个开源、高性能、低成本易于扩展的新一代云数据仓库。bend-ingest-kafka 是一个专为 Databend 设计的实时数据导入工具&#xff0c;它允许用户从 Apache Kafka 直接将数据流导入到 D…

MacOS下更新curl

苹果自带的curl不支持Https,我们可以通过curl -V看到如下结果 curl 7.72.0 (x86_64-apple-darwin18.6.0) libcurl/7.72.0 zlib/1.2.12 libidn2/2.3.7 librtmp/2.3 Release-Date: 2020-08-19 Protocols: dict file ftp gopher http imap ldap ldaps pop3 rtmp rtsp smtp telne…

LabVIEW汽车ECU测试系统

开发了一个基于LabVIEW开发的汽车发动机控制单元&#xff08;ECU&#xff09;测试系统。该系统使用了NI的硬件和LabVIEW软件&#xff0c;能够自动执行ECU的功能测试和性能测试&#xff0c;确保其在不同工作条件下的可靠性和功能性。通过自动化测试系统&#xff0c;大大提高了测…

基于xilinx FPGA的GTX/GTH/GTY位置信息查看方式(如X0Y0在bank几)

目录 1 概述2 参考文档3 查看方式4查询总结&#xff1a; 1 概述 本文用于介绍如何查看xilinx fpga GTX得位置信息&#xff08;如X0Y0在哪个BANK/Quad&#xff09;。 2 参考文档 《ug476_7Series_Transceivers》 《pg156-ultrascale-pcie-gen3-en-us-4.4》 3 查看方式 通过…

linux——IPC 进程间通信

IPC 进程间通信 interprocess communicate IPC&#xff08;Inter-Process Communication&#xff09;&#xff0c;即进程间通信&#xff0c;其产生的原因主要可以归纳为以下几点&#xff1a; 进程空间的独立性 资源隔离&#xff1a;在现代操作系统中&#xff0c;每个进程都…

Hadoop-12-Hive 基本介绍 下载安装配置 MariaDB安装 3台云服务Hadoop集群 架构图 对比SQL HQL

章节内容 上一节我们完成了&#xff1a; Reduce JOIN 的介绍Reduce JOIN 的具体实现DriverMapperReducer运行测试 背景介绍 这里是三台公网云服务器&#xff0c;每台 2C4G&#xff0c;搭建一个Hadoop的学习环境&#xff0c;供我学习。 之前已经在 VM 虚拟机上搭建过一次&am…

独立开发者系列(18)——js的window对象

独立开发者&#xff0c;必然要面对JS代码&#xff0c;基本可以认为在脚本语言里面&#xff0c;JS门槛最低&#xff0c;正因为如此&#xff0c;JS也是最受欢迎的开发语言之一。JS的代码运行规律&#xff0c;按照代码模块执行&#xff0c;也就是<script></script> 每…

2024年上半年网络工程师下午真题及答案解析

试题一(20分) 某高校网络拓扑如下图所示&#xff0c;两校区核心&#xff08;CORE-1、CORE-2&#xff09;&#xff0c;出口防火墙&#xff08;NGFW-1、NGFW-2&#xff09;通过校区间光缆互联&#xff0c;配置OSPF实现全校路由收敛&#xff0c;两校区相距40km。两校区默认由本地…

「媒体邀约」苏州媒体宣传服务公司

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 媒体宣传加速季&#xff0c;100万补贴享不停&#xff0c;一手媒体资源&#xff0c;全国100城线下落地执行。详情请联系胡老师。 苏州的媒体资源相当丰富&#xff0c;涵盖了报纸、电视、广…