事件传播机制
当pipeline中有多个handler时,netty内部的事件是如何向后传递到每个handler中的?
结论:需要在当前handler中手动将当前事件传递下去
1,如果handler是直接实现的接口,使用ChannelHandlerContext的fireXXX方法
2,如果handler是继承Adapter,直接调用父类即可,比如:
super.channelRead(ctx, msg);
super.channelRead(ctx, msg);的本质也是调用了Context中的fireActive方法
@Skip
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
ChannelOutboundHandler 可以用来做什么?
1,响应数据包装,压缩,加密,…等通用功能
ChannelHandler 的执行顺序
BootStrap
1,Bootstrap是引导的意思,它的作用是配置网络参数并引导Netty程序启动,将各个组件都串起来
2,Netty中提供了2种类型的引导类,一种用于客户端(Bootstrap),而另一种(ServerBootstrap)用于服务器,它们之间的区别在于:
ServerBootstrap 需要指定绑定的端口,服务器开启监听接收连接,而 Bootstrap 则是想要连接到远程节点故需要对端的地址信息
引导客户端只需要一个EventLoopGroup,服务端一般需要两个
Channel
Netty中的Channel是与网络套接字相关的,底层对应了一个socket连接,它负责基本的IO操作,比如:
read(),write() 等,它的一些核心作用有:
获得当前网络连接的通道状态
获得网络连接的配置参数(如缓冲区大小等)
网络I/O操作
EventLoop和EventLoopGroup
1,Netty是基于事件驱动的,比如:连接就绪;数据读取;异常事件等,有了事件,就需要一个组件去监控事件的产生和事件的协调处理,这个组件就是EventLoop(事件循环)
2,Netty 中每个Channel 都会被分配到一个 EventLoop,一个 EventLoop 可以服务于多个 Channel的事件处理
3,每个 EventLoop 会占用一个 Thread,在这个Thread上处理相应的事件
4,EventLoopGroup 包含了一组EventLoop,可以理解成Netty的线程池
EventLoopGroup线程数有多大
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup();
// IO线程,处理注册其上Channel的I/O事件及其他Task
EventLoopGroup worker = new NioEventLoopGroup();
默认为当前CPU核心数*2
对于boss group,我们其实也只用到了其中的一个线程,因为服务端一般只会绑定一个端口启动
小结
Bootstrap的作用及类型
1,配置网络参数并引导netty程序启动,
2,ServerBootstrap作用于服务端,Bootstrap作用于客户端
Channel的作用
1,底层绑定的是一个网络连接socket,故可以获取网络通道状态
2,获取相关网络配置参数
3,处理相关操作,如IO
EventLoop和EventLoopGroup 的作用
1,EventLoop 检测并协调处理所注册Channel的相关事件,
2,EventLoop 绑定在一个线程上
3,EventLoopGroup 包含了一组EventLoop,可以理解成Netty的线程池
ChannelPipeline
1,ChannelHandler 是Netty的事件处理器,在其对应的事件回调方法中处理事件,是开发者介入的入口
2,ChannelHandlerContext 持有ChannelHandler并构成双向链表,handler基于它连接pipeline和channel
3,ChannelPipeline 提供了一个了容器,持有ChannelHandlerContext 构成的链表,并连接channel
在刚开始的时候,ChannelPipeline只有head和tail,当接收到了客户端的请求之后,准备初始化ChannelPipeline的时候会回调Handler方法,把Handler方法全都加入在管道当中。
channel中存放着这个套接字的各种信息。
ChannelHandler如何复用?
添加@Sharable注解
SimpleChannelInboundHandler
对于编写Netty数据入站处理器,可以选择继承ChannelInboundHandlerAdapter,也可以选择继承SimpleChannelInboundHandler,区别是什么?
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (this.acceptInboundMessage(msg)) {
this.channelRead0(ctx, msg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (this.autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
继承SimpleChannelInboundHandler需要重写channelRead0方法,且可以通过泛型指定msg类型
SimpleChannelInboundHandler在接收到数据后会自动release掉数据占用的Bytebuf资源
ByteBuf的内存模型
1,堆缓冲区(HeapByteBuf):内存分配在jvm堆,分配和回收速度比较快,可以被JVM自动回收,缺点是,如果进行socket的IO读写,需要额外做一次内存复制,将堆内存对应的缓冲区复制到内核Channel中,性能会有一定程度的下降。由于在堆上被 JVM 管理,在不被使用时可以快速释放。
2,直接缓冲区(DirectByteBuf):内存分配的是堆外内存(系统内存),相比堆内存,它的分配和回收速度会慢一些,但是将它的数据写入或从Socket Channel中读取数据到它时,由于减少了一次内存拷贝,速度比堆内存块(netty内部默认使用)
3,复合缓冲区(CompositeByteBuf):顾名思义就是将两个不同的缓冲区从逻辑上合并,让使用更加方便。
ByteBuf的分配
Netty提供了ByteBufAllocator接口,有两个重要实现:PooledByteBufAllocator 和 UnpooledByteBufAllocator
1,PooledByteBufAllocator:实现了 ByteBuf 内存的池化,提高性能减少并最大限度地减少内存碎片,池化思想通过预先申请一块专用内存地址作为内存池进行管理,从而不需要每次都进行分配和释放(netty内部默认使用该方式,Android除外)
2,UnpooledByteBufAllocator:没有实现内存的池化,每次都会为ByteBuf对象重新分配内存
ByteBuf资源释放
1,unpool模式下,ByteBuf如果采用的是堆缓冲区可以由GC回收,但是如果采用的是直接缓冲区,就不受GC的管理,就得手动释放否则会发生内存泄露
2,pool模式下,某个ByteBuf对象关联的内存何时可以被归还给pool
引用计数法:ByteBuf对象的引用计数为0时,它关联的内存要么直接释放要么归还到池,Netty提供了ReferenceCounted接口
ReferenceCountUtil.release(buf)
什么时候释放?
1,数据入站能自动释放的地方:TailContext(前提是事件能传递到这),继承SimpleChannelInboundHandler
2,数据出站能自动释放的地方:HeadContext 出站消息一般是由应用所申请,到达最后一站时,经过一轮复杂的调用,在flush完成后终将被release掉
1,对于入站消息:
随着事件向后传递,依然将原Bytebuf对象向后传,如果能到TailContext则会自动释放该ByteBuf
事件传递的过程中,向后传递的是新的ByteBuf对象,则有必要手动释放原ByteBuf
在可以的情况下选择继承SimpleChannelInboundHandler是个不错的选择
2,对于出站消息:
应用的ByteBuf对象能走到HeadContext,则在flush后会释放
事件传递的过程中,向后传递的是新的ByteBuf对象,则有必要手动释放原ByteBuf
通过Wrap操作可以快速转换或得到一个ByteBuf对象,Unpooled 工具类中提供了很多重载的wrappedBuffer方法
Netty Future
Netty 自己实现的 Future 继承了 JDK 的 Future,新增了 Listener 机制,任务结束会回调Listener
public void testNettyFuture2() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup(1);
Future<String> future = group.submit( () ->{
log.info("---异步线程执行任务开始----");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("---异步线程执行任务结束----");
return "hello netty future";
});
//基于future 添加监听
/*future.addListener(new GenericFutureListener<Future<? super String>>() {
@Override
public void operationComplete(Future<? super String> future) throws Exception {
future.get();
}
});*/
future.addListener( future1 -> {
log.info("---收到异步线程执行任务结果通知----执行结果是;{}",future1.get());
});
log.info("---主线程向后执行----");
TimeUnit.SECONDS.sleep(10);
}
Netty Promise
1,Netty的Future基于JDK Future增加了Listener机制,但是Listener的触发则是需要等到异步任务执行结束,而任务大都是一个Runnable或者Callable,也就是需要等到其run()方法或者call()方法返回后才会触发Listener的执行,整个过程无法人为干预。
2,Netty的 Promise接口扩展了Netty的Future接口,它可以设置异步执行的结果并触发Listener,比如在IO操作过程,不管是顺利完成、还是发生异常,都可以设置Promise的结果,并且通知Promise的
public void testNettyPromise() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Promise promise = new DefaultPromise(group.next());// promise 绑定到 eventLoop上
group.submit(()->{
log.info("---异步线程执行任务开始----");
try {
// int i = 1/0;
TimeUnit.SECONDS.sleep(3);
log.info("first");
promise.setSuccess("hello netty promise");
TimeUnit.SECONDS.sleep(3);
log.info("two");
log.info("---异步线程执行任务结束----");
log.info("finish");
} catch (Throwable e) {
log.info("exception ={}",e.getMessage());
promise.setFailure(e);
}
});
promise.addListener( future -> {
if (future.isSuccess()) {
log.info("----异步任务执行结果:{}",future.get());
}else {
log.info("异步任务执行失败,exception={}",future.cause().getMessage());
}
});
log.info("---主线程----");
TimeUnit.SECONDS.sleep(10);
}