Netty学习——源码篇6 Pipeline设计原理

1 Pipeline设计原理

        在Netty中每个Channel都有且仅有一个ChannelPipeline与之对应,它们的组成关系如下图:

        通过上图可以看到,一个Channel包含了一个ChannelPipeline,而ChannelPipeline中又维护了一个由ChannelHandlerContext组成的双向链表。这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext又关联着一个ChannelHandler。

        通过分析代码,已经知道了一个Channel初始化的基本过程,下面在回顾一下。AbstractChannel构造器的代码如下:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

         AbstractChannel有一个pipeline属性,在构造器中会把它初始化为DefaultChannelPipeline的实例。这里的代码就印证了这一点:每个Channel都有一个ChannelPipeline。来看一下DefaultChannelPipeline的构造器,代码如下:

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

        在DefaultChannelPipeline构造器中,首先将与之关联的Channel保存到属性channel中。然后实例化两个ChannelHandlerContext:一个是HeadContext实例Head,另一个是TailContext实例Tail。接着将Head和Tail互相指向,构成一个双向链表。

        特别注意的是:在开始的示意图中,Head和Tail并没有包含ChannelHandler,这是因为HeadContext和TailContext继承于AbstractChannelHandlerContext的同时,也实现了ChannelHandler接口,所以它们有Context和Handler的双重属性。

2 ChannelPipeline初始化

        下面看一下ChannelPipeline的初始化具体做了哪些工作。先回顾一下,在实例化一个Channel时,会伴随着一个ChannelPipeline的实例化,并且此Channel会与这个ChannelPipeline相互关联,这一点可以通过NioEventLoop的父类AbstractChannel的构造器予以佐证,代码如下:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

        当实例化一个NioSocketChannel时,其pipeline属性就是新创建的DefaultChannelPipeline对象,再来回顾一下DefaultChannelPipeline的构造方法,代码如下:

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

        上面代码中的Head实现了ChannelInboundHandler接口,而Tail实现了ChannelOutboundHandler接口,因此可以说Head和Tail就是ChannelHandler,又是ChannelHandlerContext。

3 ChannelInitializer的添加

        前面分析过Channel的组成,最开始的时候ChannelPipeline中含有两个ChannelHandlerContext,但是此时的Pipeline并不能实现特定的功能,因为还没有添加自定义的ChannelHandler。通常来说,在初始化Bootstrap时,会添加自定义的ChannelHandler,下面就以具体的客户端启动代码片段举例:

Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            System.out.println("初始化channel:" + socketChannel);
                        }
                    });

        在调用Handler时,传入ChannelInitializer对象,它提供了一个initChannel方法来初始化ChannelHandler。通过代码跟踪,发现ChannelInitializer是在Bootstrap的init方法中添加到ChannelPipiline中的,代码如下:

    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast(new ChannelHandler[]{this.config.handler()});
        Map<ChannelOption<?>, Object> options = this.options0();
        synchronized(options) {
            Iterator i$ = options.entrySet().iterator();

            while(true) {
                if (!i$.hasNext()) {
                    break;
                }

                Entry e = (Entry)i$.next();

                try {
                    if (!channel.config().setOption((ChannelOption)e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable var10) {
                    logger.warn("Failed to set a channel option: " + channel, var10);
                }
            }
        }

        Map<AttributeKey<?>, Object> attrs = this.attrs0();
        synchronized(attrs) {
            Iterator i$ = attrs.entrySet().iterator();

            while(i$.hasNext()) {
                Entry<AttributeKey<?>, Object> e = (Entry)i$.next();
                channel.attr((AttributeKey)e.getKey()).set(e.getValue());
            }

        }
    }

        从上面的代码可见,将handler()方法返回的ChannelHandler添加到Pipeline中,而handler()方法返回的其实就是在初始化Bootstrap时通过handler方法设置的ChannelInitializer实例,因此这里就将ChannelInitializer插到了Pipieline的末端。此时Pipeline的结构如下图所示:

        这是会有一个疑问,明明插入的是 ChannelInitializer实例,为什么在ChannelPipeline的双向链表中的元素却是一个ChannelHandlerContext呢?继续看源码,在Bootstrap的init方法中会调用p.addList()方法,将ChannelInitializer插入链表的末端,代码如下:

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

        addList方法有很多重载的方法,只需要关注这个方法即可。上面的addList方法中,首先检查ChannelHandler的名字是否重复,如果不重复,则调用newContext方法为这个Handler创建一个对应的DefaultChannelHandlerContext实例,并与之关联起来。

        为了添加一个Handler到Pipeline中,必须把此Handler包装成ChannelHandlerContext。因此在上面的代码中,我们新实例化一个newCx对象,并将Handler作为参数传递到构造方法中。下面来看一下DefaultChannelHandlerContext的构造器。

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

        在DefaultChannelHandlerContext的构造器中,调用了isInbound()方法和isOutbound()方法,这两个方法的代码如下:

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }

        从上面代码中可以看到,当一个Handler实现了ChannelInboundHandler接口,则isInbound返回true;类似的,当一个Handler实现了ChannelOutboundHandler接口,则isOuntbound返回true。而这两个boolean类型变量会传递给父类AbstractChannelHandlerContext中,并初始化父类的两个属性:inbound和outbound。

        这里的ChannelInitializer所对应的DefaultChannelHandlerContext的inbound与outbound属性分别是什么呢?先来看ChannelInitializer的类层次结构图,如下图:

        可以看到,ChannelInitializer仅仅实现了ChannelInboundHandler接口,因此这里实例化的DefaultChannelHandlerContext的inbound是true,outbound是false。

        inbound和outbound这两个属性关系到Pipeline事件的流向与分类,因此十分关键。这里先记住一个结论:ChannelInitializer所对应的DefaultChannelHandlerContext的inbound=true,outbound=false。

        当创建好Context之后,就将这个Context插入Pipeline的双向链表中。

