Netty入门

Netty入门

1. 概述

1.1 Netty是什么?

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

以上片段摘自官网,Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

1.2 Netty的优势

  • Netty vs NIO,工作量大,bug 多
    • 需要自己构建协议
    • 解决 TCP 传输问题,如粘包、半包
    • epoll 空轮询导致 CPU 100%
    • 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
  • Netty vs 其它网络应用框架
    • Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀
    • 久经考验,16年,Netty 版本
      • 2.x 2004
      • 3.x 2008
      • 4.x 2013
      • 5.x 已废弃(没有明显的性能提升,维护成本高)

2. Hello World

2.1 目标

开发一个简单的服务器端和客户端

  • 客户端向服务器端发送 hello, world
  • 服务器仅接收,不返回

添加依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>

2.2 服务端

public class NettyServer {
    public static void main(String[] args) {
        // 1. 启动器,负责组装netty组件,启动服务器
        new ServerBootstrap()
                // 2. BossEventLoop, WorkerEventLoop(selector,thread) ,group组
                .group(new NioEventLoopGroup())
                // 3. 选择服务器的ServerSocketChannel实现
                .channel(NioServerSocketChannel.class)
                // 4. boss负责处理连接, worker负责处理读写, 决定了worker能做哪些操作
                .childHandler(
                        // 5. ChannelInitializer 代表和客户端进行数据读写的通道初始化,本身也是一个handler,作用是负责添加别的handler
                        new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel ch) {
                                // 6. 添加具体的handler
                                ch.pipeline().addLast(new StringDecoder()); // 字符解码,将ByteBuf转换成字符串
                                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ // 自定义handler
                                    @Override // 读事件
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                        System.out.println(msg);
                                    }
                                });
                            }
                        }
                )
                // 7. 绑定端口
                .bind(8080);
    }
}

2.3 客户端

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        // 1. 启动类
        new Bootstrap()
                // 2. 添加EventLoop
                .group(new NioEventLoopGroup())
                // 3. 选择客户端的channel实现
                .channel(NioSocketChannel.class)
                // 4. 添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override // 在建立连接后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder()); // 字符编码,将字符串转换成ByteBuf
                    }
                })
                // 5. 连接到服务器
                .connect(new InetSocketAddress("localhost",8080))
                .sync()
                .channel()
                // 6. 想服务器发送数据
                .writeAndFlush("hello, world");
    }
}

2.4 执行流程

image-20230516132020193

提示

  • channel可以理解为数据通道
  • msg理解为流动的数据,最开始是ByteBuf,但经过pipeline(流水线)的加工,会变成其他类型的对象,最后输出又变成了ByteBuf
  • handler理解为数据的处理工序
    • 工序有多道,合在一起就是pipeline,pipeline负责发布时间(读、读取完成…)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应时间的处理方法)
    • handler分Inbound(入站)和Outbound(出站)
  • eventLoop理解为处理数据的工人
    • 工人可以管理多个channel的io操作,并且一旦工人负责了某个channel,就要负责到底
    • 工人既可以执行io操作,也可以进行任务处理,每位工人有任务队列,队列里可以对方多个channel的待处理任务,任务分为普通任务、定时任务
    • 工人按照pipeline顺序,依此按照handler的规划处理数据,可以为每道工序指定不同的工人

3. 组件

3.1 EventLoop

事件循环对象

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup

事件循环组

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

构造方法

常用的EventLoopGroup有NioEventLoopGroupDefaultEventLoopGroup

NioEventLoopGroup:可以处理io/普通/定时任务

DefaultEventLoopGroup:可以处理普通/定时任务,相较于nio缺少了处理io任务的功能

public class TestEventLoop {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(2);
        DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup(); 
    }
}

构造方法中可以传入一个参数,指定EventLoopGroup中的线程数

如果是空参构造器,可以看到netty回读取系统配置文件中的io.netty.eventLoopThreads,如果没有的话则是cpu核数*2。并且,至少会有一个线程

image-20230516152905885

