Netty学习——源码篇6 Pipeline设计原理 备份

 1 Pipeline设计原理

        在Netty中每个Channel都有且仅有一个ChannelPipeline与之对应,它们的组成关系如下图:

        通过上图可以看到,一个Channel包含了一个ChannelPipeline,而ChannelPipeline中又维护了一个由ChannelHandlerContext组成的双向链表。这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext又关联着一个ChannelHandler。

        通过分析代码,已经知道了一个Channel初始化的基本过程,下面在回顾一下。AbstractChannel构造器的代码如下:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

         AbstractChannel有一个pipeline属性,在构造器中会把它初始化为DefaultChannelPipeline的实例。这里的代码就印证了这一点:每个Channel都有一个ChannelPipeline。来看一下DefaultChannelPipeline的构造器,代码如下:

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

        在DefaultChannelPipeline构造器中,首先将与之关联的Channel保存到属性channel中。然后实例化两个ChannelHandlerContext:一个是HeadContext实例Head,另一个是TailContext实例Tail。接着将Head和Tail互相指向,构成一个双向链表。

        特别注意的是:在开始的示意图中,Head和Tail并没有包含ChannelHandler,这是因为HeadContext和TailContext继承于AbstractChannelHandlerContext的同时,也实现了ChannelHandler接口,所以它们有Context和Handler的双重属性。

2 ChannelPipeline初始化

        下面看一下ChannelPipeline的初始化具体做了哪些工作。先回顾一下,在实例化一个Channel时,会伴随着一个ChannelPipeline的实例化,并且此Channel会与这个ChannelPipeline相互关联,这一点可以通过NioEventLoop的父类AbstractChannel的构造器予以佐证,代码如下:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

        当实例化一个NioSocketChannel时,其pipeline属性就是新创建的DefaultChannelPipeline对象,再来回顾一下DefaultChannelPipeline的构造方法,代码如下:

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

        上面代码中的Head实现了ChannelInboundHandler接口,而Tail实现了ChannelOutboundHandler接口,因此可以说Head和Tail就是ChannelHandler,又是ChannelHandlerContext。

3 ChannelInitializer的添加

        前面分析过Channel的组成,最开始的时候ChannelPipeline中含有两个ChannelHandlerContext,但是此时的Pipeline并不能实现特定的功能,因为还没有添加自定义的ChannelHandler。通常来说,在初始化Bootstrap时,会添加自定义的ChannelHandler,下面就以具体的客户端启动代码片段举例:

Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            System.out.println("初始化channel:" + socketChannel);
                        }
                    });

        在调用Handler时,传入ChannelInitializer对象,它提供了一个initChannel方法来初始化ChannelHandler。通过代码跟踪,发现ChannelInitializer是在Bootstrap的init方法中添加到ChannelPipiline中的,代码如下:

    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast(new ChannelHandler[]{this.config.handler()});
        Map<ChannelOption<?>, Object> options = this.options0();
        synchronized(options) {
            Iterator i$ = options.entrySet().iterator();

            while(true) {
                if (!i$.hasNext()) {
                    break;
                }

                Entry e = (Entry)i$.next();

                try {
                    if (!channel.config().setOption((ChannelOption)e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable var10) {
                    logger.warn("Failed to set a channel option: " + channel, var10);
                }
            }
        }

        Map<AttributeKey<?>, Object> attrs = this.attrs0();
        synchronized(attrs) {
            Iterator i$ = attrs.entrySet().iterator();

            while(i$.hasNext()) {
                Entry<AttributeKey<?>, Object> e = (Entry)i$.next();
                channel.attr((AttributeKey)e.getKey()).set(e.getValue());
            }

        }
    }

        从上面的代码可见,将handler()方法返回的ChannelHandler添加到Pipeline中,而handler()方法返回的其实就是在初始化Bootstrap时通过handler方法设置的ChannelInitializer实例,因此这里就将ChannelInitializer插到了Pipieline的末端。此时Pipeline的结构如下图所示:

        这是会有一个疑问,明明插入的是 ChannelInitializer实例,为什么在ChannelPipeline的双向链表中的元素却是一个ChannelHandlerContext呢?继续看源码,在Bootstrap的init方法中会调用p.addList()方法,将ChannelInitializer插入链表的末端,代码如下:

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

        addList方法有很多重载的方法,只需要关注这个方法即可。上面的addList方法中,首先检查ChannelHandler的名字是否重复,如果不重复,则调用newContext方法为这个Handler创建一个对应的DefaultChannelHandlerContext实例,并与之关联起来。

        为了添加一个Handler到Pipeline中,必须把此Handler包装成ChannelHandlerContext。因此在上面的代码中,我们新实例化一个newCx对象,并将Handler作为参数传递到构造方法中。下面来看一下DefaultChannelHandlerContext的构造器。

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

        在DefaultChannelHandlerContext的构造器中,调用了isInbound()方法和isOutbound()方法,这两个方法的代码如下:

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }

        从上面代码中可以看到,当一个Handler实现了ChannelInboundHandler接口,则isInbound返回true;类似的,当一个Handler实现了ChannelOutboundHandler接口,则isOuntbound返回true。而这两个boolean类型变量会传递给父类AbstractChannelHandlerContext中,并初始化父类的两个属性:inbound和outbound。

        这里的ChannelInitializer所对应的DefaultChannelHandlerContext的inbound与outbound属性分别是什么呢?先来看ChannelInitializer的类层次结构图,如下图:

        可以看到,ChannelInitializer仅仅实现了ChannelInboundHandler接口,因此这里实例化的DefaultChannelHandlerContext的inbound是true,outbound是false。

        inbound和outbound这两个属性关系到Pipeline事件的流向与分类,因此十分关键。这里先记住一个结论:ChannelInitializer所对应的DefaultChannelHandlerContext的inbound=true,outbound=false。

        当创建好Context之后,就将这个Context插入Pipeline的双向链表中。

