Netty Review - Netty自动重连机制揭秘:原理与最佳实践

文章目录

  • 概述
  • Pre
  • 客户端自动重连
  • Code
    • Server
    • Client (重点)
  • 测试
    • 启动自动重连
    • 运行过程中断链后的自动重连

在这里插入图片描述


概述

在这里插入图片描述


Pre

Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析


客户端自动重连

自动重连是一个用于提高网络应用稳定性和可靠性的功能。当客户端与服务器之间的连接意外断开时,客户端可以自动尝试重新连接到服务器,以确保数据的正常传输。

自动重连是指在网络通信中,当客户端与服务器之间的连接由于某种原因断开时,客户端能够自动尝试重新建立连接的机制。这是一种用于提高网络应用稳定性和可靠性的功能。

具体来说,当客户端检测到与服务器的连接中断时,它会自动发起新的连接尝试,以确保数据的正常传输。这对于处理网络不稳定性、临时断开或服务器重新启动等情况非常重要,可以减少用户干预,提升应用的用户体验。


Code

Server

package com.artisan.reconnect;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ArtisanNettyServer {

    public static void main(String[] args) throws Exception {
        // 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
        // bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(8);
        try {
            // 创建服务器端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 使用链式编程来配置参数
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    // 使用NioServerSocketChannel作为服务器的通道实现
                    .channel(NioServerSocketChannel.class)
                    // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
                    // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //对workerGroup的SocketChannel设置处理器
                            //ch.pipeline().addLast(new LifeCycleInBoundHandler());
                            ch.pipeline().addLast(new ArtisanNettyServerHandler());
                        }
                    });
            System.out.println("netty server start。。");
            // 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
            // 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
            ChannelFuture cf = bootstrap.bind(9000).sync();
            // 给cf注册监听器,监听我们关心的事件
            /*cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("监听端口9000成功");
                    } else {
                        System.out.println("监听端口9000失败");
                    }
                }
            });*/
            // 等待服务端监听端口关闭,closeFuture是异步操作
            // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  1. EventLoopGroup:Netty使用EventLoopGroup来处理事件循环和IO操作。这里创建了两个EventLoopGroup,一个用于处理连接请求(bossGroup),另一个用于处理实际的业务逻辑(workerGroup)。
  2. ServerBootstrap:这是Netty的另一个核心组件,用于配置和初始化服务器。
  3. ChannelFuture:这是一个异步结果对象,用于表示通道操作的结果。
  4. ChannelInitializer:这是一个用于初始化新连接的处理器。
  5. ArtisanNettyServerHandler:这应该是一个自定义的处理类,用于处理业务逻辑,下文给出。
  6. bind()closeFuture()bind()方法用于启动服务器并绑定端口,closeFuture()用于等待服务器通道关闭。
  7. finally块:这里确保在服务器启动失败或成功后,EventLoopGroup会被优雅地关闭,以释放资源。

package com.artisan.reconnect;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 * @Description: 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)
 */
public class ArtisanNettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取客户端发送的数据
     *
     * @param ctx 上下文对象, 含有通道channel,管道pipeline
     * @param msg 就是客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取线程 " + Thread.currentThread().getName());
        //Channel channel = ctx.channel();
        //ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
        //将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 数据读取完毕处理方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }

    /**
     * 处理异常, 一般是需要关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

定义了一个自定义的Netty服务器处理器ArtisanNettyServerHandler,它继承自ChannelInboundHandlerAdapter。这个处理器包含了几个重要的方法来处理客户端的请求和响应:

  1. channelRead(ChannelHandlerContext ctx, Object msg):当服务器从客户端接收到数据时,这个方法会被调用。在这个方法中,你可以编写处理客户端发送的数据的逻辑。在这个例子中,它简单地打印了接收到的消息内容。
  2. channelReadComplete(ChannelHandlerContext ctx):这个方法在channelRead方法执行完成后被调用。在这个方法中,你可以发送响应给客户端。在这个例子中,它发送了一个简单的"HelloClient"消息给客户端。
  3. exceptionCaught(ChannelHandlerContext ctx, Throwable cause):这个方法在出现异常时被调用。在这个方法中,你可以编写异常处理的逻辑。在这个例子中,它简单地关闭了通道。

Client (重点)

这段代码是一个使用Netty框架的简单客户端示例,它实现了重连功能。以下是这段代码的中文注释解释:

package com.artisan.reconnect;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.TimeUnit;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 * @Description: 实现了重连的客户端
 */
