Netty学习——源码篇7 Pipeline的事件传播机制1 备份

        上篇:Netty学习——源码篇6 Pipeline设计原理 已经知道AbstractChannelHandlerContext中有Inbound和Outbound两个boolean变量,分别用于识别Context所对应的Handler的类型。

        1、Inbound为true时,表示其对应的ChannelHandler是ChannelInboundHandler的子类。

        2、Outbound为true时,表示其对应的ChannelHandler是ChannelOutboundHandler的子类。

        这两个属性到底有什么作用呢?还要从ChannelPipeline的事件传播类型说起。Netty中的传播事件可以分为两种:Inbound事件和Outbound事件。以下是Netty官网针对这两个事件的说明。

         由上可以看出,Inbound和Outbound事件的流向是不一样的,Inbound事件的流向是从下至上的,而Outbound恰好相反,是从下到上。并且Inbound方法是通过调用相应的ChannelHandlerContext.fireIN_EVT()方法来传递的,而Outbound方法是通过ChannelHandlerContext的fireChannelRegister()调用会发送一个ChannelRegistered的Inbound给下一个ChannelHandlerContext,而ChannelHandlerContext的bind()方法调用时会发送一个bind的Outbound事件给下一个ChannelHandlerContext。

        Inbound事件传播方法代码如下:

public interface ChannelInboundHandler extends ChannelHandler {

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
     */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} is now active
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
     * end of lifetime.
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    /**
     * Invoked when the current {@link Channel} has read a message from the peer.
     */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    /**
     * Invoked when the last message read by the current read operation has been consumed by
     * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
     * attempt to read an inbound data from the current {@link Channel} will be made until
     * {@link ChannelHandlerContext#read()} is called.
     */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if an user event was triggered.
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    /**
     * Gets called once the writable state of a {@link Channel} changed. You can check the state with
     * {@link Channel#isWritable()}.
     */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if a {@link Throwable} was thrown.
     */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

        Outbound事件传播方法的代码如下:

public interface ChannelOutboundHandler extends ChannelHandler {
    /**
     * Called once a bind operation is made.
     *
     * @param ctx           the {@link ChannelHandlerContext} for which the bind operation is made
     * @param localAddress  the {@link SocketAddress} to which it should bound
     * @param promise       the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception    thrown if an error accour
     */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a connect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the connect operation is made
     * @param remoteAddress     the {@link SocketAddress} to which it should connect
     * @param localAddress      the {@link SocketAddress} which is used as source on connect
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a disconnect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the disconnect operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a close operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a deregister operation is made from the current registered {@link EventLoop}.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Intercepts {@link ChannelHandlerContext#read()}.
     */
    void read(ChannelHandlerContext ctx) throws Exception;

    /**
    * Called once a write operation is made. The write operation will write the messages through the
     * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
     * {@link Channel#flush()} is called
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
     * @param msg               the message to write
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    /**
     * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
     * that are pending.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the flush operation is made
     * @throws Exception        thrown if an error accour
     */
    void flush(ChannelHandlerContext ctx) throws Exception;
}

        可以发现,Inbound类似于事件回调(响应请求的事件),而Outbound类似于主动触发(发起请求的事件)。注意,如果捕获了一个事件,并且想让这个事件继续传递下去,需要调用Context对应的fireXXX()方法。

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void channelActive(ChannelHandlerContext ctx)  throws Exception{
        System.out.println("连接成功");
        ctx.fireChannelActive();
    }
    
}
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception{
        System.out.println("客户端关闭");
        ctx.close(promise);
    }
}

        如上面的代码所示,MyInboundHandler收到了一个channelActive事件,它在处理后,如果希望将事件继续传播下去,那么需要接着调用ctx.fireChannelActive()方法。

        下面用一个代码案例了解一下Pipeline的传播机制。分别编写InboundHandlerA、InboundHandlerB、InboundHandlerC和OutboundandlerA、OutboundandlerB、OutboundandlerC类。

public class InboundHandlerA extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        System.out.println("InboundHandlerA");
        ctx.fireChannelRead(msg);
    }
}
public class InboundHandlerB extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
        System.out.println("InboundHandlerB");
        ctx.fireChannelRead(msg);
    }
}
public class InboundHandlerC extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
        System.out.println("InboundHandlerC");
        ctx.fireChannelRead(msg);
    }
}

        以上三个类都调用了ctx.fireChannelRead()方法向下传播。

public class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
    
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
        System.out.println("OutboundHandlerA write");
        ctx.write(msg,promise);
    }
}

public class OutboundHandlerB extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
        System.out.println("OutboundHandlerB write");
        ctx.write(msg,promise);
    }

    @Override
    public void handlerAdded(final ChannelHandlerContext ctx) throws Exception{
        ctx.executor().schedule(new Runnable() {
            @Override
            public void run() {
                ctx.channel().write("hello");
            }
        },3, TimeUnit.SECONDS);
    }
}

