文章目录
- 1. 概述
- 2. 代码实例
- 2.1 服务端
- 2.2 客户端
- 2.3 运行截图
- 3. 整体结构
- 4. 重要组件
- 4.1 EventLoopGroup、EventLoop
- 4.2 Handler & Pipeline
- 4.3 ByteBuf
- 参考文献
1. 概述
Netty 是一款用于高效开发网络应用的 NIO 网络框架,它大大简化了网络应用的开发过程。
Netty 相比 JDK NIO 的优势:
● 易用性:Netty 在 NIO 基础上进行了更高层次的封装,屏蔽了 NIO 的复杂性,大大降低了开发者的上手难度;
● 稳定性:Netty 修复和完善了 JDK NIO 较多已知问题,例:select 空转导致 CPU 消耗 100%,TCP 断线重连,keep-alive 检测等;
● 可扩展性: 可定制化的线程模型,用户可以通过启动的配置参数选择 Reactor 线程模型;另一个是可扩展的事件驱动模型,将框架层和业务层的关注点分离,开发者只需要关注 ChannelHandler
Netty 比 JDK NIO 更低的资源消耗:
● 对象池复用技术。 Netty 通过复用对象,避免频繁创建和销毁带来的开销;
● 零拷贝技术。 除了操作系统级别的零拷贝技术外,Netty 提供了更多面向用户态的零拷贝技术,例如 Netty 在 I/O 读写时直接使用 DirectBuffer,从而避免了数据在堆内存和堆外内存之间的拷贝;
2. 代码实例
2.1 服务端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;
/**
* Netty服务端示例01
*/
public class NettyServer01 {
/**
* 主函数,服务器的入口点
* @param args 命令行参数
*/
public static void main(String[] args) {
// 创建BossGroup和WorkerGroup,分别处理连接接受和数据读写
NioEventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(2);
NioEventLoopGroup workerEventLoopGroup = new NioEventLoopGroup(8);
new ServerBootstrap() // 初始化ServerBootstrap
.group(bossEventLoopGroup, workerEventLoopGroup) // 设置EventLoopGroup
.channel(NioServerSocketChannel.class) // 指定服务器通道类
.childHandler(new ChannelInitializer<NioSocketChannel>() { // 设置通道初始化器
/**
* 初始化通道,添加处理器到通道的管道中
* @param ch 当前初始化的通道
*/
protected void initChannel(NioSocketChannel ch) {
// 添加多个处理器,分别处理入站和出站事件
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
/**
* 处理入站数据
* @param ctx 通道上下文
* @param msg 接收到的消息对象
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = inbound((ByteBuf) msg, "1");
ctx.fireChannelRead(byteBuf);
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws CharacterCodingException {
ByteBuf byteBuf = inbound((ByteBuf) msg, "2");
ctx.fireChannelRead(byteBuf);
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
/**
* 处理入站数据,将处理后的数据写回通道
* @param ctx 通道上下文
* @param msg 接收到的消息对象
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = inbound((ByteBuf) msg, "3");
ctx.channel().write(byteBuf);
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
/**
* 处理出站数据,在数据写出前进行加工
* @param ctx 通道上下文
* @param msg 要写出的消息对象
* @param promise 写操作的承诺
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ByteBuf byteBuf = outbound((ByteBuf) msg, "4");
ctx.writeAndFlush(msg);
ctx.write(byteBuf, promise);
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ByteBuf byteBuf = outbound((ByteBuf) msg, "5");
ctx.write(byteBuf, promise);
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ByteBuf byteBuf = outbound((ByteBuf) msg, "6");
ctx.write(byteBuf, promise);
}
});
}
})
.bind(8080); // 绑定端口并启动服务器
}
/**
* 对出站数据进行处理
* @param msg 待处理的ByteBuf对象
* @param no 数据标识号
* @return 处理后的ByteBuf对象
*/
private static ByteBuf outbound(ByteBuf msg, String no) {
ByteBuf byteBuf = msg;
String output = byteBufToString(byteBuf);
System.out.printf("\n\noutbound%s output: %s", no, output);
stringWriteToByteBuf(byteBuf, String.format("\noutbound%s 已处理", no));
return byteBuf;
}
/**
* 对入站数据进行处理
* @param msg 待处理的ByteBuf对象
* @param no 数据标识号
* @return 处理后的ByteBuf对象
*/
private static ByteBuf inbound(ByteBuf msg, String no) {
String input = byteBufToString(msg);
System.out.printf("\n\ninbound%s input: %s\n", no, input);
stringWriteToByteBuf(msg, String.format("\ninbound%s 已处理", no));
return msg;
}
/**
* 将ByteBuf对象转换为字符串
* @param msg 待转换的ByteBuf对象
* @return 字符串表示的数据
*/
private static String byteBufToString(ByteBuf msg) {
return msg.toString(StandardCharsets.UTF_8);
}
/**
* 将字符串写入ByteBuf对象
* @param byteBuf 待写入的ByteBuf对象
* @param msg 要写入的字符串数据
*/
private static void stringWriteToByteBuf(ByteBuf byteBuf, String msg) {
byteBuf.writeBytes(msg.getBytes(StandardCharsets.UTF_8));
}
}
2.2 客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.StandardCharsets;
public class NettyClient01 {
public static void main(String[] args) {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(((ByteBuf) msg).toString(StandardCharsets.UTF_8));
}
});
}
})
.connect("127.0.0.1", 8080)
.addListener((ChannelFutureListener) future -> {
future.channel().writeAndFlush("hello,world");
});
}
}
2.3 运行截图
服务端截图:
客户端截图:
3. 整体结构
● Boss EventLoopGroup: 负责监听网络连接事件,把新网络连接的Channel 注册到 Worker EventLoopGroup
● Worker EventLoopGroup: 分配一个 EventLoop 负责处理该 Channel 的读写事件,每个 EventLoop 都是单线程的,所以该连接线程安全的,通过 Selector 进行事件循环
● 客户端发起 I/O 读写事件时,服务端 EventLoop 会进行数据的读取,然后通过 Pipeline 触发各种监听器进行数据的加工处理。客户端数据会被传递到 ChannelPipeline 的第一个 ChannelInboundHandler 中,数据处理完成后,将加工完成的数据传递给下一个 ChannelInboundHandler。当数据写回客户端时,会将处理结果在 ChannelPipeline 的 ChannelOutboundHandler 中传播,最后到达客户端。
4. 重要组件
4.1 EventLoopGroup、EventLoop
EventLoop本质上是一个单线程执行器,维护了一个 Selector,里面有run方法处理Channel上源源不断的io事件。EventLoop继承了ScheduledExecutorService、和自己的OrderedEventExecutor,OrderedEventExecutor 提供了inEventLoop(java.lang.Thread thread)、EventExecutorGroup parent()、EventExecutor next() 等方法。
EventLoopGroup 是一组 EventLoop,Channel一般会调用 EventLoopGroup 的 register 方法绑定其中一个EventLoop,后续这个Channel上 的io都由这个EventLoop处理,EventLoop又是单线程的,保证了单个Channel io 事件处理的线程安全性。
4.2 Handler & Pipeline
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
● 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
● 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表
4.3 ByteBuf
传送
参考文献
- 黑马 Netty教程
- 拉钩教育 Netty 核心原理剖析与 RPC 实践 若地老师