初识Netty网络编程

Netty网络编程

对于高并发的Reactor线程模型,Netty是如何支持的?

Netty线程模型是基于Reactor模型实现的,对Reeactor三种模式都有非常好的支持,并做了一定的改进,也非常的灵活,一般情况,在服务端会采用主从架构模型。

BossGroup负责和客户端建立连接(循环执行三个步骤):

<1> 轮询注册在其上的accept事件

<2> 处理accept事件与客户端建立连接,生成socketchannel,并将其注册到某个WorkerGroup的Selector上

<3> 循环处理任务队列中的其他任务

WorkerGroup负责处理连接上的读写循环执行三个步骤):

<1> 轮询注册在其上的socketchannel的读写事件

<2>在对应的socketchannel上处理读写事件

<3> 循环处理任务队列中的其他任务

注意:以上两个步骤<2>中,会用Pipeline,通过Pipeline可以拿到对应的Channel,Pipeline中维护了很多Handler(拦截、过滤、自定义处理器等)。

从代码编写上Netty如何使用不同的线程模型?

指定线程就好了

在客户端Netty使用什么线程模型?

只需要提供可用的线程池即可,开发者仍然只需编写事件处理器Handler,其他连接服务端,数据收发等均由netty实现

什么是ChannelPipeline和ChannelHandler ?

ChannelPipeline

  • ChannelPipeline 是一个有序的处理器链容器,每个 Channel (在网络编程中,Channel 表示连接的抽象)都有自己的管道。管道中的处理器按顺序排列,形成一个链式结构,用于处理或拦截进出 Channel 的各种事件和数据。
  • 当数据从网络流入或流出时,它会经过管道中的每一个处理器。处理器可以修改、过滤、转换数据或者处理特定类型的事件(比如连接建立、读取完成、异常等)。
  • ChannelPipeline 提供了一种灵活的方式来组织和管理这些处理器,允许开发者在运行时动态插入、删除或替换处理器,从而定制不同阶段的数据处理逻辑。

ChannelHandler

  • ChannelHandler 是处理网络事件和数据的接口或抽象类的实现,它是管道中的基本单元,负责处理具体的业务逻辑。
  • 根据处理方向的不同,ChannelHandler有两个主要的子接口:
  • ChannelInboundHandler:处理入站事件和数据,例如读取客户端发送过来的消息、连接激活、用户事件等。
  • ChannelOutboundHandler:处理出站事件和数据,例如客户端请求发送数据、关闭连接等。
  • 一个 ChannelHandler 可能同时实现这两个接口,以处理双向的事件和数据流。另外,Netty还提供了一些抽象适配器类(如 ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter),方便开发者扩展和实现自己的处理器逻辑。

  • inbound入站事件处理顺序(方向)是由链表的头到链表尾,

  • outbound事件的处理顺序是由链表尾到链表头。

  • inbound入站事件由netty内部触发,最终由netty外部的代码消费

  • 在Netty框架中,"inbound入站事件"指的是那些由网络底层(如操作系统内核)向Netty框架推送的数据到达事件,或者是由于网络连接状态变化(如连接建立、断开等)引发的事件。这类事件是由网络外部(相对于应用层代码)触发的,但最终需要由开发人员编写的Netty外部代码(即应用程序代码)来处理和消费。

  • outbound事件由netty外部的代码触发,最终由netty内部消费

  • "outbound出站事件"是指由应用程序代码发起的、意图影响或控制网络连接的行为事件。这些事件通常涉及到向外发送数据、关闭连接、改变连接状态等操作,它们是由开发者编写的Netty外部代码主动触发的。

实现一个简单的客户端和服务端通信的程序

服务端

客户端

两端ChannelHandler编写核心步骤

NettyServer

package com.hayaizo.netty.handler;

import com.hayaizo.netty.handler.handler.server.ServerInbound1Handler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.jbosslog.JBossLog;

import java.net.InetSocketAddress;

public class NettyServer {
    public static void main(String[] args) {
        NettyServer server=new NettyServer();
        server.start(9999);
    }

    public void start(int port){
            //创建线程池
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //配置服务端引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    //客户端的handler
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            ChannelPipeline pipeline=nioSocketChannel.pipeline();
                            pipeline.addLast(new ServerInbound1Handler());
                        }
                    });
            //绑定端口
            ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(port)).sync();

            //获取服务端的channel
            future.channel().closeFuture().sync();
            //释放资源
            //在finally中释放
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

NettyClient

package com.hayaizo.netty.handler;

