【Netty】Netty中的超时处理与心跳机制(十九)

文章目录

  • 前言
  • 一、超时监测
  • 二、IdleStateHandler类
  • 三、ReadTimeoutHandler类
  • 四、WriteTimeoutHandler类
  • 五、实现心跳机制
    • 5.1. 定义心跳处理器
    • 5.2. 定义 ChannelInitializer
    • 5.3. 编写服务器
    • 5.4. 测试
  • 结语

前言

回顾Netty系列文章:

  • Netty 概述(一)
  • Netty 架构设计(二)
  • Netty Channel 概述(三)
  • Netty ChannelHandler(四)
  • ChannelPipeline源码分析(五)
  • 字节缓冲区 ByteBuf (六)(上)
  • 字节缓冲区 ByteBuf(七)(下)
  • Netty 如何实现零拷贝(八)
  • Netty 程序引导类(九)
  • Reactor 模型(十)
  • 工作原理详解(十一)
  • Netty 解码器(十二)
  • Netty 编码器(十三)
  • Netty 编解码器(十四)
  • 自定义解码器、编码器、编解码器(十五)
  • Future 源码分析(十六)
  • Promise 源码分析(十七)
  • 一行简单的writeAndFlush都做了哪些事(十八)

一、超时监测

Netty 的超时类型 IdleState 主要分为以下3类:

  • ALL_IDLE : 一段时间内没有数据接收或者发送。
  • READER_IDLE : 一段时间内没有数据接收。
  • WRITER_IDLE : 一段时间内没有数据发送。

针对上面的 3 类超时异常,Netty 提供了 3 类ChannelHandler来进行监测。

  • IdleStateHandler : 当 Channel 一段时间未执行读取、写入或者两者都未执行时,触发 -IdleStateEvent 事件。
  • ReadTimeoutHandler :在一定时间内未读取任何数据时,引发 ReadTimeoutEvent 事件。
  • WriteTimeoutHandler :当写操作在一定时间内无法完成时,引发 WriteTimeoutEvent 事件。

二、IdleStateHandler类

IdleStateHandler 包括了读\写超时状态处理,观察以下 IdleStateHandler 类的构造函数源码。

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    this.writeListener = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) throws Exception {
            IdleStateHandler.this.lastWriteTime = IdleStateHandler.this.ticksInNanos();
            IdleStateHandler.this.firstWriterIdleEvent = IdleStateHandler.this.firstAllIdleEvent = true;
        }
    };
    this.firstReaderIdleEvent = true;
    this.firstWriterIdleEvent = true;
    this.firstAllIdleEvent = true;
    ObjectUtil.checkNotNull(unit, "unit");
    this.observeOutput = observeOutput;
    if (readerIdleTime <= 0L) {
        this.readerIdleTimeNanos = 0L;
    } else {
        this.readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
    }

    if (writerIdleTime <= 0L) {
        this.writerIdleTimeNanos = 0L;
    } else {
        this.writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
    }

    if (allIdleTime <= 0L) {
        this.allIdleTimeNanos = 0L;
    } else {
        this.allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
    }

}

在上述源码中,构造函数可以接收以下参数:

  • readerIdleTimeSecond:指定读超时时间,指定 0 表明为禁用。

  • writerIdleTimeSecond:指定写超时时间,指定 0 表明为禁用。

  • allIdleTimeSecond:在指定读写超时时间,指定 0 表明为禁用。

IdleStateHandler 使用示例:

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast("idleStateHandler",new IdleStateHandler(60,30,0));
        channel.pipeline().addLast("myHandler",new MyHandler());
    }
}

public class MyHandler extends ChannelDuplexHandler {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent e = (IdleStateEvent) evt;
            if(e.state() == IdleState.READER_IDLE){
                ctx.close();
            }else if(e.state() == IdleState.WRITER_IDLE){
                ctx.writeAndFlush(new PingMessage());
            }
        }
    }
}