DefaultChannelPipeline.java
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

        添加完ChannelInitializer的Pipeline内部如下图所示

4 自定义ChannelHandler的添加过程

        上面分析了ChannelInitializer是如何插入Pipeline中的,接下来探讨ChannelInitializer在哪里被调用、ChannelInitializer的作用以及自定义的ChannelHandler是如何插入Pipeline中的。

        自定义ChannelHandler的添加过程,发生在AbstractUnsafe的register方法中,在这个方法中调用了pipeline.fireChannelRegister()方法,代码如下:

    @Override
    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }

        再看AbstractChannelHandlerContext的invokeChannelRegister()方法。

    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

        很显然,这个代码将从Head开始遍历Pipeline的双向链表,然后找到第一个属性inbound为true的ChannelHandlerContext实例。在分析ChannelInitializer时,专门分析了inbound和outbound属性,现在这里就用上了。回想一下,ChannelInitializer实现了ChannelInboundHandler,因此它所对应的ChannelHandlerContext的inbound属性为true,因此这里返回的就是ChannelInitializer实例所对应的ChannelHandlerContext对象,如下图所示

        当获取inbound的Context后,就调用它的invokeChannelRegistered()方法。

    private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }

         我们已经知道,每个ChannelHandler都和一个ChannelHandlerContext关联,可以通过ChannelHandlerContext获取对应的ChannelHandler。很明显,这里handler()方法返回的对象其实就是一开始实例化的ChannelInitializer对象,接着调用了ChannelInitializer的channelRegister()方法。ChannelInitializer的channelRegister()方法的代码如下:

    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (this.initChannel(ctx)) {
            ctx.pipeline().fireChannelRegistered();
        } else {
            ctx.fireChannelRegistered();
        }

    }
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (this.initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
            try {
                this.initChannel(ctx.channel());
            } catch (Throwable var6) {
                this.exceptionCaught(ctx, var6);
            } finally {
                this.remove(ctx);
            }

            return true;
        } else {
            return false;
        }
    

        initChannel()方法就是在初始化Bootstrap时,调用handler方法传入的匿名内部类所实现的方法。因此,在调用这个方法之后,自定义的ChannelHandler就插入到Pipeline中,此时Pipeline的状态如下图:

        当添加完自定义的ChannelHandler后,在finally代码块会删除自定义的ChannelHandler,也就是remove(ctx),最终调用ctx.pipeline().remove(this),因此最后Pipeline的状态如下图:

        到此,自定义ChannelHandler的添加过程也就分析完成了。 

5 给ChannelHandler命名

        pipeline.addXXX()都有一个重载方法,例如addList()有一个重载的版本,代码如下:

