目录
需求分析
RabbitMQ 客户端设定
ConnectionFactory(连接工厂)
Connection(连接)
Channel(通道)
针对 客户端 和 服务器 单元测试
需求分析
RabbitMQ 客户端设定
- 一个客户端可以有多个模块
- 每个模块均可以和 Broker Server 之间建立 "逻辑上的连接"(channel)
- 这几个模块的 channel 彼此之间是相互不影响的
- 同时这几个 channel 复用了同一个 TCP 连接
- 此处我们将仿照 RabbitMQ 客户端设定
ConnectionFactory(连接工厂)
- 这个类持有服务器的地址
- 该类用于创建 Connection 对象
具体代码编写:
import lombok.Getter; import lombok.Setter; import java.io.IOException; @Getter @Setter public class ConnectionFactory { // broker server 的 ip 地址 private String host; // broker server 的端口号 private int port; public Connection newConnection() throws IOException { Connection connection = new Connection(host,port); return connection; } // 访问 broker server 的哪个虚拟主机 // 下列几个属性暂时先不搞了 // private String virtualHostName; // private String username; // private String password; }
Connection(连接)
- 这个类表示一个 TCP 连接,持有 Socket 对象
- 该类用于写入请求/读取响应,管理多个 Channel 对象
具体代码编写:
- 编写成员变量
private Socket socket = null; // 需要管理多个 channel 使用一个 hash 表把若干个 channel 组织起来 private ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>(); private InputStream inputStream; private OutputStream outputStream; private DataOutputStream dataOutputStream; private DataInputStream dataInputStream; //用来处理 0xc 的回调,这里开销可能会很大,不希望把 扫描线程 阻塞住,因此使用 线程池 来处理 private ExecutorService callbackPool = null;
- 编写构造方法
- 此处不仅需要初始化成员变量,还需创建一个扫描线程,不停的从 socket 中读取响应数据,并将读取到的响应交给 dispatchResponse 方法执行
public Connection(String host, int port) throws IOException { socket = new Socket(host,port); inputStream = socket.getInputStream(); outputStream = socket.getOutputStream(); dataInputStream = new DataInputStream(inputStream); dataOutputStream = new DataOutputStream(outputStream); callbackPool = Executors.newFixedThreadPool(4); // 创建一个 扫描线程,由这个线程负责不停的从 socket 中取响应数据 把这个响应数据再交给对应的 channel 负责处理 Thread t = new Thread(() -> { try { while (!socket.isClosed()) { Response response = readResponse(); dispatchResponse(response); } }catch (SocketException e) { // 连接正常断开,此时这个异常直接忽略 System.out.println("[Connection] 连接正常断开!"); }catch (IOException | ClassNotFoundException | MqException e) { System.out.println("[Connection] 连接异常断开!"); e.printStackTrace(); } }); t.start(); }
- 编写 dispatchResponse 方法
- 使用该方法来区分,当前响应是一个针对控制请求的响应,还是服务器推送过来的消息
- 如果是服务器推送过来的消息,type = 0xc,也就需要执行回调,通过线程池来执行
- 如果只是一个普通的响应,就将该结果放到 channel 的 哈希表中
- 随后 channel 的 putReturns 方法会唤醒所有阻塞等待的线程,让这些线程从 哈希表中拿与自己 rid 相等的返回结果
// 使用这个方法来分别处理,当前的响应是一个针对控制请求的响应,还是服务器推送的消息 private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException { if(response.getType() == 0xc) { // 服务器推送来的消息数据 SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload()); // 根据 channelId 找到对应的 channel 对象 Channel channel = channelMap.get(subScribeReturns.getChannelId()); if(channel == null) { throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在!channelId = " + channel.getChannelId()); } // 执行该 channel 对象内部的回调 // 此处我们直接将回调方法交给线程池来执行,而不是用扫描线程来执行 callbackPool.submit(() -> { try { channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(),subScribeReturns.getBasicProperties(), subScribeReturns.getBody()); } catch (MqException | IOException e) { e.printStackTrace(); } }); }else { // 当前响应是针对刚才控制请求的响应 BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload()); // 把这个结果放到对应的 channel 的 hash 表中 Channel channel = channelMap.get(basicReturns.getChannelId()); if(channel == null) { throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在!channelId = " + channel.getChannelId()); } channel.putReturns(basicReturns); } }
- 编写 发送请求 与 读取响应 的方法
// 发送请求 public void writeRequest(Request request) throws IOException { dataOutputStream.writeInt(request.getType()); dataOutputStream.writeInt(request.getLength()); dataOutputStream.write(request.getPayload()); dataOutputStream.flush(); System.out.println("[Connection] 发送请求! type = " + request.getType() + ",length = " + request.getLength()); } // 读取响应 public Response readResponse() throws IOException { Response response = new Response(); response.setType(dataInputStream.readInt()); response.setLength(dataInputStream.readInt()); byte[] payload = new byte[response.getLength()]; int n = dataInputStream.read(payload); if(n != payload.length) { throw new IOException("读取的响应数据不完整!"); } response.setPayload(payload); System.out.println("[Connection] 收到响应! type = " + response.getType() + ",length = " + response.getLength()); return response; }
- 编写创建 channel 的方法
注意:
- 我们的代码中使用了多次 UUID
- message 的 id,就是用 UUID 当时加了个 M- 前缀
- 现在 channel 的 id 也是使用 UUID 此时加个 C- 前缀
- rid 也使用 UUID 来生成,加个前缀 R-
// 通过这个方法,在 Connection 中能够创建出一个 Channel public Channel createChannel() throws IOException, InterruptedException { String channelId = "C-" + UUID.randomUUID().toString(); Channel channel = new Channel(channelId,this); // 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中 channelMap.put(channelId,channel); // 同时也需要把 创建 channel 的这个消息也告诉服务器 boolean ok = channel.createChannel(); if(!ok) { // 服务器这里创建失败了!!整个这次创建 channel 操作不顺利! // 把刚才已经加入 hash 表的键值对,再删了 channelMap.remove(channelId); return null; } return channel; }
- 编写释放 Connection 相关资源的方法
public void close() { // 关闭 Connection 释放上述资源 try { callbackPool.shutdownNow(); channelMap.clear(); inputStream.close(); outputStream.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } }
Channel(通道)
这个类表示一个逻辑上的连接
该类用于提供一系列的方法,去和服务器提供的核心 API 相对应
客户端提供的这些方法,其方法内部就是发送了一个特定的请求
具体代码编写:
- 编写成员变量 与 构造方法
private String channelId; // 当前这个 channel 属于哪个连接的 private Connection connection; // 用来存储后续客户端收到的服务器的响应 private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>(); // 如果当前 Channel 订阅了某个队列,就需要在此处记录下对应回调是啥,当该队列的消息返回回来的时候,调用回调 // 此处约定一个 Channel 中只能有一个回调 private Consumer consumer = null; public Channel(String channelId,Connection connection) { this.channelId = channelId; this.connection = connection; }
- 实现 type = 0x1,即创建 channel
- 构造请求发给服务器,随后阻塞等待,唤醒后从 basicReturnsMap 中尝试获取响应结果
- 其余 type (0xc 除外,因为 0xc 只有响应没有请求)类型的请求与 0x1 大差不差,对着所需参数,构造即可
// 在这个方法中,和服务器进行交互,告知服务器,此处客户端创建了新的 channel 了 public Boolean createChannel() throws IOException, InterruptedException { // 对于创建 Channel 来说, BasicArguments basicArguments = new BasicArguments(); basicArguments.setChannelId(channelId); basicArguments.setRid(generateRid()); byte[] payload = BinaryTool.toBytes(basicArguments); Request request = new Request(); request.setType(0x1); request.setLength(payload.length); request.setPayload(payload); // 构造出完整请求之后,就可以发送这个请求了 connection.writeRequest(request); // 等待服务器的响应 BasicReturns basicReturns = waitResult(basicArguments.getRid()); return basicReturns.isOk(); } private String generateRid() { return "R-" + UUID.randomUUID().toString(); } // 期望使用这个方法来阻塞等待服务器的响应 private BasicReturns waitResult(String rid) throws InterruptedException { BasicReturns basicReturns = null; while ((basicReturns = basicReturnsMap.get(rid)) == null) { // 如果查询结果为 null,说明响应还没回来 // 此时就需要阻塞等待 synchronized (this) { wait(); } } // 读取成功之后,还需要把这个消息从 哈希表中给删除掉 basicReturnsMap.remove(rid); System.out.println("[Channel] 获取到服务器响应!rid = " + rid); return basicReturns; } public void putReturns(BasicReturns basicReturns) { basicReturnsMap.put(basicReturns.getRid(),basicReturns); synchronized (this) { // 当前也不知道有多少个线程在等待上述的这个响应 // 把所有的等待的线程都唤醒 notifyAll(); } }
- 特别注意 type = 0xa ,即 订阅消息
- 其次值得注意的是 consumerTag 使用 channelId 来表示
// 订阅消息 public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException, InterruptedException { // 先设置回调 if(this.consumer != null) { throw new MqException("该 channel 已经设置过消费消息的回调了,不能重复设置!"); } this.consumer = consumer; BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments(); basicConsumeArguments.setRid(generateRid()); basicConsumeArguments.setChannelId(channelId); // 此处 ConsumerTag 也使用 channelId 来表示 basicConsumeArguments.setConsumerTag(channelId); basicConsumeArguments.setQueueName(queueName); basicConsumeArguments.setAutoAck(autoAck); byte[] payload = BinaryTool.toBytes(basicConsumeArguments); Request request = new Request(); request.setType(0xa); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid()); return basicReturns.isOk(); }
针对 客户端 和 服务器 单元测试
- 编写测试用例代码是十分重要的!
package com.example.demo; import com.example.demo.common.Consumer; import com.example.demo.common.MqException; import com.example.demo.mqclient.Channel; import com.example.demo.mqclient.Connection; import com.example.demo.mqclient.ConnectionFactory; import com.example.demo.mqserver.BrokerServer; import com.example.demo.mqserver.core.BasicProperties; import com.example.demo.mqserver.core.ExchangeType; import org.apache.tomcat.util.http.fileupload.FileUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.boot.SpringApplication; import java.io.File; import java.io.IOException; public class MqClientTests { private BrokerServer brokerServer = null; private ConnectionFactory connectionFactory = null; private Thread t = null; @BeforeEach public void setUp() throws IOException { // 1、先启动服务器 DemoApplication.context = SpringApplication.run(DemoApplication.class); brokerServer = new BrokerServer(9090); t = new Thread(() -> { // 这个 start 方法会进入一个死循环,使用一个新的线程来运行 start 即可! try { brokerServer.start(); } catch (IOException e) { e.printStackTrace(); } }); t.start(); // 2、配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(9090); } @AfterEach public void tearDown() throws IOException { // 停止服务器 brokerServer.stop(); // t.join(); DemoApplication.context.close(); // 删除必要的文件 File file = new File("./data"); FileUtils.deleteDirectory(file); connectionFactory = null; } @Test public void testConnection() throws IOException { Connection connection = connectionFactory.newConnection(); Assertions.assertNotNull(connection); } @Test public void testChannel() throws IOException, InterruptedException { Connection connection = connectionFactory.newConnection(); Assertions.assertNotNull(connection); Channel channel = connection.createChannel(); Assertions.assertNotNull(channel); } @Test public void testExchange() throws IOException, InterruptedException { Connection connection = connectionFactory.newConnection(); Assertions.assertNotNull(connection); Channel channel = connection.createChannel(); Assertions.assertNotNull(channel); boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null); Assertions.assertTrue(ok); ok = channel.exchangeDelete("testExchange"); Assertions.assertTrue(ok); // 此处稳妥起见,把该关闭的要进行关闭 channel.close(); connection.close(); } @Test public void testQueue() throws IOException, InterruptedException{ Connection connection = connectionFactory.newConnection(); Assertions.assertNotNull(connection); Channel channel = connection.createChannel(); Assertions.assertNotNull(channel); boolean ok = channel.queueDeclare("testQueue",true,false,false,null); Assertions.assertTrue(ok); ok = channel.queueDelete("testQueue"); Assertions.assertTrue(ok); channel.close(); connection.close(); } @Test public void testBinding() throws IOException, InterruptedException{ Connection connection = connectionFactory.newConnection(); Assertions.assertNotNull(connection); Channel channel = connection.createChannel(); Assertions.assertNotNull(channel); boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null); Assertions.assertTrue(ok); ok = channel.queueDeclare("testQueue",true,false,false,null); Assertions.assertTrue(ok); ok = channel.queueBind("testQueue","testExchange","testBindingKey"); Assertions.assertTrue(ok); ok = channel.queueUnbind("testQueue","testExchange"); Assertions.assertTrue(ok); channel.close(); connection.close(); } @Test public void testMessage() throws IOException, InterruptedException, MqException { Connection connection = connectionFactory.newConnection(); Assertions.assertNotNull(connection); Channel channel = connection.createChannel(); Assertions.assertNotNull(channel); boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null); Assertions.assertTrue(ok); ok = channel.queueDeclare("testQueue",true,false,false,null); Assertions.assertTrue(ok); byte[] requestBody = "hello".getBytes(); ok = channel.basicPublish("testExchange","testQueue",null,requestBody); Assertions.assertTrue(ok); channel.basicConsume("testQueue", true, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException { System.out.println("[消费数据] 开始!"); System.out.println("consumeTag = " + consumerTag); System.out.println("basicProperties = " + basicProperties); Assertions.assertArrayEquals(requestBody,body); System.out.println("[消费数据] 结束!"); } }); Assertions.assertTrue(ok); Thread.sleep(500); channel.close(); connection.close(); } }