public class ArtisanNettyClient {
    private String host;
    private int port;
    private Bootstrap bootstrap;
    private EventLoopGroup group;
    public static void main(String[] args) throws Exception {
        ArtisanNettyClient artisanNettyClient = new ArtisanNettyClient("localhost", 9000);
        artisanNettyClient.connect();
    }
    public ArtisanNettyClient(String host, int port) {
        this.host = host;
        this.port = port;
        init();
    }
    private void init() {
        // 客户端需要一个事件循环组
        group = new NioEventLoopGroup();
        // 创建客户端启动对象
        // bootstrap 可重用, 只需在NettyClient实例化的时候初始化即可.
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 加入处理器
                        ch.pipeline().addLast(new ArtisanNettyClientHandler(ArtisanNettyClient.this));
                    }
                });
    }
    public void connect() throws Exception {
        System.out.println("netty client start。。");
        // 启动客户端去连接服务器端
        ChannelFuture cf = bootstrap.connect(host, port);
        cf.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    // 重连交给后端线程执行
                    future.channel().eventLoop().schedule(() -> {
                        System.err.println("重连服务端...");
                        try {
                            connect();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }, 3000, TimeUnit.MILLISECONDS);
                } else {
                    System.out.println("服务端连接成功...");
                }
            }
        });
        // 对通道关闭进行监听
        cf.channel().closeFuture().sync();
    }
}

这段代码定义了一个名为ArtisanNettyClient的类,它包含了客户端的初始化和连接逻辑。

  • EventLoopGroup:Netty使用EventLoopGroup来处理事件循环和IO操作。这里创建了一个NioEventLoopGroup,用于处理客户端的IO操作。
  • Bootstrap:这是Netty的另一个核心组件,用于配置和初始化客户端。
  • ChannelFuture:这是一个异步结果对象,用于表示通道操作的结果。
  • connect()方法:这个方法用于启动客户端并连接到服务器。如果连接失败,它将使用schedule方法在3秒后重试连接。
  • ArtisanNettyClientHandler:这应该是一个自定义的处理类,用于处理业务逻辑,但在这段代码中没有给出具体实现。
  • init()方法:这个方法用于初始化客户端的BootstrapEventLoopGroup
  • operationComplete()方法:这是ChannelFutureListener的回调方法,用于处理连接操作的结果。如果连接失败,它将安排重试连接。如果连接成功,它将打印成功消息。
  • closeFuture().sync():这个方法用于等待客户端通道关闭,确保在客户端关闭之前完成所有必要的清理工作。

这个示例中,客户端将尝试连接到指定的服务器地址和端口,如果连接失败,它将自动重试连接。