轮询

可以看到,在EventLoopGroup中,他每次使用的线程是通过类似于负载均衡的方式指定的

public static void main(String[] args) {
    NioEventLoopGroup group = new NioEventLoopGroup(2);

    System.out.println(group.next()); //io.netty.channel.nio.NioEventLoop@2a18f23c
    System.out.println(group.next()); //io.netty.channel.nio.NioEventLoop@d7b1517
    System.out.println(group.next()); //io.netty.channel.nio.NioEventLoop@2a18f23c
    System.out.println(group.next()); //io.netty.channel.nio.NioEventLoop@d7b1517
}

普通任务

submit()或者execute()都可以创建一个普通任务

@Slf4j
public class TestEventLoop {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(2); //可处理 io/普通/定时任务

        group.submit(() -> {
            log.debug("====1");
        });

        log.debug("====2");
    }
}

定时任务

@Slf4j
public class TestEventLoop {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(2); //可处理 io/普通/定时任务

        group.scheduleAtFixedRate(() -> {
            log.debug("====1");
        },0,1, TimeUnit.SECONDS);

        log.debug("====2");
    }
}

io任务

服务端代码

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
          new ServerBootstrap()
                  .group(new NioEventLoopGroup())
                  .channel(NioServerSocketChannel.class)
                  .childHandler(new ChannelInitializer<NioSocketChannel>() {
                      @Override
                      protected void initChannel(NioSocketChannel ch) throws Exception {
                          ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                              @Override
                              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                  ByteBuf buf = (ByteBuf) msg;
                                  log.debug("buf : {}" , buf.toString(Charset.defaultCharset()));
                              }
                          });
                      }
                  })
                  .bind(8080);
    }
}

客户端代码

public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        // 1. 启动类
        Channel channel = new Bootstrap()
                // 2. 添加EventLoop
                .group(new NioEventLoopGroup())
                // 3. 选择客户端的channel实现
                .channel(NioSocketChannel.class)
                // 4. 添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override // 在建立连接后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder()); // 字符编码,将字符串转换成ByteBuf
                    }
                })
                // 5. 连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();

        System.out.println(channel);
        System.out.println("");
    }
}

先启动服务端,再通过debug启动客户端,发送数据

image-20230516154628235

发现服务端没有收到数据,这是因为netty是异步发送,而idea默认情况下打断点,是阻塞所有的线程,所以要设置一下断点,只阻塞当前线程,重启服务

image-20230516154732109

启动多个客户端,发现每个客户端的信息,都是由同一个线程来处理的,这就是我们之前提到过的,每一个EventLoop会绑定channel

image-20230516155017457

细分

在之前学习NIO的时候,有一个专门的ServerSocketChannel来处理accpet事件,我们把它称为boss,其余的SocketChannel来处理读写事件,我们把它称为worker

在netty中,也可以这么设置,在创建ServerSocketChannelgroup()方法中,可以指定一个父EventLoopGroup,这个就是用来专门处理accpet事件的

image-20230516155421119

修改后的服务端代码如下

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
          new ServerBootstrap()
                  .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
                  .channel(NioServerSocketChannel.class)
                  .childHandler(new ChannelInitializer<NioSocketChannel>() {
                      @Override
                      protected void initChannel(NioSocketChannel ch) throws Exception {
                          ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                              @Override
                              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                  ByteBuf buf = (ByteBuf) msg;
                                  log.debug("buf : {}" , buf.toString(Charset.defaultCharset()));
                              }
                          });
                      }
                  })
                  .bind(8080);
    }
}

继续细分

一个EventLoop可能负责多个客户端,这时候假如有一个io任务,处理的非常慢,就会导致该EventLoop中其它的任务排队等待,这个是非常难受的。

