NettyのEventLoopChannel

Netty的重要组件:EventLoop、Channel、Future & Promise、Handler & Pipeline、ByteBuf

本篇主要介绍Netty的EventLoop和Channel组件。

1、Netty入门案例

        服务器端的创建,主要分为以下步骤:

  • 创建serverBootstrap对象。
  • 配置服务器的线程模型。可以指定两个线程模型,parentGroup是专门负责接收连接的线程模型,childGroup是处理读写事件的工作线程模型。

      

  • 设置 Channel 类型,这里使用NioServerSocketChannel,是基于NIO实现的,还有其他的实现如下:

  • 配置 Channel 初始化器,使用ChannelInitializer初始化NioSocketChannel,在这里我们配置了处理字符串编码以及打印字符串。
  • 绑定端口。
  ServerBootstrap serverBootstrap = new ServerBootstrap();
        //接受连接的线程  or 工作线程
        serverBootstrap.group(new NioEventLoopGroup())
                //服务器ServerSocketChannel的实现 是NIO 还是 BIO
                .channel(NioServerSocketChannel.class)
                .childHandler(
                        //初始化与客户端进行数据读写的通道,并且添加别的handler 在连接建立后回调
                        new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel channel) throws Exception {
                                //处理编码(解码 )
                                channel.pipeline().addLast(new StringDecoder());
                                //自己的业务逻辑,比如打印字符串
                                channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        System.out.println(msg);
                                    }
                                });

                            }
                        }
                )
                .bind(8080);

        客户端的创建步骤与服务器端创建的步骤类似,但是在connect方法后需要加上.sync(),以及调用writeAndFlush()方法进行收发数据。(为什么会这样后面会说明)

 new Bootstrap()
                .group(new NioEventLoopGroup())
                //客户端SocketChannel的实现
                .channel(NioSocketChannel.class)
                //初始化与服务器进行数据读写的通道,并且添加别的handler 在连接建立后回调
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //字符输出编码
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost",8080))
                .sync()//阻塞方法 连接建立成功后才会执行
                .channel()
                .writeAndFlush("netty");//收发数据,会调用ch.pipeline().addLast(new StringEncoder());所添加的处理器

        启动程序看一下效果(先启动服务器,再启动客户端)

2、EventLoop

        下面我们开始介绍Netty中的第一个组件,配置服务器的线程模型时设置的EventLoop。

        EventLoop类关系图:

        我们首先看下EventLoop的NIO实现:NioEventLoopGroup的构造方法:

        可以看到它有多个重载的构造方法:

        在构造方法中可以指定线程的数量,如果使用的是无参构造方法,那么默认传递的线程数是0,但是会把0传递给其他的构造方法:

        最后调用父类的构造:

        在父类的构造中,会判断传入的线程参数是否为0。

        目前很显然条件成立,就会获取成员变量DEFAULT_EVENT_LOOP_THREADS 并且再次作为参数调用父类的构造。成员变量DEFAULT_EVENT_LOOP_THREADS 会在静态代码块中被赋值。

        如果能获取到"io.netty.eventLoopThreads" key对应的value,就以该值为准,否则线程数是当前cpu核心数*2

//NioEventLoopGroup构造方法指定线程数 如果不指定为CPU可运行核心数 * 2
NioEventLoopGroup loopGroup = new NioEventLoopGroup(2);

         一个 EventLoop由一个单独的线程驱动,它不断轮询 I/O 事件并执行相应的任务:

log.debug("{}",loopGroup.next());
log.debug("{}",loopGroup.next());
log.debug("{}",loopGroup.next());

        并且EventLoop的内部维护了一个任务队列,可以提交任务到这个队列中,由 EventLoop的线程顺序执行。

        向EventLoop提交任务:

