文章目录
- 概述
- Pre
- 客户端自动重连
- Code
- Server
- Client (重点)
- 测试
- 启动自动重连
- 运行过程中断链后的自动重连
概述
Pre
Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析
客户端自动重连
自动重连是一个用于提高网络应用稳定性和可靠性的功能。当客户端与服务器之间的连接意外断开时,客户端可以自动尝试重新连接到服务器,以确保数据的正常传输。
自动重连是指在网络通信中,当客户端与服务器之间的连接由于某种原因断开时,客户端能够自动尝试重新建立连接的机制。这是一种用于提高网络应用稳定性和可靠性的功能。
具体来说,当客户端检测到与服务器的连接中断时,它会自动发起新的连接尝试,以确保数据的正常传输。这对于处理网络不稳定性、临时断开或服务器重新启动等情况非常重要,可以减少用户干预,提升应用的用户体验。
Code
Server
package com.artisan.reconnect;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class ArtisanNettyServer {
public static void main(String[] args) throws Exception {
// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程来配置参数
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
// 使用NioServerSocketChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对workerGroup的SocketChannel设置处理器
//ch.pipeline().addLast(new LifeCycleInBoundHandler());
ch.pipeline().addLast(new ArtisanNettyServerHandler());
}
});
System.out.println("netty server start。。");
// 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
// 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture cf = bootstrap.bind(9000).sync();
// 给cf注册监听器,监听我们关心的事件
/*cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});*/
// 等待服务端监听端口关闭,closeFuture是异步操作
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
EventLoopGroup
:Netty使用EventLoopGroup
来处理事件循环和IO操作。这里创建了两个EventLoopGroup
,一个用于处理连接请求(bossGroup
),另一个用于处理实际的业务逻辑(workerGroup
)。ServerBootstrap
:这是Netty的另一个核心组件,用于配置和初始化服务器。ChannelFuture
:这是一个异步结果对象,用于表示通道操作的结果。ChannelInitializer
:这是一个用于初始化新连接的处理器。ArtisanNettyServerHandler
:这应该是一个自定义的处理类,用于处理业务逻辑,下文给出。bind()
和closeFuture()
:bind()
方法用于启动服务器并绑定端口,closeFuture()
用于等待服务器通道关闭。finally
块:这里确保在服务器启动失败或成功后,EventLoopGroup
会被优雅地关闭,以释放资源。
package com.artisan.reconnect;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
* @Description: 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)
*/
public class ArtisanNettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " + Thread.currentThread().getName());
//Channel channel = ctx.channel();
//ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
//将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕处理方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 处理异常, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
定义了一个自定义的Netty服务器处理器ArtisanNettyServerHandler
,它继承自ChannelInboundHandlerAdapter
。这个处理器包含了几个重要的方法来处理客户端的请求和响应:
channelRead(ChannelHandlerContext ctx, Object msg)
:当服务器从客户端接收到数据时,这个方法会被调用。在这个方法中,你可以编写处理客户端发送的数据的逻辑。在这个例子中,它简单地打印了接收到的消息内容。channelReadComplete(ChannelHandlerContext ctx)
:这个方法在channelRead方法执行完成后被调用。在这个方法中,你可以发送响应给客户端。在这个例子中,它发送了一个简单的"HelloClient"消息给客户端。exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
:这个方法在出现异常时被调用。在这个方法中,你可以编写异常处理的逻辑。在这个例子中,它简单地关闭了通道。
Client (重点)
这段代码是一个使用Netty框架的简单客户端示例,它实现了重连功能。以下是这段代码的中文注释解释:
package com.artisan.reconnect;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
* @Description: 实现了重连的客户端
*/
public class ArtisanNettyClient {
private String host;
private int port;
private Bootstrap bootstrap;
private EventLoopGroup group;
public static void main(String[] args) throws Exception {
ArtisanNettyClient artisanNettyClient = new ArtisanNettyClient("localhost", 9000);
artisanNettyClient.connect();
}
public ArtisanNettyClient(String host, int port) {
this.host = host;
this.port = port;
init();
}
private void init() {
// 客户端需要一个事件循环组
group = new NioEventLoopGroup();
// 创建客户端启动对象
// bootstrap 可重用, 只需在NettyClient实例化的时候初始化即可.
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 加入处理器
ch.pipeline().addLast(new ArtisanNettyClientHandler(ArtisanNettyClient.this));
}
});
}
public void connect() throws Exception {
System.out.println("netty client start。。");
// 启动客户端去连接服务器端
ChannelFuture cf = bootstrap.connect(host, port);
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 重连交给后端线程执行
future.channel().eventLoop().schedule(() -> {
System.err.println("重连服务端...");
try {
connect();
} catch (Exception e) {
e.printStackTrace();
}
}, 3000, TimeUnit.MILLISECONDS);
} else {
System.out.println("服务端连接成功...");
}
}
});
// 对通道关闭进行监听
cf.channel().closeFuture().sync();
}
}
这段代码定义了一个名为ArtisanNettyClient
的类,它包含了客户端的初始化和连接逻辑。
EventLoopGroup
:Netty使用EventLoopGroup
来处理事件循环和IO操作。这里创建了一个NioEventLoopGroup
,用于处理客户端的IO操作。Bootstrap
:这是Netty的另一个核心组件,用于配置和初始化客户端。ChannelFuture
:这是一个异步结果对象,用于表示通道操作的结果。connect()
方法:这个方法用于启动客户端并连接到服务器。如果连接失败,它将使用schedule
方法在3秒后重试连接。ArtisanNettyClientHandler
:这应该是一个自定义的处理类,用于处理业务逻辑,但在这段代码中没有给出具体实现。init()
方法:这个方法用于初始化客户端的Bootstrap
和EventLoopGroup
。operationComplete()
方法:这是ChannelFutureListener
的回调方法,用于处理连接操作的结果。如果连接失败,它将安排重试连接。如果连接成功,它将打印成功消息。closeFuture().sync()
:这个方法用于等待客户端通道关闭,确保在客户端关闭之前完成所有必要的清理工作。
这个示例中,客户端将尝试连接到指定的服务器地址和端口,如果连接失败,它将自动重试连接。
package com.artisan.reconnect;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class ArtisanNettyClientHandler extends ChannelInboundHandlerAdapter {
private ArtisanNettyClient artisanNettyClient;
public ArtisanNettyClientHandler(ArtisanNettyClient artisanNettyClient) {
this.artisanNettyClient = artisanNettyClient;
}
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 创建一个ByteBuf,包含"HelloServer"的字符串
ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
// 向服务器发送消息
ctx.writeAndFlush(buf);
}
/**
* 当通道有读取事件时会触发,即服务端发送数据给客户端
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 获取消息内容
ByteBuf buf = (ByteBuf) msg;
// 打印服务端发送的消息
System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
// 打印服务端的地址
System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
}
/**
* 当通道处于不活动状态时调用
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 打印提示信息
System.err.println("运行中断开重连。。。");
// 调用客户端的connect方法进行重连
artisanNettyClient.connect();
}
/**
* 当捕获到异常时调用
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 打印异常堆栈信息
cause.printStackTrace();
// 关闭通道
ctx.close();
}
}
这个处理类包含了一些基本的方法来处理通道的激活、读取、不活动状态和异常。以下是每个方法的简要说明:
channelActive()
:当客户端成功连接到服务器时,这个方法会被调用,并向服务器发送一条消息。channelRead()
:当客户端从服务器接收到消息时,这个方法会被调用,并打印出接收到的消息内容和服务器的地址。channelInactive()
:当通道不再活跃时(例如,连接被断开),这个方法会被调用,并尝试重新连接服务器。exceptionCaught()
:当捕获到异常时,这个方法会被调用,并打印异常的堆栈跟踪信息,然后关闭通道。
这个处理类是客户端逻辑的一部分,它负责处理客户端与服务器之间的交互。
测试
启动自动重连
先启动客户端哈(务必) , 再启动服务端,来验证下 客户端的自动重连 。
起客户端,不起服务端
起服务端
运行过程中断链后的自动重连
系统运行过程中网络故障或服务端故障,导致客户端与服务端断开连接了也需要重连,可以在客户端处理数据的Handler的channelInactive
方法中进行重连。
运行过程中,请将服务端的连接断开,过一会儿再启动,验证客户端在运行过程中的自动重连
断开服务端
恢复服务端