【消息队列开发】 实现BrokerServer类——本体服务器

文章目录

  • 🍃前言
  • 🎋创建 BrokerServer 类
  • 🎍启动与停止服务器
  • 🍀实现处理连接
  • 🎄实现 readRequest 与 writeResponse
  • 🌴实现处理请求
  • 🌲实现 clearClosedSession
  • ⭕总结

🍃前言

本次开发任务

  • 实现 BrokerServer 类,也就是咱们消息队列的本体服务器。

其实本质上就是一个 TCP 的服务器。
在这里插入图片描述

🎋创建 BrokerServer 类

创建 BrokerServer 类如下:

public class BrokerServer {
	// 当前程序只考虑⼀个虚拟主机的情况.
	private VirtualHost virtualHost = new VirtualHost("default-VirtualHost");
	// key 为 channelId, value 为 channel 对应的 socket 对象.
	private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>
	private ServerSocket serverSocket;
	private ExecutorService executorService;
	private volatile boolean runnable = true;
}
  • virtualHost表示服务器持有的虚拟主机.队列,交换机,绑定,消息都是通过虚拟主机管理.
  • sessions ⽤来管理所有的客⼾端的连接. 记录每个客户端的 socket.
  • serverSocket 是服务器自身的 socket
  • executorService 这个线程池用来处理响应
  • runnable 这个标志位用来控制服务器的运行停⽌

🎍启动与停止服务器

代码实现如下:

public BrokerServer(int port) throws IOException {
    serverSocket = new ServerSocket(port);
}

public void start() throws IOException {
    System.out.println("[BrokerServer] 启动!");
    executorService = Executors.newCachedThreadPool();
    try {
        while (runnable) {
            Socket clientSocket = serverSocket.accept();
            // 把处理连接的逻辑丢给这个线程池.
            executorService.submit(() -> {
                processConnection(clientSocket);
            });
        }
    } catch (SocketException e) {
        System.out.println("[BrokerServer] 服务器停止运行!");
        // e.printStackTrace();
    }
}

// 一般来说停止服务器, 就是直接 kill 掉对应进程就行了.
// 此处还是搞一个单独的停止方法. 主要是用于后续的单元测试.
public void stop() throws IOException {
    runnable = false;
    // 把线程池中的任务都放弃了. 让线程都销毁.
    executorService.shutdownNow();
    serverSocket.close();
}

🍀实现处理连接

通过这个方法, 来处理一个客户端的连接.

我们使用 InputStreamOutputStream,由于后面要按照特定格式来读取并解析.

此时就需要用到 DataInputStreamDataOutputStream

在这一个连接中, 可能会涉及到多个请求和响应,我们使用一个while(true)来进行实现

在此循环我们要做的事情有三件:

  1. 读取请求并解析
  2. 根据请求计算响应
  3. 把响应写回客户端

具体处理逻辑,我们后面再仔细实现,

那么我们怎么结束这个循环呢?

注意我们上面使用的是 DataInputStreamDataOutputStream,当没有数据进行读取的时候,就会进行抛出异常而结束循环

最后当连接处理完了, 就需要记得关闭 socket, 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.

代码实现如下:

// 通过这个方法, 来处理一个客户端的连接.
// 在这一个连接中, 可能会涉及到多个请求和响应.
private void processConnection(Socket clientSocket) {
    try (InputStream inputStream = clientSocket.getInputStream();
         OutputStream outputStream = clientSocket.getOutputStream()) {
        // 这里需要按照特定格式来读取并解析. 此时就需要用到 DataInputStream 和 DataOutputStream
        try (DataInputStream dataInputStream = new DataInputStream(inputStream);
             DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
            while (true) {
                // 1. 读取请求并解析.
                Request request = readRequest(dataInputStream);
                // 2. 根据请求计算响应
                Response response = process(request, clientSocket);
                // 3. 把响应写回给客户端
                writeResponse(dataOutputStream, response);
            }
        }
    } catch (EOFException | SocketException e) {
        // 对于这个代码, DataInputStream 如果读到 EOF , 就会抛出一个 EOFException 异常.
        // 需要借助这个异常来结束循环
        System.out.println("[BrokerServer] connection 关闭! 客户端的地址: " + clientSocket.getInetAddress().toString()
                + ":" + clientSocket.getPort());
    } catch (IOException | ClassNotFoundException | MqException e) {
        System.out.println("[BrokerServer] connection 出现异常!");
        e.printStackTrace();
    } finally {
        try {
            // 当连接处理完了, 就需要记得关闭 socket
            clientSocket.close();
            // 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.
            clearClosedSession(clientSocket);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

🎄实现 readRequest 与 writeResponse

在这里插入图片描述

关于读取请求,我们前面定义了一个类为 Request ,此时我们构造相应的对象,并对该对象相应属性进行填充即可。

代码实现如下:

private Request readRequest(DataInputStream dataInputStream) throws IOException {
    Request request = new Request();
    request.setType(dataInputStream.readInt());
    request.setLength(dataInputStream.readInt());
    byte[] payload = new byte[request.getLength()];
    int n = dataInputStream.read(payload);
    if (n != request.getLength()) {
        throw new IOException("读取请求格式出错!");
    }
    request.setPayload(payload);
    return request;
}

关于响应,实现相反,传入的 响应对象 相应的属性返回即可。

最后不要忘了刷新缓冲区

代码实现如下:

private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
    dataOutputStream.writeInt(response.getType());
    dataOutputStream.writeInt(response.getLength());
    dataOutputStream.write(response.getPayload());
    // 这个刷新缓冲区也是重要的操作!!
    dataOutputStream.flush();
}

🌴实现处理请求

先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid

再根据不同的 type, 分别处理不同的逻辑. (主要是调用virtualHost中不同的方法).
在这里插入图片描述

针对消息订阅操作,则需要在存在消息的时候通过回调,把响应结果写回给对应的客⼾端.

最后构造成统⼀的响应.

代码实现如下:

private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
    // 1. 把 request 中的 payload 做一个初步的解析.
    BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
    System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()
            + ", type=" + request.getType() + ", length=" + request.getLength());
    // 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.
    boolean ok = true;
    if (request.getType() == 0x1) {
        // 创建 channel
        sessions.put(basicArguments.getChannelId(), clientSocket);
        System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + basicArguments.getChannelId());
    } else if (request.getType() == 0x2) {
        // 销毁 channel
        sessions.remove(basicArguments.getChannelId());
        System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + basicArguments.getChannelId());
    } else if (request.getType() == 0x3) {
        // 创建交换机. 此时 payload 就是 ExchangeDeclareArguments 对象了.
        ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
        ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),
                arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());
    } else if (request.getType() == 0x4) {
        ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
        ok = virtualHost.exchangeDelete(arguments.getExchangeName());
    } else if (request.getType() == 0x5) {
        QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
        ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),
                arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());
    } else if (request.getType() == 0x6) {
        QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
        ok = virtualHost.queueDelete((arguments.getQueueName()));
    } else if (request.getType() == 0x7) {
        QueueBindArguments arguments = (QueueBindArguments) basicArguments;
        ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());
    } else if (request.getType() == 0x8) {
        QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;
        ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());
    } else if (request.getType() == 0x9) {
        BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
        ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),
                arguments.getBasicProperties(), arguments.getBody());
    } else if (request.getType() == 0xa) {
        BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
        ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),
                new Consumer() {
                    // 这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端
                    @Override
                    public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                        // 先知道当前这个收到的消息, 要发给哪个客户端.
                        // 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的
                        // socket 对象了, 从而可以往里面发送数据了
                        // 1. 根据 channelId 找到 socket 对象
                        Socket clientSocket = sessions.get(consumerTag);
                        if (clientSocket == null || clientSocket.isClosed()) {
                            throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
                        }
                        // 2. 构造响应数据
                        SubScribeReturns subScribeReturns = new SubScribeReturns();
                        subScribeReturns.setChannelId(consumerTag);
                        subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.
                        subScribeReturns.setOk(true);
                        subScribeReturns.setConsumerTag(consumerTag);
                        subScribeReturns.setBasicProperties(basicProperties);
                        subScribeReturns.setBody(body);
                        byte[] payload = BinaryTool.toBytes(subScribeReturns);
                        Response response = new Response();
                        // 0xc 表示服务器给消费者客户端推送的消息数据.
                        response.setType(0xc);
                        // response 的 payload 就是一个 SubScribeReturns
                        response.setLength(payload.length);
                        response.setPayload(payload);
                        // 3. 把数据写回给客户端.
                        //    注意! 此处的 dataOutputStream 这个对象不能 close !!!
                        //    如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.
                        //    此时就无法继续往 socket 中写入后续数据了.
                        DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
                        writeResponse(dataOutputStream, response);
                    }
                });
    } else if (request.getType() == 0xb) {
        // 调用 basicAck 确认消息.
        BasicAckArguments arguments = (BasicAckArguments) basicArguments;
        ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
    } else {
        // 当前的 type 是非法的.
        throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());
    }
    // 3. 构造响应
    BasicReturns basicReturns = new BasicReturns();
    basicReturns.setChannelId(basicArguments.getChannelId());
    basicReturns.setRid(basicArguments.getRid());
    basicReturns.setOk(ok);
    byte[] payload = BinaryTool.toBytes(basicReturns);
    Response response = new Response();
    response.setType(request.getType());
    response.setLength(payload.length);
    response.setPayload(payload);
    System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()
            + ", type=" + response.getType() + ", length=" + response.getLength());
    return response;
}