//执行普通任务
loopGroup.next().submit(() -> log.debug("run..."));
//执行定时任务
loopGroup.next().scheduleAtFixedRate((Runnable) () -> log.debug("run with schedule..."),0,1, TimeUnit.SECONDS);

        下面通过一个案例来加深对EventLoop的理解:

        改造最初的入门案例中的代码,主要体现在.group方法,这次传递了两个参数,将负责连接的EventLoop和负责读写的EventLoop分离开,并且给负责读写的EventLoop设置了两个线程。

 //boss只负责接受连接 1个线程 worker负责读写 2个线程
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))

        服务端的代码和之前的相同,我们先启动一个服务端,再启动三个客户端:

        三个连接(channel)由两个EventLoop轮询处理,并且每个连接(channel)和EventLoop是绑定的,一个EventLoop可以负责多条消息的处理。

        如图所示:

        同时EventLoop也有不同的实现:

        案例进一步细化,我们可以再设置一个EventLoop 专门处理其他任务:

EventLoopGroup eventLoop = new DefaultEventLoopGroup();

         改造.childHandler:

  .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    /**
                     * 工序有多道,合在一起就是 pipeline,
                     * pipeline 负责发布事件(读、读取完成...)传播给每个 handler,
                     * handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
                     * @param ch
                     * @throws Exception
                     */
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast("handle1",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = ((ByteBuf) msg);
                                log.debug(byteBuf.toString(Charset.defaultCharset()));
                                //责任链模式,将消息传递个下一个处理器
                                ctx.fireChannelRead(msg);
                            }
                        }).addLast(eventLoop,"handle2",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = ((ByteBuf) msg);
                                log.debug(byteBuf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })

        启动一个客户端:

        相当于链式调用:

小结:

  • 在创建EventLoopGroup实例时,可以指定线程数,如果没有指定,默认使用cpu核心数*2。
  • 调用ServerBootstrap的.group时,可以将EventLoop细化成为专门处理连接以及负责读写的。
  • 可以利用pipeline()的.addLast,链式组合多个EventLoopGroup,实现不同的功能。
  • 每一个EventLoop可以轮询处理多个Channel事件,但是会和Channel绑定,线程间相互独立。

3、Channel

        Channel在Netty中代表一个可以执行I/O操作(如读、写、连接、绑定等)的对象。它可以是一个套接字连接、文件、管道等。

        Netty支持多种类型的Channel,包括NioSocketChannel(用于客户端连接)、NioServerSocketChannel(用于服务端绑定)以及其他专门用途的Channel如EmbeddedChannel等。

        Channel的I/O操作都是异步的,会返回一个ChannelFuture对象,用于表示操作的结果:

 ChannelFuture channelFuture = new Bootstrap()
                 // 。。。。。。
                .connect(new InetSocketAddress("localhost", 8080));

        并且每个Channel都有一个与之关联的ChannelPipeline。Pipeline中包含多个ChannelHandler,每个Handler负责特定的处理逻辑。

为什么在案例代码中,客户端写出数据之前必须要调用.sync()方法?

        因为Channel的I/O操作都是异步的,是主线程调用了.connect() 方法,但是建立连接是在NioSocketChannel所在线程。.sync()方法的作用就是让主线程在此处阻塞,等到NioSocketChannel所在线程建立完成连接,主线程才会继续向下执行。(如果不使用.sync()方法,主线程会在连接没有建立完成的时候继续执行后续代码,服务端无法正常接受消息。)

        与此类似的还有.close()方法,如果我们想在断开连接后执行一段自己的逻辑:

        客户端代码:

        //... new BootStrap...
        //接受控制台输入,q则断开连接
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true){
                String str = scanner.nextLine();
                if ("q".equals(str)){
                    //关闭channel连接
                    channel.close();
                    break;
                }
                channel.writeAndFlush(str);
            }
        },"input").start();

        可以将执行断开后逻辑的代码放在input 线程的channel.close();后,或者主线程中吗?

        答案是否定的:

  1. 如果放在主线程中,那么input 线程和主线程是并行执行的,无法控制先后顺序。
  2. 如果放在input 线程的channel.close(); 后,也是不行的。因为channel.close(); 方法也是异步调用,由NioSocketChannel所在线程负责关闭连接:

        解决该问题有两个方案:

        方案一的思路和解决.connect() 方法异步调用的类似,都是使用.sync()方法阻塞主线程,等待input 线程中的channel.close(); 执行完成后再由主线程处理连接关闭后释放资源

ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
log.debug("连接关闭后释放资源");

        第二种方案是通过调用ChannelFuture的.addListener 方法,添加一个监听器,由nioEventLoopGroup所在线程关闭连接后,处理连接关闭后释放资源(释放资源和处理后续都是nioEventLoopGroup同一线程。):