在上述示例中,IdleStateHandler 设置了读超时时间为 60 秒,写超时时间为 30 秒。MyHandler 是针对超时事件 IdleStateEvent 的处理。

  • 如果 30 秒内没有出站流量(写超时)时发送 ping 消息的示例。
  • 如果 60 秒内没有入站流量(读超时)时,连接关闭。

三、ReadTimeoutHandler类

ReadTimeoutHandler 类包括了读超时状态处理。ReadTimeoutHandler 类的源码如下:

public class ReadTimeoutHandler extends IdleStateHandler {
    private boolean closed;

    public ReadTimeoutHandler(int timeoutSeconds) {
        this((long)timeoutSeconds, TimeUnit.SECONDS);
    }

    public ReadTimeoutHandler(long timeout, TimeUnit unit) {
        super(timeout, 0L, 0L, unit);//禁用了写超时、读写超时
    }

    protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        assert evt.state() == IdleState.READER_IDLE;//只处理读超时

        this.readTimedOut(ctx);
    }

    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!this.closed) {
            ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);//引发异常
            ctx.close();
            this.closed = true;
        }

    }
}

从上述源码可以看出,ReadTimeoutHandler 继承自 IdleStateHandler,并在构造函数中禁用了写超时、读写超时,而且在处理超时时,只会针对 READER_IDLE状态进行处理,并引发 ReadTimeoutException 异常。
ReadTimeoutHandler 的使用示例如下:

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(30));
        channel.pipeline().addLast("myHandler",new MyHandler());
    }
}

//处理器处理ReadTimeoutException 
public class MyHandler extends ChannelDuplexHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if(cause instanceof ReadTimeoutException){
            //...
        }else {
            super.exceptionCaught(ctx,cause);
        }
    }
}

在上述示例中,ReadTimeoutHandler 设置了读超时时间是 30 秒。

四、WriteTimeoutHandler类

WriteTimeoutHandler 类包括了写超时状态处理。WriteTimeoutHandler 类的源码如下:

public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
    private static final long MIN_TIMEOUT_NANOS;
    private final long timeoutNanos;
    private WriteTimeoutHandler.WriteTimeoutTask lastTask;
    private boolean closed;

    public WriteTimeoutHandler(int timeoutSeconds) {
        this((long)timeoutSeconds, TimeUnit.SECONDS);
    }

    public WriteTimeoutHandler(long timeout, TimeUnit unit) {
        ObjectUtil.checkNotNull(unit, "unit");
        if (timeout <= 0L) {
            this.timeoutNanos = 0L;
        } else {
            this.timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
        }

    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (this.timeoutNanos > 0L) {
            promise = promise.unvoid();
            this.scheduleTimeout(ctx, promise);
        }

        ctx.write(msg, promise);
    }

    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        WriteTimeoutHandler.WriteTimeoutTask task = this.lastTask;

        WriteTimeoutHandler.WriteTimeoutTask prev;
        for(this.lastTask = null; task != null; task = prev) {
            task.scheduledFuture.cancel(false);
            prev = task.prev;
            task.prev = null;
            task.next = null;
        }

    }

    private void scheduleTimeout(ChannelHandlerContext ctx, ChannelPromise promise) {
        WriteTimeoutHandler.WriteTimeoutTask task = new WriteTimeoutHandler.WriteTimeoutTask(ctx, promise);
        task.scheduledFuture = ctx.executor().schedule(task, this.timeoutNanos, TimeUnit.NANOSECONDS);
        if (!task.scheduledFuture.isDone()) {
            this.addWriteTimeoutTask(task);
            promise.addListener(task);
        }

    }

    private void addWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {
        if (this.lastTask != null) {
            this.lastTask.next = task;
            task.prev = this.lastTask;
        }

        this.lastTask = task;
    }

    private void removeWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {
        if (task == this.lastTask) {
            assert task.next == null;

            this.lastTask = this.lastTask.prev;
            if (this.lastTask != null) {
                this.lastTask.next = null;
            }
        } else {
            if (task.prev == null && task.next == null) {
                return;
            }

            if (task.prev == null) {
                task.next.prev = null;
            } else {
                task.prev.next = task.next;
                task.next.prev = task.prev;
            }
        }

        task.prev = null;
        task.next = null;
    }

    protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!this.closed) {
            ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
            ctx.close();
            this.closed = true;
        }

    }

  //...
}