public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }

        第一个参数指定添加的是Handler的名字(更准确的说是ChannelHandlerContext的名字)。那么Handler的名字有什么用呢?如果不设置name,那么Handler默认的名字是怎么样的?上面的方法会调用重载的addLast()方法,代码如下:

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

        第一个参数设置为null,不用关心。第二个参数就是Handler的名字。有代码可知,再添加一个Handler之前,需要调用checkMultiplicity()方法来确定新添加的Handler名字是否与已添加的Handler名字重复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/489319.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

零基础学python之高级编程(6)---Python中进程的Queue 和进程锁,以及进程池的创建 (包含详细注释代码)

Python中进程的Queue 和进程锁,以及进程池的创建 文章目录 Python中进程的Queue 和进程锁,以及进程池的创建前言一、进程间同步通信(Queue)二、进程锁&#xff08;Lock&#xff09;三、创建进程池Poorpool 类方法: End! 前言 大家好,上一篇文章,我们初步接触了进程的概念及其应…

【随笔】Git -- 常用命令(四)

&#x1f48c; 所属专栏&#xff1a;【Git】 &#x1f600; 作  者&#xff1a;我是夜阑的狗&#x1f436; &#x1f680; 个人简介&#xff1a;一个正在努力学技术的CV工程师&#xff0c;专注基础和实战分享 &#xff0c;欢迎咨询&#xff01; &#x1f496; 欢迎大…

第16篇:奇偶校验器

Q&#xff1a;本期我们将实现4位奇偶校验逻辑电路&#xff0c;即校验4位二进制代码中 “1” 的个数是奇数或偶数。 A&#xff1a;奇偶校验器的基本原理&#xff1a;采用异或运算对“1”的奇偶个数进行校验&#xff0c;从最高位依次往最低位进行连续异或运算。如果最后的异或运…

stm32控制电机--计算电角度以及电角度和机械角度的对应关系---以及foc的控制算法模型及过程(推荐)

1&#xff0c;电角度和机械角度的关系 如何区分电角度和机械角度&#xff1f; 2&#xff0c;foc模型工程&#xff08;推荐&#xff09; SimpleFOC移植STM32&#xff08;四&#xff09;—— 闭环控制 注意速度需要进行低通滤波

使用Docker Compose一键部署前后端分离项目(图文保姆级教程)

一、安装Docker和docker Compose 1.Docker安装 //下载containerd.io包 yum install https://download.docker.com/linux/fedora/30/x86_64/stable/Packages/containerd.io-1.2.6-3.3.fc30.x86_64.rpm //安装依赖项 yum install -y yum-utils device-mapper-persistent-data l…

消费电子回暖之际,手机回收厂商如何持续释放“绿色潜力”?

春天到来的暖意&#xff0c;正在消费电子产业链上下游蔓延。 仅就手机这一品类而言&#xff0c;可以看到&#xff0c;2023年手机厂商已经度过寒冬&#xff0c;中国信息通信研究院发布的数据显示&#xff0c;2023年1-12月&#xff0c;我国手机总体出货量累计2.89亿部&#xff0…

新生儿奶瓶怎么选择?5款口碑榜单奶瓶推荐

新生儿奶瓶是每个新手爸妈都要选择的喂养产品&#xff0c;除了喂养宝宝外&#xff0c;还能帮助宝宝渡过戒奶期。然而近年来&#xff0c;市面上出现一些低质量、劣质材料制成的奶瓶&#xff0c;频频被爆安全隐患&#xff0c;给消费者带来极大的不便和风险。那么你知道什么牌子的…

Linux的学习之路:1、发展史与编译环境的搭建

一、发展史 1991年10月5日&#xff0c;赫尔辛基大学的一名研究生Linus Benedict Torvalds在一个Usenet新闻组 &#xff08;comp.os.minix&#xff09;中宣布他编制出了一种类似UNIX的小操作系统&#xff0c;叫Linux。新的操作系统是受到另一个UNIX的小操作系统——Minix的启发…

Cisco firepower 2140 run ASA and config failover

1 背景 here we got 2 cisco firepower 2140 hardware appliance we’re planning to run ASA on it. and config failover for Primary Unit and Secondary Unit 现场2台Cisco firepower 2140防火墙&#xff0c; 运行ASA模式&#xff0c; 双机组HA&#xff0c;心跳线使用E1/1…