🌲实现 clearClosedSession

这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉

需要注意的是:

  • 我们在进行迭代的时候,不要直接删除,这样会影响集合类的结构

代码实现如下:

private void clearClosedSession(Socket clientSocket) {
    // 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉.
    List<String> toDeleteChannelId = new ArrayList<>();
    for (Map.Entry<String, Socket> entry : sessions.entrySet()) {
        if (entry.getValue() == clientSocket) {
            // 不能在这里直接删除!!!
            // 这属于使用集合类的一个大忌!!! 一边遍历, 一边删除!!!
            // sessions.remove(entry.getKey());
            toDeleteChannelId.add(entry.getKey());
        }
    }
    for (String channelId : toDeleteChannelId) {
        sessions.remove(channelId);
    }
    System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);
}

⭕总结

关于《【消息队列开发】 实现BrokerServer类——本体服务器》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/490512.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

java的ArrayList类

ArrayList<E>E是自定义数据类型 ArrayList类&#xff1a; 构造函数&#xff1a; 成员方法&#xff1a; public boolean add(E e)&#xff1a; 将指定元素加到集合末尾 Appends the specified element to the end of this list. public class Array {public static…

就业班 第二阶段 2401--3.26 day6 Shell初识 连接vscode

远程连接vs_code可能出现的问题 C:\Users\41703\.ssh 验证远程主机的身份&#xff0c;如果连不上vscode&#xff0c;可以尝试删除这里面的公钥代码。 重新安装那个扩展&#xff0c;排除扩展本身的问题 谁连过我&#xff0c;并操作了什么 curl https://gitea.beyourself.org.c…

【软件测试】功能测试/接口测试/自动化测试/性能测试/验收测试

软件测试的主要流程 一、测试主要的四个阶段 1.测试计划设计阶段&#xff1a;产品立项之后&#xff0c;进行需求分析&#xff0c;需求评审&#xff0c;业务需求评级&#xff0c;绘制业务流程图。确定测试负责人&#xff0c;开始制定测试计划&#xff1b; 2.测试准备阶段&…

C++11与thread相关使用(纯代码)

多线程创建 //多线程创建 void print(string s) {cout << "i am a new thread&#xff1a;" << s << endl; } int main() {//move将左值变成右值(右值引用过后的属性是左值)//thread t1(print, "t1");//thread t2(move(t1));//调用移动…

javaWeb校园二手平台项目

一、系统分析 1.1开发背景 随着全世界互联网技术的不断发展&#xff0c;各种基于互联网技术的网络应用不断涌现,网络技术正在不断的深入人们的生活。人们从Internet上获取信息、享受生活、交流感情、网上工作等。Internet正在迅速改变着人们的生活方式。 经过我国改革开放多年…

CI/CD实战-jenkins流水线 6

现最新版本没有该问题的出现 基于RBAC的身份授权&#xff1a; 安装插件&#xff1a; 新建测试用户 修改默认授权策略 新建的用户就没有任何权限 新建角色并授权 添加用户角色身份 pipeline 安装ssh agent插件 由于最新版的插件是有问题的&#xff0c;会有以下报错&#xff…

「媒体宣传」财经类媒体邀约资源有哪些?-51媒体

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 财经类媒体邀约资源包括但不限于以下几类&#xff1a; 商业杂志和报纸&#xff1a;可以邀请如《财经》、《新财富》、《经济观察报》等主流商业杂志和报纸。这些媒体通常具有较强的品牌影…

自信当众讲话:从紧张到自如的转变之路

自信当众讲话&#xff1a;从紧张到自如的转变之路 在人生的舞台上&#xff0c;当众讲话是每个人都可能面对的挑战。然而&#xff0c;对于许多人来说&#xff0c;站在众人面前讲话却是一件令人紧张甚至恐惧的事情。这种紧张感往往源于对自我能力的怀疑&#xff0c;对未知的恐惧…

【Node.js从基础到高级运用】十八、Node.js的安全性加固

引言 在Web开发中&#xff0c;安全性是一个不可忽视的话题。Node.js作为一个流行的后端平台&#xff0c;同样需要关注各种潜在的安全威胁&#xff0c;并采取措施加以防御。本文将介绍如何在Node.js应用中防御常见的Web攻击&#xff0c;以及如何使用安全相关的中间件来加固安全性…

