Netty(2)

Netty

文章目录

  • Netty
  • 4 Netty 模型
    • 4.1 Netty 模型介绍
    • 4.2 Netty demo
    • 4.3 Netty 异步模型
      • 4.3.1 基本介绍
      • 4.3.2 异步模型
      • 4.3.3 Future-Listener 机制
      • 4.4 Netty 任务队列 task

4 Netty 模型

4.1 Netty 模型介绍

Netty 线程模式:Netty 主要基于主从 Reactor 多线程模型做了一定的改进,其中主从Reactor 多线程模型有多Reactor
在这里插入图片描述
在这里插入图片描述

  1. Netty 抽象出两组线程池 BoosGroup 和 WorkGroup
  2. BoosGroup 专门负责接收客户端连接
  3. WorkGroup 专门负责网络的读写
  4. BoosGroup、WorkGroup 类型都是 NioEventLoopGroup
  5. NioEventLoopGroup 是一个事件循环组,可以是多个线程,组中包含多个事件循环,每个事件循环都是 NioEventLoop
  6. NioEventLoop 表示一个事件循环,不断循环的执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通讯
  7. Boos NioEventLoop 执行步骤
    • 轮询 accept 事件
    • 处理 select 事件,与 client 建立连接,生成 NioSocketChannnel,并将其注册到某个 work NioEventLoop 上的 selector
    • runAllTasks 处理任务队列
  8. Work NioEventLoop 执行步骤
    • 轮询 read、write 事件
    • 在对应的 NioSocketChannel 处理 read、write 事件
    • runAllTasks 处理任务队列
  9. 每个 work NioEventLoop 处理业务时,会使用 pipline,pipline 中包含 channel,即可以通过 pipline 获取到对应的 channel,并且 pipline 中也维护了很多处理器

4.2 Netty demo

maven

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.20.Final</version>
</dependency>

server

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;

public class NettyServer {

    public static void main(String[] args) {

        EventLoopGroup boosGroup = null;
        EventLoopGroup workGroup = null;

        try {

            // 创建 boosGroup 一直循环只处理连接请求,真正的业务交由 workGroup 处理
            boosGroup = new NioEventLoopGroup();

            // 创建 workGroup 处理 read write 事件
            workGroup = new NioEventLoopGroup();

            ServerBootstrap b = new ServerBootstrap();

            b.group(boosGroup, workGroup) // 配置boosGroup workGroup
                    .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 64) // 设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
                    .handler(new LoggingHandler(LogLevel.INFO)) // handler 在 BoosGroup 中生效
                    .childHandler(new ChannelInitializer<SocketChannel>() { // childHandler 在 WorkGroup 中生效
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            System.out.println("初始化server端channel对象...");

                            ChannelPipeline pipeline = ch.pipeline();

                            pipeline.addLast(new ChannelInboundHandlerAdapter() {
                                /**
                                 * 读取客户端发送的数据
                                 * @param ctx 上下文对象, 含有 管道pipeline , 通道channel, 地址 等
                                 * @param msg 客户端发送的数据 默认Object
                                 * @throws Exception
                                 */
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                                    System.out.println("server 端正在接收 client 端的数据......");

                                    ByteBuf buffer = (ByteBuf) msg;

                                    System.out.println("client:" + ctx.channel().remoteAddress() + " 发送过来的数据 msg = " + buffer.toString(CharsetUtil.UTF_8));

                                }

                                @Override
                                public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("server 数据读取完毕 .....");
                                    ctx.writeAndFlush(Unpooled.copiedBuffer("server 以读取 client 发送的数据...", CharsetUtil.UTF_8));
                                }

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                    System.out.println("出现异常:" + cause.getMessage());
                                    // 关闭通道
                                    ctx.close();
                                }
                            });

                        }
                    });

            System.out.println("server 端 ready !!!!");

            ChannelFuture channelFuture = b.bind("127.0.0.1", 8090).sync();

            // 给 ChannelFuture 注册监听器
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) System.out.println("server 端监听 8090 端口 成功....");
                    else System.out.println("server 端监听 8090 端口 失败....");
                }
            });

            //关闭通道
            channelFuture.channel().closeFuture().sync();


        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {

            boosGroup.shutdownGracefully();
            workGroup.shutdownGracefully();

        }


    }

}


client

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;

public class NettyClient {

