根据源码,模拟实现 RabbitMQ - 网络通讯设计,实现客户端Connection、Channel(完结)

目录

一、客户端代码实现

1.1、需求分析

1.2、具体实现

1)实现 ConnectionFactory

2)实现 Connection

3)实现 Channel

二、编写 Demo 

2.1、实例 

2.1、实例演示


一、客户端代码实现


1.1、需求分析

RabbitMQ 的客户端设定:一个客户端可以有多个模块,每个模块都可以和 broker server 之间建立 “逻辑上的连接” (channel),这几个模块的channel 彼此之间是互相不影响的,同时这几个 channel 又复用的同一个 TCP 连接,省去了频繁 建立/销毁 TCP 连接的开销(三次握手、四次挥手......).

这里,我们也按照这样的逻辑实现 消息队列 的客户端,主要涉及到以下三个核心类:

  1. ConnectionFactory:连接工厂,这个类持有服务器的地址,主要功能就是创建 Connection 对象.
  2. Connection:表示一个 TCP连接,持有 Socket 对象,用来 写入请求/读取响应,管理多个Channel 对象.
  3. Channel:表示一个逻辑上的连接,需要提供一系列的方法,去和服务器提供的核心 API 对应(客户端提供的这些方法的内部,就是写入了一个特定的请求,然后等待服务器响应).

1.2、具体实现

1)实现 ConnectionFactory

主要用来创建 Connection 对象.

public class ConnectionFactory {

    //broker server 的 ip 地址
    private String host;
    //broker server 的端口号
    private int port;

//    //访问 broker server 的哪个虚拟主机
//    //这里暂时先不涉及
//    private String virtualHostName;
//    private String username;
//    private String password;

    public Connection newConnection() throws IOException {
        Connection connection = new Connection(host, port);
        return connection;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }
}

2)实现 Connection

属性如下

    private Socket socket;
    //一个 socket 连接需要管理多个 channel
    private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
    private InputStream inputStream;
    private OutputStream outputStream;
    // DataXXX 主要用来 读取/写入 特定格式数据(例如 readInt())
    private DataInputStream dataInputStream;
    private DataOutputStream dataOutputStream;
    //用来处理 0xc 的回调,这里开销可能会很大,不希望把 Connection 阻塞住,因此使用 线程池 来处理
    private ExecutorService callbackPool;

构造如下

这里不光需要初始化属性,还需要创建一个扫描线程,由这个线程负责不停的从 socket 中读取响应数据,把这个响应数据再交给对应的 channel 负责处理

    public Connection(String host, int port) throws IOException {
        socket = new Socket(host, port);
        inputStream = socket.getInputStream();
        outputStream = socket.getOutputStream();
        dataInputStream = new DataInputStream(inputStream);
        dataOutputStream = new DataOutputStream(outputStream);

        callbackPool = Executors.newFixedThreadPool(4);

        //创建一个扫描线程,由这个线程负责不停的从 socket 中读取响应数据,把这个响应数据再交给对应的 channel 负责处理
        Thread t = new Thread(() -> {
            try {
                while(!socket.isClosed()) {
                    Response response = readResponse();
                    dispatchResponse(response);
                }
            } catch (SocketException e) {
                //连接正常断开的,此时这个异常可以忽略
                System.out.println("[Connection] 连接正常断开!");
            } catch(IOException | ClassNotFoundException | MqException e) {
                System.out.println("[Connection] 连接异常断开!");
                e.printStackTrace();
            }
        });
        t.start();
    }