closeFuture.addListener((ChannelFutureListener) future -> log.debug("连接关闭后释放资源"));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/718645.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Avalonia for VSCode

1、在VSCode中编辑AvaloniaUI界面&#xff0c;在VSCode中搜索Avalonia&#xff0c;并安装。如下图&#xff0c;可以发现Avalonia for VSCode还是预览版。 2、 创建一个Avalonia 项目。 选择项目类型 输入项目名称 选择项目所在文件夹 打开项目 3、项目架构如下图。 4、builde…

基于jeecgboot-vue3的Flowable流程-所有任务

因为这个项目license问题无法开源&#xff0c;更多技术支持与服务请加入我的知识星球。 这个部分主要讲所有任务的功能 1、主要列表界面如下&#xff1a; <template><div class"p-2"><!--查询区域--><div class"jeecg-basic-table-form-…

创建型模式--抽象工厂模式

产品族创建–抽象工厂模式 工厂方法模式通过引入工厂等级结构,解决了简单工厂模式中工厂类职责太重的问题。 但由于工厂方法模式中的每个工厂只生产一类产品,可能会导致系统中存在大量的工厂类,势必会增加系统的开销。此时,可以考虑将一些相关的产品组成一个“产品族”,…

什么是Vue开发技术

概述 Vue.js 是一个用于构建用户界面的渐进式框架&#xff0c;它设计得非常灵活&#xff0c;可以轻松地被集成到任何项目中。 vue是视图的发音&#xff0c;其目的是帮助开发者易于上手&#xff0c;提供强大的功能构建复杂的应用程序 示例 以下是vue基本的语法概述 声明式渲…

示例:WPF中TreeView自定义TreeNode泛型绑定对象来实现级联勾选

一、目的&#xff1a;在绑定TreeView的功能中经常会遇到需要在树节点前增加勾选CheckBox框&#xff0c;勾选本节点的同时也要同步显示父节点和子节点状态 二、实现 三、环境 VS2022 四、示例 定义如下节点类 public partial class TreeNodeBase<T> : SelectBindable<…

探秘提交任务到线程池后源码的执行流程

探秘提交任务到线程池后源码的执行流程 1、背景2、任务提交2、Worker线程获取任务执行流程3、Worker线程的退出时机1、背景 2、任务提交 线程池任务提交有两种方式,execute()和submit()。首先看一下execute方法的源码。我们发现它接收的入参是一个Runnable类型。我们按照代码…

常见的Redis使用问题及解决方案

目录 1. 缓存穿透 1.1 解决方案 2. 缓存击穿 2.1 解决方案 3. 缓存雪崩 3.1 概念图及问题描述 ​编辑3.2 解决方案 4. 分布式锁 4.1 概念 4.2 基于redis来实现分布式锁 4.3 用idea来操作一遍redis分布式锁 4.4 分布式上锁的情况下&#xff0c;锁释放了服务器b中的锁…

水滴式粉碎机:粉碎干性物料的理想选择

在工业生产中&#xff0c;干性物料的粉碎是一个重要的环节&#xff0c;其对于提升产品质量、优化生产流程和提高生产效率具有显著的影响。近年来&#xff0c;水滴式粉碎机因其粉碎原理和工作性能&#xff0c;逐渐在干性物料粉碎领域崭露头角&#xff0c;成为众多企业的理想选择…

《Java2实用教程》 期末考试整理

作用域 当前类 当前包 子类 其他包 public √ √ √ √ protected √ √ √ default √ √ private √ 三、问答题&#xff08;每小题4分&#xff0c;共8分&#xff09; 1.类与对象的关系 对象&#xff1a;对象是类的一个实例&#xff0c;有状…

JavaScript事件类型和事件处理程序

● 之前我们用过了很多此的点击事件&#xff0c;这次让我们来学习另一种事件类型 mouseenter “mouseenter” 是一个鼠标事件类型&#xff0c;它在鼠标指针进入指定元素时触发。 const h1 document.querySelector(h1); h1.addEventListener(mouseenter, function (e) {aler…