    public static void main(String[] args) {

        // 客户端需要一个事件循环组
        NioEventLoopGroup group = null;

        try {

            group = new NioEventLoopGroup();

            // 创建客户端启动对象
            Bootstrap b = new Bootstrap();

            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline pipeline = ch.pipeline();

                            pipeline.addLast(new ChannelInboundHandlerAdapter() {

                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("当通道就绪就会触发该方法.....");
                                    ctx.writeAndFlush(Unpooled.copiedBuffer("client 通道已就绪...", CharsetUtil.UTF_8));
                                }

                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                                    ByteBuf buffer = (ByteBuf) msg;

                                    System.out.println("client 读取 server 发送的数据 msg = " + buffer.toString(CharsetUtil.UTF_8));

                                }

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                    System.out.println("出现异常,异常信息 cause = " + cause.getMessage());
                                    // 关闭通道
                                    ctx.close();
                                }
                            });

                        }
                    });

            System.out.println("client 端已 ok ...");

            // 启动客户端去连接服务器端
            ChannelFuture channelFuture = b.connect("127.0.0.1", 8090).sync();

            channelFuture.channel().closeFuture().sync();


        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }


    }

}

  1. Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作
  2. NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通道。
  3. NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责
  4. NioEventLoopGroup下包含多个 NioEventLoop
  5. 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
  6. 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
  7. 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
  8. 每个 NioChannel 都绑定有一个自己的 ChannelPipeline

4.3 Netty 异步模型

4.3.1 基本介绍

  1. 异步的概念和同步相对
  2. 当一个异步过程调用发出后,调用者不能立刻得到结果,实 际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者
  3. Netty中的I/O操作是异步的,包括Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture
  4. 调用者并不能立刻获得结果,而是通过Future-Listener 机制,用户可以方便的主动获 取或者通过通知机制获得IO操作结果
  5. Netty 的异步模型是建立在 future 和 callback 的之上的
  6. callback 回调
  7. Future 的核心思想是:假设一个方法fun,计算过程可能非常耗时,等待 fun 返回 显然不合适。那么可以在调用 fun 的时候,立马返回一个Future,后续可以通过 Future 去监控方法 fun 的处理过程(即:Future-Listener 机制)

4.3.2 异步模型

在这里插入图片描述

  1. 在使用 Netty 进行编程时,拦截操作和转换出入站数据只需要提供 callback 或利用 future 即可。这使得链式操作简单、高效,并有利于编写可重用的、通用的代码。
  2. Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来

4.3.3 Future-Listener 机制

当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。

常见有如下操作

isDone判断当前操作是否完成
isSuccess判断已完成的当前操作是否成功
getCause获取已完成的当前操作失败的原因
isCancelled判断已完成的当前操作是否被取消
addListener注册监听器 当操作已完成(isDone方法返回完成),将会通知 指定的监听器;如果Future对象已完成,则通知指定的监听器

相比传统阻塞I/O,执行I/O操作后线程会被阻塞住,直到操作完成
异步处理的好处是不会造成线程阻塞,线程在I/O操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量

4.4 Netty 任务队列 task

处理耗时操作

  • 用户程序自定义普通任务
    将任务提交 taskQueue中 但还是一个线程在执行
  • 定时提交任务
    将任务提交 scheduledTaskQueue 使用不同的线程
childHandler(new ChannelInitializer<SocketChannel>() { // childHandler 在 WorkGroup 中生效
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            System.out.println("初始化server端channel对象...");

                            System.out.println("ChannelInitializer thread name = " + Thread.currentThread().getName());

                            ChannelPipeline pipeline = ch.pipeline();

                            pipeline.addLast(new ChannelInboundHandlerAdapter() {
                                /**
                                 * 读取客户端发送的数据
                                 *
                                 * @param ctx 上下文对象, 含有 管道pipeline , 通道channel, 地址 等
                                 * @param msg 客户端发送的数据 默认Object
                                 * @throws Exception
                                 */
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                                    System.out.println("server 端正在接收 client 端的数据......");

                                    System.out.println("ChannelInitializer channelRead thread name = " + Thread.currentThread().getName());


                                    System.out.println("ChannelInitializer channelRead 普通handle thread name = " + Thread.currentThread().getName());
                                    ByteBuf buffer0 = (ByteBuf) msg;
                                    System.out.println("client:" + ctx.channel().remoteAddress() + " 普通handle 发送过来的数据 msg = " + buffer0.toString(CharsetUtil.UTF_8));


                                    ctx.channel().eventLoop().execute(() -> {
                                        try {
                                            TimeUnit.SECONDS.sleep(60);
                                        } catch (InterruptedException e) {
                                            e.printStackTrace();
                                        }
                                        System.out.println("ChannelInitializer channelRead taskQueue handle thread name = " + Thread.currentThread().getName());
                                        ByteBuf buffer1 = (ByteBuf) msg;
                                        System.out.println("client:" + ctx.channel().remoteAddress() + " 发送过来的数据 msg = " + buffer1.toString(CharsetUtil.UTF_8));
                                    });


                                    ctx.channel().eventLoop().schedule(() -> {
                                        System.out.println("ChannelInitializer channelRead scheduleQueue handle thread name = " + Thread.currentThread().getName());
                                        ByteBuf buffer2 = (ByteBuf) msg;
                                        System.out.println("client:" + ctx.channel().remoteAddress() + " 发送过来的数据 msg = " + buffer2.toString(CharsetUtil.UTF_8));
                                    }, 60, TimeUnit.SECONDS);

                                    System.out.println("server doing...");

                                }

                                @Override
                                public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("server 数据读取完毕 .....");
                                    ctx.writeAndFlush(Unpooled.copiedBuffer("server 以读取 client 发送的数据...", CharsetUtil.UTF_8));
                                }

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                    System.out.println("出现异常:" + cause.getMessage());
                                    // 关闭通道
                                    ctx.close();
                                }
                            });

                        }
                    })

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/17263.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

