仿RabbitMQ实现消息队列服务端(二)

文章目录

  • ⽹络通信协议设计
  • 信道管理模块
  • 连接管理模块
  • 服务器模块实现

⽹络通信协议设计

在这里插入图片描述
其中⽣产者和消费者都是客⼾端,它们都需要通过⽹络和BrokerServer进⾏通信。具体通信的过程我们使⽤Muduo库来实现,使⽤TCP作为通信的底层协议,同时在这个基础上⾃定义应⽤层协议,完成客⼾端对服务器功能的远端调⽤。我们要实现的远端调⽤接⼝包括:

在这里插入图片描述
使⽤⼆进制的⽅式设计应⽤层协议。因为MQMessage的消息体是使⽤Protobuf进⾏序列化的,本⾝是按照⼆进制存储的,所以不太适合⽤json等⽂本格式来定义协议。

下⾯我们设计⼀下应⽤层协议:请求/响应报⽂设计

在这里插入图片描述

  • len:4个字节,表⽰整个报⽂的⻓度
  • nameLen:4个字节,表⽰typeName数组的⻓度
  • typeName:是个字节数组,占nameLen个字节,表⽰请求/响应报⽂的类型名,作⽤是分发不同消息到对应的远端接⼝调⽤中
  • protobufData:是个字节数组,占len-nameLen-8个字节,表⽰请求/响应参数数据通过protobuf序列化之后的⼆进制
  • checkSum:4个字节,表⽰整个消息的校验和,作⽤是为了校验请求/响应报⽂的完整性

⼀个创建交换机的请求,如下图⽰:

在这里插入图片描述

信道管理模块

在AMQP模型中,除了通信连接Connection概念外,还有⼀个Channel的概念,Channel是针对Connection连接的⼀个更细粒度的通信信道,多个Channel可以使⽤同⼀个通信连接Connection进⾏通信,但是同⼀个Connection的Channel之间相互独⽴。

⽽信道模块就是再次将上述模块进⾏整合提供服务的模块

proto.proto

syntax = "proto3";
package nzq;

import "msg.proto";

//信道的打开与关闭
message openChannelRequest{
    string rid = 1;
    string cid = 2;
};
message closeChannelRequest{
    string rid = 1;
    string cid = 2;
};
//交换机的声明与删除
message declareExchangeRequest{
    string rid = 1;
    string cid = 2;
    string exchange_name = 3;
    ExchangeType exchange_type = 4;
    bool durable = 5;
    bool auto_delete = 6;
    map<string, string> args = 7;
};
message deleteExchangeRequest{
    string rid = 1;
    string cid = 2;
    string exchange_name = 3;
};
//队列的声明与删除
message declareQueueRequest{
    string rid = 1;
    string cid = 2;
    string queue_name = 3;
    bool exclusive = 4;
    bool durable = 5;
    bool auto_delete = 6;
    map<string, string> args = 7;
};
message deleteQueueRequest{
    string rid = 1;
    string cid = 2;
    string queue_name = 3;
};
//队列的绑定与解除绑定
message queueBindRequest{
    string rid = 1;
    string cid = 2;
    string exchange_name = 3;
    string queue_name = 4;
    string binding_key = 5;
};
message queueUnBindRequest{
    string rid = 1;
    string cid = 2;
    string exchange_name = 3;
    string queue_name = 4;
};
//消息的发布
message basicPublishRequest {
    string rid = 1;
    string cid = 2;
    string exchange_name = 3;
    string body = 4;
    BasicProperties properties = 5;
};
//消息的确认
message basicAckRequest {
    string rid = 1;
    string cid = 2;
    string queue_name = 3;
    string message_id = 4;
};
//队列的订阅
message basicConsumeRequest {
    string rid = 1;
    string cid = 2;
    string consumer_tag  =3;
    string queue_name = 4;
    bool auto_ack = 5;
};
//订阅的取消
message basicCancelRequest {
    string rid = 1;
    string cid = 2;
    string consumer_tag = 3;
    string queue_name = 4;
};
//消息的推送
message basicConsumeResponse {
    string cid = 1;
    string consumer_tag = 2;
    string body = 3;
    BasicProperties properties = 4;
};
//通用响应
message basicCommonResponse {
    string rid = 1;
    string cid = 2;
    bool ok = 3;
}

