这期我们来分析下消息头+消息体的这种方式来实现完美的解决方案,当然这也是最复杂的一种实现,因为在大多数场景中,性能和复杂度始终不能兼得。代码中使用了MessagePack的第三方序列化,因为我们要传输的实体类对象在客户端和服务端之间要经过序列化和反序列化
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
1.实体类
package serializable.msgpack;
import org.msgpack.annotation.Message;
@Message // MessagePack提供的注解,表明这是一个需要序列化的实体类
public class User {
private String id;
private String userName;
private int age;
private UserContact userContact;
public User(String userName, int age, String id) {
this.userName = userName;
this.age = age;
this.id = id;
}
public User() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public UserContact getUserContact() {
return userContact;
}
public void setUserContact(UserContact userContact) {
this.userContact = userContact;
}
@Override
public String toString() {
return "User{" +
"id='" + id + '\'' +
", userName='" + userName + '\'' +
", age=" + age +
", userContact=" + userContact +
'}';
}
}
package serializable.msgpack;
import org.msgpack.annotation.Message;
@Message // MessagePack提供的注解,表明这是一个需要序列化的实体类
public class UserContact {
private String mail;
private String phone;
public UserContact() {
}
public UserContact (String mail, String phone) {
this.mail = mail;
this.phone = phone;
}
public String getMail() {
return mail;
}
public void setMail(String mail) {
this.mail = mail;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
@Override
public String toString() {
return "UserContact{" +
"mail='" + mail + '\'' +
", phone='" + phone + '\'' +
'}';
}
}
2.Server
package serializable.msgpack;
import constant.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.net.InetSocketAddress;
public class ServerMsgPackEcho {
public static void main(String[] args) throws InterruptedException {
ServerMsgPackEcho serverMsgPackEcho = new ServerMsgPackEcho();
System.out.println("服务器即将启动....");
serverMsgPackEcho.start();
}
public void start() throws InterruptedException {
MsgPackServerHandler serverHandler = new MsgPackServerHandler();
// 线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
// 服务端启动必须
ServerBootstrap b = new ServerBootstrap();
b.group(group) // 将线程组传入
.channel(NioServerSocketChannel.class) // 指定使用NIO进行网络传输
.localAddress(new InetSocketAddress(Constant.DEFAULT_PORT)) // 指定服务器监听端口
// 服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel
// 所以下面这段代码地作用就是为这个子channel增加handler
.childHandler(new ChannelInitializerImp());
// 异步绑定到服务器, sync()会阻塞直到成功
ChannelFuture f = b.bind().sync();
// 给绑定事件增加一个监听器
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("绑定端口成功.....");
}
});
System.out.println("服务器启动完成,等待客户端的连接和数据....");
// 阻塞直到服务器的channel关闭
ChannelFuture closeFuture = f.channel().closeFuture().sync();
// 给关闭事件增加一个监听器
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("服务器已关闭");
}
});
} finally {
// 优雅地关闭线程组
group.shutdownGracefully().sync();
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,
0,
2,
0,
2));
ch.pipeline().addLast(new MsgPackDecoder());
ch.pipeline().addLast(new MsgPackServerHandler());
}
}
}
package serializable.msgpack;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.atomic.AtomicInteger;
public class MsgPackServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
// 服务端读取到网络数据后的处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 将上一个handler生成的数据强制转型
User user = (User)msg;
System.out.println("Server Accept [" + user +
"] and the counter is :" +counter.incrementAndGet());
// 服务器的应答
String resp = "I process user :" + user.getUserName()
+ System.getProperty("line.separator");
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
ctx.fireChannelRead(user);
}
// 发生异常后的处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
cause.printStackTrace();
ctx.close();
}
}
3.Client
package serializable.msgpack;
import constant.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.LineBasedFrameDecoder;
import java.net.InetSocketAddress;
public class ClientMsgPackEcho {
private final String host;
public ClientMsgPackEcho(String host) {
this.host = host;
}
public void start() throws InterruptedException {
// 线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
// 客户端启动必须
Bootstrap b = new Bootstrap();
b.group(group) // 将线程组传入
.channel(NioSocketChannel.class) // 指定使用NIO进行网络传输
// 配置需要连接的服务器的ip地址和端口
.remoteAddress(new InetSocketAddress(host, Constant.DEFAULT_PORT))
.handler(new ChannelInitializerImp());
ChannelFuture f = b.connect().sync();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("operationComplete..... ");
}
});
System.out.println("已连接到服务器");
ChannelFuture closeFuture = f.channel().closeFuture().sync();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("已关闭....");
}
});
} finally {
group.shutdownGracefully().sync();
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
// 告诉Netty,计算一下报文的长度,然后作为报文头加在前面
ch.pipeline().addLast(new LengthFieldPrepender(2));
// 对服务器的应答也要解码,解决粘包半包
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 对我们要发送的数据做编码 -- 序列化
ch.pipeline().addLast(new MsgPackEncode());
ch.pipeline().addLast(new MsgPackClientHandler(5));
}
}
public static void main(String[] args) throws InterruptedException {
new ClientMsgPackEcho(Constant.DEFAULT_SERVER_IP).start();
}
}
package serializable.msgpack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
public class MsgPackClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final int sendNumber;
public MsgPackClientHandler(int sendNumber) {
this.sendNumber = sendNumber;
}
private AtomicInteger counter = new AtomicInteger(0);
// 客户端读取到网络数据后的处理
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept [" + msg.toString(CharsetUtil.UTF_8) +
" ] and the counter is :" + counter.incrementAndGet());
}
// 客户端被通知channel活跃后,做事
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// super.channelActive(ctx);
User[] users = makeUsers();
// 发送数据
for (User user : users) {
System.out.println("Send user :" + user);
ctx.write(user);
}
ctx.flush();
}
// 发生异常后的处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
cause.printStackTrace();
ctx.close();
}
// 生成用户实体类的数组,以供发送
private User[] makeUsers() {
User[] users = new User[sendNumber];
User user = null;
for (int i = 0; i < sendNumber; i++) {
user = new User();
user.setAge(i);
// 这里为了方便演示,我采用了一个随机数,用来表示这个报文不是定长的,而是可变的
String userName = "ABCEDFG---->" + getSpecialSymbol() + i;
user.setUserName(userName);
user.setId("No:" + (sendNumber - i));
user.setUserContact( new UserContact(userName + "@cover.com", "133"));
users[i] = user;
}
return users;
}
private String getSpecialSymbol() {
Random random = new Random();
int count = random.nextInt(10);
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < count; i++) {
stringBuilder.append("*");
}
return stringBuilder.toString();
}
}
4.编解码器
package serializable.msgpack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;
import org.msgpack.annotation.Message;
import java.util.List;
/**
* 基于MessagePack的解码器,反序列化
*/
public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
List<Object> out) throws Exception {
int length = msg.readableBytes();
byte[] array = new byte[length];
msg.getBytes(msg.readerIndex(), array, 0, length);
MessagePack messagePack = new MessagePack();
out.add(messagePack.read(array, User.class));
}
}
package serializable.msgpack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;
/**
* 基于MessagePack的编码,序列化
*/
public class MsgPackEncode extends MessageToByteEncoder<User> {
@Override
protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception {
MessagePack messagePack = new MessagePack();
byte[] raw = messagePack.write(msg);
out.writeBytes(raw);
}
}
5.结果展示
消息头+消息体这种机制相当于在客户端在往服务端发送数据时,在消息头中提前计算出消息体的报文大小,并将其嵌入到消息头中,当服务端收到这个报文时,会先解析消息头,如果产生了半包,服务端将会与消息头中的报文长度进行校验,并将其收到的半包数据暂存在缓冲区中,直到发送到完成的报文时才会进行处理,细心的小伙伴已经发现了,核心组件在于下图的位置,里面的参数也是极为复杂,稍有不慎可能就会发生奇奇怪怪的问题
对于LengthFieldBasedFrameDecoder
这个类,注释就写了150多行,并且还参杂着图画,下一期我们来着重讲解这里面的具体参数