Netty通信中的粘包半包问题(五)

这期我们来分析下消息头+消息体的这种方式来实现完美的解决方案,当然这也是最复杂的一种实现,因为在大多数场景中,性能和复杂度始终不能兼得。代码中使用了MessagePack的第三方序列化,因为我们要传输的实体类对象在客户端和服务端之间要经过序列化和反序列化

        <dependency>
            <groupId>org.msgpack</groupId>
            <artifactId>msgpack</artifactId>
            <version>0.6.12</version>
        </dependency>

1.实体类

package serializable.msgpack;

import org.msgpack.annotation.Message;

@Message // MessagePack提供的注解,表明这是一个需要序列化的实体类
public class User {
    
    private String id;
    
    private String userName;
    
    private int age;
    
    private UserContact userContact;
    
    public User(String userName, int age, String id) {
        this.userName = userName;
        this.age = age;
        this.id = id;
    }
    
    public User() {
        
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public UserContact getUserContact() {
        return userContact;
    }

    public void setUserContact(UserContact userContact) {
        this.userContact = userContact;
    }

    @Override
    public String toString() {
        return "User{" +
                "id='" + id + '\'' +
                ", userName='" + userName + '\'' +
                ", age=" + age +
                ", userContact=" + userContact +
                '}';
    }
}

package serializable.msgpack;

import org.msgpack.annotation.Message;

@Message // MessagePack提供的注解,表明这是一个需要序列化的实体类
public class UserContact {
    
    private String mail;
    
    private String phone;
    
    public UserContact() {
        
    }
    
    public UserContact (String mail, String phone) {
        this.mail = mail;
        this.phone = phone;
    }

    public String getMail() {
        return mail;
    }

    public void setMail(String mail) {
        this.mail = mail;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    @Override
    public String toString() {
        return "UserContact{" +
                "mail='" + mail + '\'' +
                ", phone='" + phone + '\'' +
                '}';
    }
}


2.Server

package serializable.msgpack;

import constant.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import java.net.InetSocketAddress;

public class ServerMsgPackEcho {

    public static void main(String[] args) throws InterruptedException {
        ServerMsgPackEcho serverMsgPackEcho = new ServerMsgPackEcho();
        System.out.println("服务器即将启动....");
        serverMsgPackEcho.start();
    }
    
    
    public void start() throws InterruptedException {
        MsgPackServerHandler serverHandler = new MsgPackServerHandler();
        // 线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 服务端启动必须
            ServerBootstrap b = new ServerBootstrap();
            b.group(group) // 将线程组传入
                    .channel(NioServerSocketChannel.class) // 指定使用NIO进行网络传输
                    .localAddress(new InetSocketAddress(Constant.DEFAULT_PORT)) // 指定服务器监听端口
                    // 服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel
                    // 所以下面这段代码地作用就是为这个子channel增加handler
                    .childHandler(new ChannelInitializerImp());
            // 异步绑定到服务器, sync()会阻塞直到成功
            ChannelFuture f = b.bind().sync();
            // 给绑定事件增加一个监听器
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    System.out.println("绑定端口成功.....");
                }
            });

            System.out.println("服务器启动完成,等待客户端的连接和数据....");
            // 阻塞直到服务器的channel关闭
            ChannelFuture closeFuture = f.channel().closeFuture().sync();
            // 给关闭事件增加一个监听器
            closeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    System.out.println("服务器已关闭");
                }
            });

        } finally {
            // 优雅地关闭线程组
            group.shutdownGracefully().sync();
        }
    }
    
    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,
                    0,
                    2,
                    0,
                    2));
            
            ch.pipeline().addLast(new MsgPackDecoder());
            ch.pipeline().addLast(new MsgPackServerHandler());
        }
    }
}

package serializable.msgpack;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.atomic.AtomicInteger;

public class MsgPackServerHandler extends ChannelInboundHandlerAdapter {
    
    private AtomicInteger counter = new AtomicInteger(0);

    // 服务端读取到网络数据后的处理
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 将上一个handler生成的数据强制转型
        User user = (User)msg;
        System.out.println("Server Accept [" + user +
                "] and the counter is :" +counter.incrementAndGet());
        // 服务器的应答
        String resp = "I process user :" + user.getUserName() 
                + System.getProperty("line.separator");
        
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
        ctx.fireChannelRead(user);
                
    }

    // 发生异常后的处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//        super.exceptionCaught(ctx, cause);
        cause.printStackTrace();
        ctx.close();
    }
}