在这里插入图片描述

  1. 管理信息:
  • a. 信道ID:信道的唯⼀标识
  • b. 信道关联的消费者:⽤于消费者信道在关闭的时候取消订阅,删除订阅者信息
  • c. 信道关联的连接:⽤于向客⼾端发送数据(响应,推送的消息)
  • d. protobuf协议处理句柄:⽹络通信前的协议处理
  • e. 消费者管理句柄:信道关闭/取消订阅的时候,通过句柄删除订阅者信息
  • f. 虚拟机句柄:交换机/队列/绑定/消息数据管理
  • g. ⼯作线程池句柄(⼀条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过
    程由线程池完成)
  1. 管理操作:
  • a. 提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)
  • b. 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)
  • c. 提供绑定&解绑队列操作
  • d. 提供订阅&取消订阅队列消息操作
  • e. 提供发布&确认消息操作
    using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
    using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;
    using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;
    using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;
    using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;
    using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;
    using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;
    using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;
    using queueUnBindRequestPtr = std::shared_ptr<queueUnBindRequest>;
    using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;
    using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;
    using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;
    using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;
    class Channel {
        public:
            using ptr = std::shared_ptr<Channel>;
            Channel(const std::string &id, 
                const VirtualHost::ptr &host, 
                const ConsumerManager::ptr &cmp, 
                const ProtobufCodecPtr &codec, 
                const muduo::net::TcpConnectionPtr &conn,
                const threadpool::ptr &pool):
                _cid(id),
                _conn(conn),
                _codec(codec),
                _cmp(cmp),
                _host(host),
                _pool(pool){
                DLOG("new Channel: %p", this);
            }
            ~Channel() {
                if (_consumer.get() != nullptr) {
                    _cmp->remove(_consumer->tag, _consumer->qname);
                }
                DLOG("del Channel: %p", this);
            }
            //交换机的声明与删除
            void declareExchange(const declareExchangeRequestPtr &req) {
                bool ret = _host->declareExchange(req->exchange_name(), 
                    req->exchange_type(), req->durable(), 
                    req->auto_delete(), req->args());
                return basicResponse(ret, req->rid(), req->cid());
            }
            void deleteExchange(const deleteExchangeRequestPtr &req) {
                _host->deleteExchange(req->exchange_name());
                return basicResponse(true, req->rid(), req->cid());
            }
            //队列的声明与删除
            void declareQueue(const declareQueueRequestPtr &req) {
                bool ret = _host->declareQueue(req->queue_name(),
                    req->durable(), req->exclusive(),
                    req->auto_delete(), req->args());
                if (ret == false) {
                    return basicResponse(false, req->rid(), req->cid());
                }
                _cmp->initQueueConsumer(req->queue_name());//初始化队列的消费者管理句柄
                return basicResponse(true, req->rid(), req->cid());
            }
            void deleteQueue(const deleteQueueRequestPtr &req) {
                _cmp->destroyQueueConsumer(req->queue_name());
                _host->deleteQueue(req->queue_name());
                return basicResponse(true, req->rid(), req->cid());
            }
            //队列的绑定与解除绑定
            void queueBind(const queueBindRequestPtr &req) {
                bool ret = _host->bind(req->exchange_name(), 
                    req->queue_name(), req->binding_key());
                return basicResponse(ret, req->rid(), req->cid());
            }
            void queueUnBind(const queueUnBindRequestPtr &req) {
                _host->unBind(req->exchange_name(), req->queue_name());
                return basicResponse(true, req->rid(), req->cid());
            }
            //消息的发布
            void basicPublish(const basicPublishRequestPtr &req) {
                //1. 判断交换机是否存在
                auto ep = _host->selectExchange(req->exchange_name());
                if (ep.get() == nullptr) {
                    return basicResponse(false, req->rid(), req->cid());
                }
                //2. 进行交换路由,判断消息可以发布到交换机绑定的哪个队列中
                MsgQueueBindingMap mqbm = _host->exchangeBindings(req->exchange_name());
                BasicProperties *properties = nullptr;
                std::string routing_key;
                if (req->has_properties()) {
                    properties = req->mutable_properties();
                    routing_key = properties->routing_key();
                }
                for (auto &binding : mqbm) {
                    if (Router::route(ep->type, routing_key, binding.second->binding_key)) {
                        //3. 将消息添加到队列中(添加消息的管理)
                        _host->basicPublish(binding.first, properties, req->body());
                        //4. 向线程池中添加一个消息消费任务(向指定队列的订阅者去推送消息--线程池完成)
                        auto task = std::bind(&Channel::consume, this, binding.first);
                        _pool->push(task);
                    }
                }
                return basicResponse(true, req->rid(), req->cid());
            }
            //消息的确认
            void basicAck(const basicAckRequestPtr &req) {
                _host->basicAck(req->queue_name(), req->message_id());
                return basicResponse(true, req->rid(), req->cid());
            }
            //订阅队列消息
            void basicConsume(const basicConsumeRequestPtr &req) {
                //1. 判断队列是否存在
                bool ret = _host->existsQueue(req->queue_name());
                if (ret == false) {
                    return basicResponse(false, req->rid(), req->cid());
                }
                //2. 创建队列的消费者
                auto cb = std::bind(&Channel::callback, this, std::placeholders::_1,
                    std::placeholders::_2, std::placeholders::_3);
                //创建了消费者之后,当前的channel角色就是个消费者
                _consumer = _cmp->create(req->consumer_tag(), req->queue_name(), req->auto_ack(), cb);
                return basicResponse(true, req->rid(), req->cid());
            }
            //取消订阅
            void basicCancel(const basicCancelRequestPtr &req) {
                _cmp->remove(req->consumer_tag(), req->queue_name());
                return basicResponse(true, req->rid(), req->cid());
            }
        private:
            void callback(const std::string tag, const BasicProperties *bp, const std::string &body) {
                //针对参数组织出推送消息请求,将消息推送给channel对应的客户端
                basicConsumeResponse resp;
                resp.set_cid(_cid);
                resp.set_body(body);
                resp.set_consumer_tag(tag);
                if (bp) {
                    resp.mutable_properties()->set_id(bp->id());
                    resp.mutable_properties()->set_delivery_mode(bp->delivery_mode());
                    resp.mutable_properties()->set_routing_key(bp->routing_key());
                }
                _codec->send(_conn, resp);
            }
            void consume(const std::string &qname) {
                //指定队列消费消息
                //1. 从队列中取出一条消息
                MessagePtr mp = _host->basicConsume(qname);
                if (mp.get() == nullptr) {
                    DLOG("执行消费任务失败,%s 队列没有消息!", qname.c_str());
                    return;
                }
                //2. 从队列订阅者中取出一个订阅者
                Consumer::ptr cp = _cmp->choose(qname);
                if (cp.get() == nullptr) {
                    DLOG("执行消费任务失败,%s 队列没有消费者!", qname.c_str());
                    return;
                }
                //3. 调用订阅者对应的消息处理函数,实现消息的推送
                cp->callback(cp->tag, mp->mutable_payload()->mutable_properties(), mp->payload().body());
                //4. 判断如果订阅者是自动确认---不需要等待确认,直接删除消息,否则需要外部收到消息确认后再删除
                if (cp->auto_ack) _host->basicAck(qname, mp->payload().properties().id());
            }
            void basicResponse(bool ok, const std::string &rid, const std::string &cid) {
                basicCommonResponse resp;
                resp.set_rid(rid);
                resp.set_cid(cid);
                resp.set_ok(ok);
                _codec->send(_conn, resp);
            }
        private:
            std::string _cid;
            Consumer::ptr _consumer;
            muduo::net::TcpConnectionPtr _conn;
            ProtobufCodecPtr _codec;
            ConsumerManager::ptr _cmp;
            VirtualHost::ptr _host;
            threadpool::ptr _pool;
    };
  1. 信道管理
  • a. 信道的增删查。
    class ChannelManager {
        public:
            using ptr = std::shared_ptr<ChannelManager>;
            ChannelManager(){}
            bool openChannel(const std::string &id, 
                const VirtualHost::ptr &host, 
                const ConsumerManager::ptr &cmp, 
                const ProtobufCodecPtr &codec, 
                const muduo::net::TcpConnectionPtr &conn,
                const threadpool::ptr &pool) {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _channels.find(id);
                if (it != _channels.end()) {
                    DLOG("信道:%s 已经存在!", id.c_str());
                    return false;
                }
                auto channel = std::make_shared<Channel>(id, host, cmp, codec, conn, pool);
                _channels.insert(std::make_pair(id, channel));
                return true;
            }
            void closeChannel(const std::string &id){
                std::unique_lock<std::mutex> lock(_mutex);
                _channels.erase(id);
            }
            Channel::ptr getChannel(const std::string &id) {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _channels.find(id);
                if (it == _channels.end()) {
                    return Channel::ptr();
                }
                return it->second;
            }
        private:
            std::mutex _mutex;
            std::unordered_map<std::string, Channel::ptr> _channels;
    };