释放 Connection 相关资源

    public void close() {
        try {
            callbackPool.shutdown();
            channelMap.clear();
            inputStream.close();
            outputStream.close();
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

使用这个方法来区别,当前的响应是一个针对控制请求的响应,还是服务器推送过来的消息.

如果是服务器推送过来的消息,就响应表明是 0xc,也就是一个回调,通过线程池来进行处理;

如果只是一个普通的响应,就把这个结果放到 channel 的 哈希表中(随后 channel 会唤醒所有阻塞等待响应的线程,去 map 中拿数据).

    public void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
        if(response.getType() == 0xc) {
            //服务器推送过来的消息数据
            SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
            //根据 channelId 找到对应的 channel 对象
            Channel channel = channelMap.get(subScribeReturns.getChannelId());
            if(channel == null) {
                throw new MqException("[Connection] 该消息对应的 channel 再客户端中不存在!channelId=" + channel.getChannelId());
            }
            //执行该 channel 对象内部的回调(这里的开销未知,有可能很大,同时不希望把这里阻塞住,所以使用线程池来执行)
            callbackPool.submit(() -> {
                try {
                    channel.getConsumer().handlerDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),
                            subScribeReturns.getBody());
                } catch(MqException | IOException e) {
                    e.printStackTrace();
                }
            });
        } else {
            //当前响应是针对刚才的控制请求的响应
            BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
            //把这个结果放到 channel 的 哈希表中
            Channel channel = channelMap.get(basicReturns.getChannelId());
            if(channel == null) {
                throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在!channelId=" + channel.getChannelId());
            }
            channel.putReturns(basicReturns);
        }
    }

发送请求和读取响应

    /**
     * 发送请求
     * @param request
     * @throws IOException
     */
    public void writeRequest(Request request) throws IOException {
        dataOutputStream.writeInt(request.getType());
        dataOutputStream.writeInt(request.getLength());
        dataOutputStream.write(request.getPayload());
        dataOutputStream.flush();
        System.out.println("[Connection] 发送请求!type=" + request.getType() + ", length=" + request.getLength());
    }

    /**
     * 读取响应
     */
    public Response readResponse() throws IOException {
        Response response = new Response();
        response.setType(dataInputStream.readInt());
        response.setLength(dataInputStream.readInt());
        byte[] payload = new byte[response.getLength()];
        int n = dataInputStream.read(payload);
        if(n != response.getLength()) {
            throw new IOException("读取的响应格式不完整! n=" + n + ", responseLen=" + response.getLength());
        }
        response.setPayload(payload);
        System.out.println("[Connection] 收到响应!type=" + response.getType() + ", length=" + response.getLength());
        return response;
    }

在 Connection 中提供创建 Channel 的方法

    public Channel createChannel() throws IOException {
        String channelId = "C-" + UUID.randomUUID().toString();
        Channel channel = new Channel(channelId, this);
        //放到 Connection 管理的 channel 的 Map 集合中
        channelMap.put(channelId, channel);
        //同时也需要把 “创建channel” 这个消息告诉服务器
        boolean ok = channel.createChannel();
        if(!ok) {
            //如果创建失败,就说明这次创建 channel 操作不顺利
            //把刚才加入 hash 表的键值对再删了
            channelMap.remove(channelId);
            return null;
        }
        return channel;
    }

Ps:代码中使用了很多次 UUID ,这里我们和之前一样,使用加前缀的方式来进行区分.

3)实现 Channel