3.Client

package serializable.msgpack;

import constant.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.LineBasedFrameDecoder;

import java.net.InetSocketAddress;

public class ClientMsgPackEcho {
    
    private final String host;
    
    public ClientMsgPackEcho(String host) {
        this.host = host;
    }
    
    public void start() throws InterruptedException {
        // 线程组
        EventLoopGroup group = new NioEventLoopGroup();
        
        try {
            // 客户端启动必须
            Bootstrap b = new Bootstrap();
            b.group(group) // 将线程组传入
                    .channel(NioSocketChannel.class) // 指定使用NIO进行网络传输
                    // 配置需要连接的服务器的ip地址和端口
                    .remoteAddress(new InetSocketAddress(host, Constant.DEFAULT_PORT))
                    
                    .handler(new ChannelInitializerImp());

            ChannelFuture f = b.connect().sync();
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    System.out.println("operationComplete..... ");
                }
            });
            System.out.println("已连接到服务器");

            ChannelFuture closeFuture = f.channel().closeFuture().sync();
            closeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    System.out.println("已关闭....");
                }
            });
        } finally {
            group.shutdownGracefully().sync();
        }
    }
    
    
    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            // 告诉Netty,计算一下报文的长度,然后作为报文头加在前面
            ch.pipeline().addLast(new LengthFieldPrepender(2));
            // 对服务器的应答也要解码,解决粘包半包
            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
            
            // 对我们要发送的数据做编码 -- 序列化
            ch.pipeline().addLast(new MsgPackEncode());
            ch.pipeline().addLast(new MsgPackClientHandler(5));
            
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new ClientMsgPackEcho(Constant.DEFAULT_SERVER_IP).start();
    }
    
}

package serializable.msgpack;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

public class MsgPackClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
    private final int sendNumber;
    
    public MsgPackClientHandler(int sendNumber) {
        this.sendNumber = sendNumber;
    }
    
    private AtomicInteger counter = new AtomicInteger(0);
    
    // 客户端读取到网络数据后的处理
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("client Accept [" + msg.toString(CharsetUtil.UTF_8) +
                " ] and the counter is :" + counter.incrementAndGet());
    }

    // 客户端被通知channel活跃后,做事
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//        super.channelActive(ctx);
        User[] users = makeUsers();
        // 发送数据
        for (User user : users) {
            System.out.println("Send user :" + user);
            ctx.write(user);
        }
        
        ctx.flush();
    }

    // 发生异常后的处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//        super.exceptionCaught(ctx, cause);
        cause.printStackTrace();
        ctx.close();
    }
    
    // 生成用户实体类的数组,以供发送
    private User[] makeUsers() {
        User[] users = new User[sendNumber];
        
        User user = null;
        for (int i = 0; i < sendNumber; i++) {
            user = new User();
            user.setAge(i);
            // 这里为了方便演示,我采用了一个随机数,用来表示这个报文不是定长的,而是可变的
            String userName = "ABCEDFG---->" + getSpecialSymbol() + i;
            user.setUserName(userName);
            user.setId("No:" + (sendNumber - i));
            user.setUserContact( new UserContact(userName + "@cover.com", "133"));
            users[i] = user;
        }
        
        return users;
    }
    
    private String getSpecialSymbol() {
        Random random = new Random();

        int count = random.nextInt(10);
        StringBuilder stringBuilder = new StringBuilder();
        for (int i = 0; i < count; i++) {
            stringBuilder.append("*");
        }
        
        return stringBuilder.toString();
    }
}

4.编解码器

package serializable.msgpack;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;
import org.msgpack.annotation.Message;

import java.util.List;

/**
 * 基于MessagePack的解码器,反序列化
 */
public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
                          List<Object> out) throws Exception {
        int length = msg.readableBytes();
        byte[] array = new byte[length];
        msg.getBytes(msg.readerIndex(), array, 0, length);
        
        MessagePack messagePack = new MessagePack();
        out.add(messagePack.read(array, User.class));
    }
}

package serializable.msgpack;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;

/**
 * 基于MessagePack的编码,序列化
 */