【将xml文件转yolov5训练数据txt标签文件】连classes.txt都可以生成

将xml文件转yolov5训练数据txt标签文件 前言一、代码解析 二、使用方法总结 前言 找遍全网&#xff0c;我觉得写得最详细的就是这个博文⇨将xml文件转yolov5训练数据txt标签文件 虽然我还是没有跑成功。那个正则表达式我不会改QWQ&#xff0c;但是不妨碍我会训练ai。 最终成功…

关于从大平台跳转各个应用,更新应用前端包后,显示的仍是旧的内容,刷新应用页面后方才显示新的内容的问题的排查和解决

我们从绿洲物联平台跳转智能锁应用&#xff0c; 如下&#xff0c;我们可以看到&#xff0c;我们是通过a标签去跳转应用的。但是我们打开控制台的话&#xff0c;因为a标签是另外新开一个页面&#xff0c;我们看不到新页面的html文档的加载情况。 我们可以临时把_blank改成_sel…

WPF 上位机 Modbus 入门必备的信息 C# 开发对接

关于Modbus协议 Modbus协议是MODICON(莫迪康)(现施耐德品牌)在1979年开发的&#xff0c;是全球第一个真正用于现场的总线协议; Modbus协议是应用于电子控制器上的一种通用语言。通过此协议&#xff0c;可以实现控制器相互之间、控制器经由网络和其实设备之间的通 信。 Modbus特…

推荐 3 款小巧的文件压缩、投屏和快速启动软件,请收藏,避免找不到

Maya Maya是一款由博主25H开发的体积小巧、简单易用的快速启动工具。它的操作逻辑和界面设计几乎复刻了Rolan早期版本&#xff0c;功能上与Rolan几乎别无二致。Maya支持多文件拖拽添加启动、快捷键呼出、自动多列显示等功能。此外&#xff0c;Maya还具备lnk文件解析功能。 May…

分类模型:MATLAB判别分析

1. 判别分析简介 判别分析&#xff08;Discriminant Analysis&#xff09; 是一种统计方法&#xff0c;用于在已知分类的样本中构建分类器&#xff0c;并根据特征变量对未知类别的样本进行分类。常见的判别分析方法包括线性判别分析&#xff08;Linear Discriminant Analysis, …

在mybatis 中如何防止 IN里面的参数过多?

代码示例&#xff1a; select xsid from zhxg_gy_ssfp_cwfp where xsid in <foreach collection"list" item"item" open"(" close")" separator" " index"index"> <if test"(index % 999) 998&quo…

【kyuubi k8s】kyuubi发布k8s执行spark sql

背景 依据上一篇kyuubi与spark集成&#xff0c;并发布spark sql到k8s集群&#xff0c;上一篇的将kyuubi和spark环境放在本地某台服务器上的&#xff0c;为了高可用&#xff0c;本篇将其打包镜像&#xff0c;并发布到k8s。 其实就是将本地的kyuubi&#xff0c;spark&#xff0…

Nginx + KeepAlived高可用负载均衡集群

目录 一、Keepealived脑裂现象 1.现象 2.原因 3.解决 4.预防 二、实验部署 1.两台nginx做初始化操作并安装nginx 2.四层反向代理配置 3.配置高可用 4.准备检查nginx运行状态脚本 5.开启keepalived服务并测试 一、Keepealived脑裂现象 1.现象 主服务器和备服务器都同…

记录第一次edusrc挖掘

文章目录 一、前言二、漏洞说明截止目前已修复 一、前言 edusrc平台介绍 我们可以在关于页面看到edusrc的收录规则 现阶段&#xff0c;教育行业漏洞报告平台接收如下类别单位漏洞&#xff1a; 教育部 各省、自治区教育厅、直辖市教委、各级教育局 学校 教育相关软件 可以看到…

拉依达的嵌入式学习和秋招经验

拉依达的嵌入式学习和秋招经验 你好&#xff0c;我是拉依达。目前我已经结束了自己的学生生涯&#xff0c;开启了人生的下一个阶段。 从研二准备秋招开始&#xff0c;我就逐渐将自己的学习笔记陆续整理并到CSDN上发布。起初只是作为自己学习的备份记录&#xff0c;后续得到了越…