5.2.2.Channel
Channel 的基本概念
在 Netty 中,Channel 是表示网络传输的开放连接的抽象。它提供了对不同种类网络传输的统一视图,比如 TCP 和 UDP。
Channel 的生命周期
Channel 的生命周期包括创建、激活、连接、读取、写入和关闭等阶段。Netty 中的 Channel 具有状态,根据不同的事件触发状态转换。
Channel channel = ...; // 获取 Channel 实例
// 检查 Channel 是否打开
if (channel.isOpen()) {
// 进行数据读取操作
channel.read();
}
// 关闭 Channel
channel.close();
Channel 的异步 I/O
Netty 中的 Channel 支持异步的 I/O 操作,这意味着可以在不阻塞线程的情况下进行网络通信。下面是一个简单的读取操作示例:
// 从 Channel 中读取数据
channel.read(new ChannelHandler() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 处理读取到的数据
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
buf.release(); // 释放资源
}
});
ChannelHandler 和 ChannelPipeline
ChannelHandler 用于处理入站和出站的事件,而 ChannelPipeline 是一系列 ChannelHandler 的链,负责处理 Channel 传递的事件。
// 创建一个 ChannelInitializer 用于初始化 ChannelPipeline
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加自定义的 ChannelHandler 到 ChannelPipeline 中
pipeline.addLast("handler", new MyChannelHandler());
}
};
// 在 ServerBootstrap 中应用 ChannelInitializer
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
channel的主要作用
- close():主要用来关闭channel
- **closeFuture():**用来处理channel的关闭
- sync方法作用是同步等待channel的关闭
- addListener方法是异步等待channel关闭
- **pipeline():**方法添加处理器
- **write():**方法是将数据写入
- **writeAndFlush():**方法是将数据写入并刷出
例如刚刚的客户端代码
// 1.创建启动器 try { new Bootstrap() // 2.指定线程模型 一个用于接收客户端连接,另一个用于处理客户端读写 .group(new NioEventLoopGroup()) // 3.选择客户端的Channel的实现 .channel(NioSocketChannel.class) // 4.添加处理器 .handler(new ChannelInitializer<NioSocketChannel>() { // 5.初始化处理器 @Override protected void initChannel(NioSocketChannel ch) throws Exception { // 6.添加具体的handler 客户端是需要一个编码器 ch.pipeline().addLast(new StringEncoder()); } }) // 7.连接到服务器 .connect(new InetSocketAddress("localhost", 8080)) .sync() // 阻塞方法 知道连接建立 .channel() // 代表客户端和服务端的连接 // 8.向服务器发送数据 .writeAndFlush("hello, world"); } catch (InterruptedException e) { throw new RuntimeException(e); }
5.2.2.1.连接问题sync
// 1.创建启动器
try {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
// 7.连接到服务器
// connect方法是异步的,返回一个ChannelFuture(异步调用 就是不关心结果,直接返回)
// main线程发起了调用,真正执行了connect是另外一个线程 nio线程
.connect(new InetSocketAddress("localhost", 8080));
// 7.1.同步等待连接成功 如果不调用sync()方法,main线程会继续往下执行,不会等待connect()方法的执行结果
channelFuture.sync();
// 7.2.获取连接对象 如果没有调用sync()方法,这里的channel此时还没有真正建立起连接
Channel channel = channelFuture.channel(); // 连接对象
logger.error("channel: {}", channel);
// 8.向服务器发送数据
channel.writeAndFlush("hello, world");
} catch (Exception e) {
throw new RuntimeException(e);
}
5.2.2.2.处理结果
带有Future Promise 的类型,都是和异步方法配套使用的,用来正确处理结果的
-
调用
channelFuture.sync()
处理同步结果,sync()
主要是阻塞当前线程,直到nio线程连接建立完毕 -
使用
addListener(new ChannelFutureListener() )
-
// 使用addListener(回调对象)方法,可以在ChannelFuture执行完成后,再执行一些操作 channelFuture.addListener(new ChannelFutureListener() { // 在NIO线程连接建立好后,会调用operationComplete方法 @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { // 7.2.获取连接对象 如果没有调用sync()方法,这里的channel就会是null Channel channel = channelFuture.channel(); // 连接对象 logger.error("channel: {}", channel); // 8.向服务器发送数据 channel.writeAndFlush("hello, world"); } else { // 7.3.连接失败 Throwable cause = channelFuture.cause(); logger.error("connect failed: {}", cause); } } });
-
5.2.2.3.处理关闭
小需求 : 客户端 不断接收用于输入的信息,然后发送给客户端,当用户端输入q 退出 关闭channel
/**
*
* @author 13723
* @version 1.0
* 2024/2/27 21:46
*/
public class CloseFutureClient {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
// 客户端 不断接收用于输入的信息,然后发送给客户端,当用户端输入q 退出
// 建立建立
Channel channel = channelFuture.sync().channel();
logger.error("channel: {} ",channel);
// 接收用户输入的需求
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true){
String s = scanner.nextLine();
if ("q".equals(s)){
// 退出 关闭channel
// 1s 后才真正的关闭
channel.close();
// 退出循环
logger.error("处理关闭之后的操作!");
break;
}
// 向服务器 发送数据
channel.writeAndFlush(s);
}
},"input").start();
}
}
-
解决
-
使用CloseFuture.sync()
// 关闭Channel // 获取closeFuture对象 1.同步受理关闭 2.异步处理关闭 ChannelFuture closeFuture = channel.closeFuture(); logger.error("wait close... "); closeFuture.sync(); logger.error("处理关闭之后的操作!");
-
使用addListener(new ChannelFutureListener())
closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { logger.error("处理关闭之后的操作!"); } });
-
`
此时关闭,会会发现客户端并没有结束,因为线程虽然结束,但是NioEventLoopGroup 里面可能还有线程,这是时关闭,需要调用
**shutdownGracefully()**方法
// 将NioEventLoopGroup提出来
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.........
// 然后在处理善后中调用
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
logger.error("处理关闭之后的操作!");
// 需要保证整个全部关闭
group.shutdownGracefully();
}
5.2.2.4.为什么使用异步
思考下面这样的场景,4个医生给人看病,每个病人花费20分钟,而且医生看病的过程中,是以病人为单位的,一个病人看完了,才能看下一个病人,假设病人源源不断来,可以计算一天4个医生工作8小时,处理病人总数
4 * 8 * 3 = 96
经研究 发现 看病可以分为 四个步骤 经拆分后每个步骤仅需要五分钟
因此 可以做如下优化,只有一开始, 医生 2 3 4 需要分别等待 5 10 15分钟开能开始执行工作,但是只要后续病人源源不断的来,他们就能满负荷工作,并且处理病人的能力提高 到了,
4 * 8 * 12
整个效率 是原先的 4 倍
(满负载情况下)第一个医生 只挂号,一个号五分钟,那么 一个小时 可以处理 12个,之前一个医生从头到尾只能看一个病人,那么一个小时只能看3个
- 单线程没法异步提高效率,必须配合多线程,多核心cpu才能发挥异步的优势
- 异步并没有缩短响应时间,反而有所增加(提高的是吞吐量,单位时间内能够处理请求的速度)
- 合理任务的拆分,也是利用异步的关键