文章目录
- 问题现象
- 解决
- 总结
问题现象
- 业务场景:雷达作为客户端,平台作为服务端,采用TCP/IP协议的socket连接,数据包采用字节的二进制数据传输
- 平台与雷达的通信和数据解析,在我接手时,已经开发完成,我接手负责维护和继续开发
- 由于日常工作忙于大数据分析和指标计算,未对使用正常的通信代码做什么改动,在客户现场使用后发现了一些问题(公司测试没问题)
- 问题1:雷达实时监测过车车辆,将数据通过socket上报给平台。但平台存入的过车,与现场视频相比,数量上明显偏少,怀疑存在数据丢失现象
- 问题2:程序刚启动时,产生以下报错,后续也有偶尔类似报错
2022-12-19 10:40:57.215 ERROR 1 --- [ntLoopGroup-3-9] c.n.s.listener.DefaultExceptionListener : Can't parse 115
java.lang.IllegalArgumentException: Can't parse 115
at com.newatc.socketio.protocol.PacketType.valueOfInner(PacketType.java:63)
at com.newatc.socketio.protocol.PacketDecoder.decodePackets(PacketDecoder.java:21)
at com.newatc.socketio.handler.InPacketHandler.channelRead0(InPacketHandler.java:66)
at com.newatc.socketio.handler.InPacketHandler.channelRead0(InPacketHandler.java:33)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at com.newatc.socketio.handler.AuthorizeHandler.channelRead(AuthorizeHandler.java:149)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)
解决
- 针对问题2,追踪业务代码,发现是解析到的数据包类型出错,不在规定范围内。由于我们的平台和雷达设备都在客户内网内(不通互联网),基本不会有第三方数据侵入,数据上报类型不应该超出规定范围,肯定是平台数据接收解析出了问题
- 针对问题1,用了另一个工具(雷达上位机软件)接收雷达数据,发现车辆数量正常,初步定位为平台处理socket上报信息时丢失了部分数据
- 针对问题1,是在客户现场实施时发现的问题,在公司测试环境接入雷达测试时没有问题。对比分析了下两个场景,发现客户的路口数量较多,雷达数量也较多,同时这些路口实时过车也较多。
- 雷达监测到过车就会触发数据上报,当数据量较大时,单个数据包超过最大长度就会进行拆包,分多个数据包上报
- 查看了平台已有代码,使用的是Netty,但在数据接收时,没有进行包头校验、长度校验。对于拆包后的数据包(前面几个字节不是包头),跳过包头长度直接进行包类型校验会失败报错(此字节位置可能是负载的实际数据内容,而不是包头包类型),出错就丢掉数据,就会导致上面2个问题
- 先看下优化前,有问题的代码,很粗糙,很明显,但当过车较少,单个数据包包含所有信息时没问题
// 数据接收经过的一个解码器 InPacketHandler 的接收处理方法
@Override
protected void channelRead0(ChannelHandlerContext ctx, PacketsMessage message) throws Exception {
ByteBuf content = message.getContent();
ClientHead client = message.getClient();
if (log.isTraceEnabled()) {
log.trace("In message: {} sessionId: {}", content.toString(CharsetUtil.UTF_8), client.getSessionId());
}
while (content.isReadable()) {
try {
Packet packet = decoder.decodePackets(content, client);
if(packet == null)
continue;
Namespace ns = namespacesHub.get(packet.getNsp());
NamespaceClient nClient = client.getChildClient(ns);
if (nClient == null) {
log.debug(
"Can't find namespace client in namespace: {}, sessionId: {} probably it was disconnected.",
ns.getName(),
client.getSessionId()
);
return;
}
packetListener.onPacket(packet, nClient);
} catch (Exception ex) {
String c = content.toString(CharsetUtil.UTF_8);
log.error("Error during data processing. Client sessionId: " + client.getSessionId() + ", data: " + c, ex);
throw ex;
}
}
// 读取完成后,释放ByteBuf,防止内存溢出LEAK
content.release();
}
// 上面channelRead0方法里对应的具体解码方法
public Packet decodePackets(ByteBuf buf, ClientHead client) {
// 前七个字节为 : 标志(4)-负载长度(2)-协议版本号(1),未校验,直接跳过了
buf.skipBytes(7);
Packet packet = null;
// 第8个字节是包类型
short pType = buf.readUnsignedByte();
PacketType packetSubType = PacketType.valueOfInner(pType);
// 第 9、10字节是校验位和预留位,也跳过了
buf.skipBytes(2);
// 这里也没进行负载长度校验,只校验了是否还有可读字节
if (buf.readableBytes() < 1){
// 直接返回了,已读的字节也没有回滚回去,这样后面数据包上报依然不完整,解析依然会有问题
return packet;
}
short oID = buf.readUnsignedByte();
ObjectId objectId = ObjectId.valueOf(oID);
switch (packetSubType) {
case QUERY_RESULT:
break;
case REPLY:
break;
case REPORT:
packet = new Packet(PacketType.MESSAGE);
packet.setSubType(packetSubType);
packet.setObjectId(objectId);
switch (objectId){
case REALTIME_DATA:
packet.setName(EventName.REALTIME_DATA);
break;
case PASSING_VEHICLE:
packet.setName(EventName.PASSING_VEHICLE);
break;
case TRAFFIC_STATUS:
packet.setName(EventName.TRAFFIC_STATUS);
break;
case TRAFFIC_STATS:
packet.setName(EventName.FLOW_STATS);
break;
case PERFORMANCE:
packet.setName(EventName.PERFORMANCE);
break;
case TRAFFIC_EVENT:
packet.setName(EventName.TRAFFIC_EVENT);
break;
case RADAR_FAULT:
packet.setName(EventName.RADAR_FAULT);
break;
}
// 解析数据包里负载的数据体
Object o = readReportObject(objectId, buf);
List<Object> args = new ArrayList<>();
args.add(o);
packet.setData(args);
break;
case HEART_BEAT:
packet = new Packet(PacketType.PING);
packet.setSubType(packetSubType);
packet.setObjectId(ObjectId.HEART_BEAT_FROM_RADAR);
break;
default:
break;
}
return packet;
}
- 优化后的代码,主要做了包头校验和负载长度校验
- 如果不是包头则舍弃,直到读到包头开始解析,这是为了防止程序启动时接收到的就是一个不完整包
- 读取完包头后,如果剩余字节长度小于负载长度,回滚游标到包头位置,返回,继续等待下一个数据包上报再一起解析
public Packet decodePackets(ByteBuf buf, ClientHead client) {
// 将起始位置记录下,后面如果不完整,读取失败,回复到此位置
int savedReaderIndex = buf.readerIndex();
// 前七个字节为 : 标志CYRC(4)-负载长度(2)-协议版本号(1),除了负载长度,其他没有用,直接跳过
// 读取前四个字节,如果不是包头,则继续往下读
byte head1 = buf.readByte();
if (head1 != 0x43) {
logger.info("第一个字符不是 C ,舍弃");
return null;
}
byte head2 = buf.readByte();
if (head2 != 0x59) {
logger.info("第二个字符不是 Y ,舍弃");
return null;
}
byte head3 = buf.readByte();
if (head3 != 0x52) {
logger.info("第三个字符不是 R ,舍弃");
return null;
}
byte head4 = buf.readByte();
if (head4 != 0x43) {
logger.info("第四个字符不是 C ,舍弃");
return null;
}
// 读取负载长度
int dataLength = buf.readUnsignedShort();
buf.skipBytes(1);
// 第8个字节为包类型
short pType = buf.readUnsignedByte();
PacketType packetSubType = PacketType.valueOfInner(pType);
// 第9/10字节,为校验位(1)-Reserve(1),暂未实际使用,直接跳过了
buf.skipBytes(2);
// 后续数据长度小于负载长度,则等待后面的数据上报再一起解析
if (buf.readableBytes() < dataLength) {
buf.readerIndex(savedReaderIndex);
logger.info("剩余消息长度,小于负载(消息内容)长度 ,重置游标返回,等待后面数据上报一起解析");
return null;
}
// 包头十个字节,后面为负载(消息内容),负载的第一个字节为对象标识,剩余为对象数据内容
Packet packet = null;
short oID = buf.readUnsignedByte();
ObjectId objectId = ObjectId.valueOf(oID);
switch (packetSubType) {
case QUERY_RESULT:
break;
case REPLY:
break;
case REPORT:
packet = new Packet(PacketType.MESSAGE);
packet.setSubType(packetSubType);
packet.setObjectId(objectId);
switch (objectId) {
case REALTIME_DATA:
packet.setName(EventName.REALTIME_DATA);
break;
case PASSING_VEHICLE:
packet.setName(EventName.PASSING_VEHICLE);
break;
case TRAFFIC_STATUS:
packet.setName(EventName.TRAFFIC_STATUS);
break;
case TRAFFIC_STATS:
packet.setName(EventName.FLOW_STATS);
break;
case PERFORMANCE:
packet.setName(EventName.PERFORMANCE);
break;
case TRAFFIC_EVENT:
packet.setName(EventName.TRAFFIC_EVENT);
break;
case RADAR_FAULT:
packet.setName(EventName.RADAR_FAULT);
break;
}
Object o = readReportObject(objectId, buf);
List<Object> args = new ArrayList<>();
args.add(o);
packet.setData(args);
break;
case HEART_BEAT:
packet = new Packet(PacketType.PING);
packet.setSubType(packetSubType);
packet.setObjectId(ObjectId.HEART_BEAT_FROM_RADAR);
break;
default:
break;
}
return packet;
}
总结
在使用Netty进行TCP数据传输时,由于TCP是一个面向流的协议,消息会被拆分成多个字节流进行发送,因此接收方收到消息时,可能会出现黏包和拆包现象。
黏包指的是接收方一次性收到了多个完整的消息,而拆包则是接收方收到了不完整的消息。这种现象的出现是由于TCP是面向流的、无边界的协议,不保留数据报的边界。
为了解决黏包和拆包问题,Netty提供了多种解决方法:
-
消息定长:即发送方发送的每个消息长度固定,接收方接收到固定长度的字节流后进行消息的解析,这个是一种简单有效的实现方法。但是存在一个问题,不同消息长度不同,如心跳消息和数据上报消息,消息定长则部分消息必须填充补偿,显得浪费带宽。
-
消息分隔符:发送方在每个消息后添加特殊的分隔符,接收方根据分隔符对消息进行解码和拆分。使用这种实现方式时,要确保分隔符是唯一的,不会和消息内容重复,造成错误分隔。
-
消息头部标识:发送方在消息头部加入固定长度的表示消息长度的字段,接收方根据消息长度字段对消息进行解码和拆分。这是一种比较通用的实现,消息头部可以加一些标识、负载长度、校验位等,就可以正确识别到包头位置和对数据完整性进行校验。
-
基于Netty编解码器:Netty提供了一系列编解码器,可以自定义编解码器来控制消息的长度和格式。我们按照自己的诗句情况进行参数配置后,就可以实现自动的黏包和拆包处理。
总的来说,Netty提供了多种方式来解决黏包和拆包问题,我们可以根据业务需求选择合适的方式进行实现。只要双方约定了通信协议且严格按照协议发送数据,并且代码已经处理了黏包拆包,数据解析应该就没有问题。