概述
Netty是什么
Netty的地位
Netty的优势
HelloWorld
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
// 1. 启动类
new Bootstrap()
// 2. 添加 EventLoop
.group(new NioEventLoopGroup())
// 3. 选择客户端 channel 实现
.channel(NioSocketChannel.class)
// 4. 添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
// 5. 连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel()
// 6. 向服务器发送数据
.writeAndFlush("hello, world");
}
}
public class HelloServer {
public static void main(String[] args) {
// 1. 启动器,负责组装 netty 组件,启动服务器
new ServerBootstrap()
// 2. BossEventLoop, WorkerEventLoop(selector,thread), group 组
.group(new NioEventLoopGroup())
// 3. 选择 服务器的 ServerSocketChannel 实现
.channel(NioServerSocketChannel.class) // OIO BIO
// 4. boss 负责处理连接 worker(child) 负责处理读写,决定了 worker(child) 能执行哪些操作(handler)
.childHandler(
// 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 6. 添加具体 handler
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new StringDecoder()); // 将 ByteBuf 转换为字符串
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 自定义 handler
@Override // 读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg); // 打印上一步转换好的字符串
}
});
}
})
// 7. 绑定监听端口
.bind(8080);
}
}
流程分析
正确理解
正确理解 Netty中各个组件的功能和职责
组件
EventLoop
普通任务和定时任务
@Slf4j
public class TestEventLoop {
public static void main(String[] args) {
// 1. 创建事件循环组
EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
// EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务,定时任务
// 2. 获取下一个事件循环对象
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
// 3. 执行普通任务
group.next().execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("ok");
});
// 4. 执行定时任务
group.next().scheduleAtFixedRate(() -> {
log.debug("ojbk");
}, 0, 1, TimeUnit.SECONDS);
log.debug("main");
}
}
IO任务
Netty客户端是多线程程序,idea debug 默认
断点模式为ALL
,即会停止主线程以及守护线程,所以当客户端断点自定义Evaluate发送数据时,守护线程的发送数据Channel也被断点停止,所以无法发送数据
选择Thread
只停止当前线程,守护线程仍然可以运行
一个客户端的NIO线程跟Channel建立链接就会建立一个绑定关系,后续客户端的Channel上的IO事件都由一个EventLoop处理,
客户端-Channel-EventLoop
绑定关系
EventLoop的分工细化
第一次细分,Netty建议将EventLoop
职责细分,分为boss和worker
group中传入两个EventLoop,那么boss只负责accept事件,worker负责read事件
上诉优化,worker中的NIOEventLoopGroup除了要负责SocketChannel的NIO连接操作还要负责连接后的读写操作,如果读写较长较重,那么会阻塞影响到worker其他的连接或读写操作,所以,
再次细分,EventLoop有两种实现,NIOEventLoopGroup
能处理IO事件普通任务和定时任务,DefaultEventLoopGroup
只能处理普通任务和定时任务,将读写操作交给它去处理耗时较长的读写操作。
作为对比,第一个没有指定group,默认使用了worker的NIOEventLoopGroup来处理读写操作,而第二则使用了DefaultEventLoop来处理读写操作
切换线程
Channel
正确的链接建立:ChannelFuture
处理异步连接
由于连接的建立是耗时的,所以Channel必须等到连接建立完成再执行获取,否则是无效的
connect方法返回的ChannelFuture若没有阻塞等待连接,那么接下来获取到的Channel是没有建立好连接的Channel
如上两种方法异步等待NIO线程建立完毕
谁发起的调用谁等待链接结果
正确的链接关闭:CloseFuture的关闭
不能直接在主线程或其他线程中直接处理关闭操作,因为nioEventLoopGroup-2-1
属于异步线程,此处close
方法非阻塞,有可能在关闭操作还未完成就执行了关闭后操作
解决方法:
使用阻塞关闭方法,只有当channel真的关闭了才执行后面的方法
优雅的关闭:等待还未执行完的操作执行完后再关闭
为什么Netty是异步设计
Future & Promise
概述
Future
jdk中的Future
Future就是在线程之间传递结果的一个容器,是被动的获取结果,由执行完任务的线程给予的结果,没有暴露主动赋予结果的方法
@Slf4j
public class TestJdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.创建线程池
ExecutorService service = Executors.newFixedThreadPool(2);
//2.提交任务
Future<Object> future = service.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
log.debug("执行计算");
Thread.sleep(1000);
return 50;
}
});
//3.祝线程通过future获取结果,get是阻塞等待方法
log.debug("等待结果");
log.debug("结果{}",future.get());
}
}
Netty 中的 Future
与jdk中的差不多,继承至jdk的Future,做了增强
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
//提交任务
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算");
System.out.println("执行计算");
Thread.sleep(1000);
return 50;
}
});
//通过future获取结果,get是阻塞等待方法
log.debug("等待结果");
log.debug("结果{}",future.get());
//异步方式获取结果
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
//getNow非阻塞等待 立即获取结果
log.debug("结果{}",future.getNow());
}
});
}
}
Promise
Promise又继承至Netty的Future,功能更强大,可以主动填充结果,对于网络通信非常有用
@Slf4j
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 准备 EventLoop 对象
EventLoop eventLoop = new NioEventLoopGroup().next();
// 2. 可以主动创建 promise, 结果容器
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(() -> {
// 3. 任意一个线程执行计算,计算完毕后向 promise 填充结果
log.debug("开始计算...");
try {
int i = 1 / 0;
Thread.sleep(1000);
promise.setSuccess(80);
} catch (Exception e) {
e.printStackTrace();
promise.setFailure(e);
}
}).start();
// 4. 接收结果的线程
log.debug("等待结果...");
log.debug("结果是: {}", promise.get());
}
}
Handler & Pipeline
Pipeline
Inbound
入栈是按入栈顺序出,出栈是按入栈顺序返出
channelRead 是一个调用链,如果中间没调用,那么后面的handler 则调用不到
Outbound
注意ctx.writeAndFlush
和 ch.writeAndFlush
ctx.writeAndFlush
是从当前调用的 handler 往后寻找 OutboundHandler
,若之前没有执行到OutboundHandler
那么找不到OutboundHandler
执行
而ch.writeAndFlush
则是从整个调用链的最前端 tail 处理开始往后寻找OutboundHandler
而且先执行调用链中的InboundHandler
输入,中间的 OutboundHandler
被跳过不影响正常输入执行
如图ch.writeAndFlush
的调用执行流程
ByteBuffer
netty
中 ByteBuf
容量动态扩容,netty
中 ByteBuffer
固定容量
netty 中 ByteBuffer 默认使用直接内存(系统内存、内存条)
例如 扩容 2 的整数倍 2^9=512
扩容至 2^10 =1024
tail 只能处理原始 ByteBuf
如果中途 ByteBuf
被转换成其他数据类型,则 tail 无法自动release
零拷贝 slice
slice 是 netty 中对于零拷贝的体现之一
切片后生成的对象,实际上还是操作原始bytebuf
的内容
使用习惯,切片自己增加引用计数,避免被其他调用者释放
component 组合零拷贝
writeBytes
会发生真正的数据复制,每次writeBytes
都会发生数据复制
addComponents
是使逻辑上连续,没有发生复制
双向通信
实现一个 echo server
编写 server
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(Charset.defaultCharset()));
// 建议使用 ctx.alloc() 创建 ByteBuf
ByteBuf response = ctx.alloc().buffer();
response.writeBytes(buffer);
ctx.writeAndFlush(response);
// 思考:需要释放 buffer 吗
// 思考:需要释放 response 吗
}
});
}
}).bind(8080);
编写 client
NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(Charset.defaultCharset()));
// 思考:需要释放 buffer 吗
}
});
}
}).connect("127.0.0.1", 8080).sync().channel();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}).start();
💡 读和写的误解
我最初在认识上有这样的误区,认为只有在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上,Java Socket 是全双工的:在任意时刻,线路上存在A 到 B
和 B 到 A
的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读
例如
public class TestServer {
public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(8888);
Socket s = ss.accept();
new Thread(() -> {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
while (true) {
System.out.println(reader.readLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
// 例如在这个位置加入 thread 级别断点,可以发现即使不写入数据,也不妨碍前面线程读取客户端数据
for (int i = 0; i < 100; i++) {
writer.write(String.valueOf(i));
writer.newLine();
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
客户端
public class TestClient {
public static void main(String[] args) throws IOException {
Socket s = new Socket("localhost", 8888);
new Thread(() -> {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
while (true) {
System.out.println(reader.readLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
for (int i = 0; i < 100; i++) {
writer.write(String.valueOf(i));
writer.newLine();
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}