概览
我们知道NIO就是调用系统内核的的select/poll/epoll方法来实现,这些系统内核方法会扫描或监控IO,每次将所有的IO的状态返回给NIO线程。让NIO线程可以选择处理读取可读状态的IO流,也可以选择继续监控轮询监控IO的其它状态。
reactor模型也叫做Dispatcher模型,即分发模型,NIO中分发线程和处理线程策略不同而衍生出了四种网络编程模型:单Reactor单线程、单Reactor多线程、多Reactor多线程。
单Reactor单线程
这里无非就是整个NIO Server里都是单线程顺序执行,Reactor调用select()方法IO状态为ACCEPT就派发给Acceptor处理,IO状态为OP_READ或OP_WRITE则派发给handler进行处理。
上图Reactor获取到IO状态后,依旧是单线程按顺序进行处理。具体代码如下:
package com.longqi.boottest.io;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author LongQi
* @projectName boot-integration
* @description: TODO
* @date 2023/3/29 20:06
*/
public class SingleReactorSingleThread {
public static void main(String[] args) {
int port = 8080;
Server server = new Server(port);
server.start();
sendMessage(port);
}
static class Server{
private int port;
public Server(int port){
this.port=port;
}
public void start(){
try {
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
// 设置select监听的事件
server.register(selector, SelectionKey.OP_ACCEPT);
new Thread(new Reactor(selector)).start();
}catch (Exception e){
System.out.println("服务器启动失败:"+e.getMessage());
e.printStackTrace();
}
}
}
static class Reactor implements Runnable{
private Selector selector;
private Dispatch dispatch = new Dispatch();
public Reactor(Selector selector){
this.selector=selector;
}
@Override
public void run() {
try{
while (true){
int n = selector.select();
if(n > 0){
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
dispatch.dispatch(selector,key);
iterator.remove();
}
}
}
}catch (Exception e){
System.out.println("服务器运行异常:"+e.getMessage());
e.printStackTrace();
}
}
}
static class Dispatch{
private ReadHandler handler = new ReadHandler();
private Acceptor acceptor = new Acceptor();
public void dispatch(Selector selector,SelectionKey key){
if(key.isAcceptable()){
acceptor.hand(selector,key);
}
if(key.isReadable()){
handler.hand(key);
}
}
}
static class Acceptor{
public void hand(Selector selector,SelectionKey key){
try{
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector,SelectionKey.OP_READ);
}catch (Exception e){
System.out.println("处理请求异常:"+e.getMessage());
e.printStackTrace();
}
}
}
static class ReadHandler{
public void hand(SelectionKey key){
try{
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (channel.read(buffer)!=-1){
//复位,转化为读模式
buffer.flip();
while (buffer.hasRemaining()){
System.out.println("收到客户端"+channel.socket().getPort()+"的信息:"+ StandardCharsets.UTF_8.decode(buffer).toString());
}
//清空缓存区,转化为写模式
buffer.clear();
}
}catch (Exception e){
System.out.println("处理请求异常:"+e.getMessage());
e.printStackTrace();
}
}
}
public static void sendMessage(int port) {
try{
Thread.sleep(2000);
ThreadPoolExecutor clientPool = new ThreadPoolExecutor(200,300,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(800));
for(int i=0;i<300;i++){
clientPool.submit(new Runnable() {
@Override
public void run() {
try{
Socket client = new Socket();
client.connect(new InetSocketAddress("127.0.0.1",port));
client.getOutputStream().write(("hello world,send time:"+System.nanoTime()).getBytes());
// 注意TCP粘包,这里不调用close,服务器是收不到消息的
client.close();
}catch (Exception e){
System.out.println("客户端发送信息失败");
}
}
});
}
}catch (Exception e){
System.out.println("客户端发送信息失败");
}
}
}
单Reactor多线程
这里NIO Server里都是单个Reactor调用select()方法,然后派发给handler时使用线程池进行处理。大大提升处理速度。
上图Reactor获取到IO状态后,可读可写的channel由线程池开线程进行处理。具体代码如下:
package com.longqi.boottest.io;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author LongQi
* @projectName boot-integration
* @description: TODO
* @date 2023/3/29 20:06
*/
public class SingleReactorMultiThread{
public static void main(String[] args) {
int port = 8080;
Server server = new Server(port);
server.start();
sendMessage(port);
}
static class Server{
private int port;
public Server(int port){
this.port=port;
}
public void start(){
try {
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
// 设置select监听的事件
server.register(selector, SelectionKey.OP_ACCEPT);
new Thread(new Reactor(selector)).start();
}catch (Exception e){
System.out.println("服务器启动失败:"+e.getMessage());
e.printStackTrace();
}
}
}
static class Reactor implements Runnable{
private Selector selector;
private Dispatch dispatch = new Dispatch();
public Reactor(Selector selector){
this.selector=selector;
}
@Override
public void run() {
try{
while (true){
int n = selector.select();
if(n > 0){
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
dispatch.dispatch(selector,key);
iterator.remove();
}
}
}
}catch (Exception e){
System.out.println("服务器运行异常:"+e.getMessage());
e.printStackTrace();
}
}
}
static class Dispatch{
private Acceptor acceptor = new Acceptor();
private ThreadPoolExecutor handPool = new ThreadPoolExecutor(200,300,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(800));
public void dispatch(Selector selector,SelectionKey key){
if(key.isAcceptable()){
acceptor.hand(selector,key);
}
if(key.isReadable()){
handPool.submit(new ReadHandler(key));
}
}
}
static class Acceptor{
public void hand(Selector selector,SelectionKey key){
try{
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector,SelectionKey.OP_READ);
}catch (Exception e){
System.out.println("处理请求异常:"+e.getMessage());
e.printStackTrace();
}
}
}
static class ReadHandler implements Runnable{
private SelectionKey key;
public ReadHandler(SelectionKey key){
this.key = key;
}
@Override
public void run() {
try{
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (channel.read(buffer)!=-1){
//复位,转化为读模式
buffer.flip();
while (buffer.hasRemaining()){
System.out.println("收到客户端"+channel.socket().getPort()+"的信息:"+ StandardCharsets.UTF_8.decode(buffer).toString());
}
//清空缓存区,转化为写模式
buffer.clear();
}
}catch (Exception e){
System.out.println("处理请求异常:"+e.getMessage());
e.printStackTrace();
}
}
}
public static void sendMessage(int port) {
try{
Thread.sleep(2000);
ThreadPoolExecutor clientPool = new ThreadPoolExecutor(200,300,60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(800));
for(int i=0;i<300;i++){
clientPool.submit(new Runnable() {
@Override
public void run() {
try{
Socket client = new Socket();
client.connect(new InetSocketAddress("127.0.0.1",port));
client.getOutputStream().write(("hello world,send time:"+System.nanoTime()).getBytes());
// 注意TCP粘包,这里不调用close,服务器是收不到消息的
client.close();
}catch (Exception e){
System.out.println("客户端发送信息失败");
}
}
});
}
}catch (Exception e){
System.out.println("客户端发送信息失败");
}
}
}
多Reactor多线程
这里NIO Server里有两个Reactor,一个分发给acceptor,一个分发给处理线程。