public class MsgPackEncode extends MessageToByteEncoder<User> {
    @Override
    protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception {

        MessagePack messagePack = new MessagePack();
        byte[] raw = messagePack.write(msg);
        out.writeBytes(raw);
    }
}

5.结果展示

在这里插入图片描述
在这里插入图片描述
消息头+消息体这种机制相当于在客户端在往服务端发送数据时,在消息头中提前计算出消息体的报文大小,并将其嵌入到消息头中,当服务端收到这个报文时,会先解析消息头,如果产生了半包,服务端将会与消息头中的报文长度进行校验,并将其收到的半包数据暂存在缓冲区中,直到发送到完成的报文时才会进行处理,细心的小伙伴已经发现了,核心组件在于下图的位置,里面的参数也是极为复杂,稍有不慎可能就会发生奇奇怪怪的问题
在这里插入图片描述
对于LengthFieldBasedFrameDecoder这个类,注释就写了150多行,并且还参杂着图画,下一期我们来着重讲解这里面的具体参数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/333624.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

从matlab的fig图像文件中提取数据

这里用的是openfig&#xff08;&#xff09;函数打开的fig文件 →→→【matlab 中 fig 数据提取】 很简洁 →→→【MATLAB提取 .fig 文件中的数据】 这个给出了包含多个曲线的情况 →→→【提取matlab fig文件里的数据和legend】 chatgpt给出的方法 打开fig文件并保存数据 我的…

Git 基本命令与操作流

记录 Git 中的基本命令和创建仓库、提交文件、删除文件等方面的操作 Git 基本命令 git status&#xff1a;查看状态 nothing to commit, working directory clean&#xff1a;所有已跟踪文件在上次提交后都未被更改过&#xff0c;或者说当前目录下没有出现任何处于未跟踪状态…

Java 类与对象(对象的分配机制、对象的创建过程、匿名对象)

面向对象 面向对象编程&#xff08;Object-Oriented Programming 简称 OOP&#xff09;是一种程序设计思想和编码架构。 Java 是完全面向对象的&#xff0c; 必须熟悉 OOP 才能够编写 Java 程序。 面向对象的程序是由对象组成的&#xff0c;每个对象包含对用户公开的特定功能…

基于springboot+vue的蜗牛兼职网的设计与实现系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目背景…

力扣每日一练(24-1-18)

经验一&#xff1a;不要把问题想复杂 Python&#xff1a; min_price float(inf)max_profit 0for price in prices:min_price min(min_price, price)max_profit max(max_profit, price - min_price)return max_profit C#&#xff1a; public int MaxProfit(int[] prices) {i…

GNU Radio简介及流程图搭建

文章目录 前言一、GNU Radio 是什么&#xff1f;二、GNU Radio 安装三、搭建第一个流程图1、创建 GRC 文件2、添加块3、运行流程图 前言 欢迎来到无线通信的世界&#xff0c;初步接触 GNU Radio&#xff0c;对其学习进行一个记录。 一、GNU Radio 是什么&#xff1f; GNU Rad…

【重点!!!】【背包】【回溯】518.零钱兑换II

题目 跟39.组合总数、322.零钱兑换题目很类似。 法1&#xff1a;背包DP&#xff0c;最优解法 解释如下&#xff1a; 0 1 2 3 4 5(背包容量)1 0 0 0 0 0 没有硬币的时候&#xff09; 0 1 2 3 4 5(背包容量) 1 1 1 1 1 1 1 0 1 2 3 4 5(背包容量) 1 …

深入解析 Java 方法引用:Lambda 表达式的进化之路

前言 方法引用是 Java 8 提供的一种新特性&#xff0c;它允许我们更简洁地传递现有方法作为参数。这项特性实际上是对 Lambda 表达式的一种补充&#xff0c;通过方法引用&#xff0c;我们可以直接引用现有方法&#xff0c;而无需编写完整的Lambda表达式。最近在使用方法引用的…

Spring(19) ThreadPoolTaskExecutor 线程池的使用

目录 一、线程池简介1.1 为什么使用线程池1.2 线程池为什么需要使用队列1.3 线程池为什么要使用阻塞队列而不是用非阻塞队列1.4 如何配置线程池1.5 execute() 和 submit() 方法 二、ThreadPoolTaskExecutor 线程池简介2.1 简介2.2 核心参数配置2.3 ThreadPoolTaskExecutor 内部…

TortoiseSVN客户端如何安装配置并实现公网访问服务端提交文件到本地服务器

