六. 自定义协议
1. 需求分析
在目前的RPC框架中,采用Vert.x的HttpServer作为服务提供者的Web服务器,底层使用HTTP协议进行网络传输。但HTTP协议只是RPC框架网络传输的一种可选方式,还有其它更好的选择。
RPC框架注重性能,但HTTP协议中的头部信息、请求响应等复杂且繁重,会影响网络传输性能。
查看任意一个网页,即可发现大量响应标头和请求头信息,
因此,通过自定义一套RPC协议,来实现性能更高且更安全的RPC框架。
2. 设计方案
(1)自定义网络传输
目标:选择一个能够高性能通信的网络协议和传输方式。
由于HTTP协议头部信息较重,会影响传输性能,并且其本身属于无状态协议,即每个HTTP请求相互独立,每次响应都需要重新建立和关闭连接,也会影响性能。此外,HTTP属于应用层协议,性能不如传输层的TCP协议高。
因此,采用TCP协议进行网络传输,已追求更高性能与灵活性。
(2)自定义消息结构
目标:用最少的空间传递需要的信息。
最少的空间:
选择轻量级类型,如byte字节类型,占用1个字节,8个bit。
而其它常用的数据类型,如整型int,占用4个字节,32个bit;长整型long,占用8个字节,64个bit;浮点型float,占用4个字节等。这些类型相对较重,占用字节数较多。
需要的信息:
分析HTTP请求结构,能够得到RPC消息所需的信息,
- 魔数:用来安全校验,防止服务器处理非框架消息(类似于HTTPS的安全证书)。
- 版本号:保证请求和响应的一致性(类似于HTTP协议的1.0/2.0版本)。
- 序列化方式:告诉服务端和客户端如何解析数据(类似于HTTP的Content—Type)。
- 类型:标记消息是Request或Response,或者是heartBeat(类似于HTTP的请求响应头)。
- 状态:如果是响应,记录响应的结果(类似于HTTP的200状态码)。
- 请求id:用于唯一标识请求,因为TCP是双向通信,需要有唯一标识来追踪每个请求。
- 请求体:即数据内容(类似于HTTP请求中发送的RpcRequest)。
- 请求体数据长度:保证完整地获取到请求体内容。
HTTP协议有专门的key/value结构,比较容易找到完整的请求体数据。但TCP协议本身存在半包和粘包问题,每次传输的数据可能是不完整的,因此需要在消息头中增加一个字段请求体数据长度,保证能够完整地获取到信息内容。
综上,自定义消息结构设计如下,
这种消息结构本质上就是拼接在一起的一个字节数组。通过这种方式,我们不需要额外记录头部信息,而通过读取某个或某段字节来获取到具体内容。比如读取第一个字节,得到魔数。
该协议设计参考了Dubbo的协议架构,
3. 具体实现
(1)消息结构
创建protocol包,将所有与自定义协议有关的代码都放在该包下。
创建协议消息类ProtocolMessage。
消息头封装为内部类,消息体使用泛型:
package com.khr.krpc.protocol;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 协议消息结构
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProtocolMessage<T> {
/**
* 消息头
*/
private Header header;
/**
* 消息体(请求或响应对象)
*/
private T body;
/**
* 协议消息头
*/
@Data
public static class Header {
/**
* 魔数,保证安全性
*/
private byte magic;
/**
* 版本号
*/
private byte version;
/**
* 序列化器
*/
private byte serializer;
/**
* 消息类型(请求/响应)
*/
private byte type;
/**
* 状态
*/
private byte status;
/**
* 请求 id
*/
private long requestId;
/**
* 消息体长度
*/
private int bodyLength;
}
}
创建协议常量类ProtocolConstant。
记录与自定义协议有关的关键信息,如消息头长度、魔数、版本号:
package com.khr.krpc.protocol;
/**
* 协议常量
*/
public interface ProtocolConstant {
/**
* 消息头长度
*/
int MESSAGE_HEADER_LENGTH = 17;
/**
* 协议魔数
*/
byte PROTOCOL_MAGIC = 0x1;
/**
* 协议版本号
*/
byte PROTOCOL_VERSION = 0x1;
}
创建消息字段的枚举类ProtocolMessageStatusEnum。
协议状态枚举,成功、请求失败、响应失败三类枚举值:
package com.khr.krpc.protocol;
import lombok.Getter;
/**
* 协议消息的状态枚举
*/
@Getter
public enum ProtocolMessageStatusEnum {
OK("ok", 20),
BAD_REQUEST("badRequest", 40),
BAD_RESPONSE("badResponse", 50);
private final String text;
private final int value;
ProtocolMessageStatusEnum(String text, int value){
this.text = text;
this.value = value;
}
/**
* 根据 value 获取枚举
*
* @param value
* @return
*/
public static ProtocolMessageStatusEnum getEnumByValue(int value){
for (ProtocolMessageStatusEnum anEnum : ProtocolMessageStatusEnum.values()){
if (anEnum.value == value){
return anEnum;
}
}
return null;
}
}
创建消息类型的枚举类ProtocolMessageTypeEnum。
协议消息类型枚举,请求、响应、心跳、其它四类枚举值:
package com.khr.krpc.protocol;
import lombok.Getter;
/**
* 协议消息的类型枚举
*/
@Getter
public enum ProtocolMessageTypeEnum {
REQUEST(0),
RESPONSE(1),
HEART_BEAT(2),
OTHERS(3);
private final int key;
ProtocolMessageTypeEnum(int key){
this.key = key;
}
/**
* 根据 key 获取枚举
*
* @param key
* @return
*/
public static ProtocolMessageTypeEnum getEnumByType(int key){
for (ProtocolMessageTypeEnum anEnum : ProtocolMessageTypeEnum.values()){
if (anEnum.key == key){
return anEnum;
}
}
return null;
}
}
创建序列化器的枚举类ProtocolMessageSerializerEnum。
序列化器枚举,对应之前设计好的序列化器:
package com.khr.krpc.protocol;
import cn.hutool.core.util.ObjectUtil;
import lombok.Getter;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* 协议消息的序列化器枚举
*/
@Getter
public enum ProtocolMessageSerializerEnum {
JDK(0, "jdk"),
JSON(1, "json"),
KRYO(2, "kryo"),
HESSIAN(3, "hessian");
private final int key;
private final String value;
ProtocolMessageSerializerEnum(int key, String value){
this.key = key;
this.value = value;
}
/**
* 获取值列表
*
* @return
*/
public static List<String> getValues(){
return Arrays.stream(values()).map(item -> item.value).collect(Collectors.toList());
}
/**
* 根据 key 获取枚举
*
* @param key
* @return
*/
public static ProtocolMessageSerializerEnum getEnumByKey(int key){
for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()){
if (anEnum.key == key){
return anEnum;
}
}
return null;
}
/**
* 根据 value 获取枚举
*
* @param value
* @return
*/
public static ProtocolMessageSerializerEnum getEnumByValue(String value){
if (ObjectUtil.isEmpty(value)){
return null;
}
for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()){
if (anEnum.value.equals(value)){
return anEnum;
}
}
return null;
}
}
(2)网络传输
之前采用了基于HTTP协议的Vert.x服务器。同样,Vert.x也支持TCP服务器,简单易用。
在server包下创建tcp包,将所有与TCP服务相关的代码都放在该包下。
TCP服务器实现。
创建VertxTcpServer类,与之前的VertxHttpServer逻辑类似,先创建服务器实例,然后定义处理请求的方法,最后启动服务器:
package com.khr.krpc.server.tcp;
import com.khr.krpc.server.HttpServer;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class VertxTcpServer implements HttpServer{
private byte[] handleRequest(byte[] requestData){
//在此处编写处理请求的逻辑,根据 requestData 构造响应数据并返回
String requestString = new String(requestData);
System.out.println("Received request from client: " + requestString);
//示例,实际逻辑根据具体业务需求实现
return "Hello, client!".getBytes();
}
@Override
public void doStart(int port){
//创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
//创建 TCP 服务器
NetServer server = vertx.createNetServer();
//处理请求
server.connectHandler(socket -> {
//处理连接
socket.handler(buffer -> {
//处理接收到的字节数组
byte[] requestData = buffer.getBytes();
//自定义字节数组处理逻辑,比如解析请求、调用服务、构造响应等
byte[] responseData =handleRequest(requestData);
//发送响应
socket.write(Buffer.buffer(responseData));
});
});
//启动 TCP 服务器并监听指定端口
server.listen(port, result ->{
if (result.succeeded()){
System.out.println("TCP server started on port "+ port);
}else {
System.out.println("Failed to start TCP server: "+ result.cause());
}
});
}
public static void main(String[] args){
new VertxTcpServer().doStart(8888);
}
}
其中,socket.write(Buffer.buffer(responseData)) 方法,就是在向连接到服务器的客服端发送数据。数据格式为Buffer,这是Vert.x提供的字节数组缓冲区实现。
TCP客户端实现。
创建VertxTcpClient类,先创建客户端实例,然后定义处理请求的方法,最后建立连接:
package com.khr.krpc.server.tcp;
import io.vertx.core.Vertx;
public void start(){
//创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
vertx.createNetClient().connect(8888,"localhost", result ->{
if (result.succeeded()){
System.out.println("Connected to TCP server");
io.vertx.core.net.NetSocket socket = result.result();
//发送数据
socket.write("Hello,server!");
//接收响应
socket.handler(buffer -> {
System.out.println("Received response from server: "+ buffer.toString());
});
} else {
System.out.println("Failed to connect to TCP server");
}
});
}
public static void main(String[] args){
new VertxTcpClient().start();
}
运行测试可以看到服务器与客户端相互打招呼。
(3)编码器和解码器
Vert.x的TCP服务器收发的消息是 Buffer 类型,不能直接写入一个对象。因此需要编码器和解码器将Java的消息对象和 Buffer 进行相互转换。
编码器先 new 一个空的 Buffer 缓冲区,然后按照顺序向缓冲区依次写入数据;解码器在读取时也按照顺序依次读取,还原出编码前的数据。
实现消息编码器。
在protocol包下创建ProtocolMessageEncoder类,核心流程是依次向 Buffer 缓冲区写入消息对象里的字段:
package com.khr.krpc.protocol;
import com.khr.krpc.serializer.Serializer;
import com.khr.krpc.serializer.SerializerFactory;
import io.vertx.core.buffer.Buffer;
import java.io.IOException;
public class ProtocolMessageEncoder {
/**
*编码
*
* @param protocolMessage
* @return
* @throws IOException
*/
public static Buffer encode(ProtocolMessage<?> protocolMessage) throws IOException{
if (protocolMessage == null || protocolMessage.getHeader() == null){
return Buffer.buffer();
}
ProtocolMessage.Header header = protocolMessage.getHeader();
//依次向缓冲区写入字节
Buffer buffer = Buffer.buffer();
buffer.appendByte(header.getMagic());
buffer.appendByte(header.getVersion());
buffer.appendByte(header.getSerializer());
buffer.appendByte(header.getType());
buffer.appendByte(header.getStatus());
buffer.appendLong(header.getRequestId());
//获取序列化器
ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if (serializerEnum == null){
throw new RuntimeException("序列化协议不存在");
}
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
//写入 body 长度和数据
buffer.appendInt(bodyBytes.length);
buffer.appendBytes(bodyBytes);
return buffer;
}
}
实现消息解码器。
在protocol包下创建ProtocolMessageDecoder类,核心流程是依次从 Buffer 缓冲区的指定位置读取字段,构造出完整的消息对象:
package com.khr.krpc.protocol;
import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.serializer.Serializer;
import com.khr.krpc.serializer.SerializerFactory;
import io.vertx.core.buffer.Buffer;
import java.io.IOException;
/**
* 协议消息解码器
*/
public class ProtocolMessageDecoder {
/**
* 解码
*
* @param buffer
* @return
* @throws IOException
*/
public static ProtocolMessage<?> decode(Buffer buffer) throws IOException{
//分别从指定位置读出 Buffer
ProtocolMessage.Header header = new ProtocolMessage.Header();
byte magic = buffer.getByte(0);
//校验魔数
if (magic != ProtocolConstant.PROTOCOL_MAGIC){
throw new RuntimeException("消息 magic 非法");
}
header.setMagic(magic);
header.setVersion(buffer.getByte(1));
header.setSerializer(buffer.getByte(2));
header.setType(buffer.getByte(3));
header.setStatus(buffer.getByte(4));
header.setRequestId(buffer.getLong(5));
header.setBodyLength(buffer.getInt(13));
//解决粘包问题,只读指定长度的数据
byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());
//解析消息体
ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if (serializerEnum == null) {
throw new RuntimeException("序列化消息的协议不存在");
}
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByType(header.getType());
if (messageTypeEnum == null){
throw new RuntimeException("序列化消息的类型不存在");
}
switch (messageTypeEnum){
case REQUEST:
RpcRequest request = serializer.deserialize(bodyBytes,RpcRequest.class);
return new ProtocolMessage<>(header, request);
case RESPONSE:
RpcResponse response = serializer.deserialize(bodyBytes,RpcResponse.class);
return new ProtocolMessage<>(header, response);
case HEART_BEAT:
case OTHERS:
default:
throw new RuntimeException("暂不支持该消息类型");
}
}
}
创建单元测试类测试。
编码解码均能正常工作:
package com.khr.rpc.protocol;
import cn.hutool.core.util.IdUtil;
import com.khr.krpc.constant.RpcConstant;
import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.protocol.*;
import io.vertx.core.buffer.Buffer;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class ProtocolMessageTest {
@Test
public void testEncodeAndDecode() throws IOException{
//构造消息
ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
header.setSerializer((byte) ProtocolMessageSerializerEnum.JDK.getKey());
header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
header.setStatus((byte) ProtocolMessageStatusEnum.OK.getValue());
header.setRequestId(IdUtil.getSnowflakeNextId());
header.setBodyLength(0);
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setServiceName("kService");
rpcRequest.setMethodName("kMethod");
rpcRequest.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
rpcRequest.setParameterTypes(new Class[]{System.class});
rpcRequest.setArgs(new Object[]{"aaa","bbb"});
protocolMessage.setHeader(header);
protocolMessage.setBody(rpcRequest);
Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
ProtocolMessage<?> message = ProtocolMessageDecoder.decode(encodeBuffer);
Assert.assertNotNull(message);
}
}
(4)请求处理器(服务提供者)
请求处理器的作用是接受请求,然后通过反射调用服务实现类。
类似于之前的HttpServerHandler,TCP服务器需要一个TcpServerHandler。通过实现Vert.x提供的Handler<NetSocket>接口,可以定义TCP请求处理器。
在server.tcp包下创建TcpServerHandler类:
package com.khr.krpc.server.tcp;
import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.protocol.ProtocolMessage;
import com.khr.krpc.protocol.ProtocolMessageEncoder;
import com.khr.krpc.protocol.ProtocolMessageDecoder;
import com.khr.krpc.protocol.ProtocolMessageTypeEnum;
import com.khr.krpc.registry.LocalRegistry;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import java.io.IOException;
import java.lang.reflect.Method;
public class TcpServerHandler implements Handler<NetSocket> {
/**
* 处理请求
*
* @param socket the event to handle
*/
@Override
public void handle(NetSocket socket){
//处理连接
netSocket.handler(buffer -> {
//接受请求,解码
ProtocolMessage<RpcRequest> protocolMessage;
try {
protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);
} catch (IOException e){
throw new RuntimeException("协议消息解码错误");
}
RpcRequest rpcRequest = protocolMessage.getBody();
//处理请求
//构造响应结果对象
RpcResponse rpcResponse = new RpcResponse();
try{
//获取要调用的服务实现类,通过反射调用
Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
Method method = implClass.getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes());
Object result = method.invoke(implClass.newInstance(),rpcRequest.getArgs());
//封装返回结果
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType());
rpcResponse.setMessage("ok");
} catch (Exception e){
e.printStackTrace();
rpcResponse.setMessage(e.getMessage());
rpcResponse.setException(e);
}
//发送响应,编码
ProtocolMessage.Header header = protocolMessage.getHeader();
header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());
ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header,rpcResponse);
try {
Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);
socket.write(encode);
} catch (IOException e){
throw new RuntimeException("协议消息编码错误");
}
});
}
}
(5)请求发送(服务消费者)
调整消费者发送HTTP请求为TCP请求:
package com.khr.krpc.proxy;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.khr.krpc.RpcApplication;
import com.khr.krpc.config.RpcConfig;
import com.khr.krpc.constant.RpcConstant;
import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.model.ServiceMetaInfo;
import com.khr.krpc.protocol.*;
import com.khr.krpc.registry.Registry;
import com.khr.krpc.registry.RegistryFactory;
import com.khr.krpc.serializer.Serializer;
import com.khr.krpc.serializer.SerializerFactory;
import com.khr.krpc.server.tcp.VertxTcpClient;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetClient;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.SocketAddress;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
/**
* 服务代理(JDK动态代理)
*/
public class ServiceProxy implements InvocationHandler {
/**
* 调用代理
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy,Method method,Object[] args) throws Throwable{
//指定序列化器
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
//构造请求
String serviceName = method.getDeclaringClass().getName();
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(serviceName)
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();
try {
//序列化
byte[] bodyBytes = serializer.serialize(rpcRequest);
//从注册中心获取服务提供者请求地址
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
if (CollUtil.isEmpty(serviceMetaInfoList)) {
throw new RuntimeException("暂无服务地址");
}
//暂时先取第一个
ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);
//发送 TCP 请求
Vertx vertx = Vertx.vertx();
NetClient netClient = vertx.createNetClient();
CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
netClient.connect(selectedServiceMetaInfo.getServicePort(), serviceMetaInfo.getServiceHost(),
result -> {
if (result.succeeded()) {
System.out.println("Connect to TCP server");
io.vertx.core.net.NetSocket socket = result.result();
//发送数据
//构造消息
ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey());
header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
//生成全局请求ID
header.setRequestId(IdUtil.getSnowflakeNextId());
protocolMessage.setHeader(header);
protocolMessage.setBody(rpcRequest);
//编码请求
try {
Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
socket.write(encodeBuffer);
} catch (IOException e) {
throw new RuntimeException("协议消息编码错误");
}
//接收响应
TcpBufferHandlerWarpper bufferHandlerWarpper = new TcpBufferHandlerWarpper(
buffer -> {
try {
ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);
responseFuture.complete(rpcResponseProtocolMessage.getBody());
} catch (IOException e) {
throw new RuntimeException("协议消息解码错误");
}
});
} else {
System.out.println("Failed to connect to TCP server");
}
});
RpcResponse rpcResponse = responseFuture.get();
//关闭连接
netClient.close();
return rpcResponse.getData();
} catch(IOException e){
e.printStackTrace();
}
return null;
}
}
重点关注发送TCP请求部分。由于Vert.x提供的请求处理器是异步、反应式的,为了更方便地获取结果,使用CompletableFuture转异步为同步。
CompleteableFuturn<RpcResponse> responseFuture = new CompletableFuture<>();
netClient.connet(XXX,
result -> {
// 完成响应
responseFuture.complete(rpcResponseProtocolMessage.getBody());
});
);
//阻塞,直到响应完成,才会继续向下执行
RpcResponse rpcResponse = responseFuture.get();
4. 测试
修改服务提供者ProviderExample代码,改为启动TCP服务器:
package com.khr.example.provider;
import com.khr.example.common.service.UserService;
import com.khr.krpc.RpcApplication;
import com.khr.krpc.config.RegistryConfig;
import com.khr.krpc.config.RpcConfig;
import com.khr.krpc.model.ServiceMetaInfo;
import com.khr.krpc.registry.LocalRegistry;
import com.khr.krpc.registry.Registry;
import com.khr.krpc.registry.RegistryFactory;
import com.khr.krpc.server.HttpServer;
import com.khr.krpc.server.VertxHttpServer;
import com.khr.krpc.server.tcp.VertxTcpServer;
import io.vertx.core.Vertx;
/**
* 服务提供者示例
*/
public class ProviderExample {
public static void main(String[] args){
//RPC框架初始化
RpcApplication.init();
//注册服务
String serviceName = UserService.class.getName();
LocalRegistry.registry(serviceName,UserServiceImpl.class);
//注册服务到注册中心
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceHost(rpcConfig.getServerHost());
serviceMetaInfo.setServicePort(Integer.valueOf(rpcConfig.getServerPort()));
try {
registry.register(serviceMetaInfo);
}catch (Exception e){
throw new RuntimeException(e);
}
//启动 TCP 服务
VertxTcpServer vertxTcpServer = new VertxTcpServer();
vertxTcpServer.doStart(8080);
//启动 Web 服务
//HttpServer httpServer = new VertxHttpServer();
//httpServer.doStart(Integer.parseInt(RpcApplication.getRpcConfig().getServerPort()));
}
}
ConsumerExample项目不动,先后启动后如果不能正常完成调用,说明出现了半包粘包问题。
5. 解决半包粘包问题
(1)什么是半包和粘包
粘包:连续给对端发送两个或两个以上的数据包,对端在一次收取时可能收到的数据包大于一个,即可能是一个包和另一个包一部分的结合,或者是两个完整的数据包头尾相连。
半包:一次收取到的数据只是其中一个包的一部分。
比如,客户端连续2次发送消息:
//第一次
Hello, server!Hello, server!Hello, server!Hello, server!
//第二次
Hello, server!Hello, server!Hello, server!Hello, server!
服务端接收到的是:
半包:
//第一次
Hello, server!Hello, server!
//第二次
Hello, server!Hello, server!Hello, server!
粘包:
//第三次
Hello, server!Hello, server!Hello, server!Hello, server!Hello, server!
(2)半包粘包问题演示
修改TCP客户端代码,连续发送1000次消息:
public class VertxTcpClient{
public void start(){
//创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
vertx.createNetClient().connect(8888,"localhost", result ->{
if (result.succeeded()){
System.out.println("Connected to TCP server");
io.vertx.core.net.NetSocket socket = result.result();
for (int i = 0; i < 1000; i++){
//发送数据
Buffer buffer = Buffer.buffer();
String str = "Hello, server!Hello, server!Hello, server!Hello, server!";
buffer.appendInt(0);
buffer.appendInt(str.getBytes().length);
buffer.appendBytes(str.getBytes());
socket.write(buffer);
}
//接收响应
socket.handler(buffer -> {
System.out.println("Received response from server: "+ buffer.toString());
});
} else {
System.out.println("Failed to connect to TCP server");
}
});
}
public static void main(String[] args){
new VertxTcpClient().start();
}
}
TCP服务端打印出每次收到的消息:
package com.khr.krpc.server.tcp;
import com.khr.krpc.server.HttpServer;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class VertxTcpServer implements HttpServer{
@Override
public void doStart(int port){
//创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
//创建 TCP 服务器
NetServer server = vertx.createNetServer();
//处理请求
server.connectHandler(socket -> {
socket.handler(buffer -> {
String testMessage = "Hello,server!Hello,server!Hello,server!"
int messageLength = testMessage.getBytes().length;
if (buffer.getBytes().length < messageLength){
System.out.println("半包,length = " + buffer.getBytes().length);
return;
}
if (buffer.getBytes().length > messageLength){
System.out.println("粘包,length = " + buffer.getBytes().length);
return;
}
String str = new String(buffer.getBytes(0, messageLength));
System.out.println(str);
if (testMessage.equals(str)){
System.out.println("good");
}
});
});
//启动 TCP 服务器并监听指定端口
server.listen(port, result ->{
if (result.succeeded()){
log.info("TCP server started on port "+ port);
}else {
log.info("Failed to start TCP server: "+ result.cause());
}
});
}
public static void main(String[] args){
new VertxTcpServer().doStart(8888);
}
}
运行后,再服务端会查看到类似这样的结果:
(3)Vert.x解决半包和粘包
解决半包的核心思路:在消息头中设置请求体的长度,服务端接收时,判断每次消息的长度是否符合预期,不完整就不读,留到下一次接收到消息时再读取。
if(buffer == null || buffer.length() == 0){
throw new RuntimeException("消息 buffer 为空");
}
if(buffer.getBytes().length < ProtocolConstant.MESSAGE_HEADER_LENGTH){
throw new RuntimeException("发生半包问题");
}
解决粘包的核心思路:每次只读取指定长度的数据,超过长度的留着下一次接收到消息时再读取。
//只读指定长度的数据
byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength())
Vert.x框架中,内置了RecordParser, 可以保证下次读取到特定长度的字符。
先修改TCP服务端代码进行测试,采用RecordParser读取固定长度的消息:
package com.khr.krpc.server.tcp;
import com.khr.krpc.server.HttpServer;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class VertxTcpServer implements HttpServer{
@Override
public void doStart(int port){
//创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
//创建 TCP 服务器
NetServer server = vertx.createNetServer();
//处理请求
server.connectHandler(socket -> {
String testMessage = "Hello,server!Hello,server!Hello,server!"
int messageLength = testMessage.getBytes().length;
//构造parser
RecordParser parser = RecordParser.newFixed(messageLength);//每次读取固定值长度的内容
parser.setOutput(new Handler<Buffer>() {
@Override
public void handle(Buffer buffer) {
String str = new String(buffer.getBytes());
System.out.println(str);
if (testMessage.equals(str)) {
System.out.println("good");
}
}
});
socket.handler(parser);
});
//启动 TCP 服务器并监听指定端口
server.listen(port, result ->{
if (result.succeeded()){
log.info("TCP server started on port "+ port);
}else {
log.info("Failed to start TCP server: "+ result.cause());
}
});
}
public static void main(String[] args){
new VertxTcpServer().doStart(8888);
}
}
测试后发现,输出类似结果,非常整齐,解决了半包和粘包:
但在实际应用中,消息体的长度是不固定的,所以要调整RecordParser的固定长度。
将读取完整的消息拆分为2次:
- 先完整读取请求头信息。由于请求头信息长度是固定的,可以使用RecordParser保证每次都完整读取。
- 再根据请求头长度信息更改RecordParser的固定长度,保证完整获取到请求体。
package com.khr.krpc.server.tcp;
import com.khr.krpc.server.HttpServer;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.core.parsetools.RecordParser;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class VertxTcpServer implements HttpServer{
@Override
public void doStart(int port){
//创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
//创建 TCP 服务器
NetServer server = vertx.createNetServer();
//处理请求
server.connectHandler(socket -> {
RecordParser parser = RecordParser.newFixed(8);
parser.setOutput(new Handler<Buffer>() {
//初始化
int size = -1;
//一次完整的读取(头 + 体)
Buffer resultBuffer = Buffer.buffer();
@Override
public void handle(Buffer buffer) {
if (-1 == size){
//读取消息头长度
size = buffer.getInt(4);
parser.fixedSizeMode(size);
//写入头信息到结果
resultBuffer.appendBuffer(buffer);
} else {
//写入体信息到结果
resultBuffer.appendBuffer(buffer);
System.out.println(resultBuffer.toString());
//重置一轮
parser.fixedSizeMode(8);
size = -1;
resultBuffer = Buffer.buffer();
}
}
});
socket.handler(parser);
});
//启动 TCP 服务器并监听指定端口
server.listen(port, result ->{
if (result.succeeded()){
log.info("TCP server started on port "+ port);
}else {
log.info("Failed to start TCP server: "+ result.cause());
}
});
}
public static void main(String[] args){
new VertxTcpServer().doStart(8888);
}
}
将size变量初始化为-1,表示尚未读取头部消息,还没去确定消息体的长度。因此接下来读取的数据就是消息的头部信息。
读取到消息体的长度信息后,将size设置为读取到的长度值,然后设置解析器进入固定大小模式,以读取消息体。消息体内容读取完毕并写入缓冲区后,再重置解析器和状态,准备处理下一条消息。
在TCP客户端中构造一个变长、长度信息不再Buffer最开头的消息:
public class VertxTcpClient{
public void start(){
//创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
vertx.createNetClient().connect(8888,"localhost", result ->{
if (result.succeeded()){
System.out.println("Connected to TCP server");
io.vertx.core.net.NetSocket socket = result.result();
for (int i = 0; i < 1000; i++){
//发送数据
Buffer buffer = Buffer.buffer();
String str = "Hello, server!Hello, server!Hello, server!Hello, server!";
buffer.appendInt(0);//添加一个整数0,占4个字节,使长度信息不在Buffer最开头。
buffer.appendInt(str.getBytes().length);//真正的消息体长度
buffer.appendBytes(str.getBytes());//添加实际的消息体内容
socket.write(buffer);//发给服务器
}
//接收响应
socket.handler(buffer -> {
System.out.println("Received response from server: "+ buffer.toString());
});
} else {
System.out.println("Failed to connect to TCP server");
}
});
}
public static void main(String[] args){
new VertxTcpClient().start();
}
}
测试后发现能够正常读取到消息,不会出现半包粘包问题。
(4)封装半包粘包处理器
解决半包粘包问题有一定的代码量,并且由于ServiceProxy和请求Handler都需要接受Buffer,所以都要进行半包粘包处理。因此可以对代码进行封装复用。
采用装饰者模式,使用RecordParser对原有的Buffer处理器的能力进行增强。
在server.tcp包下新建TcpBufferHandlerWrapper类,实现并增强Handler<Buffer>接口:
package com.khr.krpc.server.tcp;
import com.khr.krpc.protocol.ProtocolConstant;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
/**
* 装饰者模式(使用 recordParser 对原有的 buffer 处理能力进行增强)
*/
public class TcpBufferHandlerWarpper implements Handler<Buffer>{
private final RecordParser recordParser;
public TcpBufferHandlerWarpper(Handler<Buffer> bufferHandler){
recordParser = initRecordParser(bufferHandler);
}
@Override
public void handle(Buffer buffer){
recordParser.handle(buffer);
}
private RecordParser initRecordParser(Handler<Buffer> bufferHandler){
//构造 parser
RecordParser parser = RecordParser.newFixed(ProtocolConstant.MESSAGE_HEADER_LENGTH);
parser.setOutput(new Handler<Buffer>() {
//初始化
int size = -1;
//一次完整的读取(头 + 体)
Buffer resultBuffer = Buffer.buffer();
@Override
public void handle(Buffer buffer) {
if (-1 == size){
//读取消息体长度
size = buffer.getInt(13);
parser.fixedSizeMode(size);
//写入头信息到结果
resultBuffer.appendBuffer(buffer);
} else {
//写入体信息到结果
resultBuffer.appendBuffer(buffer);
//已拼接为完整 Buffer,执行处理
bufferHandler.handle(resultBuffer);
//重置一轮
parser.fixedSizeMode(ProtocolConstant.MESSAGE_HEADER_LENGTH);
size = -1;
resultBuffer = Buffer.buffer();
}
}
});
return parser;
}
}
优化客户端调用代码。
修改TCP请求处理器,使用TcpBufferHandlerWrapper封装之前处理请求的部分:
package com.khr.krpc.server.tcp;
import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.protocol.ProtocolMessage;
import com.khr.krpc.protocol.ProtocolMessageEncoder;
import com.khr.krpc.protocol.ProtocolMessageDecoder;
import com.khr.krpc.protocol.ProtocolMessageTypeEnum;
import com.khr.krpc.registry.LocalRegistry;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import java.io.IOException;
import java.lang.reflect.Method;
public class TcpServerHandler implements Handler<NetSocket> {
/**
* 处理请求
*
* @param socket the event to handle
*/
@Override
public void handle(NetSocket socket){
TcpBufferHandlerWarpper bufferHandlerWarpper = new TcpBufferHandlerWarpper(buffer -> {
//处理请求逻辑不变
……
});
socket.handler(bufferHandlerWrapper);
}
}
修改客户端处理响应。之前是将所有发送请求、处理响应的代码都放到了ServiceProxy类中,使其变得复杂臃肿。因此将其优化,把所有的请求响应逻辑提出来,封装为单独的VertxTcpClient类,放在server.tcp包下。
修改后的VertxTcpClient:
package com.khr.krpc.server.tcp;
import cn.hutool.core.util.IdUtil;
import com.khr.krpc.RpcApplication;
import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.model.ServiceMetaInfo;
import com.khr.krpc.protocol.*;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* Vertx TCP 请求客户端
*/
public class VertxTcpClient {
/**
* 发送请求
*
* @param rpcRequest
* @param serviceMetaInfo
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
public static RpcResponse doRequest(RpcRequest rpcRequest, ServiceMetaInfo serviceMetaInfo) throws InterruptedException, ExecutionException {
//发送TCP请求
Vertx vertx = Vertx.vertx();
NetClient netClient = vertx.createNetClient();
CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
netClient.connect(serviceMetaInfo.getServicePort(), serviceMetaInfo.getServiceHost(),
result -> {
if (!result.succeeded()) {
System.err.println("Failed to connect to TCP server");
return;
}
NetSocket socket = result.result();
//发送数据
//构造消息
ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey());
header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
//生成全局请求ID
header.setRequestId(IdUtil.getSnowflakeNextId());
protocolMessage.setHeader(header);
protocolMessage.setBody(rpcRequest);
//编码请求
try {
Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
socket.write(encodeBuffer);
} catch (IOException e) {
throw new RuntimeException("协议消息编码错误");
}
//接收响应
TcpBufferHandlerWarpper bufferHandlerWarpper = new TcpBufferHandlerWarpper(
buffer -> {
try {
ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);
responseFuture.complete(rpcResponseProtocolMessage.getBody());
} catch (IOException e) {
throw new RuntimeException("协议消息解码错误");
}
}
);
socket.handler(bufferHandlerWarpper);
});
RpcResponse rpcResponse = responseFuture.get();
//关闭连接
netClient.close();
return rpcResponse;
}
}
而在ServiceProxy中,直接调用VertxTcpClient即可:
package com.khr.krpc.proxy;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.khr.krpc.RpcApplication;
import com.khr.krpc.config.RpcConfig;
import com.khr.krpc.constant.RpcConstant;
import com.khr.krpc.model.RpcRequest;
import com.khr.krpc.model.RpcResponse;
import com.khr.krpc.model.ServiceMetaInfo;
import com.khr.krpc.protocol.*;
import com.khr.krpc.registry.Registry;
import com.khr.krpc.registry.RegistryFactory;
import com.khr.krpc.serializer.Serializer;
import com.khr.krpc.serializer.SerializerFactory;
import com.khr.krpc.server.tcp.VertxTcpClient;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetClient;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.SocketAddress;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
/**
* 服务代理(JDK动态代理)
*/
public class ServiceProxy implements InvocationHandler {
/**
* 调用代理
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy,Method method,Object[] args) throws Throwable{
//指定序列化器
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
//构造请求
String serviceName = method.getDeclaringClass().getName();
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(serviceName)
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();
try {
//序列化
byte[] bodyBytes = serializer.serialize(rpcRequest);
//从注册中心获取服务提供者请求地址
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
if (CollUtil.isEmpty(serviceMetaInfoList)) {
throw new RuntimeException("暂无服务地址");
}
ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);
//发送 TCP 请求
RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
return rpcResponse.getData();
} catch (IOException e){
throw new RuntimeException("调用失败");
}
}
}
至此,扩展功能,自定义协议完成。