文章目录
- 单线程Reactor模式
- 多线程Reactor模式
- Reactor模式中IO事件的处理流程
- Netty中的通道Channel
- Netty中的反应器Reactor
- Netty中的处理器Handler
- Netty中的通道Channel和处理器Handler的协作组件Pipeline
Reactor(反应器)模式是高性能网络编程在设计和架构方面的基础模式.Doug Lea大师在文章“Scalable IOin Java”中对Reactor模式的定义:
Reactor 模式是由Reactor线程、Handlers处理器两大角色组成,两大角色的职责分别如下:
(1) Reactor 线程的职责:负责响应IO事件,并且分发到Handlers处理器。
(2)Handlers处理器的职责:与IO事件(或者选择键)绑定,负责IO事件的处理,完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写到通道等。
我觉得Reactor模式类似事件驱动模式,当有事件触发时,事件源会将事件分发到Handler(处理器),由Handler负责事件处理。Reactor模式中的反应器角色类似于事件驱动模式中的事件分发器(Dispatcher)角色。
单线程Reactor模式
单线程Reactor模式是Reactor和Handlers 处于一个处于一个线程中执行。如图所示:
在单线程Reactor模式中,需要将attach和attachment结合使用:
在选择键注册完成之后调用attach()方法,将Handler实例绑定到选择键;当IO事件发生时调用attachment()方法,可以从选择键取出Handler实例,将事件分发到Handler处理器中完成业务处理。
单线程Reactor模式逻辑示例EchoServerReactor代码如下:
public class EchoServerReactor implements Runnable{
Selector selector;
ServerSocketChannel serverSocket;
public EchoServerReactor() throws IOException {
//Reactor初始化
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
//非阻塞
serverSocket.configureBlocking(false);
//分步处理,第一步,接收accept事件
SelectionKey sk =
serverSocket.register(selector,0,new AcceptorHandler());
// SelectionKey.OP_ACCEPT
serverSocket.socket().bind(address);
Logger.info("服务端已经开始监听:"+address);
sk.interestOps(SelectionKey.OP_ACCEPT);
}
//轮询和分发事件
@Override
public void run() {
try{
while (!Thread.interrupted()){
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()){
//反应器负责dispatch收到的事件
SelectionKey sk =it.next();
dispathch(sk);
}
selected.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
void dispathch(SelectionKey key){
Runnable r = (Runnable) key.attachment();
//调用之前绑定的选择键的处理器对象
if (r!=null){
r.run();
}
}
/**
* 新连接处理器,完成新连接的接收工作,为新连接创建一个负责数据传输的Handler
*/
class AcceptorHandler implements Runnable{
@Override
public void run() {
//接受新连接
try {
SocketChannel channel = (SocketChannel) serverSocket .accept();
//需要为新连接创建一个输入输出的Handler
if(channel!=null){
new EchoHandler(selector,channel);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new Thread(new EchoServerReactor()).start();
}
}
EchoHandler 代码如下:
public class EchoHandler implements Runnable{
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//处理器实例的状态:发送和接收,一个连接对应一个处理器实例
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
EchoHandler(Selector selector, java.nio.channels.SocketChannel c) throws IOException {
channel = c;
//设置为非阻塞模式
c.configureBlocking(false);
//仅仅取得选择键,绑定事件处理器
// 后设置感兴趣的IO事件
sk = channel.register(selector, 0);
//将Handler作为选择键的附件
sk.attach(this);
//第二步,注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
// 唤醒事件查询线程,在单线程模式下,这里没啥意义
selector.wakeup();
}
@Override
public void run() {
try{
if(state == SENDING){
//发送状态,把数据写入连接通道
channel.write(byteBuffer);
//byteBuffer切换成写模式,写完后,就准备开始从通道度
byteBuffer.clear();
//注册read就绪时间,开始接收客户端数据
sk.interestOps(SelectionKey.OP_READ);
//修改状态,进入接收状态
state= RECIEVING;
}else if (state == RECIEVING){
//接收状态,从通道读取数据
int length = 0;
while ((length = channel.read(byteBuffer))>0){
Logger.info(new String(byteBuffer.array(),0,length));
}
//读完后,翻转byteBuffer的读写模式
byteBuffer.flip();
//准备写数据到通道,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//注册完成后,进入发送状态
state= SENDING;
}
//处理结束了,这里不能关闭select key ,需要重复使用功能,sk.cancel
} catch (IOException e) {
e.printStackTrace();
}
}
}
NioDemoConfig 代码如下:
public class NioDemoConfig extends ConfigProperties {
static ConfigProperties singleton
= new NioDemoConfig("system.properties");
private NioDemoConfig(String fileName)
{
super(fileName);
super.loadFromFile();
}
public static final String SOCKET_SERVER_IP
= singleton.getValue("socket.server.ip");
public static final int SOCKET_SERVER_PORT
= singleton.getIntValue("socket.server.port");
}
单线程Reactor模式是基于Java的NIO实现的,Reactor和Handler都在同一条线程中执行。这样,带来了一个问题:当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。在这种场景下,被阻塞的Handler不仅仅负责输入和输出处理的传输处理器,还包括负责新连接监听的AcceptorHandler处理器,可能导致服务器无响应。这是一个非常严重的缺陷,导致单线程反应器模型在生产场景中使用得比较少。
多线程Reactor模式
多线程版本的Reator模式如下:
(1)将负责数据传输处理的IOHandler处理器的执行放入独立的线程池中。这样,业务处理线程与负责新连接监听的反应器线程就能相互隔离,避免服务器的连接监听受到阻塞。
(2)如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,并且为每一个SubReactor引入一个线程,一个线程负责一个选择器的事件轮询。这样充分释放了系统资源的能力,也大大提升了反应器管理大量连接或者监听大量传输通道的能力。
多线程Reactor模式的逻辑模型如下:
核心代码MultiThreadEchoServerReactor 如下:
public class MultiThreadEchoServerReactor {
ServerSocketChannel serverSocket;
AtomicInteger next = new AtomicInteger(0);
Selector bossSelector = null;
Reactor bossReactor = null;
//selectors集合,引入多个selector选择器
Selector[] workSelectors = new Selector[2];
//引入多个子反应器
Reactor[] workReactors = null;
MultiThreadEchoServerReactor() throws IOException {
//初始化多个selector选择器
bossSelector = Selector.open();// 用于监听新连接事件
workSelectors[0] = Selector.open(); // 用于监听read、write事件
workSelectors[1] = Selector.open(); // 用于监听read、write事件
serverSocket = ServerSocketChannel.open();
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
serverSocket.socket().bind(address);
serverSocket.configureBlocking(false);//非阻塞
//bossSelector,负责监控新连接事件, 将 serverSocket注册到bossSelector
SelectionKey sk =
serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);
//绑定Handler:新连接监控handler绑定到SelectionKey(选择键)
sk.attach(new AcceptorHandler());
//bossReactor反应器,处理新连接的bossSelector
bossReactor = new Reactor(bossSelector);
//第一个子反应器,一子反应器负责一个worker选择器
Reactor workReactor1 = new Reactor(workSelectors[0]);
//第二个子反应器,一子反应器负责一个worker选择器
Reactor workReactor2 = new Reactor(workSelectors[1]);
workReactors = new Reactor[]{workReactor1, workReactor2};
}
private void startService() {
// 一子反应器对应一条线程
new Thread(bossReactor).start();
new Thread(workReactors[0]).start();
new Thread(workReactors[1]).start();
}
//反应器
class Reactor implements Runnable {
//每条线程负责一个选择器的查询
final Selector selector;
public Reactor(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
//单位为毫秒
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (null == selectedKeys || selectedKeys.size() == 0) {
continue;
}
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
//Reactor负责dispatch收到的事件
SelectionKey sk = it.next();
dispatch(sk);
}
selectedKeys.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKey sk) {
Runnable handler = (Runnable) sk.attachment();
//调用之前attach绑定到选择键的handler处理器对象
if (handler != null) {
handler.run();
}
}
}
// Handler:新连接处理器
class AcceptorHandler implements Runnable {
@Override
public void run() {
try {
SocketChannel channel = serverSocket.accept();
Logger.info("接收到一个新的连接");
if (channel != null) {
int index = next.get();
Logger.info("选择器的编号:" + index);
Selector selector = workSelectors[index];
new MultiThreadEchoHandler(selector, channel);
}
} catch (IOException e) {
e.printStackTrace();
}
if (next.incrementAndGet() == workSelectors.length) {
next.set(0);
}
}
}
public static void main(String[] args) throws IOException {
MultiThreadEchoServerReactor server =
new MultiThreadEchoServerReactor();
server.startService();
}
}
MultiThreadEchoHandler 代码如下
public class MultiThreadEchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
//引入线程池
static ExecutorService pool = Executors.newFixedThreadPool(4);
MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
//仅仅取得选择键,后设置感兴趣的IO事件
sk = channel.register(selector, 0);
//将本Handler作为sk选择键的附件,方便事件dispatch
sk.attach(this);
//向sk选择键注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//唤醒 查询线程,使得OP_READ生效
selector.wakeup();
Logger.info("新的连接 注册完成");
}
@Override
public void run() {
//异步任务,在独立的线程池中执行
//提交数据传输任务到线程池
//使得IO处理不在IO事件轮询线程中执行,在独立的线程池中执行
pool.execute(new AsyncTask());
}
//异步任务,不在Reactor线程中执行
//数据传输与业务处理任务,不在IO事件轮询线程中执行,在独立的线程池中执行
public synchronized void asyncRun() {
try {
if (state == SENDING) {
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
//从通道读
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
//读完后,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
//异步任务的内部类
class AsyncTask implements Runnable {
@Override
public void run() {
MultiThreadEchoHandler.this.asyncRun();
}
}
}
Reactor模式中IO事件的处理流程
一个IO事件从操作系统底层产生后,在Reactor模式中的处理流程如下所示:
Reactor模式中IO事件的处理流程大致分为4步,具体如下:
step1:通道注册。IO事件源于通道(Channel),IO是和通道(对应于底层连接而言)强相关的。一个IO事件一定属于某个通道。如果要查询通道的事件,首先就要将通道注册到选择器。
step2:查询事件。在Reactor模式中,一个线程会负责一个反应器(或者SubReactor子反应器),不断地轮询,查询选择器中的IO事件(选择键)。
step3:事件分发。如果查询到IO事件,则分发给与IO事件有绑定关系的Handler业务处理器。
step4:完成真正的IO操作和业务处理,这一步由Handler业务处理器负责。
其中step1和step2是java NIO的功能。
Netty中的通道Channel
Reactor模式和通道紧密相关,反应器的查询和分发的IO事件都来自于Channel组件, 而Channel组件也是Netty中非常重要的组件,Netty中不直接使用java NIO的Channel组件,对Channel组件进行了自己封装,对于每一种通信连接协议,netty都实现了自己的通道,每一种协议基本上偶有NIO和OIO两个版本。
对于不同协议,Netty中常见的通道类型如下:
- NioSocketChannel:异步非阻塞TCP Socket传输通道。
- OioSocketChannel:同步阻塞式TCP Socket传输通道。
- NioServerSocketChannel:异步非阻塞TCPSocket服务端监听通道。
- OioServerSocketChannel:同步阻塞式TCPSocket服务端监听通道。
- NioDatagramChannel:异步非阻塞的UDP传输通道。
- OioDatagramChannel:同步阻塞式UDP传输通道。
- NioSctpChannel:异步非阻塞Sctp传输通道。
- OioSctpChannel:同步阻塞式Sctp传输通道。
- NioSctpServerChannel:异步非阻塞Sctp服务端监听通道。
- OioSctpServerChannel:同步阻塞式Sctp服务端监听通道。
在Netty的NioSocketChannel内部封装了一个Java NIO的SelectableChannel成员,通过对该内部的Java NIO通道的封装,对Netty的NioSocketChannel通道上的所有IO操作最终都会落地到Java NIO的SelectableChannel底层通道。NioSocketChannel的类结构图如下:
Netty中的反应器Reactor
Netty中的反应器组件有多个实现类,这些实现类与其通道类型相互匹配。对应于NioSocketChannel通道,Netty的反应器类为NioEventLoop(NIO事件轮询)。
NioEventLoop类有两个重要的成员属性:一个是Thread线程类的成员,一个是Java NIO选择器的成员属性。NioEventLoop的继承关系和主要成员属性如下图:
从上图可知:一个NioEventLoop拥有一个线程,负责一个Java NIO选择器的IO事件轮询。理论上来说,一个EventLoop反应器和NettyChannel通道是一对多的关系:一个反应器可以注册成千上万的通道,如下图所示
Netty 是通过使用EventLoopGroup 完成多线程版本的Reactor模式的,多个EventLoop线程放在一起,可以组成一个EventLoopGroup。
EventLoopGroup的构造函数有一个参数,用于指定内部的线程数。在构造器初始化时,会按照传入的线程数量在内部构造多个线程和多个EventLoop子反应器(一个线程对应一个EventLoop子反应器),进行多线程的IO事件查询和分发。如果使用EventLoopGroup的无参数构造函数,没有传入线程数量或者传入的数量为0,默认的EventLoopGroup内部线程数量为最大可用的CPU处理器数量的2倍。假设电脑使用的是4核的CPU,那么在内部会启动8个EventLoop线程,相当于8个子反应器实例。
Netty中的处理器Handler
Netty的Handler分为两大类:第一类是ChannelInboundHandler入站处理器;第二类是ChannelOutboundHandler出站处理器,二者都继承了ChannelHandler处理器接口。
以底层的Java NIO中的OP_READ输入事件为例剖析Netty入站处理流程:在通道中发生了OP_READ事件后,会被EventLoop查询到,然后分发给ChannelInboundHandler入站处理器,调用对应的入站处理的read()方法。在ChannelInboundHandler入站处理器内部的read()方法具体实现中,可以从通道中读取数据。Netty中的入站处理触发的方向为从通道触发,ChannelInboundHandler入站处理器负责接收(或者执行)。
Netty中的出站处理指的是从ChannelOutboundHandler出站处理器到通道的某次IO操作。无论是入站还是出站,Netty都提供了各自的默认适配器实现:ChannelInboundHandler的默认实现为ChannelInboundHandlerAdapter(入站处理适配器)。ChannelOutboundHandler的默认实现为ChannelOutBoundHandlerAdapter(出站处理适配器)。这两个默认的通道处理适配器分别实现了基本的入站操作和出站操作功能。如果要实现自己的业务处理器,不需要从零开始去实现处理器的接口,只需要继承通道处理适配器即可。
Netty中的通道Channel和处理器Handler的协作组件Pipeline
Netty设计了ChannelPipeline(通道流水线)组件。它像一条管道,将绑定到一个通道的多个Handler处理器实例串联在一起,形成一条流水线。ChannelPipeline的默认实现实际上被设计成一个双向链表。所有的Handler处理器实例被包装成双向链表的节点,被加入到ChannelPipeline中。
以入站处理为例,每一个来自通道的IO事件都会进入一次ChannelPipeline。在进入第一个Handler处理器后,这个IO事件将按照既定的从前往后次序,在流水线上不断地向后流动,流向下一个Handler处理器。
在向后流动的过程中,会出现3种情况:
(1)如果后面还有其他Handler入站处理器,那么IO事件可以交给下一个Handler处理器向后流动。
(2)如果后面没有其他的入站处理器,就意味着这个IO事件在此次流水线中的处理结束了。
(3)如果在中间需要终止流动,可以选择不将IO事件交给下一个Handler处理器,流水线的执行也被终止了。
Netty的通道流水线是双向的并规定:入站处理器的执行次序是从前到后,出站处理器的执行次序是从后到前。流水线上入站处理器和出站处理器的执行次序可以用下图表示:
其实netty在为了方便开发者进行开发,N提供了一系列辅助类,其中引导类把上面的三个组件快速组装起来完成一个Netty应用,服务端的引导类叫作ServerBootstrap类,客户端的引导类叫作Bootstrap类。我们将在一篇博文中介绍Bootstrap类。