连接管理模块

向⽤⼾提供⼀个⽤于实现⽹络通信的Connection对象,从其内部可创建出粒度更轻的Channel对象,⽤于与客⼾端进⾏⽹络通信。
在这里插入图片描述

  1. 成员信息:
  • a. 连接关联的信道管理句柄(实现信道的增删查)
  • b. 连接关联的实际⽤于通信的muduo::net::Connection连接
  • c. protobuf协议处理的句柄(ProtobufCodec对象)
  • d. 消费者管理句柄
  • e. 虚拟机句柄
  • f. 异步⼯作线程池句柄
  1. 连接操作:
  • a. 提供创建Channel信道的操作
  • b. 提供删除Channel信道的操作
    class Connection {
        public:
            using ptr = std::shared_ptr<Connection>;
            Connection(const VirtualHost::ptr &host, 
                const ConsumerManager::ptr &cmp, 
                const ProtobufCodecPtr &codec, 
                const muduo::net::TcpConnectionPtr &conn,
                const threadpool::ptr &pool) :
                _conn(conn),
                _codec(codec),
                _cmp(cmp),
                _host(host),
                _pool(pool),
                _channels(std::make_shared<ChannelManager>()){}
            void openChannel(const openChannelRequestPtr &req) {
                //1. 判断信道ID是否重复,创建信道
                bool ret = _channels->openChannel(req->cid(), _host, _cmp, _codec, _conn, _pool);
                if (ret == false) {
                    DLOG("创建信道的时候,信道ID重复了");
                    return basicResponse(false, req->rid(), req->cid());
                }
                DLOG("%s 信道创建成功!", req->cid().c_str());
                //3. 给客户端进行回复
                return basicResponse(true, req->rid(), req->cid());
            }
            void closeChannel(const closeChannelRequestPtr &req) {
                _channels->closeChannel(req->cid());
                return basicResponse(true, req->rid(), req->cid());
            }
            Channel::ptr getChannel(const std::string &cid) {
                return _channels->getChannel(cid);
            }
        private:
            void basicResponse(bool ok, const std::string &rid, const std::string &cid) {
                basicCommonResponse resp;
                resp.set_rid(rid);
                resp.set_cid(cid);
                resp.set_ok(ok);
                _codec->send(_conn, resp);
            }
        private:
            muduo::net::TcpConnectionPtr _conn;
            ProtobufCodecPtr _codec;
            ConsumerManager::ptr _cmp;
            VirtualHost::ptr _host;
            threadpool::ptr _pool;
            ChannelManager::ptr _channels;
    };
  1. 连接管理:
  • a. 连接的增删查
    class ConnectionManager {
        public:
            using ptr = std::shared_ptr<ConnectionManager>;
            ConnectionManager() {}
            void newConnection(const VirtualHost::ptr &host, 
                const ConsumerManager::ptr &cmp, 
                const ProtobufCodecPtr &codec, 
                const muduo::net::TcpConnectionPtr &conn,
                const threadpool::ptr &pool) {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _conns.find(conn);
                if (it != _conns.end()) {
                    return ;
                }
                Connection::ptr self_conn = std::make_shared<Connection>(host, cmp, codec, conn, pool);
                _conns.insert(std::make_pair(conn, self_conn));
            }
            void delConnection(const muduo::net::TcpConnectionPtr &conn) {
                std::unique_lock<std::mutex> lock(_mutex);
                _conns.erase(conn);
            }
            Connection::ptr getConnection(const muduo::net::TcpConnectionPtr &conn) {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _conns.find(conn);
                if (it == _conns.end()) {
                    return Connection::ptr();
                }
                return it->second;
            }
        private:
            std::mutex _mutex;
            std::unordered_map<muduo::net::TcpConnectionPtr, Connection::ptr> _conns;
    };