DefaultChannelPipeline.java
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

        添加完ChannelInitializer的Pipeline内部如下图所示

4 自定义ChannelHandler的添加过程

        上面分析了ChannelInitializer是如何插入Pipeline中的,接下来探讨ChannelInitializer在哪里被调用、ChannelInitializer的作用以及自定义的ChannelHandler是如何插入Pipeline中的。

        自定义ChannelHandler的添加过程,发生在AbstractUnsafe的register方法中,在这个方法中调用了pipeline.fireChannelRegister()方法,代码如下:

    @Override
    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }

        再看AbstractChannelHandlerContext的invokeChannelRegister()方法。

    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

        很显然,这个代码将从Head开始遍历Pipeline的双向链表,然后找到第一个属性inbound为true的ChannelHandlerContext实例。在分析ChannelInitializer时,专门分析了inbound和outbound属性,现在这里就用上了。回想一下,ChannelInitializer实现了ChannelInboundHandler,因此它所对应的ChannelHandlerContext的inbound属性为true,因此这里返回的就是ChannelInitializer实例所对应的ChannelHandlerContext对象,如下图所示

        当获取inbound的Context后,就调用它的invokeChannelRegistered()方法。

    private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }

         我们已经知道,每个ChannelHandler都和一个ChannelHandlerContext关联,可以通过ChannelHandlerContext获取对应的ChannelHandler。很明显,这里handler()方法返回的对象其实就是一开始实例化的ChannelInitializer对象,接着调用了ChannelInitializer的channelRegister()方法。ChannelInitializer的channelRegister()方法的代码如下:

    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (this.initChannel(ctx)) {
            ctx.pipeline().fireChannelRegistered();
        } else {
            ctx.fireChannelRegistered();
        }

    }
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (this.initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
            try {
                this.initChannel(ctx.channel());
            } catch (Throwable var6) {
                this.exceptionCaught(ctx, var6);
            } finally {
                this.remove(ctx);
            }

            return true;
        } else {
            return false;
        }
    

        initChannel()方法就是在初始化Bootstrap时,调用handler方法传入的匿名内部类所实现的方法。因此,在调用这个方法之后,自定义的ChannelHandler就插入到Pipeline中,此时Pipeline的状态如下图:

        当添加完自定义的ChannelHandler后,在finally代码块会删除自定义的ChannelHandler,也就是remove(ctx),最终调用ctx.pipeline().remove(this),因此最后Pipeline的状态如下图:

        到此,自定义ChannelHandler的添加过程也就分析完成了。 

5 给ChannelHandler命名

        pipeline.addXXX()都有一个重载方法,例如addList()有一个重载的版本,代码如下:

public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }

        第一个参数指定添加的是Handler的名字(更准确的说是ChannelHandlerContext的名字)。那么Handler的名字有什么用呢?如果不设置name,那么Handler默认的名字是怎么样的?上面的方法会调用重载的addLast()方法,代码如下:

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

        第一个参数设置为null,不用关心。第二个参数就是Handler的名字。有代码可知,再添加一个Handler之前,需要调用checkMultiplicity()方法来确定新添加的Handler名字是否与已添加的Handler名字重复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/488793.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

云数据库认识

云数据库概述 说明云数据库厂商概述Amazon 云数据库产品Google 的云数据库产品Microsoft 的云数据库产品 云数据库系统架构UMP 系统概述UMP 系统架构MnesiaRabbitMQZooKeeperLVSController 服务器Proxy 服务器Agent 服务器日志分析服务器 UMP 系统功能容灾 读写分离分库分表资源…

PyCharm环境下Git与Gitee联动:本地与远程仓库操作实战及常见问题解决方案

写在前面&#xff1a;本博客仅作记录学习之用&#xff0c;部分图片来自网络&#xff0c;如需引用请注明出处&#xff0c;同时如有侵犯您的权益&#xff0c;请联系删除&#xff01; 文章目录 前言下载及安装GitGit的使用设置用户签名设置用户安全目录Git基本操作Git实操操作 Pyc…

设置远程访问 jupyter Notebook Lab

安装Anaconda / Miniconda 进入conda环境&#xff0c;安装jupyter https://jupyter.org/install 生成notebook config C:\Users\***>jupyter notebook --generate-config Writing default config to: C:\Users\***\.jupyter\jupyter_notebook_config.py创建密码 jupyter…

教你怎样根据空行分割TXT文本文档 TXT文本分割 文本拆分实例

比如有一些文本中间用多个空行隔开&#xff0c;需要把隔开的文本分别保存&#xff0c;比如我们要把隔2行或以上空行的文本分别保存成一个文档&#xff0c;如图&#xff1a; 实现方法&#xff1a; 1、先打开首助编辑高手软件&#xff0c;进入【文本批量操作】--【拆分文本】&am…

Gin中的gin.Context与Golang原生的context.Context区别与联系

一.gin中的context gin.Context 1.概念 在 Gin 中&#xff0c;Context 是一个非常重要的概念&#xff0c;它是Gin的核心结构体之一,用于处理 HTTP 请求和响应,在 Gin 的处理流程中&#xff0c;Context 贯穿整个处理过程&#xff0c;用于传递请求和响应的信息Gin 的 Context 是…

python进阶:装饰器

装饰器本质上是一个Python函数&#xff0c;它可以让其他函数在不需要做任何代码变动的前提下增加额外功能&#xff0c;装饰器的返回值也是一个函数对象。它经常用于有切面需求的场景&#xff0c;比如&#xff1a;插入日志、性能测试、事务处理、缓存、权限校验等场景。装饰器是…

【python】获取4K壁纸保存到本地文件夹【附源码】

图片信息丰富多彩&#xff0c;许多网站上都有大量精美的图片资源。有时候我们可能需要批量下载这些图片&#xff0c;而手动一个个下载显然效率太低。因此&#xff0c;编写一个简单的网站图片爬取程序可以帮助我们高效地获取所需的图片资源。 目标网站&#xff1a; 如果出现模…

软件开发困境

软件开发的困境开发者的困境人月神话 布鲁克斯的核心观点包括&#xff1a; EAI &#xff08;企业应用集成&#xff09; 通过EAI&#xff0c;企业能够实现以下功能&#xff1a;实例介绍 信息孤岛软件开发有没有“银弹”&#xff1f; 软件开发的困境 在软件开发过程中&#xff0…

FPGA时钟资源详解(3)——全局时钟资源

FPGA时钟系列文章总览&#xff1a;FPGA原理与结构&#xff08;14&#xff09;——时钟资源https://ztzhang.blog.csdn.net/article/details/132307564 一、概述 全局时钟是 FPGA 中的一种专用互连网络&#xff0c;旨在将时钟信号分配到 FPGA 内各种资源的时钟输入处。这种设计…

ARM IHI0069F GIC architecture specification (4)

1.3 支持的配置和兼容性 在 Armv8-A 中&#xff0c;EL2 和 EL3 是可选的&#xff0c;PE 可以支持一个、两个或都不支持这些异常级别。 然而&#xff1a; • PE 要求EL3 支持安全和非安全状态。 • PE 需要EL2 来支持虚拟化。 • 如果未实施EL3&#xff0c;则只有一个安全状态。…