import com.hayaizo.netty.handler.handler.client.ClientInbound1Handler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
    public static void main(String[] args) {
        NettyClient client=new NettyClient();
        client.connect("127.0.0.1",9999);
    }

    public void connect(String host,int port){
        //创建线程池
        EventLoopGroup group=new NioEventLoopGroup();
        try {
            //创建客户端引导类
            Bootstrap bootstrap = new Bootstrap();
            //配置
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            //handler方法
                            ChannelPipeline pipeline= nioSocketChannel.pipeline();
                            pipeline.addLast(new ClientInbound1Handler());
                        }
                    });
            //连接对端
            ChannelFuture future = bootstrap.connect(host, port).sync();
            //连接上了之后发送消息
            //先获取到发送消息的管道
            Channel channel = future.channel();
            byte[] bytes = "hello netty server,I an netty client".getBytes();
            //向管道要一块缓冲区
            ByteBuf buf=channel.alloc().buffer(bytes.length);
            buf.writeBytes(bytes);
            channel.writeAndFlush(buf);
            //链接全都关闭了之后释放资源
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }
}

ClientInbound1Handler

package com.hayaizo.netty.handler.handler.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.nio.charset.Charset;

public class ClientInbound1Handler extends ChannelInboundHandlerAdapter {
    /**
     * 客户端channel 准备就绪 ,生效
     *  只回调一次
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("ClientInbound1Handler ----channelActive ");
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("ClientInbound1Handler ----channelInactive ");
        super.channelInactive(ctx);
    }

    /**
     * 从channel中读取到了数据之后,回调
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("ClientInbound1Handler ----channelRead ");
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);

        String message = new String(bytes, Charset.defaultCharset());
        System.out.println("客户端收到服务端发送的数据为:" + message);

        /*Channel channel = ctx.channel();
        // 向服务端端回写数据
        byte[] bytes1 = "hello netty client ,i am netty server ,hehe".getBytes(StandardCharsets.UTF_8);
        ByteBuf buf1 = ctx.alloc().buffer(bytes1.length);
        buf1.writeBytes(bytes1);
        channel.writeAndFlush(buf1);*/


