Netty
使用和常用组件
简述
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId
<version>4.1.42.Final </version>
<scope>compile</scope>
</dependency>
Netty
的优势
1
、
API
使用简单,开发门槛低;
2
、功能强大,预置了多种编解码功能,支持多种主流协议;
3
、定制能力强,可以通过
ChannelHandler
对通信框架进行灵活地扩展;
4
、性能高,通过与其他业界主流的
NIO
框架对比,
Netty
的综合性能最优;
5
、成熟、稳定,
Netty
修复了已经发现的所有
JDK NIO BUG
,业务开发人员不需要再为 NIO 的
BUG
而烦恼;
6
、社区活跃,版本迭代周期短,发现的
BUG
可以被及时修复,同时,更多的新功能会 加入;
7
、经历了大规模的商业应用考验,质量得到验证。
为什么不用
Netty5
Netty5
已经停止开发了。
为什么
Netty
使用
NIO
而不是
AIO
?
Netty
不看重
Windows
上的使用,在
Linux
系统上,
AIO
的底层实现仍使用
EPOLL
,没有 很好实现 AIO
,因此在性能上没有明显的优势,而且被
JDK
封装了一层不容易深度优化。 AIO 还有个缺点是接收数据需要预先分配缓存
,
而不是
NIO
那种需要接收时才需要分配 缓存,
所以对连接数量非常大但流量小的情况
,
内存浪费很多。 而且 Linux
上
AIO
不够成熟,处理回调结果速度跟不上处理需求。
第一个
Netty
程序
Bootstrap
、
EventLoop(Group)
、
Channel
Bootstrap
是
Netty
框架的启动类和主入口类,分为客户端类
Bootstrap
和服务器ServerBootstrap 两种。
Channel
是
Java NIO
的一个基本构造。 它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一 个或者多个不同的 I/O
操作的程序组件)的开放连接,如读操作和写操作 目前,可以把 Channel
看作是传入(入站)或者传出(出站)数据的载体。因此,它 可以被打开或者被关闭,连接或者断开连接。
EventLoop
暂时可以看成一个线程、
EventLoopGroup
自然就可以看成线程组。
事件和
ChannelHandler
、
ChannelPipeline
Netty
使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于 已经发生的事件来触发适当的动作。
Netty
事件是按照它们与入站或出站数据流的相关性进行分类的。 可能由入站数据或者相关的状态更改而触发的事件包括: 连接已被激活或者连接失活;
数据读取;用户事件;错误事件。
出站事件是未来将会触发的某个动作的操作结果,这些动作包括:打开或者关闭到远程 节点的连接;将数据写到或者冲刷到套接字。 每个事件都可以被分发给 ChannelHandler
类中的某个用户实现的方法,既然事件分为 入站和出站,用来处理事件的 ChannelHandler
也被分为可以处理入站事件的
Handler
和出站 事件的 Handler
,当然有些
Handler
既可以处理入站也可以处理出站。
Netty
提供了大量预定义的可以开箱即用的
ChannelHandler
实现,包括用于各种协议 (如 HTTP
和
SSL/TLS
)的
ChannelHandler
。 基于 Netty
的网络应用程序中根据业务需求会使用
Netty
已经提供的
ChannelHandler
或 者自行开发 ChannelHandler
,这些
ChannelHandler
都放在
ChannelPipeline
中统一管理,事件 就会在 ChannelPipeline
中流动,并被其中一个或者多个
ChannelHandler
处理。
ChannelFuture
Netty
中所有的
I/O
操作都是异步的,我们知道“异步的意思就是不需要主动等待结果 的返回,而是通过其他手段比如,状态通知,回调函数等”,那就是说至少我们需要一种获 得异步执行结果的手段。
JDK
预置了
interface java.util.concurrent.Future
,
Future
提供了一种在操作完成时通知 应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时 刻完成,并提供对其结果的访问。但是其所提供的实现,只允许手动检查对应的操作是否已 经完成,或者一直阻塞直到它完成。这是非常繁琐的,所以 Netty
提供了它自己的实现 ChannelFuture,用于在执行异步操作的时候使用。 一般来说,每个 Netty
的出站
I/O
操作都将返回一个
ChannelFuture
。
Netty 服务端
EchoServer
public class EchoServer {
private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
int port = 9999;
EchoServer echoServer = new EchoServer(port);
LOG.info("服务器即将启动");
echoServer.start();
LOG.info("服务器关闭");
}
public void start() throws InterruptedException {
/*线程组*/
EventLoopGroup group = new NioEventLoopGroup();
try {
/*服务端启动必备*/
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
});
/*异步绑定到服务器,sync()会阻塞到完成*/
ChannelFuture f = b.bind().sync();
LOG.info("服务器启动完成。");
/*阻塞当前线程,直到服务器的ServerChannel被关闭*/
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
服务端的业务Handler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
System.out.println("server accept :" + in.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(in);
//ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接已建立");
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
基于Netty的客户端
public class EchoClient {
private final int port;
private final String host;
public EchoClient(int port, String host) {
this.port = port;
this.host = host;
}
public void start() throws InterruptedException {
/*线程组*/
EventLoopGroup group = new NioEventLoopGroup();
try {
/*客户端启动必备,和服务器的不同点*/
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)/*指定使用NIO的通信模式*/
/*指定服务器的IP地址和端口,和服务器的不同点*/
.remoteAddress(new InetSocketAddress(host,port))
/*和服务器的不同点*/
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
/*异步连接到服务器,sync()会阻塞到完成,和服务器的不同点*/
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();/*阻塞当前线程,直到客户端的Channel被关闭*/
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient(9999,"127.0.0.1").start();
}
}
客户端的业务Handler
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/*读取到网络数据后进行业务处理,并关闭连接*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept"+msg.toString(CharsetUtil.UTF_8));
//关闭连接
///ctx.close();
}
/*channel活跃后,做业务处理*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer(
"Hello,Netty",CharsetUtil.UTF_8));
// ctx.pipeline().write()
// ctx.channel().write()
ctx.alloc().buffer();
}
}
EventLoop
和
EventLoopGroup
回想一下我们在
NIO
中是如何处理我们关心的事件的?在一个
while
循环中
select
出事 件,然后依次处理每种事件。我们可以把它称为事件循环,这就是 EventLoop
。
interface io.netty.channel. EventLoop 定义了
Netty
的核心抽象,用于处理网络连接的生命周期中所发 生的事件。
io.netty.util.concurrent
包构建在
JDK
的
java.util.concurrent
包上。而
io.netty.channel
包 中类,为了与 Channel
的事件进行交互,扩展了这些接口
/
类。一个
EventLoop
将由一个 永远都不会改变的 Thread
驱动,同时任务(
Runnable
或者
Callable
)可以直接提交给 EventLoop 实现,以立即执行或者调度执行。
线程的分配
服务于 Channel 的 I/O 和事件的 EventLoop 包含在 EventLoopGroup 中。 异步传输实现只使用了少量的 EventLoop(以及和它们相关联的 Thread),而且在当前 的线程模型中,它们可能会被多个 Channel 所共享。这使得可以通过尽可能少量的 Thread 来 支撑大量的 Channel,而不是每个 Channel 分配一个 Thread。EventLoopGroup 负责为每个 新创建的 Channel 分配一个 EventLoop。在当前实现中,使用顺序循环(round-robin)的方 式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel。 一旦一个 Channel 被分配给一个 EventLoop,它将在它的整个生命周期中都使用这个 EventLoop(以及相关联的 Thread)。
需要注意,
EventLoop
的分配方式对
ThreadLocal
的使用的影响。因为一个
EventLoop
通 常会被用于支撑多个 Channel
,所以对于所有相关联的
Channel
来说,
ThreadLocal
都将是 一样的。这使得它对于实现状态追踪等功能来说是个糟糕的选择。然而,在一些无状态的上下文中,它仍然以被用于在多个
Channel
之间共享一些重度的或者代价昂贵的对象,甚 至是事件。
线程管理
在内部,当提交任务到如果
(
当前)调用线程正是支撑
EventLoop
的线程,那么所提交 的代码块将会被(直接)执行。否则,EventLoop
将调度该任务以便稍后执行,并将它放入 到内部队列中。当 EventLoop
下次处理它的事件时,它会执行队列中的那些任务
/
事件。
Channel
、
EventLoop(Group)
和
ChannelFuture
Netty
网络抽象的代表:
Channel—Socket
;
EventLoop—
控制流、多线程处理、并发;
ChannelFuture—
异步通知。
Channel
和
EventLoop
关系如图:
从图上我们可以看出
Channel
需要被注册到某个
EventLoop
上,在
Channel
整个生命周 期内都由这个EventLoop
处理
IO
事件,也就是说一个
Channel
和一个
EventLoop
进行了绑定, 但是一个EventLoop
可以同时被多个
Channel
绑定。
Channel
接口
基本的
I/O
操作(
bind()
、
connect()
、
read()
和
write()
)依赖于底层网络传输所提供的原 语。在基于 Java
的网络编程中,其基本的构造是类
Socket
。
Netty
的
Channel
接口所提供 的 API
,被用于所有的
I/O
操作。大大地降低了直接使用
Socket
类的复杂性。此外,
Channel 也是拥有许多预定义的、专门化实现的广泛类层次结构的根。 由于 Channel
是独一无二的,所以为了保证顺序将
Channel
声明为
java.lang.Comparable 的一个子接口。因此,如果两个不同的 Channel
实例都返回了相同的散列码,那么 AbstractChannel 中的
compareTo()
方法的实现将会抛出一个
Error
。
Channel
的生命周期状态
ChannelUnregistered
:
Channel
已经被创建,但还未注册到
EventLoop
ChannelRegistered
:
Channel
已经被注册到了
EventLoop
ChannelActive
:
Channel
处于活动状态(已经连接到它的远程节点)。它现在可以接 收和发送数据了
ChannelInactive
:
Channel
没有连接到远程节点 当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler
,其可以随后对它们做出响应。在我们的编程中,关注
ChannelActive
和 ChannelInactive 会更多一些。
重要
Channel
的方法
eventLoop
: 返回分配给
Channel
的
EventLoop
pipeline
: 返回
Channel
的
ChannelPipeline
,也就是说每个
Channel
都有自己的
ChannelPipeline
。
isActive
: 如果
Channel
是活动的,则返回
true
。活动的意义可能依赖于底层的传输。 例如,一个 Socket
传输一旦连接到了远程节点便是活动的,而一个
Datagram
传输一旦被 打开便是活动的。
localAddress
: 返回本地的
SokcetAddress
remoteAddress
: 返回远程的
SocketAddress
write
: 将数据写到远程节点,注意,这个写只是写往
Netty
内部的缓存,还没有真正 写往 socket
。
flush
: 将之前已写的数据冲刷到底层
socket
进行传输。
writeAndFlush
: 一个简便的方法,等同于调用
write()
并接着调用
flush()
ChannelPipeline 和 ChannelHandlerContext
ChannelPipeline
接口
当
Channel
被创建时,它将会被自动地分配一个新的
ChannelPipeline
,每个
Channel
都 有自己的 ChannelPipeline
。这项关联是永久性的。在
Netty
组件的生命周期中,这是一项固 定的操作,不需要开发人员的任何干预。
ChannelPipeline
提供了
ChannelHandler
链的容器,并定义了用于在该链上传播
入站(也
就是从网络到业务处理)
和
出站(也就是从业务处理到网络)
,各种事件流的
API
,我们 代码中的 ChannelHandler
都是放在
ChannelPipeline
中的。 使得事件流经 ChannelPipeline
是
ChannelHandler
的工作,它们是在应用程序的初始化 或者引导阶段被安装的。这些 ChannelHandler
对象接收事件、执行它们所实现的处理逻辑, 并将数据传递给链中的下一个 ChannelHandler
,而且
ChannelHandler
对象也完全可以拦截 事件不让事件继续传递。它们的执行顺序是由它们被添加的顺序所决定的
ChannelHandler
的生命周期
在
ChannelHandler
被添加到
ChannelPipeline
中或者被从
ChannelPipeline
中移除时会调 用下面这些方法。这些方法中的每一个都接受一个 ChannelHandlerContext
参数。
handlerAdded
当把
ChannelHandler
添加到
ChannelPipeline
中时被调用
handlerRemoved
当从
ChannelPipeline
中移除
ChannelHandler
时被调用
exceptionCaught
当处理过程中在
ChannelPipeline
中有错误产生时被调用
ChannelPipeline
中的
ChannelHandler
入站和出站
ChannelHandler
被安装到同一个
ChannelPipeline
中,
ChannelPipeline
以双 向链表的形式进行维护管理。比如下图,我们在网络上传递的数据,要求加密,但是加密后 密文比较大,需要压缩后再传输,而且按照业务要求,需要检查报文中携带的用户信息是否 合法,于是我们实现了 5
个
Handler
:解压(入)
Handler
、压缩(出)
handler
、解密(入) Handler、加密(出)
Handler
、授权(入)
Handler
。
如果一个消息或者任何其他的入站事件被读取,那么它会从
ChannelPipeline
的头部开 始流动,但是只被处理入站事件的 Handler
处理,也就是解压(入)
Handler
、解密(入)
Handler
、 授权(入) Handler
,最终,数据将会到达
ChannelPipeline
的尾端,届时,所有处理就都结
束了。
数据的出站运动(即正在被写的数据)在概念上也是一样的。在这种情况下,数据将从 链的尾端开始流动,但是只被处理出站事件的 Handler
处理,也就是加密(出)
Handler
、 压缩(出)handler
,直到它到达链的头部为止。在这之后,出站数据将会到达网络传输层, 也就是我们的 Socket
。
Netty
能区分入站事件的
Handler
和出站事件的
Handler
,并确保数据只会在具有相同定 向类型的两个 ChannelHandler
之间传递。
所以在我们编写
Netty
应用程序时要注意,分属出站和入站不同的
Handler
,
在业务没
特殊要求的情况下
是无所谓顺序的,正如我们下面的图所示,比如‘压缩(出)
handler
‘可 以放在‘解压(入)handler
‘和‘解密(入)
Handler
‘中间,也可以放在‘解密(入)
Handler ‘和‘授权(入) Handler
‘之间。 而同属一个方向的 Handler
则是有顺序的,因为上一个
Handler
处理的结果往往是下一 个 Handler
的要求的输入。比如入站处理,对于收到的数据,只有先解压才能得到密文,才 能解密,只有解密后才能拿到明文中的用户信息进行授权检查,所以解压->
解密
->
授权这个 三个入站 Handler
的顺序就不能乱。