文章目录
- 前言
- 一、超时监测
- 二、IdleStateHandler类
- 三、ReadTimeoutHandler类
- 四、WriteTimeoutHandler类
- 五、实现心跳机制
- 5.1. 定义心跳处理器
- 5.2. 定义 ChannelInitializer
- 5.3. 编写服务器
- 5.4. 测试
- 结语
前言
回顾Netty系列文章:
- Netty 概述(一)
- Netty 架构设计(二)
- Netty Channel 概述(三)
- Netty ChannelHandler(四)
- ChannelPipeline源码分析(五)
- 字节缓冲区 ByteBuf (六)(上)
- 字节缓冲区 ByteBuf(七)(下)
- Netty 如何实现零拷贝(八)
- Netty 程序引导类(九)
- Reactor 模型(十)
- 工作原理详解(十一)
- Netty 解码器(十二)
- Netty 编码器(十三)
- Netty 编解码器(十四)
- 自定义解码器、编码器、编解码器(十五)
- Future 源码分析(十六)
- Promise 源码分析(十七)
- 一行简单的writeAndFlush都做了哪些事(十八)
一、超时监测
Netty 的超时类型 IdleState 主要分为以下3类:
- ALL_IDLE : 一段时间内没有数据接收或者发送。
- READER_IDLE : 一段时间内没有数据接收。
- WRITER_IDLE : 一段时间内没有数据发送。
针对上面的 3 类超时异常,Netty 提供了 3 类ChannelHandler来进行监测。
- IdleStateHandler : 当 Channel 一段时间未执行读取、写入或者两者都未执行时,触发 -IdleStateEvent 事件。
- ReadTimeoutHandler :在一定时间内未读取任何数据时,引发 ReadTimeoutEvent 事件。
- WriteTimeoutHandler :当写操作在一定时间内无法完成时,引发 WriteTimeoutEvent 事件。
二、IdleStateHandler类
IdleStateHandler 包括了读\写超时状态处理,观察以下 IdleStateHandler 类的构造函数源码。
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
this.writeListener = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
IdleStateHandler.this.lastWriteTime = IdleStateHandler.this.ticksInNanos();
IdleStateHandler.this.firstWriterIdleEvent = IdleStateHandler.this.firstAllIdleEvent = true;
}
};
this.firstReaderIdleEvent = true;
this.firstWriterIdleEvent = true;
this.firstAllIdleEvent = true;
ObjectUtil.checkNotNull(unit, "unit");
this.observeOutput = observeOutput;
if (readerIdleTime <= 0L) {
this.readerIdleTimeNanos = 0L;
} else {
this.readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
}
if (writerIdleTime <= 0L) {
this.writerIdleTimeNanos = 0L;
} else {
this.writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
}
if (allIdleTime <= 0L) {
this.allIdleTimeNanos = 0L;
} else {
this.allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
}
}
在上述源码中,构造函数可以接收以下参数:
-
readerIdleTimeSecond:指定读超时时间,指定 0 表明为禁用。
-
writerIdleTimeSecond:指定写超时时间,指定 0 表明为禁用。
-
allIdleTimeSecond:在指定读写超时时间,指定 0 表明为禁用。
IdleStateHandler 使用示例:
public class MyChannelInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast("idleStateHandler",new IdleStateHandler(60,30,0));
channel.pipeline().addLast("myHandler",new MyHandler());
}
}
public class MyHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent e = (IdleStateEvent) evt;
if(e.state() == IdleState.READER_IDLE){
ctx.close();
}else if(e.state() == IdleState.WRITER_IDLE){
ctx.writeAndFlush(new PingMessage());
}
}
}
}
在上述示例中,IdleStateHandler 设置了读超时时间为 60 秒,写超时时间为 30 秒。MyHandler 是针对超时事件 IdleStateEvent 的处理。
- 如果 30 秒内没有出站流量(写超时)时发送 ping 消息的示例。
- 如果 60 秒内没有入站流量(读超时)时,连接关闭。
三、ReadTimeoutHandler类
ReadTimeoutHandler 类包括了读超时状态处理。ReadTimeoutHandler 类的源码如下:
public class ReadTimeoutHandler extends IdleStateHandler {
private boolean closed;
public ReadTimeoutHandler(int timeoutSeconds) {
this((long)timeoutSeconds, TimeUnit.SECONDS);
}
public ReadTimeoutHandler(long timeout, TimeUnit unit) {
super(timeout, 0L, 0L, unit);//禁用了写超时、读写超时
}
protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
assert evt.state() == IdleState.READER_IDLE;//只处理读超时
this.readTimedOut(ctx);
}
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!this.closed) {
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);//引发异常
ctx.close();
this.closed = true;
}
}
}
从上述源码可以看出,ReadTimeoutHandler 继承自 IdleStateHandler,并在构造函数中禁用了写超时、读写超时,而且在处理超时时,只会针对 READER_IDLE状态进行处理,并引发 ReadTimeoutException 异常。
ReadTimeoutHandler 的使用示例如下:
public class MyChannelInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(30));
channel.pipeline().addLast("myHandler",new MyHandler());
}
}
//处理器处理ReadTimeoutException
public class MyHandler extends ChannelDuplexHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if(cause instanceof ReadTimeoutException){
//...
}else {
super.exceptionCaught(ctx,cause);
}
}
}
在上述示例中,ReadTimeoutHandler 设置了读超时时间是 30 秒。
四、WriteTimeoutHandler类
WriteTimeoutHandler 类包括了写超时状态处理。WriteTimeoutHandler 类的源码如下:
public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
private static final long MIN_TIMEOUT_NANOS;
private final long timeoutNanos;
private WriteTimeoutHandler.WriteTimeoutTask lastTask;
private boolean closed;
public WriteTimeoutHandler(int timeoutSeconds) {
this((long)timeoutSeconds, TimeUnit.SECONDS);
}
public WriteTimeoutHandler(long timeout, TimeUnit unit) {
ObjectUtil.checkNotNull(unit, "unit");
if (timeout <= 0L) {
this.timeoutNanos = 0L;
} else {
this.timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
}
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (this.timeoutNanos > 0L) {
promise = promise.unvoid();
this.scheduleTimeout(ctx, promise);
}
ctx.write(msg, promise);
}
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
WriteTimeoutHandler.WriteTimeoutTask task = this.lastTask;
WriteTimeoutHandler.WriteTimeoutTask prev;
for(this.lastTask = null; task != null; task = prev) {
task.scheduledFuture.cancel(false);
prev = task.prev;
task.prev = null;
task.next = null;
}
}
private void scheduleTimeout(ChannelHandlerContext ctx, ChannelPromise promise) {
WriteTimeoutHandler.WriteTimeoutTask task = new WriteTimeoutHandler.WriteTimeoutTask(ctx, promise);
task.scheduledFuture = ctx.executor().schedule(task, this.timeoutNanos, TimeUnit.NANOSECONDS);
if (!task.scheduledFuture.isDone()) {
this.addWriteTimeoutTask(task);
promise.addListener(task);
}
}
private void addWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {
if (this.lastTask != null) {
this.lastTask.next = task;
task.prev = this.lastTask;
}
this.lastTask = task;
}
private void removeWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {
if (task == this.lastTask) {
assert task.next == null;
this.lastTask = this.lastTask.prev;
if (this.lastTask != null) {
this.lastTask.next = null;
}
} else {
if (task.prev == null && task.next == null) {
return;
}
if (task.prev == null) {
task.next.prev = null;
} else {
task.prev.next = task.next;
task.next.prev = task.prev;
}
}
task.prev = null;
task.next = null;
}
protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!this.closed) {
ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
ctx.close();
this.closed = true;
}
}
//...
}
从上述源码可以看出,WriteTimeoutHandler 在处理超时时,引发了 WriteTimeoutException 异常。
WriteTimeoutHandler 的使用示例如下:
public class MyChannelInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast("writeTimeoutHandler",new WriteTimeoutHandler(30));
channel.pipeline().addLast("myHandler",new MyHandler());
}
}
//处理器处理ReadTimeoutException
public class MyHandler extends ChannelDuplexHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if(cause instanceof WriteTimeoutException ){
//...
}else {
super.exceptionCaught(ctx,cause);
}
}
}
在上述示例中,WriteTimeoutHandler 设置了写超时时间是 30 秒。
五、实现心跳机制
针对超时的解决方案——心跳机制。
在程序开发中,心跳机制是非常常见的。其原理是,当连接闲置时可以发送一个心跳来维持连接。一般而言,心跳就是一段小的通信。
5.1. 定义心跳处理器
public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
// (1)心跳内容
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF_8));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
// (2)判断超时类型
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
String type = "";
if (event.state() == IdleState.READER_IDLE) {
type = "read idle";
} else if (event.state() == IdleState.WRITER_IDLE) {
type = "write idle";
} else if (event.state() == IdleState.ALL_IDLE) {
type = "all idle";
}
// (3)发送心跳
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(
ChannelFutureListener.CLOSE_ON_FAILURE);
System.out.println( ctx.channel().remoteAddress()+"超时类型:" + type);
} else {
super.userEventTriggered(ctx, evt);
}
}
}
对上述代码说明:
-
定义了心跳时,要发送的内容。
-
判断是不是 IdleStateEvent 事件,是则处理。
-
将心跳内容发送给客户端。
5.2. 定义 ChannelInitializer
HeartbeatHandlerInitializer用于封装各类ChannelHandler,代码如下:
public class HeartbeatHandlerInitializer extends ChannelInitializer<Channel> {
private static final int READ_IDEL_TIME_OUT = 4; // 读超时
private static final int WRITE_IDEL_TIME_OUT = 5;// 写超时
private static final int ALL_IDEL_TIME_OUT = 7; // 所有超时
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,
WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); // (1)
pipeline.addLast(new HeartbeatServerHandler()); // (2)
}
}
对上述代码说明如下:
- 添加了一个IdleStateHandler到 ChannelPipeline,并分别设置了读、写超时的时间。为了方便演示,将超时时间设置的比较短。
- 添加了HeartbeatServerHandler,用来处理超时时,发送心跳。
5.3. 编写服务器
服务器代码比较简单,启动后侦听 8083 端口。
public final class HeartbeatServer {
static final int PORT = 8083;
public static void main(String[] args) throws Exception {
// 配置服务器
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HeartbeatHandlerInitializer());
// 启动
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
5.4. 测试
首先启动 HeartbeatServer,客户端用操作系统自带的 Telnet 程序即可:
telnet 127.0.0.1 8083
可以看到客户端与服务器的交互效果如下图。
结语
文章如果对你有帮助,看完记得点赞、关注、收藏。