注意:
在RabbitMQ中,虚拟主机是可以随意创建/删除的,但是咱们此处为了实现简单,并没有实现虚拟主机的管理,因此我们默认就只有⼀个虚拟主机的存在,但是在数据结构的设计上我们预留了对于多虚拟主机的管理,从⽽保证不同虚拟主机中的Exchange、Queue、Binding、Message等资源都是相互隔离的。

服务器模块实现

服务器模块我们借助Muduo⽹络库来实现。

在这里插入图片描述
在这里插入图片描述

BrokerServer模块是对整体服务器所有模块的整合,接收客⼾端的请求,并提供服务。

基于前边实现的简单的翻译服务器代码,进⾏改造,只需要实现服务器内部提供服务的各个业务接即可。

在各个业务处理函数中,也⽐较简单,创建信道后,每次请求过来后,找到请求对应的信道句柄,通过句柄调⽤前边封装好的处理接⼝进⾏请求处理,最终返回处理结果。

    #define DBFILE "/meta.db"
    #define HOSTNAME "MyVirtualHost"
    class Server {
        public:
            typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
            Server(int port, const std::string &basedir): _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), 
                "Server", muduo::net::TcpServer::kReusePort),
                _dispatcher(std::bind(&Server::onUnknownMessage, this, std::placeholders::_1, 
                    std::placeholders::_2, std::placeholders::_3)),
                _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),
                _virtual_host(std::make_shared<VirtualHost>(HOSTNAME, basedir, basedir + DBFILE)),
                _consumer_manager(std::make_shared<ConsumerManager>()),
                _connection_manager(std::make_shared<ConnectionManager>()),
                _threadpool(std::make_shared<threadpool>()){
                //针对历史消息中的所有队列,别忘了,初始化队列的消费者管理结构
                QueueMap qm = _virtual_host->allQueues();
                for (auto &q : qm) {
                    _consumer_manager->initQueueConsumer(q.first);
                }
                //注册业务请求处理函数
                _dispatcher.registerMessageCallback<nzq::openChannelRequest>(std::bind(&Server::onOpenChannel, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                _dispatcher.registerMessageCallback<nzq::closeChannelRequest>(std::bind(&Server::onCloseChannel, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                _dispatcher.registerMessageCallback<nzq::declareExchangeRequest>(std::bind(&Server::onDeclareExchange, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                _dispatcher.registerMessageCallback<nzq::deleteExchangeRequest>(std::bind(&Server::onDeleteExchange, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                _dispatcher.registerMessageCallback<nzq::declareQueueRequest>(std::bind(&Server::onDeclareQueue, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                _dispatcher.registerMessageCallback<nzq::deleteQueueRequest>(std::bind(&Server::onDeleteQueue, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                _dispatcher.registerMessageCallback<nzq::queueBindRequest>(std::bind(&Server::onQueueBind, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                _dispatcher.registerMessageCallback<nzq::queueUnBindRequest>(std::bind(&Server::onQueueUnBind, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                _dispatcher.registerMessageCallback<nzq::basicPublishRequest>(std::bind(&Server::onBasicPublish, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                _dispatcher.registerMessageCallback<nzq::basicAckRequest>(std::bind(&Server::onBasicAck, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                _dispatcher.registerMessageCallback<nzq::basicConsumeRequest>(std::bind(&Server::onBasicConsume, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                _dispatcher.registerMessageCallback<nzq::basicCancelRequest>(std::bind(&Server::onBasicCancel, this, 
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

                _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),
                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
                _server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));
            }
            void start() {
                _server.start();
                _baseloop.loop();
            }
        private:
            //打开信道
            void onOpenChannel(const muduo::net::TcpConnectionPtr& conn, const openChannelRequestPtr& message, muduo::Timestamp) {
                Connection::ptr mconn = _connection_manager->getConnection(conn);
                if (mconn.get() == nullptr) {
                    DLOG("打开信道时,没有找到连接对应的Connection对象!");
                    conn->shutdown();
                    return;
                }
                return mconn->openChannel(message);
            }
            //关闭信道
            void onCloseChannel(const muduo::net::TcpConnectionPtr& conn, const closeChannelRequestPtr& message, muduo::Timestamp) {
                Connection::ptr mconn = _connection_manager->getConnection(conn);
                if (mconn.get() == nullptr) {
                    DLOG("关闭信道时,没有找到连接对应的Connection对象!");
                    conn->shutdown();
                    return;
                }
                return mconn->closeChannel(message);
            }
            //声明交换机
            void onDeclareExchange(const muduo::net::TcpConnectionPtr& conn, const declareExchangeRequestPtr& message, muduo::Timestamp) {
                Connection::ptr mconn = _connection_manager->getConnection(conn);
                if (mconn.get() == nullptr) {
                    DLOG("声明交换机时,没有找到连接对应的Connection对象!");
                    conn->shutdown();
                    return;
                }
                Channel::ptr cp = mconn->getChannel(message->cid());
                if (cp.get() == nullptr) {
                    DLOG("声明交换机时,没有找到信道!");
                    return;
                }
                return cp->declareExchange(message);
            }
            //删除交换机
            void onDeleteExchange(const muduo::net::TcpConnectionPtr& conn, const deleteExchangeRequestPtr& message, muduo::Timestamp) {
                Connection::ptr mconn = _connection_manager->getConnection(conn);
                if (mconn.get() == nullptr) {
                    DLOG("删除交换机时,没有找到连接对应的Connection对象!");
                    conn->shutdown();
                    return;
                }
                Channel::ptr cp = mconn->getChannel(message->cid());
                if (cp.get() == nullptr) {
                    DLOG("删除交换机时,没有找到信道!");
                    return;
                }
                return cp->deleteExchange(message);
            }
            //声明队列
            void onDeclareQueue(const muduo::net::TcpConnectionPtr& conn, const declareQueueRequestPtr& message, muduo::Timestamp) {
                Connection::ptr mconn = _connection_manager->getConnection(conn);
                if (mconn.get() == nullptr) {
                    DLOG("声明队列时,没有找到连接对应的Connection对象!");
                    conn->shutdown();
                    return;
                }
                Channel::ptr cp = mconn->getChannel(message->cid());
                if (cp.get() == nullptr) {
                    DLOG("声明队列时,没有找到信道!");
                    return;
                }
                return cp->declareQueue(message);
            }
            //删除队列
            void onDeleteQueue(const muduo::net::TcpConnectionPtr& conn, const deleteQueueRequestPtr& message, muduo::Timestamp) {
                Connection::ptr mconn = _connection_manager->getConnection(conn);
                if (mconn.get() == nullptr) {
                    DLOG("删除队列时,没有找到连接对应的Connection对象!");
                    conn->shutdown();
                    return;
                }
                Channel::ptr cp = mconn->getChannel(message->cid());
                if (cp.get() == nullptr) {
                    DLOG("删除队列时,没有找到信道!");
                    return;
                }
                return cp->deleteQueue(message);
            }
            //队列绑定
            void onQueueBind(const muduo::net::TcpConnectionPtr& conn, const queueBindRequestPtr& message, muduo::Timestamp) {
                Connection::ptr mconn = _connection_manager->getConnection(conn);
                if (mconn.get() == nullptr) {
                    DLOG("队列绑定时,没有找到连接对应的Connection对象!");
                    conn->shutdown();
                    return;
                }
                Channel::ptr cp = mconn->getChannel(message->cid());
                if (cp.get() == nullptr) {
                    DLOG("队列绑定时,没有找到信道!");
                    return;
                }
                return cp->queueBind(message);
            }
            //队列解绑
            void onQueueUnBind(const muduo::net::TcpConnectionPtr& conn, const queueUnBindRequestPtr& message, muduo::Timestamp) {
                Connection::ptr mconn = _connection_manager->getConnection(conn);
                if (mconn.get() == nullptr) {
                    DLOG("队列解除绑定时,没有找到连接对应的Connection对象!");
                    conn->shutdown();
                    return;
                }
                Channel::ptr cp = mconn->getChannel(message->cid());
                if (cp.get() == nullptr) {
                    DLOG("队列解除绑定时,没有找到信道!");
                    return;
                }
                return cp->queueUnBind(message);
            }
            //消息发布
            void onBasicPublish(const muduo::net::TcpConnectionPtr& conn, const basicPublishRequestPtr& message, muduo::Timestamp) {
                Connection::ptr mconn = _connection_manager->getConnection(conn);
                if (mconn.get() == nullptr) {
                    DLOG("发布消息时,没有找到连接对应的Connection对象!");
                    conn->shutdown();
                    return;
                }
                Channel::ptr cp = mconn->getChannel(message->cid());
                if (cp.get() == nullptr) {
                    DLOG("发布消息时,没有找到信道!");
                    return;
                }
                return cp->basicPublish(message);
            }
            //消息确认
            void onBasicAck(const muduo::net::TcpConnectionPtr& conn, const basicAckRequestPtr& message, muduo::Timestamp) {
                Connection::ptr mconn = _connection_manager->getConnection(conn);
                if (mconn.get() == nullptr) {
                    DLOG("确认消息时,没有找到连接对应的Connection对象!");
                    conn->shutdown();
                    return;
                }
                Channel::ptr cp = mconn->getChannel(message->cid());
                if (cp.get() == nullptr) {
                    DLOG("确认消息时,没有找到信道!");
                    return;
                }
                return cp->basicAck(message);
            }
            //队列消息订阅
            void onBasicConsume(const muduo::net::TcpConnectionPtr& conn, const basicConsumeRequestPtr& message, muduo::Timestamp) {
                Connection::ptr mconn = _connection_manager->getConnection(conn);
                if (mconn.get() == nullptr) {
                    DLOG("队列消息订阅时,没有找到连接对应的Connection对象!");
                    conn->shutdown();
                    return;
                }
                Channel::ptr cp = mconn->getChannel(message->cid());
                if (cp.get() == nullptr) {
                    DLOG("队列消息订阅时,没有找到信道!");
                    return;
                }
                return cp->basicConsume(message);
            }
            //队列消息取消订阅
            void onBasicCancel(const muduo::net::TcpConnectionPtr& conn, const basicCancelRequestPtr& message, muduo::Timestamp) {
                Connection::ptr mconn = _connection_manager->getConnection(conn);
                if (mconn.get() == nullptr) {
                    DLOG("队列消息取消订阅时,没有找到连接对应的Connection对象!");
                    conn->shutdown();
                    return;
                }
                Channel::ptr cp = mconn->getChannel(message->cid());
                if (cp.get() == nullptr) {
                    DLOG("队列消息取消订阅时,没有找到信道!");
                    return;
                }
                return cp->basicCancel(message);
            }
            void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp) {
                LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
                conn->shutdown();
            }
            void onConnection(const muduo::net::TcpConnectionPtr &conn) {
                if (conn->connected()) {
                    _connection_manager->newConnection(_virtual_host, _consumer_manager, _codec, conn, _threadpool);
                }else {
                    _connection_manager->delConnection(conn);
                }
            }
        private:
            muduo::net::EventLoop _baseloop;
            muduo::net::TcpServer _server;//服务器对象
            ProtobufDispatcher _dispatcher;//请求分发器对象--要向其中注册请求处理函数
            ProtobufCodecPtr _codec;//protobuf协议处理器--针对收到的请求数据进行protobuf协议处理
            VirtualHost::ptr _virtual_host;
            ConsumerManager::ptr _consumer_manager;
            ConnectionManager::ptr _connection_manager;
            threadpool::ptr _threadpool;
    };

server.cc

#include "broker.hpp"

int main()
{
    nzq::Server server(8085, "./data/");
    server.start();
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/887033.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【智能大数据分析 | 实验二】Spark实验:部署Spark集群

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈智能大数据分析 ⌋ ⌋ ⌋ 智能大数据分析是指利用先进的技术和算法对大规模数据进行深入分析和挖掘&#xff0c;以提取有价值的信息和洞察。它结合了大数据技术、人工智能&#xff08;AI&#xff09;、机器学习&#xff08;ML&a…

如何编写一个优雅的commit message

在Git中&#xff0c;git commit 命令扮演着至关重要的角色。它的主要作用是将暂存区&#xff08;staging area&#xff09;里的改动内容提交到本地仓库&#xff08;repository&#xff09;中&#xff0c;形成一个新的版本或提交&#xff08;commit&#xff09;。这个过程是 Git…

【HarmonyOS】时间处理Dayjs

背景 在项目中经常会使用要时间的格式转换&#xff0c;比如数据库返回一个Date数据&#xff0c;你需要转成2024-10-2的格式&#xff0c;鸿蒙的原生SDK中是没有办法实现的&#xff0c;因此&#xff0c;在这里介绍第三方封装好并且成熟使用的库Dayjs。 安装 切换到Entry文件夹下…

【学习资源】人在环路的机器学习

说明&#xff1a;本文图片和内容来源 Human-in-the-Loop Machine Learning Human-in-the-Loop Machine Learning Active learning and annotation for human-centered AI by Robert (Munro) Monarch, June 2021 介绍Human-in-the-Loop的目标&#xff0c;学习过程&#xff0c…

gdb 调试 linux 应用程序的技巧介绍

使用 gdb 来调试 Linux 应用程序时&#xff0c;可以显著提高开发和调试的效率。gdb&#xff08;GNU 调试器&#xff09;是一款功能强大的调试工具&#xff0c;适用于调试各类 C、C 程序。它允许我们在运行程序时检查其状态&#xff0c;设置断点&#xff0c;跟踪变量值的变化&am…

基于Arduino的宠物食物分配器

创作本文的初衷是本人的一个养宠物的梦想&#xff08;因为家里人对宠物过敏&#xff0c;因此养宠物的action一直没有落实&#xff09;&#xff0c;但是梦想总是要有的哈哈哈哈哈。上周正好是和一个很好的朋友见面&#xff0c;聊到了养宠物的事情&#xff0c;她大概是讲到了喂宠…

震撼!工业史上第一家万级别规模的工业数字化设备效果图平台

耗时八年打造&#xff0c;国内第一家万级别规模的工业数字化设备效果图平台 平台&#xff1a;www.kingview3d.cn 创作者&#xff1a;kingview3d郭工 行业&#xff1a;煤矿综合自动化、污水处理、净水处理、楼宇暖通、环保工程、医药废水处理、二供、无负压加压站、提升泵站、一…

《NoSQL》非关系型数据库MongoDB 学习笔记!

Mongo基础&#xff1a; 使用数据库&#xff1a; 使用use 命令 后面跟着要使用的数据库名字即可&#xff0c; 例如&#xff1a;use cities, 值得注意的是&#xff0c; mongo中不像mysql&#xff0c; 还需要先创建数据库&#xff0c;后访问&#xff0c; mongo中&#xff0c;你无…

【WebGis开发 - Cesium】如何确保Cesium场景加载完毕

目录 引言一、监听场景加载进度1. 基础代码2. 加工代码 二、进一步封装代码1. 已知存在的弊端2. 封装hooks函数 三、使用hooks方法1. 先看下效果2. 如何使用该hooks方法 三、总结 引言 本篇为Cesium开发的一些小技巧。 判断Cesium场景是否加载完毕这件事是非常有意义的。 加载…

在 Elasticsearch Serverless 上使用 Eland

作者&#xff1a;来自 Elastic Quentin Pradet 本博客将向你展示如何使用 Eland 将机器学习模型导入 Elasticsearch Serverless&#xff0c;然后如何使用类似 Pandas 的 API 探索 Elasticsearch。 Elasticsearch Serverless 中的 NLP 自 Elasticsearch 8.0 起&#xff0c;可以…

SQL专项练习第二天

在数据处理和分析中&#xff0c;Hive 是一个强大的工具。本文将通过五个 Hive 相关的问题展示其在不同场景下的应用技巧。 先在home文件夹下建一个hivedata文件夹&#xff0c;把我们所需的数据写成txt文件导入到/home/hivedata/文件夹下面。 一、找出连续活跃 3 天及以上的用户…

【AI论文精读1】针对知识密集型NLP任务的检索增强生成(RAG原始论文)

目录 一、简介一句话简介作者、引用数、时间论文地址开源代码地址 二、摘要三、引言四、整体架构&#xff08;用一个例子来阐明&#xff09;场景例子&#xff1a;核心点&#xff1a; 五、方法 &#xff08;架构各部分详解&#xff09;5.1 模型1. RAG-Sequence Model2. RAG-Toke…

Python+Matplotlib创建y=sinx、y=cosx、y=sinx+cosx可视化

y sin x (奇函数)&#xff1a; 图像关于原点对称。 对于任何 x&#xff0c;sin(-x) -sin(x)&#xff0c;符合奇函数定义。 y cos x (偶函数)&#xff1a; 图像关于 y 轴对称。 对于任何 x&#xff0c;cos(-x) cos(x)&#xff0c;符合偶函数定义。 y sin x cos x (既…

安全帽头盔检测数据集 3类 12000张 安全帽数据集 voc yolo

安全帽头盔检测数据集 3类 12000张 安全帽数据集 voc yolo 安全帽头盔检测数据集介绍 数据集名称 安全帽头盔检测数据集 (Safety Helmet and Person Detection Dataset) 数据集概述 该数据集专为训练和评估基于YOLO系列目标检测模型&#xff08;包括YOLOv5、YOLOv6、YOLOv7…

LabVIEW机床加工监控系统

随着制造业的快速发展&#xff0c;机床加工的效率与稳定性成为企业核心竞争力的关键。传统的机床监控方式存在效率低、无法远程监控的问题。为了解决这些问题&#xff0c;开发了一种基于LabVIEW的机床加工监控系统&#xff0c;通过实时监控机床状态&#xff0c;改进生产流程&am…

Spring MVC__入门

目录 一、SpringMVC简介1、什么是MVC2、什么是SpringMVC 二、Spring MVC实现原理2.1核心组件2.2工作流程 三、helloworld1、开发环境2、创建maven工程3、配置web.xml4、创建请求控制器5、创建springMVC的配置文件6、测试HelloWorld7、总结 一、SpringMVC简介 1、什么是MVC MV…

html5 + css3(上)

目录 HTML初识基础认知web标准vscode的简介和使用注释 HTML标签学习排版标签标题和段落换行和水平线标签 文本格式化标签媒体标签图片标签图片-基本使用图片-属性 路径绝对路径相对路径 音频标签视频标签链接标签 HTML基础列表标签列表-无序和有序列表-自定义 表格标签表格-使用…

【JAVA开源】基于Vue和SpringBoot的周边产品销售网站

本文项目编号 T 061 &#xff0c;文末自助获取源码 \color{red}{T061&#xff0c;文末自助获取源码} T061&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析 六、核心代码6.1 查…

Java网络通信—UDP

0.小记 1.udp通信不需要建立socket管道&#xff0c;一边只管发&#xff0c;一边只管收 2.客户端&#xff1a;将数据&#xff08;byte&#xff09;打包成包裹&#xff08;DatagramPacket&#xff09;&#xff0c;写上地址&#xff08;IP端口&#xff09;&#xff0c;通过快递站&…

【HTML并不简单】笔记1-常用rel总结:nofollow、noopener、opener、noreferrer,relList

文章目录 rel"nofollow"rel"noopener"与rel"opener"rel"noreferrer"relList对象 《HTML并不简单&#xff1a;Web前端开发精进秘籍》张鑫旭&#xff0c;一些摘要&#xff1a; HTML&#xff0c;这门语言的知识体系非常庞杂&#xff0c;涉…