文章目录 前言1. TortoiseSVN 客户端下载安装2. 创建检出文件夹3. 创建与提交文件4. 公网访问测试 前言 TortoiseSVN是一个开源的版本控制系统&#xff0c;它与Apache Subversion&#xff08;SVN&#xff09;集成在一起&#xff0c;提供了一个用户友好的界面&#xff0c;方便用…

CSV文件中json列的处理2

如上所示&#xff0c;csv文件中包含以中括号{}包含的json字段&#xff0c;可用如下方法提取&#xff1a; import pandas as pd from datetime import date todaystr(date.today()) import jsonfilepath/Users/kangyongqing/Documents/kangyq/202401/调课功能使用统计/ file104…

顶顶通呼叫中心中间件如何实现自己呼叫自己并且放音:一步步配置(mod_cti基于FreeSWITCH)

介绍 顶顶通呼叫中心中间件如何实现自己呼叫自己并且放音&#xff1a;一步步配置 一、配置acl.conf 打开ccadmin-》点击配置文件并且打开acl.conf-》配置好了点击提交XML。 注意&#xff1a;acl.conf的服务器IP必须是内网IP 添加了之后在运维调试输入reloadacl 在运维调试执…

对象存储, 开源MinIO docker-compose.yml 文件

文章目录 python SDK 文档地址&#xff1a;docker-compose.yml 文件控制台使用&#xff1a;应用服务中使用样例&#xff1a; python SDK 文档地址&#xff1a; https://min.io/docs/minio/linux/developers/python/API.html docker-compose.yml 文件 version: 3services:min…

优化微信小程序更新体验:异步更新与强制更新方案解析

在微信小程序的开发和迭代过程中&#xff0c;新版本覆盖率的问题一直备受关注。由于小程序采用异步更新机制&#xff0c;在用户首次打开或冷启动时才会检查并下载新版本&#xff0c;导致部分用户无法及时应用上最新版本。为了解决这一问题&#xff0c;微信团队经过深入研究和讨…

Prometheus 监控容器

容器监控&#xff1a;cAdvisor Docker是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的容器中&#xff0c;然后发布到任何流行的Linux/Windows/Mac机器上。容器镜像正成为一个新的标准化软件交付方式。 例如&#xff0c;可以通过以下…

NebulaGraph7 种查询(关键词、向量、混合检索),Graph RAG 探索知识图谱

NebulaGraph7 种查询&#xff08;关键词、向量、混合检索&#xff09;&#xff0c;Graph RAG 探索知识图谱 1.架构思路 如果你熟悉知识图谱和图数据库 NebulaGraph&#xff0c;可以直接跳到 “RAG 具体实现” 章节。如果你不熟悉 NebulaGraph&#xff0c;请继续往下读。 什么…

INS-06003错误处理

在麒麟V10操作系统上安装Oracle RAC 19C&#xff0c;安装GI的建立互信步骤中&#xff0c;遇到INS-06003错误&#xff1a; [INS-06003] Failed to setup password SSH connectivity with following node(s) 查看详细信息&#xff1a; PRVG-11001: PRCZ-2136: PRCZ-2006: 此时在操…

链动2+1模式:月流水6000万是怎么做到的?

一个好的企业往往只需要最简单的营销方式。当我们面对当今的商业市场&#xff0c;琳琅满目的商业模式&#xff0c;应接不暇的营销方案&#xff0c;我们一定会举足无措的不知道怎么选择。因为一个好的公司或企业&#xff0c;一定要有一个十分经得起推敲的模式来面对消费者。 那么…

基于Java开发的智慧养老管理系统详细设计和实现【附源码】

基于Java开发的智慧养老管理系统详细设计和实现【附源码】 &#x1f345; 作者主页 央顺技术团队 &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; &#x1f345; 文末获取源码联系方式 &#x1f4dd; &#x1f345; 查看下方微信号获取联系方式 承接各种定制…

C++系列-第1章顺序结构-9-字符类型char

在线练习&#xff1a; http://noi.openjudge.cn/ https://www.luogu.com.cn/ 总结 本文是C系列博客&#xff0c;主要讲述字符类型char 字符类型char 在C编程语言中&#xff0c;char是一种基本的数据类型&#xff0c;它用于存储单个字符。字符可以是字母、数字、标点符号或者…