Mysql数据库——数据备份与恢复

目录 一、数据备份的重要性 二、数据库备份的分类 1.从物理与逻辑的角度分类 2.从数据库的备份策略角度&#xff0c;备份可分为 2.1完全备份 2.2差异备份 2.3增量备份 2.4总结 三、常见的备份方法 四、Mysql数据库完全备份 1.完全备份定义 2.优缺点 3.数据库完全备…

FPGA时钟资源详解(4)——区域时钟资源

FPGA时钟系列文章总览&#xff1a;FPGA原理与结构&#xff08;14&#xff09;——时钟资源https://ztzhang.blog.csdn.net/article/details/132307564 目录 一、概述 二、Clock-Capable I/O 三、I/O 时钟缓冲器 —— BUFIO 3.1 I/O 时钟缓冲器 3.2 BUFIO原语 四、区域时钟…

【每日一题】2642. 设计可以求最短路径的图类-2024.3.26

题目&#xff1a; 2642. 设计可以求最短路径的图类 给你一个有 n 个节点的 有向带权 图&#xff0c;节点编号为 0 到 n - 1 。图中的初始边用数组 edges 表示&#xff0c;其中 edges[i] [fromi, toi, edgeCosti] 表示从 fromi 到 toi 有一条代价为 edgeCosti 的边。 请你实…

计算机网络——数据链路层(差错控制)

计算机网络——数据链路层&#xff08;差错控制&#xff09; 差错从何而来数据链路层的差错控制检错编码奇偶校验码循环冗余校验&#xff08;CRC&#xff09;FCS 纠错编码海明码海明距离纠错流程确定校验码的位数r确定校验码和数据位置 求出校验码的值检错并纠错 我们今年天来继…

搜维尔科技:「工业仿真」煤炭矿井模拟仿真救援项目实施

煤炭矿井模拟救援系统满足煤矿企业在紧急避险应急演练方面的实际需要&#xff0c;在不耽误井下正常生产的情况下&#xff0c;高效率、低成本地实现对本矿区入井人员进行避灾演练培训&#xff0c;并学会正确的避灾自救互救方法。并可在本系统中直观的看到人员定位系统、监控系统…

Java毕业设计 基于SSM网上二手书店系统

Java毕业设计 基于SSM网上二手书店系统 SSM jsp 网上二手书店系统 功能介绍 用户&#xff1a;首页 图片轮播 图书查询 图书分类显示 友情链接 登录 注册 图书信息 图片详情 评价信息 加入购物车 资讯信息 资讯详情 个人中心 个人信息 修改密码 意见信息 图书收藏 已经付款 邮…

JavaWeb项目——MVC架构框架

表现层&#xff08;UI&#xff09;&#xff1a;直接跟前端打交互&#xff08;一是接收前端ajax请求&#xff0c;二是返回json数据给前端&#xff09;业务逻辑层&#xff08;BLL&#xff09;&#xff1a;一是处理表现层转发过来的前端请求&#xff08;也就是具体业务&#xff09…

群晖NAS安装Video Station结合内网穿透实现公网访问本地影音文件

文章目录 1.使用环境要求&#xff1a;2.下载群晖videostation&#xff1a;3.公网访问本地群晖videostation中的电影&#xff1a;4.公网条件下使用电脑浏览器访问本地群晖video station5.公网条件下使用移动端&#xff08;搭载安卓&#xff0c;ios&#xff0c;ipados等系统的设备…

ASR-LLM-TTS 大模型对话实现案例;语音识别、大模型对话、声音生成

参考:https://blog.csdn.net/weixin_42357472/article/details/136305123(llm+tts) https://blog.csdn.net/weixin_42357472/article/details/136411769 (asr+vad) 这里LLM用的是chatglm;电脑声音播报用的playsound 代码: ##运行 python main.pymain.py from multipro…

Form 表单选择多个时间段 完成必填校验

一、案例效果 二、案例思路 在 Vue 中,你可以使用 v-for 指令动态渲染多个表单项,并使用 v-model 指令进行双向数据绑定。同时,你可以使用 Element UI 的 Form 组件进行表单验证。动态定义校验 prop和rules三、代码案例 html <mtd-form-itemv-for="(item, index) i…