而在netty中,我们可以指定外部的DefaultEventLoop来处理请求

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();
        new ServerBootstrap()
                  .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
                  .channel(NioServerSocketChannel.class)
                  .childHandler(new ChannelInitializer<NioSocketChannel>() {
                      @Override
                      protected void initChannel(NioSocketChannel ch) throws Exception {
                          ch.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter(){
                              @Override
                              public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                  ByteBuf buf = (ByteBuf) msg;
                                  log.debug("buf : {}" , buf.toString(Charset.defaultCharset()));
                                  ctx.fireChannelRead(msg); // 将消息传递给下一个handler
                              }
                          }).addLast(defaultEventLoopGroup,"handler2",new ChannelInboundHandlerAdapter(){
                              @Override
                              public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                  ByteBuf buf = (ByteBuf) msg;
                                  log.debug("buf : {}" , buf.toString(Charset.defaultCharset()));
                              }
                          });
                      }
                  })
                  .bind(8080);
    }
}

通过控制台输出,可以发现消息被外部独立的defaultEventLoopGroup处理

image-20230516161541625

handler 执行中如何换人?

关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
    EventExecutor executor = next.executor();
    
    // 是,直接调用
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } 
    // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
    else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
  • 如果两个 handler 绑定的是同一个线程,那么就直接调用
  • 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用

3.2 Channel

channel 的主要作用

  • close() 可以用来关闭 channel
  • closeFuture() 用来处理 channel 的关闭
    • sync 方法作用是同步等待 channel 关闭
    • 而 addListener 方法是异步等待 channel 关闭
  • pipeline() 方法添加处理器
  • write() 方法将数据写入
  • writeAndFlush() 方法将数据写入并刷出

ChannelFuture

来看以下代码,没有channelFuture.sync()时,客户端运行之后,服务端并不能收到消息,

这是因为connect()方法是异步非阻塞方法,实际上是交给另外一个线程去和服务端建立连接的

建立连接是一个非常耗时的工作,主线程并没有等待而是直接发送消息,服务端收不到消息是正常的。

@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));

        //channelFuture.sync();
        Channel channel = channelFuture.channel();
        // [main] DEBUG com.yellowstar.netty01.eventLoop.EventLoopClient - =======[id: 0xb0d8e490]
        //此时channel并没有建立连接,建立连接的channel会输出客户端地址和服务端地址
        log.debug("======={}",channel);
        channel.writeAndFlush("1");
    }
}

除了sync()方法,addListener()也可以等待线程建立连接

在channelFuture建立连接之后,会触发operationComplete()方法

通过输出可以看到此方法是在nioEventLoopGroup中的线程中运行的,是个异步方法

@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));

        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Channel channel = channelFuture.channel();
                log.debug("======={}",channel);
                channel.writeAndFlush("1");
            }
        });
    }
}

//输出
//[nioEventLoopGroup-2-1] DEBUG com.yellowstar.netty01.eventLoop.EventLoopClient - =======[id: 0x7895948a, L:/127.0.0.1:63687 - R:localhost/127.0.0.1:8080]

CloseFuture

先做一个小功能,当用户输入时,发送消息到服务端,当用户输入q时,结束发送

@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) {
                        //LoggingHandler可以对netty操作日志进行打印
                        ch.pipeline().addLast(new LoggingHandler());
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        channelFuture.sync();
        Channel channel = channelFuture.channel();
        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String msg = scanner.next();
                if ("q".equals(msg)) {
                    channel.close();
                    log.debug("通信结束处理...");
                    break;
                }
                channel.writeAndFlush(msg);
            }
        }, "input").start();
    }
}

通过测试发现基本功能都可以实现,但是有一个小问题,channel.close()也是一个异步操作,而通信结束之后的处理,必须在channel关闭之后,这就要使用到CloseFuture

@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new LoggingHandler());
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        channelFuture.sync();
        Channel channel = channelFuture.channel();
        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String msg = scanner.next();
                if ("q".equals(msg)) {
                    channel.close();
                    break;
                }
                channel.writeAndFlush(msg);
            }
        }, "input").start();

        ChannelFuture closeFuture = channel.closeFuture();
        closeFuture.sync();
        log.debug("通信结束处理...");
    }
}