开放式基金净值估算数据 API 数据接口

开放式基金净值估算数据 API 数据接口 全量基金数据&#xff0c;实时数据&#xff0c;所有基金数据。 1. 产品功能 返回实时开放式基金净值估值可定义所有基金估值数据&#xff1b;多个基金属性值返回&#xff1b;多维指标&#xff0c;一次查询毫秒级返回&#xff1b;数据持续…

全球5G市场最新进展及未来展望

从智慧医疗到万物互联&#xff0c;从无人驾驶到关乎我国未来发展的“新基建”&#xff0c;自2019年全球5G商用启动后&#xff0c;5G就步入了发展“快车道”;2022年继续保持快速稳定的增长态势&#xff0c;在网络建设、人口覆盖、终端形态等方面发展势头强劲&#xff0c;在技术标…

【致敬未来的攻城狮计划】— 连续打卡第二十三天:RA2E1的存储器基础知识

系列文章目录 1.连续打卡第一天&#xff1a;提前对CPK_RA2E1是瑞萨RA系列开发板的初体验&#xff0c;了解一下 2.开发环境的选择和调试&#xff08;从零开始&#xff0c;加油&#xff09; 3.欲速则不达&#xff0c;今天是对RA2E1 基础知识的补充学习。 4.e2 studio 使用教程 5.…

每天一道算法练习题--Day18 第一章 --算法专题 --- ----------前缀树

前缀树 字典树也叫前缀树、Trie。它本身就是一个树型结构&#xff0c;也就是一颗多叉树&#xff0c;学过树的朋友应该非常容易理解&#xff0c;它的核心操作是插入&#xff0c;查找。删除很少使用&#xff0c;因此这个讲义不包含删除操作。 截止目前&#xff08;2020-02-04&a…

基于R语言APSIM模型应用

随着数字农业和智慧农业的发展&#xff0c;基于过程的农业生产系统模型在模拟作物对气候变化的响应与适应、农田管理优化、作物品种和株型筛选、农田固碳和温室气体排放等领域扮演着越来越重要的作用。APSIM (Agricultural Production Systems sIMulator)模型是世界知名的作物生…

净利润下滑13%,帅丰电器已掉队?

近年来&#xff0c;随着市场竞争加剧&#xff0c;厨电行业加速洗牌&#xff0c;超60%杂牌或被淘汰出局&#xff0c;三类品牌全部被清退。而作为毛利最高的厨电细分市场&#xff0c;集成灶行业吸引了大批企业涌入&#xff0c;市场渗透率快速提升&#xff0c;已经超过30%&#xf…

华为MPLS跨域——后门链路实验配置

目录 配置PE与CE设备对接命令&#xff08;通过OSPF对接&#xff09; 配置后门链路 可以使用任意方式来跑跨域MPLS&#xff08;A、B、C1、C2都可以&#xff09;&#xff0c;不过关于传递Vpnv4路由的配置此处不做介绍&#xff1b;此处只介绍关于PE和CE对接的配置和关于后门链路…

数据存储系统概要

可靠、可扩展与可维护性 现在有很多都属于数据密集型&#xff0c;而不是计算密集型。对于这些类型应用&#xff0c;CPU的处理能力往往不是第一限制性因素&#xff0c;关键在于数据量、数据的复杂度及数据的快速多边形。 数据密集型应用模块&#xff1a; 数据库&#xff1a;存…

对标世界一流|从Just in time到Just in case ——汽车行业供应链管理经验借鉴

01 丰田汽车精益生产 作为最复杂和最成熟的供应链之一&#xff0c;汽车行业供应链无疑是供应链领域集大成者&#xff0c;而提起汽车行业供应链&#xff0c;就不得不提到丰田汽车&#xff1b;提到丰田汽车&#xff0c;就肯定离不开大名鼎鼎的精益生产以及JIT模式。 JIT模式由丰…