public class OutboundHandlerC extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
        System.out.println("OutboundHandlerC write");
        ctx.write(msg,promise);
    }
}

        上面的三个类都调用了ctx.write()方法。下面编写测试代码,来了解其传播顺序。先编写服务端代码。PipelineServer类主要完成Pipeline的注册工作,代码如下:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class PipelineServer {
    public void start(int port) throws Exception{
        NioEventLoopGroup bossGroup= new NioEventLoopGroup();
        NioEventLoopGroup workerGroup= new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception{
                            //InboundHandler的执行顺序,应该是 A B C
                            ch.pipeline().addLast(new InboundHandlerA());
                            ch.pipeline().addLast(new InboundHandlerB());
                            ch.pipeline().addLast(new InboundHandlerC());
                            
                            //Outbound的执行顺序应该是C B A
                            ch.pipeline().addLast(new OutboundHandlerA());
                            ch.pipeline().addLast(new OutboundHandlerB());
                            ch.pipeline().addLast(new OutboundHandlerC());
                        }
                    }).option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true);
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
        
    }

    public static void main(String[] args) throws Exception{
        PipelineServer server = new PipelineServer();
        server.start(8080);
    }
}

        PipelineClient类,与服务端建立连接并向服务端发送数据,代码如下:

public class PipelineClient {
    public void connect(String host,int port) throws Exception{
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE,true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception{
                    ch.pipeline().addLast(new ClientInhandler());
                }
            });
            ChannelFuture f = b.connect(host,port).sync();
            f.channel().closeFuture().sync();
        }finally {
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception{
        PipelineClient client = new PipelineClient();
        client.connect("127.0.0.1",8080);
    }
}

        ClientHandler类,完成向服务端发送数据的动作,代码如下:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientInhandler extends ChannelInboundHandlerAdapter {

    //读取服务端的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
        System.out.println("clientInHandler.channelRead");
        ByteBuf result = (ByteBuf) msg;
        byte[] result1 = new byte[result.readableBytes()];
        result.readBytes(result1);
        result.release();
        ctx.close();
        System.out.println("server infomation:" + new String(result1));
    }

    //当连接建立的时候向服务端发送消息,channelActive 事件在连接建立的时候会被触发
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception{
        System.out.println("ClientHandler.channelActive");
        String msg = "are you ok";
        ByteBuf encoded = ctx.alloc().buffer(4 * msg.length());
        encoded.writeBytes(msg.getBytes());
        ctx.write(encoded);
        ctx.flush();
    }
}

        接下来运行测试代码,分别启动PipelineServer和PipelineClient,得到的运行结果如下图:

        从运行结果上看,Handler的传播顺序:从Inbound开始顺序执行,然后从Outbound逆序执行。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/492339.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

考了PMP证后工资大概是多少 ?

PMP自1999年引入国内以来&#xff0c;大家对这个证书的了解并不深&#xff0c;每年考试的人数也不多。但随着越来越多的企业认可PMP认证&#xff0c;目前考证的人数不断增加&#xff0c;几乎所有与项目管理相关的人都知道这个证书的重要性。这个证书在招聘要求中出现频率较高&a…

小红的炸砖块

题目描述 小红正在玩一个“炸砖块”游戏&#xff0c;游戏的规则如下&#xff1a; 初始有一个n∗m的砖块矩阵。小红会炸k次&#xff0c;每次会向一个位置投炸弹&#xff0c;如果这个位置有一个砖块&#xff0c;则砖块消失&#xff0c;上方的砖块向下落。 小红希望你画出最终砖块…

StringBuffer和大数值