当然这也有监听器的用法,可以异步处理结果,当closeFuture识别到channel已断开时就会异步触发operationComplete()方法

closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        log.debug("通信结束处理...");
    }
});

最后一个小问题,当用户输入q时,程序应该就结束了,而程序并没有结束,这是因为除了NioEventLoopGroup中还有其余线程在运行

image-20230517144303834

所以我们在channel.close()执行后,还需要关闭NioEventLoopGroup中的其他线程

可以使用group.shutdownGracefully()来优雅的关闭线程

优雅关闭指的是不再接收新的任务,把原先已接收的任务处理完之后再关闭

@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new LoggingHandler());
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        channelFuture.sync();
        Channel channel = channelFuture.channel();
        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String msg = scanner.next();
                if ("q".equals(msg)) {
                    channel.close();
                    break;
                }
                channel.writeAndFlush(msg);
            }
        }, "input").start();

        ChannelFuture closeFuture = channel.closeFuture();
        closeFuture.sync();
        log.debug("通信结束处理...");
        group.shutdownGracefully();
    }
}

Netty中的异步

从上述例子可以感觉到,netty中使用了很多多线程异步处理方案,小黄本来觉得多线程和异步是netty提升效率的关键所在,而并不是这样的。

看下面的例子

4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96

0044

经研究发现,看病可以细分为四个步骤,经拆分后每个步骤需要 5 分钟,如下

因此可以做如下优化,只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12 效率几乎是原来的四倍

0047

要点

  • 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势
  • 异步并没有缩短响应时间,反而有所增加
  • 合理进行任务拆分,也是利用异步的关键

3.3 Future & Promise

在异步处理时,经常用到这两个接口

首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器

image-20230517164702149

功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
setSuccess--设置成功结果
setFailure--设置失败结果

JDK-Future

JDK的Future主打一个从其他线程获取结果

@Slf4j
public class TestJDKFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(2);

        //添加一个异步任务
        Future<Integer> future = threadPool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("正在计算");
                Thread.sleep(1000);
                return 10;
            }
        });

        log.debug("等待中...");
        log.debug("结果为: {}" ,future.get());
    }
}

Netty-Future

Netty中的Future通过getNow()方法在结果没有出来时,可以返回null,不会造成阻塞等待

@Slf4j
public class TestNettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        EventLoop eventLoop = group.next();

        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.debug("正在计算");
                Thread.sleep(1000);
                return 10;
            }
        });

        log.debug("等待中...");
        log.debug("结果为: {}" ,future.getNow());
    }
}

Netty-promise

promise可以作为主体,脱离线程单独存在

@Slf4j
public class TestNettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        eventLoop.execute(() -> {
            log.debug("处理中...");
            try {
                Thread.sleep(1000);
                int i = 1/0;
                promise.setSuccess(10);
            } catch (InterruptedException e) {
                promise.setFailure(e);
            }
        });

        log.debug("等待中...");
        log.debug("结果为: {}" ,promise.get());
    }
}

3.4 Handler & Pipeline

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工

打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品

Inbound(入站)

先贴一份服务端的代码,客户端的代码就不贴了,只要能发送数据即可

@Slf4j
public class TestPipelineServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("1");
                                super.channelRead(ctx, msg);
                            }
                        }).addLast("h2",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("2");
                                super.channelRead(ctx, msg);
                            }
                        }).addLast("h3",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("3");
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

主要关注pipeline.addLast()方法,添加handler,调用此方法时,会生成一个headtail的handler,把自定义的handler加在tail之前,这个链路在底层是使用双向链表实现的

head
h1
h2
h3
tail

客户端发送信息时,可以看到顺序

image-20230518104122408

Outbound(出站)

在原有的基础上再添加几个出站handler

@Slf4j
public class TestPipelineServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("1");
                                super.channelRead(ctx, msg);
                            }
                        }).addLast("h2",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("2");
                                super.channelRead(ctx, msg);
                            }
                        }).addLast("h3",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("3");
                                super.channelRead(ctx, msg);
                            }
                        }).addLast("h4",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("4");
                                super.write(ctx, msg, promise);
                            }
                        }).addLast("h5",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("5");
                                super.write(ctx, msg, promise);
                            }
                        }).addLast("h6",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("6");
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

整条链路现在是这样的

head
h1
h2
h3
h4
h5
h6
tail

通过客户端发送数据时,发现并没有输出4,5,6

image-20230518104846274

这是因为出站handler需要往channel里写出数据,才会触发

在h3时,模拟写出数据

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    log.debug("3");
    ch.writeAndFlush("hello".getBytes(StandardCharsets.UTF_8));
    super.channelRead(ctx, msg);
}