属性和构造如下

    private String channelId;
    // 当前这个 channel 是属于哪一个连接
    private Connection connection;
    //用来存储后续客户端收到的服务器响应,已经辨别是哪个响应(要对的上号) key 是 rid
    private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
    //如果当前 Channel 订阅了某个队列,就需要记录对应的回调是什么,当该队列消息返回回来的时候,调用回调
    //此处约定一个 Channel 只能有一个回调
    private Consumer consumer;

    public Channel(String channelId, Connection connection) {
        this.channelId = channelId;
        this.connection = connection;
    }

    public String getChannelId() {
        return channelId;
    }

    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }

    public Connection getConnection() {
        return connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    public ConcurrentHashMap<String, BasicReturns> getBasicReturnsMap() {
        return basicReturnsMap;
    }

    public void setBasicReturnsMap(ConcurrentHashMap<String, BasicReturns> basicReturnsMap) {
        this.basicReturnsMap = basicReturnsMap;
    }

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;

实现 0x1 创建 channel

主要就是构造构造出 request,然后发送请求到 BrokerServer 服务器,阻塞等待服务器响应.

    /**
     * 0x1
     * 和服务器进行交互,告诉服务器,此处客户端已经创建了新的 channel 了
     * @return
     */
    public boolean createChannel() throws IOException {
        //构造 payload
        BasicArguments arguments = new BasicArguments();
        arguments.setChannelId(channelId);
        arguments.setRid(generateRid());
        byte[] payload = BinaryTool.toBytes(arguments);
        //发送请求
        Request request = new Request();
        request.setType(0x1);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);

        //等待服务器响应
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    /**
     * 生成 rid
     * @return
     */
    public String generateRid() {
        return "R-" + UUID.randomUUID().toString();
    }


    /**
     * 阻塞等待服务器响应
     * @param rid
     * @return
     */
    private BasicReturns waitResult(String rid) {
        BasicReturns basicReturns = null;
        while((basicReturns = basicReturnsMap.get(rid)) == null) {
            //查询结果为空,就说明咱们去菜鸟驿站要取的包裹还没到
            //此时就需要阻塞等待
            synchronized (this) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        basicReturnsMap.remove(rid);
        return basicReturns;
    }


    /**
     * 由 Connection 中的方法调用,区分为普通响应之后触发
     * 将响应放回到 channel 管理的 map 中,并唤醒所有线程
     * @param basicReturns
     */
    public void putReturns(BasicReturns basicReturns) {
        basicReturnsMap.put(basicReturns.getRid(), basicReturns);
        synchronized (this) {
            //当前也不知道有多少线程再等待上述的这个响应
            //因此就把所有等待的线程唤醒
            notifyAll();
        }
    }

Ps:其他的 请求操作也和 0x1 的方式几乎一样,这里不一一展示了,主要说一下 0xa

0xa 消费者订阅队列消息,这里要先设置好回调到属性中,方便 Connection 通过这个属性来 处理回调

值得注意的一点, 我们约定 channelId 就是 consumerTag

    public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws IOException, MqException {
        //先设置回调
        if(this.consumer != null) {
            throw new MqException("该 channel 已经设置过消费消息回调了,不能重复!");
        }
        this.consumer = consumer;
        BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();
        basicConsumeArguments.setRid(generateRid());
        basicConsumeArguments.setChannelId(channelId);
        basicConsumeArguments.setConsumerTag(channelId); // 注意:此处的 consumerTag 使用 channelId 来表示
        basicConsumeArguments.setQueueName(queueName);
        basicConsumeArguments.setAutoAck(autoAck);
        byte[] payload = BinaryTool.toBytes(basicConsumeArguments);

        Request request = new Request();
        request.setType(0xa);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());
        return basicReturns.isOk();
    }

二、编写 Demo 


2.1、实例 

到了这里基本就实现完成了一个 跨主机/服务器 之间的生产者消费者模型了(功能上可以满足日常开发对消息队列的使用),但是还具有很强的扩展性,可以继续参考 RabbitMQ,如果有想法的,或者是遇到不会的问题,可以私信我~

以下我来我来编写一个 demo,模拟 跨主机/服务器 之间的生产者消费者模型(这里为了方便,就在本机演示).

首先再 spring boot 项目的启动类中 创建 BrokerServer ,绑定端口号,然后启动

@SpringBootApplication
public class RabbitmqProjectApplication {
    public static ConfigurableApplicationContext context;
    public static void main(String[] args) throws IOException {
        context = SpringApplication.run(RabbitmqProjectApplication.class, args);
        BrokerServer brokerServer = new BrokerServer(9090);
        brokerServer.start();
    }

}

编写消费者

public class DemoConsumer {

    public static void main(String[] args) throws IOException, MqException, InterruptedException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //创建交换机和队列(这里和生产者创建交换机和队列不冲突,谁先启动,就按照谁的创建,即使已经存在交换机和队列,再创建也不会有什么副作用)
        channel.exchangeDeclare("demoExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("demoQueue", true, false, false, null);

        //消费者消费消息
        channel.basicConsume("demoQueue", true, new Consumer() {
            @Override
            public void handlerDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("开销消费");
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("body=" + new String(body));
                System.out.println("消费完毕");
            }
        });

        //由于消费者不知道生产者要生产多少,就在这里通过循环模拟一直等待
        while(true) {
            Thread.sleep(500);
        }
    }

}

编写生产者

public class DemoProducer {

