前言
最近公司搞了个小业务,需要使用TCP协议,我这边负责服务端。客户端是某个设备,客户端传参格式、包头包尾等都是固定的,不可改变,而且还有个蓝牙传感器,透传数据到这个设备,然后通过这个设备发送到服务端,数据格式也是不可变的。于是,相当于我这个TCP客户端会发送两种不同格式、不同长度的报文,且一种是ASCII 一种是HEX。
正常单发肯定是没问题的,但是,如果你业务卡顿,那么一定会有粘包、拆包的问题
请看:我在这里打个断点,模拟阻塞
然后一起发消息
放开断点
或者,睡个五秒
发现数据一起过来了,这就是粘包
还有种情况,如下
粘包了 但是下一次的数据包部分字节出现在了上次的数据包的尾部,把整个数据包给分开了,这种就是拆包(大概就是整个效果)
总结就是:
粘包,就是将多个小的包封装成一个大的包进行发送。(多次发送的数据到了服务端合并成了一个数据包)
拆包,即是将一个超过缓冲区可用大小的包拆分成多个包进行发送。(一个的数据包到了服务端变得不完整了,哪怕是粘包都没有完整的一段)
那么如何解决呢?
本篇就以netty来简单说下,第一次用,很多不足,希望各位大佬指点!
直接上代码:
先来个maven:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.68.Final</version>
</dependency>
1. NettyTcpServerConfig TCP服务配置类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PreDestroy;
/**
* @title: NettyTCPConfig
* @description:
* @date: 2024/10/14
* @author: zwh
* @copyright: Copyright (c) 2024
* @version: 1.0
*/
@Configuration
public class NettyTcpServerConfig {
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private ChannelFuture channelFuture;
@Bean
public ServerBootstrap serverBootstrap(NettyTcpServerHandler nettyTcpServerHandler) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 添加自定义解码器
ch.pipeline().addLast(new MyCustomDecoder());
// 自带的解码器 上面数据拆包分包就是用的这个自带的
// ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(nettyTcpServerHandler);
}
}).childOption(ChannelOption.TCP_NODELAY, true);
return bootstrap;
}
public void startServer(int port) throws Exception {
channelFuture = serverBootstrap(new NettyTcpServerHandler()).bind(port).sync();
System.out.println("服务器已启动,监听端口: " + port);
channelFuture.channel().closeFuture().sync();
}
@PreDestroy
public void shutdown() {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
2. NettyTcpServer TCP服务端启动入口
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
/**
* @title: NettyTcpServer
* @description:
* @date: 2024/10/14
* @author: zwh
* @copyright: Copyright (c) 2024
* @version: 1.0
*/
@Component
public class NettyTcpServer implements CommandLineRunner {
// 默认10067 可配置
@Value("${nettyTcp.server.port:10067}")
private int nettyTcpServerPort;
private final NettyTcpServerConfig nettyTCPConfig;
public NettyTcpServer(NettyTcpServerConfig nettyTcpServerConfig) {
this.nettyTCPConfig = nettyTcpServerConfig;
}
@Override
public void run(String... args) throws Exception {
nettyTCPConfig.startServer(nettyTcpServerPort);
}
}
3. NettyTcpServerHandler 消息接收及回声(响应)处理
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
/**
* @title: NettyTcpServerHandler
* @description:
* @date: 2024/10/14
* @author: zwh
* @copyright: Copyright (c) 2024
* @version: 1.0
*/
/**
* @ChannelHandler.Sharable注解表示一个ChannelHandler实例可以被添加到多个ChannelPipeline中,并且该实例是线程安全的。
* 这意味着,如果一个ChannelHandler被标记为@Sharable,那么它可以在不同的ChannelPipeline中被共享使用,
* 而不会出现竞争条件或线程安全问题。
*
*/
@ChannelHandler.Sharable
@Component
public class NettyTcpServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println("接收到消息: " + msg);
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = remoteAddress.getAddress().getHostAddress();
int clientPort = remoteAddress.getPort();
System.out.println("来自客户端 (" + clientIp + ":" + clientPort + ") 的消息: " + msg);
// 可根据需要发送响应
String response = "Message processed: " + msg;
ctx.writeAndFlush(response + "\r\n");
/*
System.out.println("来自客户端 (" + clientIp + ":" + clientPort + ") 的消息: " + msg);
if (msg.contains("重要")) {
String responseMessage = "接收到重要数据: " + msg;
ctx.writeAndFlush(responseMessage);
System.out.println("发送响应到客户端 (" + clientIp + ":" + clientPort + "): " + responseMessage);
} else {
System.out.println("收到不重要数据,未发送响应。");
}*/
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
4. MyCustomDecoder 自定义解码器
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* @title: MyMessageDecoder
* @description:
* @date: 2024/10/15
* @author: zwh
* @copyright: Copyright (c) 2024
* @version: 1.0
*/
@Component
public class MyCustomDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
while (in.isReadable()) {
// ASCII 消息处理(第一个字节是$)
if (in.getByte(in.readerIndex()) == '$') {
// 查找结束符 '*'
int endIndex = in.indexOf(in.readerIndex(), in.writerIndex(), (byte) '*');
if (endIndex == -1) {
// 还没有找到结束符,等待更多数据 这是有结束位的
break;
}
// 读取完整的 ASCII 消息
ByteBuf messageBuffer = in.readBytes(endIndex + 1 - in.readerIndex()); // 包括结束符
String message = messageBuffer.toString(StandardCharsets.US_ASCII);
out.add(message+decToHex(calculateBcc(message)));
}
// 十六进制消息处理(第一个字节是0x2B)
else if (in.getByte(0) == (byte) 0x2B) {
// 读取原始的十六进制数据
// 创建一个新的 ByteBuf 来保存 15 个字节
// 读取15个字节 另一种格式就是15个字节 然后读取后,原始的 ByteBuf 中的数据会被更新 将其标记为已读取下次就读不到了
ByteBuf byteBuf = in.readBytes(15);
// 将 ByteBuf 中的数据转换为十六进制字符串
StringBuilder hexMessage = new StringBuilder();
for (int i = 0; i < byteBuf.readableBytes(); i++) {
byte b = byteBuf.getByte(i);
hexMessage.append(String.format("%02X ", b));
}
// 将十六进制消息发送到下一个处理器
out.add(hexMessage.toString().trim());
// 释放 ByteBuf,避免内存泄漏
byteBuf.release();
}
else {
in.skipBytes(1); // 跳过无效字节
}
}
}
/**
* 计算给定数据的 BCC 校验值
*
* @param data 输入的字节数组
* @return BCC 校验值
*/
public static byte calculateBcc(byte[] data) {
byte bcc = 0;
for (byte b : data) {
bcc ^= b; // 使用异或运算计算 BCC
}
return bcc;
}
/**
* 计算给定字符串中 $ 和 * 之间的 BCC 校验值
*
* @param input 输入的字符串
* @return BCC 校验值
*/
public static byte calculateBcc(String input) {
byte bcc = 0;
// 找到 $ 和 * 的位置
int start = input.indexOf('$') + 1; // 从 $ 后开始
int end = input.indexOf('*');
// 确保找到 $ 和 * 的位置
if (start > 0 && end > start) {
String data = input.substring(start, end); // 提取 $ 和 * 之间的部分
// 计算 BCC
for (char c : data.toCharArray()) {
bcc ^= c; // 使用异或运算计算 BCC
}
}
return bcc;
}
/**
* 十进制转HEX(16进制)
* @date 2024/10/22 15:22
* @return {@link }
* @author zwh
*/
public static String decToHex(int dec) {
return Integer.toHexString(dec).toUpperCase();
}
/**
* 十六进制转HEX(10进制)
* @date 2024/10/22 15:22
* @return {@link }
* @author zwh
*/
public static Integer hexToDec(String hexValue) {
return Integer.parseInt(hexValue, 16);
}
public static void main(String[] args) {
decToHex(107);
}
}
这个自定义解码器就看需要处理的数据类型了。我这里是两种数据:
某蓝牙传感器:$TB,6300,D702,4700,,84C2E4DCEAAD,*6B ($ 固定头 * 结束符,ASCII)
6B是 BCC异或校验,取 $ 和 * 之间所有值 (包括逗号)的异或校验 参考BCC测试
I/O控制的器:2B 50 00 00 00 09 11 44 00 20 00 01 02 00 00 15个字节(HEX)
以这两种为示例来测试我们自定义的解码是否正确,如果需要别的数据自行修改开头和结尾以及长度啥的
Let's give it a try
就直接按照我们开头说的那种方法看看效果:
让我们换另外的格式:
两种格式混合:
没有再出现拆包、粘包现象,但是要注意一点发送的数据的格式一定要是我们预定好的。
比如:$TB,6300,D702,4700,,84C2E4DCEAAD,*6B 这个数据要是ASCII;
2B 50 00 00 00 09 11 44 00 20 00 01 02 00 00 这个数据要是HEX;不然我们自定义解码器的规则就对不上了。
如果要复现开头的,就很简单,换上内置的解码器就行
当然,以上问题 UDP不会出现!因为UDP是一种面向报文的协议,每个UDP段都是一条消息,所以不会出现粘包、拆包问题;TCP是面向流的,不知道数据的界限,会把构成整条消息的数据段排序完成后才呈现在内核缓冲区,容易造成拆包、粘包问题。
用哪种协议看自己的需求,实时性高的优先UDP,数据可靠性优先TCP
另外附上文中提到的网络调试助手(NetAssist)
网络调试助手
放资源了,0积分,但是不知道能不能审核过,如果被吞了 直接百度搜索就行能搜到
---------------
欢迎大佬指出问题!
end