发现出站handler的顺序是6,5,4是设置的倒序,这是因为出站时,他是从tail节点往前面执行的。

image-20230518105036051

另外多提一嘴,重写方法中的ctx也可以写出数据,执行ctx.writeAndFlush时,不是从tail节点往前找,而是从当前handler往前找

3.5 ByteBuf

Netty中的ByteBuf和NIO中使用过的ByteBuffer功能类似,都是对字节数据的封装。

创建

可以通过有参构造器和无参构造器创建,默认Buffer大小为256字节

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(5);

下面方法可以显示buffer的具体信息

private static void log(ByteBuf buffer) {
    int length = buffer.readableBytes();
    int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
    StringBuilder buf = new StringBuilder(rows * 80 * 2)
            .append("read index:").append(buffer.readerIndex())
            .append(" write index:").append(buffer.writerIndex())
            .append(" capacity:").append(buffer.capacity())
            .append(NEWLINE);
    appendPrettyHexDump(buf, buffer);
    System.out.println(buf.toString());
}

直接内存 vs 堆内存

  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
  • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
//堆内存
ByteBuf heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer();
//直接内存
ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer();

池化 vs 非池化

这就有点类似于从线程池中拿线程一样

池化的最大意义在于可以重用ByteBuf,优点有:

  • 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加GC压力
  • 有了池化,则可以重用ByteBuf实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能
  • 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
  • 4.1 之前,池化功能还不成熟,默认是非池化实现

可以看出,默认创建出来都是Pooled(池化),可以通过修改环境变量或者JVM参数来指定是否开启池化

-Dio.netty.allocator.type={unpooled|pooled}

image-20230518160851075

组成

ByteBuf 由四部分组成

最开始读写指针都在 0 位置

有了读写指针,就不需要像ByteBuffer一样切换读写状态了。

写入

public static void main(String[] args) {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
    buffer.writeBytes(new byte[]{'a','b','c','d'});
    log(buffer);
    buffer.writeInt(1);
    log(buffer);
}

输出如下,有很多方法可以写入,大家可以自行查看api

image-20230518161407560

扩容

创建一个大小为10的ByteBuf,当插入内容超过容量时,会自动触发扩容

public static void main(String[] args) {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
    buffer.writeBytes(new byte[]{'a','b','c','d'});
    log(buffer);
    buffer.writeInt(1);
    log(buffer);
    buffer.writeInt(2);
    log(buffer);
}

扩容机制:

  • 如果写入后数据大小未超过512,则选择下一个16的整数倍,例如写入后大小为12,扩容后capacity是16
  • 如果写入后数据大小超过512,则选择下一个2^n,例如写入后大小为513,则扩容后capacity为1024
  • 扩容不能超过max capacity,否则会报错

image-20230518161516078

读取

可以读取下一个字节,也可以读取下一个int字节,

public static void main(String[] args) {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
    buffer.writeBytes(new byte[]{'a','b','c','d'});
    log(buffer);
    buffer.writeInt(1);
    log(buffer);
    buffer.writeInt(2);
    log(buffer);

    System.out.println(buffer.readByte());
    System.out.println(buffer.readByte());
    System.out.println(buffer.readByte());
    System.out.println(buffer.readByte());

    System.out.println(buffer.readInt());
    log(buffer);
}