    public static void main(String[] args) throws IOException, InterruptedException {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //创建交换机和队列(这里和消费者创建交换机和队列不冲突,谁先启动,就按照谁的创建,即使已经存在交换机和队列,再创建也不会有什么副作用)
        channel.exchangeDeclare("demoExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("demoQueue", true, false, false, null);

        //生产消息
        byte[] body1 = "Im cyk1 !".getBytes();
        channel.basicPublish("demoExchange", "demoQueue", null, body1);

        Thread.sleep(500);

        //关闭连接
        channel.close();
        connection.close();
    }

}

2.1、实例演示

启动 spring boot 项目(启动 BrokerServer)

运行消费者(消费者和生产者谁先后运行都可以)

 

运行生产者

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/91780.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Git想远程仓库与推送以及拉取远程仓库

理解分布式版本控制系统 1.中央服务器 我们⽬前所说的所有内容&#xff08;⼯作区&#xff0c;暂存区&#xff0c;版本库等等&#xff09;&#xff0c;都是在本地也就是在你的笔记本或者计算机上。⽽我们的 Git 其实是分布式版本控制系统&#xff01;什么意思呢? 那我们多人…

【Linux】进程状态|僵尸进程|孤儿进程

前言 本文继续深入讲解进程内容——进程状态。 一个进程包含有多种状态&#xff0c;有运行状态&#xff0c;阻塞状态&#xff0c;挂起状态&#xff0c;僵尸状态&#xff0c;死亡状态等等&#xff0c;其中&#xff0c;阻塞状态还包含深度睡眠和浅度睡眠状态。 个人主页&#xff…

sin(A)的意义

若存在矩阵A&#xff0c;则sin(A)表示对于矩阵A的每一个元素&#xff0c;进行对应的函数运算。 如:

[系统] 电脑突然变卡 / 电脑突然** / 各种突发情况解决思路

今天来公司办公&#xff0c;开机之后发现电脑出现各种问题&#xff0c;死机、卡顿、点什么都加载&#xff0c;甚至开一个文件夹要1分钟才能打开&#xff0c;花了2个小时才解决&#xff0c;走了很多弯路&#xff0c;其实早点想通&#xff0c;5分钟就能解决问题&#xff0c;所以打…

【ArcGIS微课1000例】0074:ArcGIS热点分析(Getis-Ord Gi*)---犯罪率热点图

严重声明:本文来自专栏《ArcGIS微课1000例:从点滴到精通》,为CSDN博客专家刘一哥GIS原创,原文及专栏地址为:(https://blog.csdn.net/lucky51222/category_11121281.html),谢绝转载或爬取!!! 文章目录 一、热点分析工具介绍二、ArcGIS热点分析案例1. 普通热点分析2. 加…

开源软件的可访问性:让技术更加包容

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

【⑮MySQL | 视图】概述 | 创建 | 查看 | 更新 | 修改 | 删除

前言 ✨欢迎来到小K的MySQL专栏&#xff0c;本节将为大家带来MySQL视图概述 | 创建 | 查看 | 更新 | 修改 | 删除的分享✨ 目录 前言1.视图概述2.创建视图3.查看视图4.更新视图数据5.修改视图6.删除视图总结 1.视图概述 1.1 为什么使用视图&#xff1f; 视图一方面可以帮我们使…

KMP算法开荒

文章目录 一 、前言二、 暴力解法三、KMP算法原理3.1 自动子串的指针3.2 跳过多少个字符3.3 next数组 - 暴力3.4 next数组 - 求解 四 KMP实现 一 、前言 字符串匹配 import re print(re.search(www, www.runoob.com).span()) # 在起始位置匹配 print(re.search(com, www.run…

研磨设计模式day13组合模式

目录 场景 不用模式实现 代码实现 有何问题 解决方案 代码改造 组合模式优缺点 思考 何时选用 场景 不用模式实现 代码实现 叶子对象 package day14组合模式;/*** 叶子对象*/ public class Leaf {/*** 叶子对象的名字*/private String name "";/**…

Linux驱动开发一、RK3568把hello编译到Linux内核中运行。‘rk_vendor_read’未定义的引用

1、在字符设备目录下建立hello目录 ~/Linux/rk356x_linux/kernel/drivers/char/hello 2、进入hello目录&#xff0c;新建hello.c、Makefile、Kconfig三个文件 3、Kconfig是打开make menuconfig配置界面是后的选项&#xff0c;这Kconfig是在字符设备下的。 config HELLOtrist…

VX小程序 实现区域转图片预览

图例 方法一 1、安装插件 wxml2canvas npm install --save wxml2canvas git地址参照&#xff1a;https://github.com/wg-front/wxml2canvas 2、类型 // 小程序页面 let data{list:[{type:wxml,class:.test_center .draw_canvas,limit:.test_center,x:0,y:0}] } 3、数据结…

熊猫:完整的初学者指南

pandas&#xff1a;完整的初学者指南 一、说明 在你的Python开发人员或数据科学之旅中&#xff0c;你可能已经多次遇到“熊猫”这个词&#xff0c;但仍然需要弄清楚它的作用。以及数据和熊猫之间的关系。所以让我向你解释一下。 根据最新估计&#xff0c;每天创建 328.77 亿 TB…

36k字从Attention讲解Transformer及其在Vision中的应用(pytorch版)

文章目录 0.卷积操作1.注意力1.1 注意力概述(Attention)1.1.1 Encoder-Decoder1.1.2 查询、键和值1.1.3 注意力汇聚: Nadaraya-Watson 核回归1.2 注意力评分函数1.2.1 加性注意力1.2.2 缩放点积注意力1.3 自注意力(Self-Attention)1.3.1 自注意力的定义和计算1.3.2 自注意…

python爬虫11:实战3

python爬虫11&#xff1a;实战3 前言 ​ python实现网络爬虫非常简单&#xff0c;只需要掌握一定的基础知识和一定的库使用技巧即可。本系列目标旨在梳理相关知识点&#xff0c;方便以后复习。 申明 ​ 本系列所涉及的代码仅用于个人研究与讨论&#xff0c;并不会对网站产生不好…

nonlocal关键字声明

nonlocal关键字声明 作用 使得内层函数可以使用/修改外层函数的变量 值得注意的是&#xff0c;在未使用nonlocal声明时 对于外层函数中的可变对象&#xff0c;内层函数即可访问&#xff0c;也可以修改 def outer():x, y [1], [2]def inner(z):x.append(1)print(x)print(z)r…

历史最佳二季度表现后,爱奇艺想为用户提供更多价值

以爱奇艺为首&#xff0c;随着长视频平台相继转变运营思路&#xff0c;走向盈利目标&#xff0c;最早完成蜕变的爱奇艺&#xff0c;已开始迈向下一阶段。 近日&#xff0c;爱奇艺发布了截至6月30日的2023年第二季度财报。除了依然亮眼的内容表现、业绩成果外&#xff0c;爱奇艺…

1.Flink源码编译

目录 1.环境版本 1.1 jdk 1.2.maven 1.3.node 1.4.scala 2.下载flink源码 3.编译源码 4.idea打开flink源码 5.运行wordcount 1.环境版本 软件地址 链接&#xff1a;https://pan.baidu.com/s/1ZxYydR8rBfpLCcIdaOzxVg 提取码&#xff1a;12xq 1.1 jdk 1.2 maven 1.…

Bean 作用域和生命周期

前言&#xff1a; &#x1f4d5;作者简介&#xff1a;热爱编程的小七&#xff0c;致力于C、Java、Python等多编程语言&#xff0c;热爱编程和长板的运动少年&#xff01; &#x1f4d8;相关专栏Java基础语法&#xff0c;JavaEE初阶&#xff0c;数据库&#xff0c;数据结构和算法…

GPT4模型架构的泄漏与分析

迄今为止&#xff0c;GPT4 模型是突破性的模型&#xff0c;可以免费或通过其商业门户&#xff08;供公开测试版使用&#xff09;向公众提供。它为许多企业家激发了新的项目想法和用例&#xff0c;但对参数数量和模型的保密却扼杀了所有押注于第一个 1 万亿参数模型到 100 万亿参…

【Mac】编译Spring 源码和Idea导入

今天我们开始Spring源码的阅读之旅。阅读Spring的源码的第一步当然是编译Spring源码。首先我们要去GitHub上将spring源码给clone下来。 笔者编译环境如下&#xff1a; Spring版本&#xff1a;5.28 https://github.com/spring-projects/spring-framework/tree/v5.2.8.RELEASE …