从上述源码可以看出,WriteTimeoutHandler 在处理超时时,引发了 WriteTimeoutException 异常。
WriteTimeoutHandler 的使用示例如下:

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast("writeTimeoutHandler",new WriteTimeoutHandler(30));
        channel.pipeline().addLast("myHandler",new MyHandler());
    }
}

//处理器处理ReadTimeoutException 
public class MyHandler extends ChannelDuplexHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if(cause instanceof WriteTimeoutException ){
            //...
        }else {
            super.exceptionCaught(ctx,cause);
        }
    }
}

在上述示例中,WriteTimeoutHandler 设置了写超时时间是 30 秒。

五、实现心跳机制

针对超时的解决方案——心跳机制。
在程序开发中,心跳机制是非常常见的。其原理是,当连接闲置时可以发送一个心跳来维持连接。一般而言,心跳就是一段小的通信。

5.1. 定义心跳处理器

public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
	
	// (1)心跳内容
	private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
			.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
					CharsetUtil.UTF_8));  

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
			throws Exception {

		// (2)判断超时类型
		if (evt instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) evt;
			String type = "";
			if (event.state() == IdleState.READER_IDLE) {
				type = "read idle";
			} else if (event.state() == IdleState.WRITER_IDLE) {
				type = "write idle";
			} else if (event.state() == IdleState.ALL_IDLE) {
				type = "all idle";
			}

			// (3)发送心跳
			ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(
					ChannelFutureListener.CLOSE_ON_FAILURE);
 
			System.out.println( ctx.channel().remoteAddress()+"超时类型:" + type);
		} else {
			super.userEventTriggered(ctx, evt);
		}
	}
}

对上述代码说明:

  • 定义了心跳时,要发送的内容。

  • 判断是不是 IdleStateEvent 事件,是则处理。

  • 将心跳内容发送给客户端。

5.2. 定义 ChannelInitializer

HeartbeatHandlerInitializer用于封装各类ChannelHandler,代码如下:

public class HeartbeatHandlerInitializer extends ChannelInitializer<Channel> {

	private static final int READ_IDEL_TIME_OUT = 4; // 读超时
	private static final int WRITE_IDEL_TIME_OUT = 5;// 写超时
	private static final int ALL_IDEL_TIME_OUT = 7; // 所有超时

	@Override
	protected void initChannel(Channel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,
				WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); // (1)
		pipeline.addLast(new HeartbeatServerHandler()); // (2)
	}
}

对上述代码说明如下:

  • 添加了一个IdleStateHandler到 ChannelPipeline,并分别设置了读、写超时的时间。为了方便演示,将超时时间设置的比较短。
  • 添加了HeartbeatServerHandler,用来处理超时时,发送心跳。

5.3. 编写服务器

服务器代码比较简单,启动后侦听 8083 端口。

public final class HeartbeatServer {

    static final int PORT = 8083;

    public static void main(String[] args) throws Exception {

        // 配置服务器
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new HeartbeatHandlerInitializer());