操作结果如下,这个操作会改变读指针的位置,被读取过的相当于就是废弃字节

image-20230518162204370

如何重复读取1?

跟ByteBuffer类似,可以标记在跳回来

buffer.markReaderIndex(); //标记当前位置
System.out.println(buffer.readInt());
log(buffer);
buffer.resetReaderIndex(); //回到标记的位置
log(buffer);

在读指针为4的地方做了标记,即使读到了后面,还是可以通过resetReaderIndex()方法将读指针修改为4

image-20230518162530148

retain & release

由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

在实际开发时,我们应该等到最后一个handler使用完ByteBuf之后,再由它来释放

一般来说,我们不需要手动进行释放,Netty有帮我们释放ByteBuf,例如现在是入站操作,从head走到tailtail会帮我们释放ByteBuf

TailContext也是一个入站Handler

image-20230518163320748

他的read()方法中,会通过计数器的工具类来释放ByteBuf

image-20230518163508425

出站操作也是一样,出站的最后一个Handler是head,他的write()方法中也会释放ByteBuf

image-20230518163648331

slice(零拷贝)

【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针

使用方法

public static void main(String[] args) {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
    buffer.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
    ByteBuf buf1 = buffer.slice(0, 5);
    ByteBuf buf2 = buffer.slice(5, 5);
    log(buf1);
    log(buf2);
}

可以看到buf1和buf2是拷贝的buffer中的内容

image-20230518164109921

证明零拷贝

只需要证明用的实际上是同一地址即可,修改其中一块数据,看看原数据会不会改变

public static void main(String[] args) {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
    buffer.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
    ByteBuf buf1 = buffer.slice(0, 5);
    ByteBuf buf2 = buffer.slice(5, 5);
    buf1.setByte(0,'x');
    log(buf1);
    log(buffer);
}

可以看到原数据也发生了改变

image-20230518164253437

注意点

零拷贝出来的ByteBuf是不支持写入的,会报错

image-20230518164412519

CompositeByteBuf

CompositeByteBuf也属于零拷贝,不过和slice恰恰相反,CompositeByteBuf可以将多个ByteBuf合在一起

public static void main(String[] args) {
    ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(10);
    buf1.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e'});
    ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(10);
    buf2.writeBytes(new byte[]{'f', 'g', 'h', 'i', 'j'});
    CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer(20);
    // true 表示增加新的 ByteBuf 自动递增 write index, 否则 write index 会始终为 0
    buffer.addComponents(true,buf1,buf2);
    log(buffer);
}

CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。

  • 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
  • 缺点,复杂了很多,多次操作会带来性能的损耗

ByteBuf vs ByteBuffer

  • 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf

4. 双向通信

学了这么多,来做一个小小的功能吧,该需求实现客户端发送什么,服务器就回传给客户端什么,例如客户端发送1,同时也会收到服务器回传的1

服务器代码

@Slf4j
public class TwoWayServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                log.debug("服务器收到:{}",byteBuf.toString(Charset.defaultCharset()));
                                ctx.writeAndFlush(byteBuf);
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

客户端代码

@Slf4j
public class TwoWayClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture future = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder()); // 字符编码
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                log.debug("接收到服务器响应:{}",byteBuf.toString(Charset.defaultCharset()));
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        future.sync();
        Channel channel = future.channel();

        channel.closeFuture().addListener(a -> {
            group.shutdownGracefully();
        });

        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String line = scanner.nextLine();
                if ("q".equals(line)) {
                    channel.close();
                    break;
                }
                channel.writeAndFlush(line);
            }
        }).start();
    }
}

测试结果

image-20230518171409100

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/21244.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

三十六、链路追踪、配置中心

1、链路追踪 在一次调用链路中&#xff0c;可能设计到多个微服务&#xff0c;如果在线上&#xff0c;某个微服务出现故障&#xff0c;如何快速定位故障所在额微服务呢。 可以使用链路追踪技术 1.1链路追踪介绍 在大型系统的微服务化构建中&#xff0c;一个系统被拆分成了许多微…

