文章目录
- 🍃前言
- 🎋创建 BrokerServer 类
- 🎍启动与停止服务器
- 🍀实现处理连接
- 🎄实现 readRequest 与 writeResponse
- 🌴实现处理请求
- 🌲实现 clearClosedSession
- ⭕总结
🍃前言
本次开发任务
- 实现 BrokerServer 类,也就是咱们消息队列的本体服务器。
其实本质上就是一个 TCP 的服务器。
🎋创建 BrokerServer 类
创建 BrokerServer 类如下:
public class BrokerServer {
// 当前程序只考虑⼀个虚拟主机的情况.
private VirtualHost virtualHost = new VirtualHost("default-VirtualHost");
// key 为 channelId, value 为 channel 对应的 socket 对象.
private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>
private ServerSocket serverSocket;
private ExecutorService executorService;
private volatile boolean runnable = true;
}
- virtualHost表示服务器持有的虚拟主机.队列,交换机,绑定,消息都是通过虚拟主机管理.
- sessions ⽤来管理所有的客⼾端的连接. 记录每个客户端的 socket.
- serverSocket 是服务器自身的 socket
- executorService 这个线程池用来处理响应
- runnable 这个标志位用来控制服务器的运行停⽌
🎍启动与停止服务器
代码实现如下:
public BrokerServer(int port) throws IOException {
serverSocket = new ServerSocket(port);
}
public void start() throws IOException {
System.out.println("[BrokerServer] 启动!");
executorService = Executors.newCachedThreadPool();
try {
while (runnable) {
Socket clientSocket = serverSocket.accept();
// 把处理连接的逻辑丢给这个线程池.
executorService.submit(() -> {
processConnection(clientSocket);
});
}
} catch (SocketException e) {
System.out.println("[BrokerServer] 服务器停止运行!");
// e.printStackTrace();
}
}
// 一般来说停止服务器, 就是直接 kill 掉对应进程就行了.
// 此处还是搞一个单独的停止方法. 主要是用于后续的单元测试.
public void stop() throws IOException {
runnable = false;
// 把线程池中的任务都放弃了. 让线程都销毁.
executorService.shutdownNow();
serverSocket.close();
}
🍀实现处理连接
通过这个方法, 来处理一个客户端的连接.
我们使用 InputStream
与 OutputStream
,由于后面要按照特定格式来读取并解析.
此时就需要用到 DataInputStream
和 DataOutputStream
在这一个连接中, 可能会涉及到多个请求和响应,我们使用一个while(true)
来进行实现
在此循环我们要做的事情有三件:
- 读取请求并解析
- 根据请求计算响应
- 把响应写回客户端
具体处理逻辑,我们后面再仔细实现,
那么我们怎么结束这个循环呢?
注意我们上面使用的是 DataInputStream
和 DataOutputStream
,当没有数据进行读取的时候,就会进行抛出异常而结束循环
最后当连接处理完了, 就需要记得关闭 socket, 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.
代码实现如下:
// 通过这个方法, 来处理一个客户端的连接.
// 在这一个连接中, 可能会涉及到多个请求和响应.
private void processConnection(Socket clientSocket) {
try (InputStream inputStream = clientSocket.getInputStream();
OutputStream outputStream = clientSocket.getOutputStream()) {
// 这里需要按照特定格式来读取并解析. 此时就需要用到 DataInputStream 和 DataOutputStream
try (DataInputStream dataInputStream = new DataInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
while (true) {
// 1. 读取请求并解析.
Request request = readRequest(dataInputStream);
// 2. 根据请求计算响应
Response response = process(request, clientSocket);
// 3. 把响应写回给客户端
writeResponse(dataOutputStream, response);
}
}
} catch (EOFException | SocketException e) {
// 对于这个代码, DataInputStream 如果读到 EOF , 就会抛出一个 EOFException 异常.
// 需要借助这个异常来结束循环
System.out.println("[BrokerServer] connection 关闭! 客户端的地址: " + clientSocket.getInetAddress().toString()
+ ":" + clientSocket.getPort());
} catch (IOException | ClassNotFoundException | MqException e) {
System.out.println("[BrokerServer] connection 出现异常!");
e.printStackTrace();
} finally {
try {
// 当连接处理完了, 就需要记得关闭 socket
clientSocket.close();
// 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.
clearClosedSession(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
🎄实现 readRequest 与 writeResponse
关于读取请求,我们前面定义了一个类为 Request ,此时我们构造相应的对象,并对该对象相应属性进行填充即可。
代码实现如下:
private Request readRequest(DataInputStream dataInputStream) throws IOException {
Request request = new Request();
request.setType(dataInputStream.readInt());
request.setLength(dataInputStream.readInt());
byte[] payload = new byte[request.getLength()];
int n = dataInputStream.read(payload);
if (n != request.getLength()) {
throw new IOException("读取请求格式出错!");
}
request.setPayload(payload);
return request;
}
关于响应,实现相反,传入的 响应对象 相应的属性返回即可。
最后不要忘了刷新缓冲区
代码实现如下:
private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
dataOutputStream.writeInt(response.getType());
dataOutputStream.writeInt(response.getLength());
dataOutputStream.write(response.getPayload());
// 这个刷新缓冲区也是重要的操作!!
dataOutputStream.flush();
}
🌴实现处理请求
先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid
再根据不同的 type, 分别处理不同的逻辑. (主要是调用virtualHost中不同的方法).
针对消息订阅操作,则需要在存在消息的时候通过回调,把响应结果写回给对应的客⼾端.
最后构造成统⼀的响应.
代码实现如下:
private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
// 1. 把 request 中的 payload 做一个初步的解析.
BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()
+ ", type=" + request.getType() + ", length=" + request.getLength());
// 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.
boolean ok = true;
if (request.getType() == 0x1) {
// 创建 channel
sessions.put(basicArguments.getChannelId(), clientSocket);
System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + basicArguments.getChannelId());
} else if (request.getType() == 0x2) {
// 销毁 channel
sessions.remove(basicArguments.getChannelId());
System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + basicArguments.getChannelId());
} else if (request.getType() == 0x3) {
// 创建交换机. 此时 payload 就是 ExchangeDeclareArguments 对象了.
ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),
arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());
} else if (request.getType() == 0x4) {
ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
ok = virtualHost.exchangeDelete(arguments.getExchangeName());
} else if (request.getType() == 0x5) {
QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),
arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());
} else if (request.getType() == 0x6) {
QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
ok = virtualHost.queueDelete((arguments.getQueueName()));
} else if (request.getType() == 0x7) {
QueueBindArguments arguments = (QueueBindArguments) basicArguments;
ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());
} else if (request.getType() == 0x8) {
QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;
ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());
} else if (request.getType() == 0x9) {
BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),
arguments.getBasicProperties(), arguments.getBody());
} else if (request.getType() == 0xa) {
BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),
new Consumer() {
// 这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
// 先知道当前这个收到的消息, 要发给哪个客户端.
// 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的
// socket 对象了, 从而可以往里面发送数据了
// 1. 根据 channelId 找到 socket 对象
Socket clientSocket = sessions.get(consumerTag);
if (clientSocket == null || clientSocket.isClosed()) {
throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
}
// 2. 构造响应数据
SubScribeReturns subScribeReturns = new SubScribeReturns();
subScribeReturns.setChannelId(consumerTag);
subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.
subScribeReturns.setOk(true);
subScribeReturns.setConsumerTag(consumerTag);
subScribeReturns.setBasicProperties(basicProperties);
subScribeReturns.setBody(body);
byte[] payload = BinaryTool.toBytes(subScribeReturns);
Response response = new Response();
// 0xc 表示服务器给消费者客户端推送的消息数据.
response.setType(0xc);
// response 的 payload 就是一个 SubScribeReturns
response.setLength(payload.length);
response.setPayload(payload);
// 3. 把数据写回给客户端.
// 注意! 此处的 dataOutputStream 这个对象不能 close !!!
// 如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.
// 此时就无法继续往 socket 中写入后续数据了.
DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
writeResponse(dataOutputStream, response);
}
});
} else if (request.getType() == 0xb) {
// 调用 basicAck 确认消息.
BasicAckArguments arguments = (BasicAckArguments) basicArguments;
ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
} else {
// 当前的 type 是非法的.
throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());
}
// 3. 构造响应
BasicReturns basicReturns = new BasicReturns();
basicReturns.setChannelId(basicArguments.getChannelId());
basicReturns.setRid(basicArguments.getRid());
basicReturns.setOk(ok);
byte[] payload = BinaryTool.toBytes(basicReturns);
Response response = new Response();
response.setType(request.getType());
response.setLength(payload.length);
response.setPayload(payload);
System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()
+ ", type=" + response.getType() + ", length=" + response.getLength());
return response;
}
🌲实现 clearClosedSession
这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉
需要注意的是:
- 我们在进行迭代的时候,不要直接删除,这样会影响集合类的结构
代码实现如下:
private void clearClosedSession(Socket clientSocket) {
// 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉.
List<String> toDeleteChannelId = new ArrayList<>();
for (Map.Entry<String, Socket> entry : sessions.entrySet()) {
if (entry.getValue() == clientSocket) {
// 不能在这里直接删除!!!
// 这属于使用集合类的一个大忌!!! 一边遍历, 一边删除!!!
// sessions.remove(entry.getKey());
toDeleteChannelId.add(entry.getKey());
}
}
for (String channelId : toDeleteChannelId) {
sessions.remove(channelId);
}
System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);
}
⭕总结
关于《【消息队列开发】 实现BrokerServer类——本体服务器》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下