package com.artisan.reconnect;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ArtisanNettyClientHandler extends ChannelInboundHandlerAdapter {
    private ArtisanNettyClient artisanNettyClient;
    public ArtisanNettyClientHandler(ArtisanNettyClient artisanNettyClient) {
        this.artisanNettyClient = artisanNettyClient;
    }
    /**
     * 当客户端连接服务器完成就会触发该方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 创建一个ByteBuf,包含"HelloServer"的字符串
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
        // 向服务器发送消息
        ctx.writeAndFlush(buf);
    }
    /**
     * 当通道有读取事件时会触发,即服务端发送数据给客户端
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 获取消息内容
        ByteBuf buf = (ByteBuf) msg;
        // 打印服务端发送的消息
        System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
        // 打印服务端的地址
        System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
    }
    /**
     * 当通道处于不活动状态时调用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 打印提示信息
        System.err.println("运行中断开重连。。。");
        // 调用客户端的connect方法进行重连
        artisanNettyClient.connect();
    }
    /**
     * 当捕获到异常时调用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 打印异常堆栈信息
        cause.printStackTrace();
        // 关闭通道
        ctx.close();
    }
}

这个处理类包含了一些基本的方法来处理通道的激活、读取、不活动状态和异常。以下是每个方法的简要说明:

  • channelActive():当客户端成功连接到服务器时,这个方法会被调用,并向服务器发送一条消息。
  • channelRead():当客户端从服务器接收到消息时,这个方法会被调用,并打印出接收到的消息内容和服务器的地址。
  • channelInactive()当通道不再活跃时(例如,连接被断开),这个方法会被调用,并尝试重新连接服务器。
  • exceptionCaught():当捕获到异常时,这个方法会被调用,并打印异常的堆栈跟踪信息,然后关闭通道。

这个处理类是客户端逻辑的一部分,它负责处理客户端与服务器之间的交互。


测试

启动自动重连

先启动客户端哈(务必) , 再启动服务端,来验证下 客户端的自动重连 。

起客户端,不起服务端

在这里插入图片描述

起服务端

在这里插入图片描述


运行过程中断链后的自动重连

系统运行过程中网络故障或服务端故障,导致客户端与服务端断开连接了也需要重连,可以在客户端处理数据的Handler的channelInactive方法中进行重连。

运行过程中,请将服务端的连接断开,过一会儿再启动,验证客户端在运行过程中的自动重连

断开服务端

在这里插入图片描述

恢复服务端

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/267167.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

OpenSource - SCM服务管理平台

文章目录 官方网址文档下载版本功能解决了哪些问题使用对象优势Linxu版本scm-dev deb服务列表 Windows版本scm-dev 服务列表scm-all 服务列表scm-jdk 服务列表scm-springboot 精简版本服务列表scm-springboot 服务列表scm-tomcat 服务列表 SCM 截图 官方网址 https://scm.chus…

如何更好的去理解源码

前言 这篇文章我准备来聊一聊如何去阅读开源项目的源码。 在聊如何去阅读源码之前&#xff0c;先来简单说一下为什么要去阅读源码&#xff0c;大致可分为以下几点原因&#xff1a; 最直接的原因&#xff0c;就是面试需要&#xff0c;面试喜欢问源码&#xff0c;读完源码才可以…

代码随想录-刷题第三十六天

435. 无重叠区间 题目链接&#xff1a;435. 无重叠区间 思路&#xff1a;本题与452. 用最少数量的箭引爆气球非常像&#xff0c;弓箭的数量就相当于是非交叉区间的数量&#xff0c;只要把弓箭那道题目代码里射爆气球的判断条件加个等号&#xff08;认为[0&#xff0c;1][1&am…

Kafka集群架构服务端核心概念

目录 Kafka集群选举 controller选举机制 Leader partition选举 leader partition自平衡 partition故障恢复机制 follower故障 leader故障 HW一致性保障 HW同步过程 Epoch Kafka集群选举 1. 在多个broker中, 需要选举出一个broker, 担任controller. 由controller来管理…

深入理解 Git 分支管理:提升团队协作与开发效率

目录 前言1 什么是分支2 分支的好处2.1 并行开发的支持2.2 独立性与隔离性2.3 灵活的版本控制2.4 提高安全性和代码质量2.5 项目历史的清晰记录 3 Git 分支操作命令3.1 git branch -v3.2 git branch 分支名称3.3 git checkout 分支名称3.4 git merge 分支名称3.5 git rebase 分…

RabbitMQ的概念与使用

什么是MQ&#xff1f; MQ 是消息队列&#xff08;Message Queue&#xff09;的简称。消息队列是一种应用程序间通信的方式&#xff0c;用于在不同的应用程序之间传递消息。它通过解耦发送者和接收者之间的直接依赖关系&#xff0c;提供了一种异步、可靠的消息传递机制。 什么是…

爬虫是什么?起什么作用?

【爬虫】 如果把互联网比作一张大的蜘蛛网&#xff0c;数据便是放于蜘蛛网的各个节点&#xff0c;而爬虫就是一只小蜘蛛&#xff0c;沿着网络抓取自己得猎物&#xff08;数据&#xff09;。这种解释可能更容易理解&#xff0c;官网的&#xff0c;就是下面这个。 爬虫是一种自动…

红日靶场-2

目录 前言 外网渗透 外网渗透打点 1、arp探测 2、nmap探测 3、nikto探测 4、gobuster目录探测 WebLogic 10.3.6.0 1、版本信息 2、WeblogicScan扫描 3、漏洞利用 4、哥斯拉连接 内网渗透 MSF上线 1、反弹连接 2、内网扫描 3、frpc内网穿透 4、ms17-010 5、ge…

第十三章 常用类(包装类和 String 相关类)

一、包装类 1. 包装类的分类 &#xff08;1&#xff09;针对八种基本数据类型相应的引用类型—包装类 &#xff08;2&#xff09;有了类的特点&#xff0c;就可以调用类中的方法。 2. 包装类和基本数据类型的转换 &#xff08;1&#xff09;jdk5 前的手动装箱和拆箱方式 publ…

Unity预设体

目录 预设体是什么&#xff1f; 如何创建预设体&#xff1f; 如何修改预设体&#xff1f; 如何删除预设体&#xff1f; 预设体是什么&#xff1f; Unity中的预设体&#xff08;Prefab&#xff09;是一种可重复使用的游戏对象模板。它允许开发者创建一个或多个游戏对象&…

模型评估系列:回归模型的评估指标介绍和代码实践

文章目录 1. 简介2. 回归评估指标2.1 平均绝对误差&#xff08;MAE&#xff09;2.2 均方误差&#xff08;MSE&#xff09;2.3 均方根误差&#xff08;RMSE&#xff09;2.4 R平方&#xff08;决定系数&#xff09;2.5 调整后的R平方2.6 交叉验证的R22.7 回归评估指标 - 结论 3 设…

OpenCV-10mat的深浅拷贝

一.Mat介绍 mat是OpenCV是在C语言用来表达图像数据的一种数据结构&#xff0c;在Python转换为numpy的ndarray. mat是由header和date组成&#xff0c;header中记录了图片的维数、大小、数据类型等信息. 例如&#xff1a;cv2.imshow&#xff08;winname&#xff0c; mat&#…

基于Boosting的力扣题目建模分析

基于Boosting的力扣题目建模分析 1 背景介绍2 数据说明3 描述性分析3.1 分类问题描述性分析3.2 回归问题描述性分析 4 建模分析4.1 Boosting概述4.2 AdaBoost算法4.2.1 算法概述4.2.2 算法实现 4.3 提升树算法4.3.1 算法概述4.3.2 算法实现 5 总结 1 背景介绍 随着大数据、人工…

2024 年全球顶级的 4 款 PDF 转换器软件

PDF 是一种广泛使用的共享文档和文件的格式。但是&#xff0c;有时您可能需要将 PDF 文件转换为其他格式&#xff08;例如 Word 或 Excel&#xff09;&#xff0c;以便编辑或操作内容。这就是 PDF 转换器软件派上用场的地方。 有许多 PDF 转换器软件可供选择&#xff0c;有免费…

day06

文章目录 一、流程控制1. 作用2. 分类1&#xff09;顺序结构2&#xff09;选择结构1. if语句2. switch语句 3&#xff09;循环结构 二、函数1. 作用2. 语法3. 使用4. 匿名函数5. 作用域 一、流程控制 1. 作用 控制代码的执行顺序 2. 分类 1&#xff09;顺序结构 从上到下依…

openGauss学习笔记-170 openGauss 数据库运维-备份与恢复-导入数据-更新表中数据-使用合并方式更新和插入数据

文章目录 openGauss学习笔记-170 openGauss 数据库运维-备份与恢复-导入数据-更新表中数据-使用合并方式更新和插入数据170.1 前提条件170.2 操作步骤 openGauss学习笔记-170 openGauss 数据库运维-备份与恢复-导入数据-更新表中数据-使用合并方式更新和插入数据 在用户需要将…

T-Dongle-S3开发板信息

相关学习网站 ESP32保姆级教程开始学习ESP32_哔哩哔哩_bilibili Wokwi - Online ESP32, STM32, Arduino Simulator T-Dongle-S3 资料:https://spotpear.cn/index/study/detail/id/1069.html 主控芯片&#xff1a; ESP32-S3 Xtensa 芯片集成了 Xtensa 32 位 LX7 双核处理器…

数据校园服务管理系统,教育平台可视化界面(教育资源信息化PS文件)

大屏组件可以让UI设计师的工作更加便捷&#xff0c;使其更高效快速的完成设计任务。现分享大数据校园服务管理系统、科技教育平台大数据可视化界面、教育资源信息化大数据分析等Photoshop源文件&#xff0c;文末提供完整资料&#xff0c;供UI设计师们工作使用。 若需其他 大屏…

手机无人直播的兴起

近年来&#xff0c;随着科技的不断进步和智能手机的普及&#xff0c;手机无人直播成为了一种新兴的传媒方式。手持手机&#xff0c;不经过镜头操作人员的干预&#xff0c;通过直播平台实时分享自己的所见所闻&#xff0c;成为了越来越多人的选择。手机无人直播的盛行离不开以下…

dubbo-admin连接虚拟机中的zookeeper报错zookeeper not connected

目录 前言 解决过程 总结 前言 可以优先查看总结看能否解决大家的问题&#xff0c;如果不能解决不需要查看解决过程浪费时间了。 解决过程 该问题卡住我很久&#xff0c;网上大多数文章都是修改配置文件中的连接超时时间&#xff0c;即修改如下内容 dubbo.registry.tim…