chatgpt赋能Python-python3_排序

Python3 排序指南&#xff1a;介绍、说明和实践 Python3是当今最受欢迎的编程语言之一&#xff0c;拥有许多可用于各种任务的库和框架。其中之一是它自带的排序函数&#xff0c;在数据分析和机器学习等领域中非常有用。 在本篇文章中&#xff0c;我们将简要介绍Python3的排序和…

基于AT89C51单片机的贪吃蛇游戏设计

点击链接获取Keil源码与Project Backups仿真图: https://download.csdn.net/download/qq_64505944/87778030 源码获取 主要内容: 设计一个贪吃蛇游戏,使其具有以下游戏规则:①当没有改变方向时,贪吃蛇沿原来路径一直前进②贪吃蛇无法回头,只能异于当前方向改变行动③蛇…

第7章链接:如何动态连接共享库、从应用程序中加载和链接共享库

文章目录 7.10 动态链接共享库静态库的缺点何为共享库共享库的"共享"的含义动态链接过程 7.11 从应用程序中加载和链接共享库运行时动态加载和连接共享库的接口 dlopen函数 dlsym函数 dlclose函数 dlerror动态加载和链接共享库的应用程序示例 7.12 *与位置无关的代码…

强大,Midjourney Imagine API接口,AI画画的福音!

前几天跟大家分享过一篇 ”让chatGPT教你AI绘画|如何将chatGPT与Midjourney结合使用&#xff1f;“&#xff0c;但是由于许多小伙伴们使用Midjourney还有许多困难&#xff0c;又要上网&#xff0c;还要注册Discord&#xff0c;MJ的使用成本很高&#xff0c;让大家望而却步&…

链表题目强化练

目录 前言&#xff08;非题目&#xff09; 两数相加 删除链表的倒数第N个结点 环形链表 相交链表 合并 K 个升序链表 复制带随机指针的链表 前言&#xff08;非题目&#xff09; 初学者在做链表的题目时有一个特点&#xff0c;就是每看一个链表的题都觉得很简单&#x…

Python程序员职业现状分析,想提高竞争力,就要做到这六点

现今程序员群体数量已经高达几百万&#xff0c;学历和收入双高&#xff0c;月薪普遍过万。今天&#xff0c;我们就围绕90后程序员人群分析、职业现状、Python程序员分析等&#xff0c;进行较为全面的报告分析和观点论述。 一、程序员人群分析 人数规模上&#xff1a;截当前程…

【设计原则与思想:总结课】38 | 总结回顾面向对象、设计原则、编程规范、重构技巧等知识点

到今天为止&#xff0c;设计原则和思想已经全部讲完了&#xff0c;其中包括&#xff1a;面向对象、设计原则、规范与重构三个模块的内容。除此之外&#xff0c;我们还学习了贯穿整个专栏的代码质量评判标准。专栏的进度已经接近一半&#xff0c;马上就要进入设计模式内容的学习…

类似于ChatGPT的优秀应用notion

notion 是一款流行的笔记应用。不过功能实际远超笔记&#xff0c;官方自己定义是&#xff1a;“将笔记、知识库和任务管理无缝整合的协作平台”。其独特的 block 概念&#xff0c;极大的扩展了笔记文档的作用&#xff0c;一个 block 可以是个数据库、多媒体、超链接、公式等等。…

怎么用问卷工具做市场调研?

对于希望开发新产品或服务、拓展新市场或确定潜在客户的公司来说&#xff0c;市场调查是一个至关重要的过程。然而&#xff0c;进行市场调查可能既耗时又昂贵&#xff0c;特别是在涉及对大量人群进行调查的情况下。今天&#xff0c;小编将来聊一聊调查问卷工具如何帮助企业进行…

Rasa 3.x 学习系列-Rasa [3.5.8] -2023-05-12新版本发布