云服务器部署python项目

前言&#xff1a;相信看到这篇文章的小伙伴都或多或少有一些编程基础&#xff0c;懂得一些linux的基本命令了吧&#xff0c;本篇文章将带领大家服务器如何部署一个使用django框架开发的一个网站进行云服务器端的部署。 文章使用到的的工具 Python&#xff1a;一种编程语言&…

SpringBoot+Shiro+Jwt+Vue+elementUI实现前后端分离单体系统Demo

记录一下使用SpringBoot集成Shiro框架和Jwt框架实现前后端分离Web项目的过程&#xff0c;后端使用SpringBoot整合ShiroJwt(auth0)&#xff0c;前端使用vueelementUI框架&#xff0c;前后端的交互使用的是jwt的token&#xff0c;shiro的会话关闭&#xff0c;后端只需要使用Shiro…

在Linux上搭建gitlab以及自动化编译部署的完整流程

一、安装gitlab 首先下载gitlab的安装包&#xff0c;地址如下&#xff1a; https://mirrors.tuna.tsinghua.edu.cn/gitlab-ce/ubuntu/pool/bionic/main/g/gitlab-ce/ 然后安装下载的包即可&#xff0c;一般还需要安装openssh-server等依赖包&#xff0c;在安装gitlab包之前可以…

睡眠经济2.0时代来了,老巨头们有护城河吗?

在第23个世界睡眠日&#xff0c;中国睡眠研究会等机构发布了《中国睡眠研究报告2023》&#xff0c;近半数人每晚平均睡眠时长不足8小时&#xff0c;“失眠”已成为了当代人的生活常态。 越是睡不好&#xff0c;越要想办法。年轻人纷纷求助于好的寝具、好的助眠产品乃至保健品&…

【Halcon】找到设备上的 标识牌

如图&#xff0c;找到设备上的 标识牌 。 标识牌最明显的特征是比其他区域亮&#xff0c; 二值化选择出亮区域&#xff0c;再通过面积选择出目标区域。 先显示图片 *获取图片的大小 get_image_size(Image,Width,Height)*关闭窗口 dev_close_window()*打开窗口 dev_open_win…

java错题总结(19-21页)

链接&#xff1a;关于Java中的ClassLoader下面的哪些描述是错误的_用友笔试题_牛客网 来源&#xff1a;牛客网 B&#xff1a;先讲一下双亲委派机制&#xff0c;简单来说&#xff0c;就是加载一个类的时候&#xff0c;会往上找他的父类加载器&#xff0c;父类加载器找它的父类加…

Centos系统安装RabbitMQ消息中间件

记录一下在centos7.x下面安装RabbitMQ消息中间件 RabbitMQ是一个开源而且遵循 AMQP协议实现的基于 Erlang语言编写&#xff0c;因此安装RabbitMQ之前是需要部署安装Erlang环境的 先安装Erlang https://packagecloud.io/rabbitmq/ 点进去可以看到 因为使用的centos是7.x版本的…

架构设计-数据库篇

大家好&#xff0c;我是易安&#xff01; 之前我们讲过架构设计的一些原则&#xff0c;和架构设计的方法论&#xff0c;今天我们谈谈高性能数据库集群的设计与应用。 读写分离原理 读写分离的基本原理是将数据库读写操作分散到不同的节点上&#xff0c;下面是其基本架构图。 读…

【Python系列】一个简单的抽奖小程序

序言 很开心你能在万千博文中打开这一篇&#xff0c;希望能给你带来一定的帮助&#xff01;&#x1f44d;&#x1f3fb; 如果有什么问题&#xff0c;都可以添加下方我的联系方式&#xff0c;联系我噢~&#x1f601; ⭐️⭐️⭐️⭐️⭐️沟通交流&#xff0c;一起成为技术达人&…

电视机顶盒哪个牌子好?数码小编盘点电视机顶盒排行榜

电视机顶盒哪个牌子好&#xff1f;这是困扰新手们的一大难题&#xff0c;部分产品被爆出虚标高配、偷工减料&#xff0c;面对众多的机顶盒品牌和型号&#xff0c;怎么选择才好&#xff1f;小编以销量和用户评价为标准&#xff0c;盘点了电视机顶盒排行榜&#xff0c;跟着我一起…

【Linux】进程学习(1)---理解进程概念

文章目录 冯诺依曼体系结构理解冯诺依曼体系结构 操作系统概念与定位概念计算机管理模型计算机的软硬件体系结构图系统调用和库函数概念 进程基本概念描述进程--PCBtask_struct内容分类组织进程 冯诺依曼体系结构 数学家冯诺依曼提出了计算机制造的三个基本原则&#xff0c;即采…