读取 import java.util.Scanner;public class index{public static void main(String[] args){Scanner in new Scanner(System.in);System.out.println("Whats your name?");String name in.nextLine();Scanner inage new Scanner(System.in);System.out.printl…

Linux虚拟机环境搭建spark

Linux环境搭建Spark分为两个版本&#xff0c;分别是Scala版本和Python版本。 一、 安装Pyspark 本环境以 Python 环境为例。 1、下载spark 下载网址&#xff1a;https://archive.apache.org/dist/spark 下载安装包&#xff1a;根据自己环境选择合适版本&#xff0c;本环境…

详细分析Linux中的core dump异常(附 Demo排查)

目录 1. 基本知识2. 进阶知识3. Demo4. 彩蛋 1. 基本知识 Core dump 是指在程序异常终止时&#xff0c;操作系统将程序的内存映像保存到磁盘上的一种机制。 在 Linux 系统中&#xff0c;core dump 提供了一种调试程序错误的重要方式&#xff0c;它记录了程序在崩溃时的内存状态…

同城双活:交易链路的稳定性与可靠性探索

知易行难&#xff0c;双活过程中遇到了非常多的问题&#xff0c;但是回过头看很难完美的表述出来&#xff0c;之所以这么久才行文也是这个原因&#xff0c;总是希望可以尽可能的复现当时的思考、问题细节及解决方案&#xff0c;但是写出来才发现能给出的都是多次打磨、摸索之后…

大数据开发(日志离线分析项目)

大数据开发&#xff08;日志离线分析项目&#xff09; 一、项目需求1、使用jqueryecharts的方式调用程序后台提供的rest api接口&#xff0c;获取json数据&#xff0c;然后通过jquerycss的方式进行数据展示。工作流程如下&#xff1a;2、七大角度1、用户基本信息分析模块2、浏览…

秋招刷题2

1.字符串分割 public static void main(String[] args) {Scanner scnew Scanner(System.in);while(sc.hasNext()){String strsc.nextLine();StringBuilder sbnew StringBuilder();sb.append(str);int sizestr.length();int addZero8-size%8;while((addZero>0&&(addZ…

【数据结构】受限制的线性表——队列

&#x1f9e7;&#x1f9e7;&#x1f9e7;&#x1f9e7;&#x1f9e7;个人主页&#x1f388;&#x1f388;&#x1f388;&#x1f388;&#x1f388; &#x1f9e7;&#x1f9e7;&#x1f9e7;&#x1f9e7;&#x1f9e7;数据结构专栏&#x1f388;&#x1f388;&#x1f388;&…

机器学习——神经网络简单了解

一、神经网络基本概念 神经网络可以分为生物神经网络和人工神经网络 (1)生物神经网络,指的是生物脑内的神经元、突触等构成的神经网络&#xff0c;可以使生物体产生意识&#xff0c;并协助生物体思考、行动和管理各机体活动。 (2)人工神经网络,是目前热门的深度学习的研究…

C++初阶:反向迭代器模板,dequeue与模板进阶

目录 1. 反向迭代器的实现2. 容器deque的数据结构&#xff08;双端队列&#xff09;3. 模板的进阶知识与使用3.1 非类型模板参数3.2 模板特化3.2.1 全特化3.2.2 偏特化&#xff08;半特化&#xff09; 3.3 模板的分离编译 1. 反向迭代器的实现 反向迭代器与正向迭代器的行为与定…

【C++中的STL(未完成)】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 例如&#xff1a;…

java 8 stream api将List<T>转换成树形结构

1、新建实体类 package com.example.springboot3.entity;import lombok.Builder; import lombok.Data;import java.util.List;Data Builder public class Menu {/*** id*/public Integer id;/*** 名称*/public String name;/*** 父id &#xff0c;根节点为0*/public Integer p…

vue3+threejs新手从零开发卡牌游戏(十七):模拟对方手牌上场

写一个模拟对方手牌上场的事件&#xff0c;其中注意上场后卡牌需要翻转下&#xff0c;同时调整攻击力文字位置&#xff0c;主要代码如下&#xff1a; utils/common.ts&#xff1a; import { nextTick } from vue; import * as THREE from three; import * as TWEEN from tween…

【Java程序设计】【C00377】基于(JavaWeb)Springboot的社区医疗服务系统(有论文)

【C00377】基于&#xff08;JavaWeb&#xff09;Springboot的社区医疗服务系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 博主介绍&#xff1a;java高级开发&#xff0c;从事互联网行业六年&#xff0c;已经做了六年的毕业设计程序开发&#x…

javaSSM公司招聘管理系统IDEA开发mysql数据库web结构计算机java编程maven项目

一、源码特点 IDEA开发SSM公司招聘管理系统是一套完善的完整企业内部系统&#xff0c;结合SSM框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用SSM框架&#xff08;MVC模式开发&#xff09;MAVEN方式加 载&#xff0c;系统具有完整的源代码和…

极端道路天气数据集 雨天 雾天 道路晴朗

极端道路天气数据集 是一系列专为自动驾驶、智能交通系统研发以及计算机视觉算法测试而设计的真实世界或模拟的道路环境图像和视频集合。这些数据集包含了在各类极端天气条件下捕捉到的道路场景&#xff0c;例如大雾、暴雨、暴雪、冰雹、雾霾、道路结冰等&#xff0c;这些都是…

Vue3 + Vite + TS + Element-Plus + Pinia创建新项目(1)

1、cmd进入命令行后&#xff0c;输入npm create vite 2、使用vs code打开文件夹 3、在VS Code的终端里面输入命令&#xff1a;npm i 安装依赖 4、安装依赖库 npm i vue-router 路由安装 npm i pinia 全局状态管理 npm i axios 请求库 npm i element-p…

30---SDRAM电路设计

视频链接 SDRAM电路设计01_哔哩哔哩_bilibili SDRAM电路设计 1、SDRAM简介 SDRAM&#xff1a;Synchronous Dynamic Random Access Memory&#xff0c;同步动态随机存储器。 同步是指其时钟频率和CPU前端总线的系统时钟相同&#xff0c;并且内部命令的发送与数据的传输都以…

国内ip切换app,让切换ip变得简单

在数字化快速发展的今天&#xff0c;互联网已经成为我们生活中不可或缺的一部分。然而&#xff0c;随着网络应用的深入&#xff0c;用户对于网络环境的需求也日益多样化。其中&#xff0c;IP地址作为网络中的关键标识&#xff0c;其切换与管理显得尤为重要。为了满足用户对于IP…