Rasa 3.x 学习系列-Rasa [3.5.8] -2023-05-12新版本发布 当自定义动作设置的值与槽的现有值相同时&#xff0c;将触发SlotSet事件。修复了这个问题&#xff0c;使AugmentedMemoizationPolicy能够正确地处理截断的跟踪器。 为了恢复以前的行为&#xff0c;自定义操作只有在槽值…

【C++进阶】继承详解

文章目录 前言一、继承的概念及定义1.概念2.继承定义定义格式继承关系和访问限定继承基类成员访问方式的变化 二、基类和派生类对象赋值转换三、继承中的作用域四、派生类的默认成员函数五、继承与友元六、继承与静态成员七、复杂的菱形继承及菱形虚拟继承1.单继承与多继承2.菱…

软件 工程

目录 第十章、软件工程1、瀑布模型&#xff08;SDLC&#xff09;2、快速原型模型3、增量模型4、螺旋模型5、Ⅴ模型6、喷泉模型7、构建组装模型&#xff08;CBSD&#xff09;8、统一过程&#xff08;RUP&#xff09;9、敏捷开发方法10、信息系统开发方法11、需求开发12、结构化设…

收藏|必读10本pcb设计书籍推荐

1."High-Speed Digital Design: A Handbook of Black Magic"。 作者是Howard Johnson和Martin Graham。这是一本关于高速数字电路设计的优秀教材&#xff0c;适合那些需要设计高速电路的工程师。 作为比较早出来的信号完整性参考书&#xff0c;对国内的信号完整性研…

H.265/HEVC编码原理及其处理流程的分析

H.265/HEVC编码原理及其处理流程的分析 H.265/HEVC编码的框架图&#xff0c;查了很多资料都没搞明白&#xff0c;各个模块的处理的分析网上有很多&#xff0c;很少有把这个流程串起来的。本文的主要目的是讲清楚H.265/HEVC视频编码的处理流程&#xff0c;不涉及复杂的计算过程。…

第3天学习Docker-Docker部署常见应用(MySQL、Tomcat、Nginx、Redis、Centos)

前提须知&#xff1a; &#xff08;1&#xff09;搜索镜像命令 格式&#xff1a;docker search 镜像名 &#xff08;2&#xff09;设置Docker镜像加速器 详见文章&#xff1a;Docker设置ustc的镜像源&#xff08;镜像加速器&#xff09; 1、部署MySQL 拉取镜像&#xff08;这…

从0到1无比流畅的React入门教程

无比流畅的React入门教程TOC React 是什么 简介 用于构建 Web 和原生交互界面的库React 用组件创建用户界面通俗来讲&#xff1a;是一个将数据渲染为HTML视图的开源JS库 其他信息 Facebook 开发&#xff0c;并且开源 为什么使用React? 原生JS使用DOM-API修改UI代码很繁…

4年外包出来人废了,5次面试全挂....

我的情况 大概介绍一下个人情况&#xff0c;男&#xff0c;毕业于普通二本院校非计算机专业&#xff0c;18年跨专业入行测试&#xff0c;第一份工作在湖南某软件公司&#xff0c;做了接近4年的外包测试工程师&#xff0c;今年年初&#xff0c;感觉自己不能够再这样下去了&…

软件设计模式介绍与入门

目录 1、软件设计模式的起源 2、什么是设计模式&#xff1f; 2.1、设计模式的设计意图 2.2、设计模式的分类准则 3、为什么要学习设计模式 4、如何学习设计模式 5、最后 VC常用功能开发汇总&#xff08;专栏文章列表&#xff0c;欢迎订阅&#xff0c;持续更新...&#x…

Redis--弱口令未授权访问漏洞

Redis--弱口令未授权访问漏洞 一、漏洞简介二、危险等级三、漏洞影响四、入侵事件五、漏洞复现--Redis CrackIT入侵事件5.1、以root启动的redis&#xff0c;可以远程登入到redis console--------A主机5.2、生成公钥5.3、执行: redis-cli flushall 清空redis(非常暴力&#xff0…