概要
- 把 channel 理解为数据的通道
- 把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过pipeline 的加工,会变成其它类型对象,最后输出又变成 ByteBuf
- 把 handler 理解为数据的处理工序
工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成..) 传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法)。
handler分inbound 和 Outbound 两类。
- 把 eventLoop 理解为处理数据的工人
工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)。
工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个channel 的待处理任务,任务分为普通任务、定时任务。
工人按照 pipeline 顺序依次按照 handler的规划(代码)处理数据,可以为每道工序指定不同的工人。
EventLoop
EventLoop本质是一个单线程执行器(同时维护了一个selector),一个EventLoopGroup是一组EventLoop,channel调用EventLoopGroup的register方法绑定一个EventLoop,后续这个channel的IO事件,都由此EventLoop来处理。
- 一个EventLoopGroup 包含一个或多个EventLoop
- 一个EventLoop只和一个Thread绑定。
- 所有EventLoop的io事件只和绑定的Thread上处理
- 一个channel只注册给一个EventLoop
- 一个EventLoop可能注册一个或多个channel
普通任务&定时任务
public class EventLoop {
private static final Logger logger = LoggerFactory.getLogger(EventLoop.class);
public static void main(String[] args) {
//1、创建时事件环组
NioEventLoopGroup group = new NioEventLoopGroup(2); //io事件、普通任务、定时任务
//DefaultEventLoop group = new DefaultEventLoop(); //普通任务、定时任务
//2、获取下一个事件循环对象
io.netty.channel.EventLoop next = group.next();
//3、执行普通任务
group.next().submit(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("执行普通任务");
});
//4、执行定时任务
group.next().scheduleAtFixedRate(()->{
logger.info("执行定时任务");
},0,1, TimeUnit.SECONDS);
logger.info("main~");
}
}
控制台
IO任务
Server
public class IoEventLoopServer {
private static final Logger logger = LoggerFactory.getLogger(IoEventLoopServer.class);
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
client
public class IoEventLoopClient {
private static final Logger logger = LoggerFactory.getLogger(EventLoop.class);
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("127.0.0.1", 8080))
.sync()
.channel();
System.out.println(channel);
}
}
在channel处多次进行数据发送(server使用同一线程进行接收处理)。
验证可知:一个EventLoop只和一个Thread绑定,后续这个channel的IO事件,都由此EventLoop来处理。
分工细化
在上述server代码做调整,将NioEventLoopGoup职责细化(设置两个worker线程)。
启动三个client向server发送信息,发现一个EventLoop可能注册一个或多个channel。
public class IoEventLoopServer {
private static final Logger logger = LoggerFactory.getLogger(IoEventLoopServer.class);
public static void main(String[] args) {
new ServerBootstrap()
//.group(new NioEventLoopGroup())
//boss只负责ServerSocketChannel上的accept worker只负责ServerChannel的读写
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
进一步细化
场景:已知一个EventLoop可以绑定多个channel,假设某个EventLoop绑定了1000个channel,其中有一个channel要进行大规模的逻辑处理耗费时间长,势必会影响后续channel的处理。
方案:将耗时的channel事件绑定额外的EventLoopGroup组。
改进后Server代码
public class IoEventLoopServer {
private static final Logger logger = LoggerFactory.getLogger(IoEventLoopServer.class);
public static void main(String[] args) {
//创建一个独立的EventLoopGroup
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
//.group(new NioEventLoopGroup())
//boss只负责ServerSocketChannel上的accept worker只负责ServerChannel的读写
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handle1",new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg);//将消息传递给下一个handler
}
}).addLast(group,"handle2",new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
控制台输出