Netty入门
1. 概述
1.1 Netty是什么?
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
以上片段摘自官网,Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
1.2 Netty的优势
- Netty vs NIO,工作量大,bug 多
- 需要自己构建协议
- 解决 TCP 传输问题,如粘包、半包
- epoll 空轮询导致 CPU 100%
- 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
- Netty vs 其它网络应用框架
- Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀
- 久经考验,16年,Netty 版本
- 2.x 2004
- 3.x 2008
- 4.x 2013
- 5.x 已废弃(没有明显的性能提升,维护成本高)
2. Hello World
2.1 目标
开发一个简单的服务器端和客户端
- 客户端向服务器端发送 hello, world
- 服务器仅接收,不返回
添加依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
2.2 服务端
public class NettyServer {
public static void main(String[] args) {
// 1. 启动器,负责组装netty组件,启动服务器
new ServerBootstrap()
// 2. BossEventLoop, WorkerEventLoop(selector,thread) ,group组
.group(new NioEventLoopGroup())
// 3. 选择服务器的ServerSocketChannel实现
.channel(NioServerSocketChannel.class)
// 4. boss负责处理连接, worker负责处理读写, 决定了worker能做哪些操作
.childHandler(
// 5. ChannelInitializer 代表和客户端进行数据读写的通道初始化,本身也是一个handler,作用是负责添加别的handler
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
// 6. 添加具体的handler
ch.pipeline().addLast(new StringDecoder()); // 字符解码,将ByteBuf转换成字符串
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ // 自定义handler
@Override // 读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(msg);
}
});
}
}
)
// 7. 绑定端口
.bind(8080);
}
}
2.3 客户端
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 1. 启动类
new Bootstrap()
// 2. 添加EventLoop
.group(new NioEventLoopGroup())
// 3. 选择客户端的channel实现
.channel(NioSocketChannel.class)
// 4. 添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在建立连接后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); // 字符编码,将字符串转换成ByteBuf
}
})
// 5. 连接到服务器
.connect(new InetSocketAddress("localhost",8080))
.sync()
.channel()
// 6. 想服务器发送数据
.writeAndFlush("hello, world");
}
}
2.4 执行流程
提示
- channel可以理解为数据通道
- msg理解为流动的数据,最开始是ByteBuf,但经过pipeline(流水线)的加工,会变成其他类型的对象,最后输出又变成了ByteBuf
- handler理解为数据的处理工序
- 工序有多道,合在一起就是pipeline,pipeline负责发布时间(读、读取完成…)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应时间的处理方法)
- handler分Inbound(入站)和Outbound(出站)
- eventLoop理解为处理数据的工人
- 工人可以管理多个channel的io操作,并且一旦工人负责了某个channel,就要负责到底
- 工人既可以执行io操作,也可以进行任务处理,每位工人有任务队列,队列里可以对方多个channel的待处理任务,任务分为普通任务、定时任务
- 工人按照pipeline顺序,依此按照handler的规划处理数据,可以为每道工序指定不同的工人
3. 组件
3.1 EventLoop
事件循环对象
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
- 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
- 另一条线是继承自 netty 自己的 OrderedEventExecutor,
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
- 提供了 parent 方法来看看自己属于哪个 EventLoopGroup
事件循环组
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
构造方法
常用的EventLoopGroup有NioEventLoopGroup
和DefaultEventLoopGroup
NioEventLoopGroup
:可以处理io/普通/定时任务
DefaultEventLoopGroup
:可以处理普通/定时任务,相较于nio缺少了处理io任务的功能
public class TestEventLoop {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup(2);
DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();
}
}
构造方法中可以传入一个参数,指定EventLoopGroup中的线程数
如果是空参构造器,可以看到netty回读取系统配置文件中的io.netty.eventLoopThreads
,如果没有的话则是cpu核数*2。并且,至少会有一个线程
轮询
可以看到,在EventLoopGroup中,他每次使用的线程是通过类似于负载均衡的方式指定的
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup(2);
System.out.println(group.next()); //io.netty.channel.nio.NioEventLoop@2a18f23c
System.out.println(group.next()); //io.netty.channel.nio.NioEventLoop@d7b1517
System.out.println(group.next()); //io.netty.channel.nio.NioEventLoop@2a18f23c
System.out.println(group.next()); //io.netty.channel.nio.NioEventLoop@d7b1517
}
普通任务
submit()
或者execute()
都可以创建一个普通任务
@Slf4j
public class TestEventLoop {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup(2); //可处理 io/普通/定时任务
group.submit(() -> {
log.debug("====1");
});
log.debug("====2");
}
}
定时任务
@Slf4j
public class TestEventLoop {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup(2); //可处理 io/普通/定时任务
group.scheduleAtFixedRate(() -> {
log.debug("====1");
},0,1, TimeUnit.SECONDS);
log.debug("====2");
}
}
io任务
服务端代码
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug("buf : {}" , buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
客户端代码
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
// 1. 启动类
Channel channel = new Bootstrap()
// 2. 添加EventLoop
.group(new NioEventLoopGroup())
// 3. 选择客户端的channel实现
.channel(NioSocketChannel.class)
// 4. 添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在建立连接后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); // 字符编码,将字符串转换成ByteBuf
}
})
// 5. 连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
System.out.println(channel);
System.out.println("");
}
}
先启动服务端,再通过debug启动客户端,发送数据
发现服务端没有收到数据,这是因为netty是异步发送,而idea默认情况下打断点,是阻塞所有的线程,所以要设置一下断点,只阻塞当前线程,重启服务
启动多个客户端,发现每个客户端的信息,都是由同一个线程来处理的,这就是我们之前提到过的,每一个EventLoop会绑定channel
细分
在之前学习NIO的时候,有一个专门的ServerSocketChannel
来处理accpet
事件,我们把它称为boss
,其余的SocketChannel
来处理读写事件,我们把它称为worker
。
在netty中,也可以这么设置,在创建ServerSocketChannel
的group()
方法中,可以指定一个父EventLoopGroup,这个就是用来专门处理accpet
事件的
修改后的服务端代码如下
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug("buf : {}" , buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
继续细分
一个EventLoop可能负责多个客户端,这时候假如有一个io任务,处理的非常慢,就会导致该EventLoop中其它的任务排队等待,这个是非常难受的。
而在netty中,我们可以指定外部的DefaultEventLoop来处理请求
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
log.debug("buf : {}" , buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); // 将消息传递给下一个handler
}
}).addLast(defaultEventLoopGroup,"handler2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
log.debug("buf : {}" , buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
通过控制台输出,可以发现消息被外部独立的defaultEventLoopGroup处理
handler 执行中如何换人?
关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
EventExecutor executor = next.executor();
// 是,直接调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
- 如果两个 handler 绑定的是同一个线程,那么就直接调用
- 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用
3.2 Channel
channel 的主要作用
- close() 可以用来关闭 channel
- closeFuture() 用来处理 channel 的关闭
- sync 方法作用是同步等待 channel 关闭
- 而 addListener 方法是异步等待 channel 关闭
- pipeline() 方法添加处理器
- write() 方法将数据写入
- writeAndFlush() 方法将数据写入并刷出
ChannelFuture
来看以下代码,没有channelFuture.sync()
时,客户端运行之后,服务端并不能收到消息,
这是因为connect()
方法是异步非阻塞方法,实际上是交给另外一个线程去和服务端建立连接的
建立连接是一个非常耗时的工作,主线程并没有等待而是直接发送消息,服务端收不到消息是正常的。
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
//channelFuture.sync();
Channel channel = channelFuture.channel();
// [main] DEBUG com.yellowstar.netty01.eventLoop.EventLoopClient - =======[id: 0xb0d8e490]
//此时channel并没有建立连接,建立连接的channel会输出客户端地址和服务端地址
log.debug("======={}",channel);
channel.writeAndFlush("1");
}
}
除了sync()
方法,addListener()
也可以等待线程建立连接
在channelFuture建立连接之后,会触发operationComplete()
方法
通过输出可以看到此方法是在nioEventLoopGroup中的线程中运行的,是个异步方法
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = channelFuture.channel();
log.debug("======={}",channel);
channel.writeAndFlush("1");
}
});
}
}
//输出
//[nioEventLoopGroup-2-1] DEBUG com.yellowstar.netty01.eventLoop.EventLoopClient - =======[id: 0x7895948a, L:/127.0.0.1:63687 - R:localhost/127.0.0.1:8080]
CloseFuture
先做一个小功能,当用户输入时,发送消息到服务端,当用户输入q
时,结束发送
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
//LoggingHandler可以对netty操作日志进行打印
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
channelFuture.sync();
Channel channel = channelFuture.channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.next();
if ("q".equals(msg)) {
channel.close();
log.debug("通信结束处理...");
break;
}
channel.writeAndFlush(msg);
}
}, "input").start();
}
}
通过测试发现基本功能都可以实现,但是有一个小问题,channel.close()
也是一个异步操作,而通信结束之后的处理,必须在channel关闭之后,这就要使用到CloseFuture
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
channelFuture.sync();
Channel channel = channelFuture.channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.next();
if ("q".equals(msg)) {
channel.close();
break;
}
channel.writeAndFlush(msg);
}
}, "input").start();
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
log.debug("通信结束处理...");
}
}
当然这也有监听器的用法,可以异步处理结果,当closeFuture识别到channel已断开时就会异步触发operationComplete()
方法
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
log.debug("通信结束处理...");
}
});
最后一个小问题,当用户输入q
时,程序应该就结束了,而程序并没有结束,这是因为除了NioEventLoopGroup
中还有其余线程在运行
所以我们在channel.close()
执行后,还需要关闭NioEventLoopGroup
中的其他线程
可以使用group.shutdownGracefully()
来优雅的关闭线程
优雅关闭指的是不再接收新的任务,把原先已接收的任务处理完之后再关闭
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
channelFuture.sync();
Channel channel = channelFuture.channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.next();
if ("q".equals(msg)) {
channel.close();
break;
}
channel.writeAndFlush(msg);
}
}, "input").start();
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
log.debug("通信结束处理...");
group.shutdownGracefully();
}
}
Netty中的异步
从上述例子可以感觉到,netty中使用了很多多线程异步处理方案,小黄本来觉得多线程和异步是netty提升效率的关键所在,而并不是这样的。
看下面的例子
4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96
经研究发现,看病可以细分为四个步骤,经拆分后每个步骤需要 5 分钟,如下
因此可以做如下优化,只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12
效率几乎是原来的四倍
要点
- 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势
- 异步并没有缩短响应时间,反而有所增加
- 合理进行任务拆分,也是利用异步的关键
3.3 Future & Promise
在异步处理时,经常用到这两个接口
首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
- jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
- netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
JDK-Future
JDK的Future主打一个从其他线程获取结果
@Slf4j
public class TestJDKFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
//添加一个异步任务
Future<Integer> future = threadPool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("正在计算");
Thread.sleep(1000);
return 10;
}
});
log.debug("等待中...");
log.debug("结果为: {}" ,future.get());
}
}
Netty-Future
Netty中的Future通过getNow()
方法在结果没有出来时,可以返回null,不会造成阻塞等待
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("正在计算");
Thread.sleep(1000);
return 10;
}
});
log.debug("等待中...");
log.debug("结果为: {}" ,future.getNow());
}
}
Netty-promise
promise可以作为主体,脱离线程单独存在
@Slf4j
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
eventLoop.execute(() -> {
log.debug("处理中...");
try {
Thread.sleep(1000);
int i = 1/0;
promise.setSuccess(10);
} catch (InterruptedException e) {
promise.setFailure(e);
}
});
log.debug("等待中...");
log.debug("结果为: {}" ,promise.get());
}
}
3.4 Handler & Pipeline
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品
Inbound(入站)
先贴一份服务端的代码,客户端的代码就不贴了,只要能发送数据即可
@Slf4j
public class TestPipelineServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
}).addLast("h2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
}).addLast("h3",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3");
super.channelRead(ctx, msg);
}
});
}
})
.bind(8080);
}
}
主要关注pipeline.addLast()
方法,添加handler,调用此方法时,会生成一个head
和tail
的handler,把自定义的handler加在tail之前,这个链路在底层是使用双向链表实现的
客户端发送信息时,可以看到顺序
Outbound(出站)
在原有的基础上再添加几个出站handler
@Slf4j
public class TestPipelineServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
}).addLast("h2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
}).addLast("h3",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3");
super.channelRead(ctx, msg);
}
}).addLast("h4",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
}).addLast("h5",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
}).addLast("h6",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}
整条链路现在是这样的
通过客户端发送数据时,发现并没有输出4,5,6
这是因为出站handler需要往channel里写出数据,才会触发
在h3时,模拟写出数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3");
ch.writeAndFlush("hello".getBytes(StandardCharsets.UTF_8));
super.channelRead(ctx, msg);
}
发现出站handler的顺序是6,5,4
是设置的倒序,这是因为出站时,他是从tail
节点往前面执行的。
另外多提一嘴,重写方法中的ctx
也可以写出数据,执行ctx.writeAndFlush
时,不是从tail
节点往前找,而是从当前handler往前找
3.5 ByteBuf
Netty中的ByteBuf和NIO中使用过的ByteBuffer功能类似,都是对字节数据的封装。
创建
可以通过有参构造器和无参构造器创建,默认Buffer大小为256字节
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(5);
下面方法可以显示buffer的具体信息
private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
直接内存 vs 堆内存
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
//堆内存
ByteBuf heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer();
//直接内存
ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer();
池化 vs 非池化
这就有点类似于从线程池中拿线程一样
池化的最大意义在于可以重用ByteBuf,优点有:
- 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加GC压力
- 有了池化,则可以重用ByteBuf实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
- 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
- 4.1 之前,池化功能还不成熟,默认是非池化实现
可以看出,默认创建出来都是Pooled(池化),可以通过修改环境变量或者JVM参数来指定是否开启池化
-Dio.netty.allocator.type={unpooled|pooled}
组成
ByteBuf 由四部分组成
最开始读写指针都在 0 位置
有了读写指针,就不需要像ByteBuffer一样切换读写状态了。
写入
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{'a','b','c','d'});
log(buffer);
buffer.writeInt(1);
log(buffer);
}
输出如下,有很多方法可以写入,大家可以自行查看api
扩容
创建一个大小为10的ByteBuf,当插入内容超过容量时,会自动触发扩容
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{'a','b','c','d'});
log(buffer);
buffer.writeInt(1);
log(buffer);
buffer.writeInt(2);
log(buffer);
}
扩容机制:
- 如果写入后数据大小未超过512,则选择下一个16的整数倍,例如写入后大小为12,扩容后capacity是16
- 如果写入后数据大小超过512,则选择下一个2^n,例如写入后大小为513,则扩容后capacity为1024
- 扩容不能超过max capacity,否则会报错
读取
可以读取下一个字节,也可以读取下一个int字节,
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{'a','b','c','d'});
log(buffer);
buffer.writeInt(1);
log(buffer);
buffer.writeInt(2);
log(buffer);
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readInt());
log(buffer);
}
操作结果如下,这个操作会改变读指针的位置,被读取过的相当于就是废弃字节
如何重复读取1?
跟ByteBuffer类似,可以标记在跳回来
buffer.markReaderIndex(); //标记当前位置
System.out.println(buffer.readInt());
log(buffer);
buffer.resetReaderIndex(); //回到标记的位置
log(buffer);
在读指针为4的地方做了标记,即使读到了后面,还是可以通过resetReaderIndex()
方法将读指针修改为4
retain & release
由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
- UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
- UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
- PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
- 每个 ByteBuf 对象的初始计数为 1
- 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
- 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
在实际开发时,我们应该等到最后一个handler使用完ByteBuf之后,再由它来释放
一般来说,我们不需要手动进行释放,Netty有帮我们释放ByteBuf,例如现在是入站操作,从head
走到tail
,tail
会帮我们释放ByteBuf
TailContext也是一个入站Handler
他的read()
方法中,会通过计数器的工具类来释放ByteBuf
出站操作也是一样,出站的最后一个Handler是head
,他的write()
方法中也会释放ByteBuf
slice(零拷贝)
【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针
使用方法
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
ByteBuf buf1 = buffer.slice(0, 5);
ByteBuf buf2 = buffer.slice(5, 5);
log(buf1);
log(buf2);
}
可以看到buf1和buf2是拷贝的buffer中的内容
证明零拷贝
只需要证明用的实际上是同一地址即可,修改其中一块数据,看看原数据会不会改变
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
ByteBuf buf1 = buffer.slice(0, 5);
ByteBuf buf2 = buffer.slice(5, 5);
buf1.setByte(0,'x');
log(buf1);
log(buffer);
}
可以看到原数据也发生了改变
注意点
零拷贝出来的ByteBuf是不支持写入的,会报错
CompositeByteBuf
CompositeByteBuf也属于零拷贝,不过和slice恰恰相反,CompositeByteBuf可以将多个ByteBuf合在一起
public static void main(String[] args) {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(10);
buf1.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e'});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(10);
buf2.writeBytes(new byte[]{'f', 'g', 'h', 'i', 'j'});
CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer(20);
// true 表示增加新的 ByteBuf 自动递增 write index, 否则 write index 会始终为 0
buffer.addComponents(true,buf1,buf2);
log(buffer);
}
CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。
- 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
- 缺点,复杂了很多,多次操作会带来性能的损耗
ByteBuf vs ByteBuffer
- 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
- 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
- 可以自动扩容
- 支持链式调用,使用更流畅
- 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf
4. 双向通信
学了这么多,来做一个小小的功能吧,该需求实现客户端发送什么,服务器就回传给客户端什么,例如客户端发送1,同时也会收到服务器回传的1
服务器代码
@Slf4j
public class TwoWayServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.debug("服务器收到:{}",byteBuf.toString(Charset.defaultCharset()));
ctx.writeAndFlush(byteBuf);
}
});
}
})
.bind(8080);
}
}
客户端代码
@Slf4j
public class TwoWayClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture future = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); // 字符编码
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.debug("接收到服务器响应:{}",byteBuf.toString(Charset.defaultCharset()));
super.channelRead(ctx, msg);
}
});
}
})
.connect(new InetSocketAddress("localhost", 8080));
future.sync();
Channel channel = future.channel();
channel.closeFuture().addListener(a -> {
group.shutdownGracefully();
});
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}).start();
}
}
测试结果