服务器基础知识(物理服务器云服务器)

今天我们来介绍一下服务器的基础知识 一、服务器硬件基础知识 组件说明中央处理器&#xff08;CPU&#xff09;CPU是服务器的大脑&#xff0c;负责执行计算任务和指令。服务器通常配备多个CPU核心&#xff0c;以支持并行处理和提高性能。关键的CPU性能指标包括时钟频率、核心数…

sonarqube使用指北(三)-编写代码进行自动化扫描

一、引言 上一篇文章之后 我们应该已经成功完成的配置了扫描环境并执行了一次基本的本地扫描,但是之前的手动扫描需要我们每一次都手动切换到代码目标并手动执行扫描命令,效率很低。在代码库较大的情况下会占用大量的时间。这一章我们会通过编写python代码的形式来实现自动化…

Python学习:循环语句

Python循环语句 概念 循环语句是编程中常用的结构&#xff0c;用于多次执行相同或类似的代码块。Python中有两种主要的循环语句&#xff1a;for循环和while循环。 for循环&#xff1a; for循环用于遍历一个序列&#xff08;如列表、元组、字符串等&#xff09;中的元素&#x…

二十二、软考-系统架构设计师笔记-真题解析-2018年真题

软考-系统架构设计师-2018年上午选择题真题 考试时间 8:30 ~ 11:00 150分钟 1.在磁盘调度管理中&#xff0c;应先进行移臂调度&#xff0c;再进行旋转调度。假设磁盘移动臂位于21号柱面上&#xff0c;进程的请求序列如下表所示。如果采用最短移臂调度算法&#xff0c;那么系统…

PyPy为什么能让Python比C还快?一文了解内在机制

「如果想让代码运行得更快&#xff0c;您应该使用 PyPy。」—— Python 之父 Guido van Rossum 对于研究人员来说&#xff0c;迅速把想法代码化并查看其是否行得通至关重要。Python 是能够实现这一目标的出色语言&#xff0c;它能够让人们专注于想法本身&#xff0c;而不必过度…

万亿功能性食品市场爆火,北美膳食健康品牌GNITE如何抓住“朋克养生”年轻人!

近几年&#xff0c;年轻人的养生意识不断提升&#xff0c;“吃出健康”理念盛行&#xff0c;在中国年轻人独有的“懒养生”理念加持下&#xff0c;功能性软糖精准击中年轻人的健康焦虑&#xff0c;助眠、美白、护眼、补铁、减脂……等产品在新消费领域兴起&#xff0c;消费热度…

邮件接口与第三方平台的集成的方式有哪些?

邮件接口如何实现高效通信&#xff1f;怎么有效地利用邮件接口&#xff1f; 邮件接口与第三方平台的集成已经成为了企业提升工作效率、优化用户体验的关键环节。那么&#xff0c;邮件接口与第三方平台的集成方式究竟有哪些呢&#xff1f;接下来&#xff0c;AokSend就来探讨一下…

蓝桥杯练习题——博弈论

1.必胜态后继至少存在一个必败态 2.必败态后继均为必胜态 Nim游戏 思路 2 3&#xff0c;先手必赢&#xff0c;先拿 1&#xff0c;然后变成 2 2&#xff0c;不管后手怎么拿&#xff0c;先手同样操作&#xff0c;后手一定先遇到 0 0 a1 ^ a2 ^ a3 … ^ an 0&#xff0c;先…

STL----vector的模拟实现

1. vector的介绍与使用 1.1 vector的介绍 1. vector是表示可变大小数组的序列容器。 2. 就像数组一样&#xff0c;vector也采用的连续存储空间来存储元素。也就是意味着可以采用下标对vector的元素进行访问&#xff0c;和数组一样高效。但是又不像数组&#xff0c;它的大小是可…

Vscode + PlatformIO + Arduino 搭建EPS32开发环境

Vscode PlatformIO Arduino 搭建EPS32开发环境 文章目录 Vscode PlatformIO Arduino 搭建EPS32开发环境1. Vscode插件安装2. 使用PlatformIO新建工程3.工程文件的基本结构4.一个基本的测试用例Reference 1. Vscode插件安装 如何下载vscode这里不再赘述&#xff0c;完成基本…