Windows 频繁失去焦点分析

原文&#xff1a;https://blog.iyatt.com/?p14383 1 前言 刚才在打字的时候发现会随机失去焦点&#xff0c;然后又要用鼠标点一下正在输入的位置才能继续输入&#xff0c;特别烦。开始我怀疑是手碰到触摸板导致失去焦点&#xff0c;但是我用了差不多十年带触摸板的笔记本电脑…

[ C++ ] STL---仿函数与priority_queue

目录 仿函数 示例一&#xff1a; 示例二 : 常见的仿函数 priority_queue简介 priority_queue的常用接口 priority_queue的模拟实现 基础接口 push() 堆的向上调整算法 堆的插入 pop() 堆的向下调整算法 堆的删除 priority_queue最终实现 仿函数 仿函数&#xff…

基于stm32与TJC3224T124_011串口屏的PID调参器(附完整工程)

电赛在即&#xff0c;每次比赛调PID都是一件比较繁琐的事。每次都要在程序中改完再烧录到板子上&#xff0c;特别耗时。正好最近发现实验室的一块串口屏比较好玩。 于是就做了这个调PID的东西。它可以通过串口直接修改PID的值&#xff0c;从而达到快速调PID的目的。下面我将完整…

【Python】学习率调整策略详解和示例

学习率调整得当将有助于算法快速收敛和获取全局最优&#xff0c;以获得更好的性能。本文对学习率调度器进行示例介绍。 学习率调整的意义基础示例无学习率调整方法学习率调整方法一多因子调度器余弦调度器 结论 学习率调整的意义 首先&#xff0c;学习率的大小很重要。如果它…

音乐制作利器 :FL Studio21中文编曲音乐制作软件免费下载

一、引言 在音乐的世界里&#xff0c;每个人都有自己独特的音色和表达方式。而今天&#xff0c;我们要为你推荐一款能让您的音乐创作更上一层楼的神器——FL Studio21中文编曲音乐制作软件。这款功能强大的音乐制作软件&#xff0c;不仅拥有丰富的音色库和高效的编辑功能&#…

Quartz

Quartz 1.核心概念1.1 核心概念图1.2 demo 2.Job2.1为什么设计成JobDetailJob, 而不直接使用Job2.2 间隔执行时, 每次都会创建新的Job实例2.3 定时任务默认都是并发执行的&#xff0c;不会等待上一次任务执行完毕2.3.1 不允许并发执行 2.4 在运行时, 通过JobDataMap向Job传递数…

Python自动化测试环境搭建

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 关注公众号&#xff1a;互联网杂货铺&#xff0c;回复1 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 请事先自行安装好​​Pycharm​​​软件哦&#xff0c;我…

深度学习模型部署(十二)CUDA编程-绪

CUDA 运行时 API 与 CUDA 驱动 API 速度没有差别&#xff0c;实际中使用运行时 API 较多&#xff0c;运行时 API 是在驱动 API 上的一层封装。​ CUDA 是什么&#xff1f;​ CUDA(Compute Unified Device Architecture) 是 nvidia 推出的一个通用并行技术架构&#xff0c;用它…

基于冠豪猪优化器(CPO)的无人机路径规划

该优化算法是2024年新发表的一篇SCI一区top论文具有良好的实际应用和改进意义。一键运行main函数代码自动保存高质量图片 1、冠豪猪优化器 摘要&#xff1a;受冠豪猪(crest Porcupine, CP)的各种防御行为启发&#xff0c;提出了一种新的基于自然启发的元启发式算法——冠豪猪…

视觉轮速滤波融合1讲:理论推导

视觉轮速滤波融合理论推导 文章目录 视觉轮速滤波融合理论推导1 坐标系2 轮速计2.1 运动学模型2.2 外参 3 状态和协方差矩阵3.1 状态3.2 协方差矩阵 4 Wheel Propagation4.1 连续运动学4.2 离散积分4.2.1 状态均值递推4.2.2 协方差递推 5 Visual update5.1 视觉残差与雅可比5.2…

蓝桥杯2023年第十四届省赛真题-买瓜|DFS+剪枝

题目链接&#xff1a; 0买瓜 - 蓝桥云课 (lanqiao.cn) 蓝桥杯2023年第十四届省赛真题-买瓜 - C语言网 (dotcpp.com) &#xff08;蓝桥官网的数据要求会高一些&#xff09; 说明&#xff1a; 这道题可以分析出&#xff1a;对一个瓜有三种选择&#xff1a; 不拿&#xff0c…