        super.channelRead(ctx, msg);
    }

    /**
     * channel中数据读取完毕的回调
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("ClientInbound1Handler ----channelReadComplete ");
        // ctx.writeAndFlush()
        super.channelReadComplete(ctx);
    }

    /**
     * 出现异常的回调
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("ClientInbound1Handler ----exceptionCaught ");
        super.exceptionCaught(ctx, cause);
    }
}

ServerInbound1Handler

package com.hayaizo.netty.handler.handler.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class ServerInbound1Handler extends ChannelInboundHandlerAdapter {
    /**
     * 客户端channel准备就绪,只会调用一次
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("ServerInbound1Handler ----channelActive ");
        super.channelActive(ctx);
    }

    /**
     * 管道没了之后调用的
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("ServerInbound1Handler ----channelInactive ");
        super.channelInactive(ctx);
    }

    /**
     * 读到数据之后回调这个函数
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("ServerInbound1Handler ----channelRead ");
        ByteBuf buf=(ByteBuf) msg;
        byte[] bytes=new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        String message=new String(bytes, Charset.defaultCharset());
        System.out.println("服务端收到的数据为:"+message);

        //写回数据
        Channel channel = ctx.channel();
        byte[] bytes1 = "hello netty client ,i am netty server ,hahha".getBytes(StandardCharsets.UTF_8);
        ByteBuf buf1 = ctx.alloc().buffer(bytes1.length);
        buf1.writeBytes(bytes1);
        channel.writeAndFlush(buf1);

        super.channelRead(ctx, msg);
    }

    /**
     * channel中数据读取完毕的回调
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("ServerInbound1Handler ----channelReadComplete ");
        super.channelReadComplete(ctx);
    }


    /**
     * 出现异常的回调
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("ServerInbound1Handler ----exceptionCaught ");
        super.exceptionCaught(ctx, cause);
    }
}

测试收发成功。

在Netty框架中,当我们搭建网络服务器时,会用到Bootstrap类来配置和启动服务器端口监听新进来的连接。在Bootstrap中,我们可以设置一个ChannelInitializer

ChannelInitializer就像是一个“管道工”,它的工作并不直接处理网络通信数据,而是在新的网络连接(也就是Channel)被创建出来的时候,首先去“装修”这条通道——这里的“装修”指的是构建和配置与该Channel相关的ChannelPipeline

ChannelPipeline就像是一条流水线,里面包含了一系列处理器(Handler),每个处理器负责处理特定类型的网络事件或数据。我们在ChannelInitializer中预先设定好这条流水线上的各个处理器,比如解码器、编码器、业务逻辑处理器等。

所以,当我们在配置Bootstrap时,还未真正建立起任何一个Channel或者它们的Pipeline,但通过设置ChannelInitializer,我们就能够在每个新建立的Channel的初期就确定好它的Pipeline应该包含哪些处理器。这样一来,一旦有新的连接进来,Netty就会自动调用ChannelInitializer去初始化对应的ChannelPipeline,并按照我们的设定加载相应的处理器。

总结

  • 每个channel就是一个连接管道,它含有一个由一系列handler组成的pipeline来处理channel每一次的in/out操作,当然handler被包在context中使用。
  • 先在bootstrap中放一个ChannelInitializer的handler,存放在这里。这时真正的channel与pipeline还没有生成。ChannelInitializer的目的是在pipeline的处理器中先占个位置,等创建了channel之后,会把ChannelInitializer的handler一个个放到pipeline中。
  • 当bootstrap进行init一个channel的时候,会给这个channel的pipeline放上这个ChannelInitializer的handler。
  • 此时所有的主要对象与关系都组装都完成了。在nio中,Channel是要注册到selector中的,这个时候就是启动处理的时候。让pipleline中产生一个register事件,就由pipeline中的相关handler来自动处理了。
  • 相关的的handler会进行相关的处理,目前只有一个,它处理中会init一下自己所属的channel,并把自己移除,之后还会继续传播这个事件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/464161.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【BUG 弹药库】二分模板的优化

文章目录 1. 为什么要优化二分算法&#xff1f;2. 如何去优化原来的二分模板&#xff1f;3. 案例分析 1. 为什么要优化二分算法&#xff1f; ① 平常学习的二分整数的算法模板边界的问题很容易出错&#xff0c;不知道什么时候用 l mid&#xff0c;r mid - 1&#xff1b;或者是…

内网渗透小结

域产生原因 简单来说就是为了安全和方便控制域内主机 一个具有一定规模的企业&#xff0c;每天都可能面临员工入职和离职&#xff0c;因此网络管理部门经常需要对域成员主机进行格式化消除磁盘的文件&#xff0c;然后重装系统及软件&#xff0c;以提供给新员工使用&#xff1…

python--剑指offer--中等--07. 重建二叉树

输入某二叉树的前序遍历和中序遍历的结果&#xff0c;请重建该二叉树。假设输入的前序遍历和中序遍历的结果中都不含重复的数字。 例如&#xff0c;给出 前序遍历 preorder [3,9,20,15,7] 中序遍历 inorder [9,3,15,20,7] 返回如下的二叉树&#xff1a; 3/ 9 20 / 15 7 …

Linux 下使用 socket 实现 TCP 客户端

目录 示例代码板级验证更多内容 套接字&#xff08;socket&#xff09;是 Linux 下的一种进程间通信机制&#xff08;socket IPC&#xff09;&#xff0c;它不仅支持同一主机的不同进程间通信&#xff0c;还支持跨网络的不同主机的进程间通信。 socket 允许通过标准的文件描述…

HarmonyOS-鸿蒙系统概述

你了解鸿蒙系统吗&#xff1f; 你看好鸿蒙系统吗&#xff1f; 今年秋季即将推出的HarmonyOS Next 星河版热度空前&#xff0c;一起来了解一下吧。本文将从HarmonyOS 的应用场景、发展历程、架构、开发语言、开发工具、生态建设六个角度聊一聊个人的理解。 1、应用场景 鸿蒙…

C# 打开文件对话框(OpenFileDialog)

OpenFileDialog&#xff1a;可以打开指定后缀名的文件&#xff0c;既能单个打开文件也能批量打开文件 /// <summary>/// 批量打开文档/// 引用&#xff1a;System.Window.Fomrs.OpenFileDialog/// </summary>public void OpenFile(){OpenFileDialog dialog new Op…

Linux中文件和目录管理(创建删除移动复制)

目录 1——一次建立一个或多个目录&#xff1a;mkdir ​2——创建一个空文件&#xff1a;touch 3——移动和重命名&#xff1a;mv 4——复制文件和目录&#xff1a;cp 5—— 删除目录和文件&#xff1a;rmdir和rm 在学习文件与目录的管理的一些命令之前&#xff0c;我们先…

QT5.14.2对象树之魅力 -- 让Qt编程如行云流水

对象编程是现代编程语言中不可或缺的核心理念。在C等编程语言中&#xff0c;对象的生命周期管理一直是开发者头疼的难题。手动管理对象创建和销毁&#xff0c;而一不小心就有可能导致内存泄漏等严重问题。而Qt以其独有的对象树模型&#xff0c;为我们解决了这一烦恼&#xff0c…

并查集(详解+例题)

1、作用 将两个集合合并 询问两个元素是否在一个集合中 2、基本原理 每个集合用一颗树表示。树根的编号就是整个集合的编号。每个节点存储它的父节点&#xff0c;p[x]表示x的父节点。 3、实现 问题1&#xff1a;如何判断树根&#xff1a;if(p[x]x); 问题2&#xff1a;如何求…

WiFi7 MLO技术框架

在2019年7月份&#xff0c;关于WiFi7 MLO的开放式讨论已经基本完成了&#xff0c;关注点集中体现在band steering/balancing和multi band aggregation上面。 英特尔基于开放讨论的基础&#xff0c;提出了MLO的协议技术框架&#xff0c;尽量兼容已有的协议文本&#xff0c;并提…

大数据数据分析-scala、IDEA、jdk之间的搭配关系

Scala主要是一门面向对象编程语言和函数式编程语言。 一、大数据框架&#xff08;处理海量/流式数据&#xff09; - ---以HADOOP 2. x为系列的大数据生态系统处理框架 离线数据分析&#xff0c;分析的数据为N1天数据 -----MapReduce 并行计算框架&#xff0c;分而治之…

C语言基础数据结构——栈和队列

目录 1.栈 1.1栈的选型 1.2 实现代码 2.队列 2.1整体思路 2.2初始化和销毁 2.3出入队列 2.4取队列元素 2.5判断队列是否为空 2.6返回队列中元素个数 2.7 Test 1.栈 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。 进行数…

Docker入门二(应用部署、迁移与备份、DockerFile、docker私有仓库、Docker-Compose)

文章目录 一、应用部署1.MySQL部署2.Redis部署3.Nginx部署 二、迁移与备份1.容器做成镜像2.镜像备份和恢复(打包成压缩包&#xff09; 三、DockerFile0.镜像从哪里来&#xff1f;1.什么是DockerFile2.DockerFile 构建特征3.DockerFile命令描述4.构建一个带vim的centos镜像案例5…

Oracle Primavera Analytics 是什么,与P6的关系?

前言 Oracle Primavera P6 Analytics 是与P6有关的一个相对较新的模块&#xff0c;Primavera 用户社区在很大程度上尚未对其进行探索。 那么它到底有什么作用呢&#xff1f; 通过了解得知它旨在通过深入了解组织的项目组合绩效&#xff0c;帮助高级管理层对其项目组合做出更好…

【开源】SpringBoot框架开发就医保险管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 科室档案模块2.2 医生档案模块2.3 预约挂号模块2.4 我的挂号模块 三、系统展示四、核心代码4.1 用户查询全部医生4.2 新增医生4.3 查询科室4.4 新增号源4.5 预约号源 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVue…

MySQL | 表的约束

目录 1. 空属性 NULL 2. 默认值 DEFAULT 3. 列描述comment 4. zerofill 5. 主键 PRIMARY KEY 6. 自增长AUTO_INCREMENT 7. 唯一键UNIQUE 8. 外键 真正约束字段的是数据类型&#xff0c;但是数据类型约束很单一&#xff0c;需要有一些额外的约束&#xff0c;更好的保证数…

VS2019加QT5.14中Please assign a Qt installation in ‘Qt Project Settings‘.问题的解决

第一篇&#xff1a; 原文链接&#xff1a;https://blog.csdn.net/aoxuestudy/article/details/124312629 error:There’ no Qt version assigned to project mdi.vcxproj for configuration release/x64.Please assign a Qt installation in “Qt Project Settings”. 一、分…

AG32 MCU以太网应用实例demo

一. 前言 AGM32系列32位微控制器旨在为MCU用户提供新的自由度和丰富的兼容外设&#xff0c;以及兼容的引脚和功能。AG32F407系列产品具有卓越的品质&#xff0c;稳定性和卓越的价格价值。 AG32产品线支持其所有接口外设尽可能接近主流兼容性&#xff0c;并提供丰富的参考设计…

机器人路径规划:基于深度优先搜索(Depth-First-Search,DFS)算法的机器人路径规划(提供Python代码)

一、深度优先搜索算法介绍 深度优先搜索算法&#xff08;Depth-First-Search&#xff09;的基本思想是沿着树的深度遍历树的节点&#xff0c;尽可能深的搜索树的分支。当节点v的所有边都己被探寻过&#xff0c;搜索将回溯到发现节点v的那条边的起始节点。这一过程一直进行到已…

代码学习记录21--回溯算法第二天

随想录日记part21 t i m e &#xff1a; time&#xff1a; time&#xff1a; 2024.03.16 主要内容&#xff1a;今天主要是结合类型的题目加深对回溯算法的理解&#xff1a;1&#xff1a;组合总和&#xff1b;2&#xff1a;电话号码的字母组合 216.组合总和III17.电话号码的字母…