            // 启动
            ChannelFuture f = b.bind(PORT).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

5.4. 测试

首先启动 HeartbeatServer,客户端用操作系统自带的 Telnet 程序即可:

telnet 127.0.0.1 8083

可以看到客户端与服务器的交互效果如下图。
在这里插入图片描述

结语

文章如果对你有帮助,看完记得点赞、关注、收藏。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/25313.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

内存栈与CPU栈机制

1. 内存栈: 先入后出,LIFO(LAST IN FIRST OUT) 入栈:将一个新的元素放到栈顶 出栈:从栈顶取出一个元素 栈顶元素总是最后一个入栈,需要时出栈. 2.CPU栈机制 8086CPU提供相关指令以栈方式来访问内存空间.相当于将一段内存当做栈来使用 8086CPU提供的入栈指令为:PUSH ,出栈指令为…

JointJS+ v3.7 Crack

JointJS v3.7 改进了对 SVG 上下文中的外部对象的支持。 2023 年 5 月 30 日 - 16:00 新版本 特征 改进了对外部对象 (HTML) 的支持- 外部对象已成为 Web 开发的标准&#xff0c;JointJS 现在已经在 SVG 上下文中引入了对外部对象的全面且功能齐全的支持。这意味着您现在可以在…

Elasticsearch 8.8.0 发布

Elasticsearch 是一个基于 Lucene 库的搜索引擎。它提供了一个分布式、支持多租户的全文搜索引擎&#xff0c;具有 HTTP Web 接口和无模式 JSON 文档。Elasticsearch 基于 Java 开发&#xff0c;并在 SSPL Elastic License 双重授权许可下作为开源软件发布。 Elasticsearch 8…

win10系统如何设置虚拟回环

在日常生活中&#xff0c;人们(特别是IT行业者)通常需要在一台机上进行软件测试&#xff0c;而同一台计算上通常只能使用一个地址&#xff0c;而在需要同时使用两个地址进行测试的时候就显得捉襟见肘。此方法通过配置window10自带的环回适配器&#xff0c;达到上述目的。 win1…

如何使用Kali进行信息收集?

渗透测试即模拟黑客入侵的手段对目标网络进修安全测试&#xff0c;从而发现目标网络的漏洞&#xff0c;对目标网络进行安全加固与漏洞修复。 Kali 是一个基于 debian 的渗透测试平台&#xff0c;其中集成了很多常见的和不常见的渗透测试工具&#xff0c;如下图&#xff1a; 工…

基于SSM的服装设计供需系统设计与实现

摘 要&#xff1a;作为服装设计的重要形式之一&#xff0c;服装具有显著的审美性&#xff0c;是人类情感表达不可忽视的代表形态。但在新时期背景下&#xff0c;随着服装设计的进一步优化&#xff0c;服装设计创新融合强度也随之增强。本文就服装设计供需系统进行深入探究。 服…

Linux之命令搜索

目录 Linux之命令搜索 Whereis命令 定义 基本信息 举例 which命令 定义 与whereis命令的区别 基本信息 举例 locate 命令 定义 优点 缺点 基本信息 案例 Linux之命令搜索 Whereis命令 定义 whereis --- 搜索系统命令的命令&#xff08;像绕口令一样&#xff09…

数据库新闻速递 明白3中主流的数据迁移方法 (译)

头还是介绍一下群&#xff0c;如果感兴趣polardb ,mongodb ,mysql ,postgresql ,redis 等有问题&#xff0c;有需求都可以加群群内有各大数据库行业大咖&#xff0c;CTO&#xff0c;可以解决你的问题。加群请联系 liuaustin3 &#xff0c;在新加的朋友会分到2群&#xff08;共8…

ShardingSphere笔记(三):自定义分片算法 — 按月分表·真·自动建表

ShardingSphere笔记&#xff08;二&#xff09;&#xff1a;自定义分片算法 — 按月分表真自动建表 文章目录 ShardingSphere笔记&#xff08;二&#xff09;&#xff1a;自定义分片算法 — 按月分表真自动建表一、 前言二、 Springboot 的动态数据库三、 实现我们自己的动态数…

MySQL查询当前数据和上一行数据比较、业务数据的趋势分析、数据变动的监控和报警

标题: 使用MySQL查询当前数据和上一行数据比较的场景 在MySQL中&#xff0c;我们经常需要对数据进行比较和分析。其中一种常见的需求是查询数据列表并与前一行的数据进行比较。这种场景可以通过使用窗口函数或连接来实现。本文将介绍使用MySQL查询比较数据和上一行数据的场景&a…

计算机组成原理-指令系统-指令格式及寻址方式

目录 一、指令的定义 1.1 扩展操作码指令格式 二、指令寻址方式 2.1 顺序寻址 2.2 跳跃寻址 三、 数据寻址 3.1 直接寻址 3.2 间接寻址 3.3 寄存器寻址 ​ 3.4 寄存器间接寻址 3.5 隐含寻址 3.6 立即寻址 3.7 偏移地址 3.7.1 基址寻址 3.7.2 变址寻址 3.7.3 相对寻址…

【C++】右值引用和移动语义(详细解析)

文章目录 1.左值引用和右值引用左值引用右值引用 2.左值引用和右值引用的比较左值引用总结右值引用总结 3.右值引用的使用场景和意义知识点1知识点2知识点3知识点4总结 4.完美转发万能引用见识完美转发的使用完美转发的使用场景 1.左值引用和右值引用 传统的C语法中就有引用的…

【SpringCloud】SpringAMQP总结

文章目录 1、AMQP2、基本消息模型队列3、WorkQueue模型4、发布订阅模型5、发布订阅-Fanout Exchange6、发布订阅-DirectExchange7、发布订阅-TopicExchange8、消息转换器 1、AMQP Advanced Message Queuing Protocol&#xff0c;高级消息队列协议。是用于在应用程序之间传递业务…

Java设计模式(三)

系列文章目录 迪米特法则 合成复用原则 设计原则核心思想 文章目录 系列文章目录前言一、迪米特法则1.迪米特法则基本介绍2.迪米特法则注意事项和细节 二、合成复用原则1.合成复用原则基本介绍 三、设计原则核心思想总结 前言 大家好呀&#xff0c;欢迎来到柚子的博客~让我们…

CAPL(vTESTStudio) - CAPL、CANoe、Panel联动

目录 一、变量设置 ① dbc文件中的Environment variables变量

图灵完备游戏:信号计数 解法记录

使用1个全加器 2个半加器完成。这关的思想主旨在于如何把输出4&#xff0c;输出2&#xff0c;输出1的情况统一在一根导线上。 首先用一个全加器来完成输入2-4这三个引脚的计数&#xff0c;因为全加器输出范围二进制是00 - 11&#xff0c;而输入正好有两个引脚数位是2和1&…

高压放大器在大学教研领域的实际应用

在大学教研领域中&#xff0c;高压放大器可以用于多种实际应用。下面将介绍其中几个典型的应用场景。 1、激光切割 适用高校学院&#xff1a;机械学院 应用场景&#xff1a;机械制造、各类材料的切割 2、超声雾化 适用高校学院&#xff1a;医学院、机械学院、物理学院 应用场景…

《Spring Guides系列学习》guide31 - guide34 及中期简单回顾

要想全面快速学习Spring的内容&#xff0c;最好的方法肯定是先去Spring官网去查阅文档&#xff0c;在Spring官网中找到了适合新手了解的官网Guides&#xff0c;一共68篇&#xff0c;打算全部过一遍&#xff0c;能尽量全面的了解Spring框架的每个特性和功能。 接着上篇看过的gu…

使用 GitHub Actions 自动部署 Hexo 个人博客

文章目录 申请 GitHub Token源码仓库配置 Github Action重新设置远程仓库和分支查看部署 每次部署 Hexo 都需要运行 hexo cl & hexo g & hexo d 指令三件套完成推送到远程仓库&#xff0c;随着文章越来越多&#xff0c;编译的时间也会越来越长&#xff0c;通过 Github …

chatgpt赋能python:Python创建venv的完全指南

Python创建venv的完全指南 在Python开发中&#xff0c;虚拟环境是一个非常有用的工具。它可以让我们在同一台计算机上拥有多个Python环境&#xff0c;而不会互相干扰。在本文中&#xff0c;我们将介绍如何使用Python创建venv&#xff08;虚拟环境&#xff09;。 什么是venv&a…