Netty网络编程
对于高并发的Reactor线程模型,Netty是如何支持的?
Netty线程模型是基于Reactor模型实现的,对Reeactor三种模式都有非常好的支持,并做了一定的改进,也非常的灵活,一般情况,在服务端会采用主从架构模型。
BossGroup负责和客户端建立连接(循环执行三个步骤):
<1> 轮询注册在其上的accept事件
<2> 处理accept事件与客户端建立连接,生成socketchannel,并将其注册到某个WorkerGroup的Selector上
<3> 循环处理任务队列中的其他任务
WorkerGroup负责处理连接上的读写循环执行三个步骤):
<1> 轮询注册在其上的socketchannel的读写事件
<2>在对应的socketchannel上处理读写事件
<3> 循环处理任务队列中的其他任务
注意:以上两个步骤<2>中,会用Pipeline,通过Pipeline可以拿到对应的Channel,Pipeline中维护了很多Handler(拦截、过滤、自定义处理器等)。
从代码编写上Netty如何使用不同的线程模型?
指定线程就好了
在客户端Netty使用什么线程模型?
只需要提供可用的线程池即可,开发者仍然只需编写事件处理器Handler,其他连接服务端,数据收发等均由netty实现
什么是ChannelPipeline和ChannelHandler ?
ChannelPipeline
- ChannelPipeline 是一个有序的处理器链容器,每个
Channel
(在网络编程中,Channel
表示连接的抽象)都有自己的管道。管道中的处理器按顺序排列,形成一个链式结构,用于处理或拦截进出Channel
的各种事件和数据。- 当数据从网络流入或流出时,它会经过管道中的每一个处理器。处理器可以修改、过滤、转换数据或者处理特定类型的事件(比如连接建立、读取完成、异常等)。
ChannelPipeline
提供了一种灵活的方式来组织和管理这些处理器,允许开发者在运行时动态插入、删除或替换处理器,从而定制不同阶段的数据处理逻辑。ChannelHandler
- ChannelHandler 是处理网络事件和数据的接口或抽象类的实现,它是管道中的基本单元,负责处理具体的业务逻辑。
- 根据处理方向的不同,ChannelHandler有两个主要的子接口:
- ChannelInboundHandler:处理入站事件和数据,例如读取客户端发送过来的消息、连接激活、用户事件等。
- ChannelOutboundHandler:处理出站事件和数据,例如客户端请求发送数据、关闭连接等。
- 一个
ChannelHandler
可能同时实现这两个接口,以处理双向的事件和数据流。另外,Netty还提供了一些抽象适配器类(如ChannelInboundHandlerAdapter
和ChannelOutboundHandlerAdapter
),方便开发者扩展和实现自己的处理器逻辑。
inbound入站事件处理顺序(方向)是由链表的头到链表尾,
outbound事件的处理顺序是由链表尾到链表头。
inbound入站事件由netty内部触发,最终由netty外部的代码消费
在Netty框架中,"inbound入站事件"指的是那些由网络底层(如操作系统内核)向Netty框架推送的数据到达事件,或者是由于网络连接状态变化(如连接建立、断开等)引发的事件。这类事件是由网络外部(相对于应用层代码)触发的,但最终需要由开发人员编写的Netty外部代码(即应用程序代码)来处理和消费。
outbound事件由netty外部的代码触发,最终由netty内部消费
"outbound出站事件"是指由应用程序代码发起的、意图影响或控制网络连接的行为事件。这些事件通常涉及到向外发送数据、关闭连接、改变连接状态等操作,它们是由开发者编写的Netty外部代码主动触发的。
实现一个简单的客户端和服务端通信的程序
服务端
客户端
两端ChannelHandler编写核心步骤
NettyServer
package com.hayaizo.netty.handler;
import com.hayaizo.netty.handler.handler.server.ServerInbound1Handler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.jbosslog.JBossLog;
import java.net.InetSocketAddress;
public class NettyServer {
public static void main(String[] args) {
NettyServer server=new NettyServer();
server.start(9999);
}
public void start(int port){
//创建线程池
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//配置服务端引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
//客户端的handler
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
ChannelPipeline pipeline=nioSocketChannel.pipeline();
pipeline.addLast(new ServerInbound1Handler());
}
});
//绑定端口
ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(port)).sync();
//获取服务端的channel
future.channel().closeFuture().sync();
//释放资源
//在finally中释放
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyClient
package com.hayaizo.netty.handler;
import com.hayaizo.netty.handler.handler.client.ClientInbound1Handler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) {
NettyClient client=new NettyClient();
client.connect("127.0.0.1",9999);
}
public void connect(String host,int port){
//创建线程池
EventLoopGroup group=new NioEventLoopGroup();
try {
//创建客户端引导类
Bootstrap bootstrap = new Bootstrap();
//配置
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//handler方法
ChannelPipeline pipeline= nioSocketChannel.pipeline();
pipeline.addLast(new ClientInbound1Handler());
}
});
//连接对端
ChannelFuture future = bootstrap.connect(host, port).sync();
//连接上了之后发送消息
//先获取到发送消息的管道
Channel channel = future.channel();
byte[] bytes = "hello netty server,I an netty client".getBytes();
//向管道要一块缓冲区
ByteBuf buf=channel.alloc().buffer(bytes.length);
buf.writeBytes(bytes);
channel.writeAndFlush(buf);
//链接全都关闭了之后释放资源
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
}
ClientInbound1Handler
package com.hayaizo.netty.handler.handler.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
public class ClientInbound1Handler extends ChannelInboundHandlerAdapter {
/**
* 客户端channel 准备就绪 ,生效
* 只回调一次
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ClientInbound1Handler ----channelActive ");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ClientInbound1Handler ----channelInactive ");
super.channelInactive(ctx);
}
/**
* 从channel中读取到了数据之后,回调
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("ClientInbound1Handler ----channelRead ");
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
String message = new String(bytes, Charset.defaultCharset());
System.out.println("客户端收到服务端发送的数据为:" + message);
/*Channel channel = ctx.channel();
// 向服务端端回写数据
byte[] bytes1 = "hello netty client ,i am netty server ,hehe".getBytes(StandardCharsets.UTF_8);
ByteBuf buf1 = ctx.alloc().buffer(bytes1.length);
buf1.writeBytes(bytes1);
channel.writeAndFlush(buf1);*/
super.channelRead(ctx, msg);
}
/**
* channel中数据读取完毕的回调
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("ClientInbound1Handler ----channelReadComplete ");
// ctx.writeAndFlush()
super.channelReadComplete(ctx);
}
/**
* 出现异常的回调
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("ClientInbound1Handler ----exceptionCaught ");
super.exceptionCaught(ctx, cause);
}
}
ServerInbound1Handler
package com.hayaizo.netty.handler.handler.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
public class ServerInbound1Handler extends ChannelInboundHandlerAdapter {
/**
* 客户端channel准备就绪,只会调用一次
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ServerInbound1Handler ----channelActive ");
super.channelActive(ctx);
}
/**
* 管道没了之后调用的
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ServerInbound1Handler ----channelInactive ");
super.channelInactive(ctx);
}
/**
* 读到数据之后回调这个函数
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("ServerInbound1Handler ----channelRead ");
ByteBuf buf=(ByteBuf) msg;
byte[] bytes=new byte[buf.readableBytes()];
buf.readBytes(bytes);
String message=new String(bytes, Charset.defaultCharset());
System.out.println("服务端收到的数据为:"+message);
//写回数据
Channel channel = ctx.channel();
byte[] bytes1 = "hello netty client ,i am netty server ,hahha".getBytes(StandardCharsets.UTF_8);
ByteBuf buf1 = ctx.alloc().buffer(bytes1.length);
buf1.writeBytes(bytes1);
channel.writeAndFlush(buf1);
super.channelRead(ctx, msg);
}
/**
* channel中数据读取完毕的回调
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("ServerInbound1Handler ----channelReadComplete ");
super.channelReadComplete(ctx);
}
/**
* 出现异常的回调
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("ServerInbound1Handler ----exceptionCaught ");
super.exceptionCaught(ctx, cause);
}
}
测试收发成功。
在Netty框架中,当我们搭建网络服务器时,会用到
Bootstrap
类来配置和启动服务器端口监听新进来的连接。在Bootstrap中,我们可以设置一个ChannelInitializer
。
ChannelInitializer
就像是一个“管道工”,它的工作并不直接处理网络通信数据,而是在新的网络连接(也就是Channel
)被创建出来的时候,首先去“装修”这条通道——这里的“装修”指的是构建和配置与该Channel相关的ChannelPipeline
。
ChannelPipeline
就像是一条流水线,里面包含了一系列处理器(Handler),每个处理器负责处理特定类型的网络事件或数据。我们在ChannelInitializer
中预先设定好这条流水线上的各个处理器,比如解码器、编码器、业务逻辑处理器等。所以,当我们在配置Bootstrap时,还未真正建立起任何一个Channel或者它们的Pipeline,但通过设置ChannelInitializer,我们就能够在每个新建立的Channel的初期就确定好它的Pipeline应该包含哪些处理器。这样一来,一旦有新的连接进来,Netty就会自动调用ChannelInitializer去初始化对应的ChannelPipeline,并按照我们的设定加载相应的处理器。
总结
- 每个channel就是一个连接管道,它含有一个由一系列handler组成的pipeline来处理channel每一次的in/out操作,当然handler被包在context中使用。
- 先在bootstrap中放一个ChannelInitializer的handler,存放在这里。这时真正的channel与pipeline还没有生成。ChannelInitializer的目的是在pipeline的处理器中先占个位置,等创建了channel之后,会把ChannelInitializer的handler一个个放到pipeline中。
- 当bootstrap进行init一个channel的时候,会给这个channel的pipeline放上这个ChannelInitializer的handler。
- 此时所有的主要对象与关系都组装都完成了。在nio中,Channel是要注册到selector中的,这个时候就是启动处理的时候。让pipleline中产生一个register事件,就由pipeline中的相关handler来自动处理了。
- 相关的的handler会进行相关的处理,目前只有一个,它处理中会init一下自己所属的channel,并把自己移除,之后还会继续传播这个事件。