目录
1.概述
2.hello world
3.EventLoop
4.channel
4.1.同步
4.2.异步
4.3.调试
4.4.关闭
4.5.为什么要用异步
5.future
6.promise
7.pipeline
8.byteBuf
8.1.创建
8.2.内存模式和池化
8.2.1.内存模式
8.2.2.池化
8.3.组成
8.4.操作
8.4.1.读写
8.4.2.释放
8.5.零拷贝
8.5.1.slice
8.5.2.composite
8.6.工具类
9.双向通信
10.粘包半包
10.1.问题成因
10.2.解决办法
10.2.1.短连接
10.2.2.解码器
1.概述
2.定长解码器
3.行解码器
4.固定帧长的解码器
11.协议解析
11.1.Redis
11.2.Http
12.协议设计
12.1.概述
12.2.编码
1.概述
netty,说人话就是封装NIO做出来的一个JAVA高性能通信框架。在JAVA领域,有高性能网络通信需求的时候,绝大多数都会选择netty作为通信框架。
关于JAVA的通信,我猜想可能博主的另外两篇关于BIO和NIO的文章作为本文的导读会不错:
详解TCP-CSDN博客
详解JAVA Socket-CSDN博客
JAVA BIO_java的bio有哪些-CSDN博客
全网最清晰JAVA NIO,看一遍就会-CSDN博客
netty底层就是封装的NIO。如果自己使用NIO的话至少会有以下的不便:
-
需要自己构建协议。
-
需要自己解决TCP传输问题,如粘包、半包。
-
API过于底层,不便于使用。
netty其实就是封装了一下NIO,使得NIO更便于使用。
2.hello world
依赖:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency>
服务器:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class HelloServer {
public static void main(String[] args) {
//ServerBootstrap,启动器,负责组装netty组件
new ServerBootstrap()
//1.怎样去接收IO?
//事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件
.group(new NioEventLoopGroup())
//2.接收成什么?
//服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的
//除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel
.channel(NioServerSocketChannel.class)
//3.做什么处理?
//支持用责任链模式来对收到的IO进行链式处理
.childHandler(new ChannelInitializer<NioSocketChannel>() {
//连接建立后才会调用初始化方法
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定解码方式
nioSocketChannel.pipeline().addLast(new StringDecoder());
//ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
//4.绑定监听端口
.bind(8080);
}
}
客户端:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class HelloCleint {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
//用什么进行发送?
//可以是BIO,也可以是NIO,也可以是epoll
.channel(NioSocketChannel.class)
//处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定编码方式
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
//连接到服务器
.connect(new InetSocketAddress("localhost",8080))
//同步通信
.sync()
//代表连接对象
.channel()
//发送数据
.writeAndFlush("hello world");
}
}
3.EventLoop
eventLoop,事件循环对象,是一个单线程执行器,本质上就是一条线程+一个selector,用来单线程监听处理IO事件。
实际使用上很少直接使用EventLoop,而是使用EventLoopGroup,EventLoopGroup的构造方法中可以指定其中的EventLoop数量。
eventLoop除了继承Netty体系类的一些标准化接口外,还继承了JDK中的ScheduledExecutorService,使得其自身具备线程池一切的能力。既然是线程池,就可以用来执行任务。
eventLoop执行普通任务:
EventLoopGroup group =new NioEventLoopGroup(5);
group.next().submit(()->{
try {
Thread.sleep(10000);
System.out.println("success!");
} catch (Exception e) {
e.printStackTrace();
}
});
eventLoop执行IO任务:
一个EventGroupLoop其实就是一条线程,用来处理一条通信连接。
public static void main(String[] args) {
//ServerBootstrap,启动器,负责组装netty组件
new ServerBootstrap()
//1.怎样去接收IO?
//事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件
.group(new NioEventLoopGroup())
//2.接收成什么?
//服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的
//除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel
.channel(NioServerSocketChannel.class)
//3.做什么处理?
//支持用责任链模式来对收到的IO进行链式处理
.childHandler(new ChannelInitializer<NioSocketChannel>() {
//连接建立后才会调用初始化方法
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定解码方式
nioSocketChannel.pipeline().addLast(new StringDecoder());
//ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
//4.绑定监听端口
.bind(8080);
}
netty还给了一种更加细粒度的分层,就是让一部分EventLoop来选择IO,一部分EventLoop来处理IO,说白了就是一部分EventLoop出selector,一部分EventLoop出Thread。
public static void main(String[] args) {
//ServerBootstrap,启动器,负责组装netty组件
new ServerBootstrap()
//boss线程只负责accept事件,worker线程只负责io读写
.group(new NioEventLoopGroup(),new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringDecoder());
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
.bind(8080);
}
4.channel
channel,对NIO的channel的二次封装,内核段缓冲区的抽象。不管是服务端还是客户端,只要调用channel()方法都能获取当前工作的这条channel。channel无非要注意的点就是它的同步和异步。
在实际应用中我们要知道在读的时候同步和异步是没有意义的,不可能在读IO的时候还区分同步读或者异步读,只可能是准备好了就读。只有写IO的时候区分同步和异步才是意义。所以在netty体系里很少会去服务端操作channel的同步和异步,一般都是在客户端操作channel的同步和异步。
4.1.同步
服务端:
在服务端让建立连接的时候休眠3秒。
public static void main(String[] args) {
//ServerBootstrap,启动器,负责组装netty组件
new ServerBootstrap()
//1.怎样去接收IO?
//事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件
.group(new NioEventLoopGroup())
//2.接收成什么?
//服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的
//除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel
.channel(NioServerSocketChannel.class)
//3.做什么处理?
//支持用责任链模式来对收到的IO进行链式处理
.childHandler(new ChannelInitializer<NioSocketChannel>() {
//连接建立后才会调用初始化方法
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定解码方式
nioSocketChannel.pipeline().addLast(new StringDecoder());
//ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Thread.sleep(3000);
System.out.println(msg);
}
});
}
})
//4.绑定监听端口
.bind(8080);
}
客户端:
客户端使用channel的sync来进行同步通信,同步模式下在connect建立连接的时候,主线程会同步等待,连接建立后再向下执行。
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class HelloCleint {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
//用什么进行发送?
//可以是BIO,也可以是NIO,也可以是epoll
.channel(NioSocketChannel.class)
//处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定编码方式
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
//连接到服务器
.connect(new InetSocketAddress("localhost",8080))
//同步通信
.sync()
//代表连接对象
.channel()
//发送数据
.writeAndFlush("hello world");
}
}
4.2.异步
channel默认处于异步通信模式,connect建立连接的时候,不会同步等待,而是会继续向下执行,由于服务器端延迟了3秒来建立连接,所以客户端发送这条“hello server”发送时,连接并未建立完成,最终效果就是丢包,服务器收不到这条数据。
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
//用什么进行发送?
//可以是BIO,也可以是NIO,也可以是epoll
.channel(NioSocketChannel.class)
//处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定编码方式
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
//连接到服务器
.connect(new InetSocketAddress("localhost", 8080));
//异步
channelFuture.channel().writeAndFlush("hello world");
}
当然,在异步通信上,netty支持了监听器,建立连接完成后,用事件回调的方式触发监听器。利用监听器,可以使得异步通信不丢包:
//异步
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
channelFuture.channel().writeAndFlush("hello world");
}
});
用监听器发送数据后,在当前业务场景下,即使服务端延迟了三秒才建立连接,但是任然能收到“hello world”这条消息。
4.3.调试
EmbeddedChannel
是Netty中提供的一种特殊类型的Channel
实现,主要用于单元测试。它允许你在测试中模拟输入事件(例如读取数据、写入数据)并检查输出事件(例如读取到的数据)。使用EmbeddedChannel
可以在不启动真实的网络连接的情况下测试你的ChannelHandler
逻辑。
代码示例:
自定义一个handler:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class UpperCaseHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
String upperCaseMsg = msg.toUpperCase();
ctx.writeAndFlush(upperCaseMsg);
}
}
测试:
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class UpperCaseHandlerTest {
@Test
public void testUpperCaseHandler() {
// 创建EmbeddedChannel,并添加要测试的Handler
EmbeddedChannel channel = new EmbeddedChannel(new UpperCaseHandler());
// 写入一个字符串消息到Channel
channel.writeInbound("hello");
// 读取Channel的输出
String output = channel.readOutbound();
// 验证处理后的消息是否符合预期
assertEquals("HELLO", output);
// 关闭Channel
channel.finish();
}
}
4.4.关闭
由于channel的close方法是异步的,所以在关闭资源时会存在风险。比如代码顺序为:
-
close掉channel
-
close掉其它资源
有可能在close掉其它资源的时候,channel并没有close掉,也就可能出现,channel中还有数据没处理完,其它资源被关掉了,导致数据处理失败的问题。所以更为稳妥的方式是用同步的机制来关闭channel。netty中封装了CloseFuture来同步关闭channel。
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
//同步关闭
closeFuture.sync();
要注意的是channel停止后如果EventLoopGroup还有其它线程时,程序是不会中止的,想要中止程序,必须再close掉group,EventLoopGroup提供了优雅停机的API——shutdownGracefully,会先停止接收请求,驻留的请求处理完成后,关掉group。
4.5.为什么要用异步
我们可以看到channel里面大量的用到了异步,对一个channel的操作,connect是一条线程,write是一条线程,close也是一条线程......
用异步的方式来处理,不仅不会加快单个IO任务的速度,反而还会略微拉长一个IO的响应时间,但是异步能明显提高吞吐量。
举个例子,一个病人看病,分为挂号、看病、缴费。取药,同步的方式就是一个医生走完一个病人的所有流程:
而异步的方式就是医生分工合作,每个医生单独负责一个项目,这样一个时间段内虽然处理的任务综合是一样的,但是在峰值的吞吐量上,异步是同步的四倍:
5.future
JDK的future是表示一个任务,netty的future是对JDK的future做了二次封装。
同步:
public static void main(String[] args) throws Exception {
NioEventLoopGroup nioEventLoopGroup=new NioEventLoopGroup();
Future<String> future = nioEventLoopGroup.submit(new Callable<String>() {
public String call() throws Exception {
Thread.sleep(1000);
return "success!";
}
});
//future的get方法是同步的,同步等待线程返回返回值为止
System.out.println(future.get());
}
异步:
用监听器实现异步
public static void main(String[] args) throws Exception {
NioEventLoopGroup nioEventLoopGroup=new NioEventLoopGroup();
Future<String> future = nioEventLoopGroup.submit(new Callable<String>() {
public String call() throws Exception {
Thread.sleep(1000);
return "success!";
}
});
//用监听器来实现异步
future.addListener(new GenericFutureListener<Future<? super String>>() {
public void operationComplete(Future<? super String> future) throws Exception {
System.out.println(future.get());
}
});
}
6.promise
光是有future是不够的,因为future必须处理完了,才能拿到结果,有些时候需要提前拿到结果开始处理,就需要在两个线程间进行通信,通信就需要一个存放数据的地方,也就有了promise,其可以理解为一个数据容器,可以向该容器中手动的存放数据、拿数据。
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
public static void main(String[] args) {
EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
EventLoop eventLoop = eventLoopGroup.next();
final DefaultPromise<String> promise=new DefaultPromise<String>(eventLoop);
eventLoop.execute(new Runnable() {
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
promise.setSuccess("success!");
}
});
//默认是同步
try {
System.out.println(promise.get());
} catch (Exception e) {
e.printStackTrace();
}
//可以用监听器来实现异步
//promise.addListener(new GenericFutureListener<Future<? super String>>() {
//public void operationComplete(Future<? super String> future) throws Exception {
//System.out.println(promise.get());
//}
//});
}
promise支持向外抛异常:
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
public class PromiseDemo {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
EventLoop eventLoop = eventLoopGroup.next();
final DefaultPromise<String> promise = new DefaultPromise<String>(eventLoop);
eventLoop.execute(new Runnable() {
public void run() {
try {
int i = 1 / 0;
} catch (Exception e) {
promise.setFailure(e);
}
}
});
try {
System.out.println(promise.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.pipeline
netty中使用了责任链来处理对channel的读写请求,链上每一个节点都是一个处理器,有两种处理器:
-
出站处理器,用来处理write操作。
-
入站处理器,用来处理read操作。
这里要注意,是出战、入站,不是出栈、入栈。Netty 的设计参考了这种网络协议栈的思想,所以出站(Outbound)和入站(Inbound)这两个概念是遵循网络协议栈的传统命名。
-
出站(Outbound):数据从应用程序流向网络的过程被称为“出站”,因为数据是从应用程序向外发送,穿越协议栈的各个层级,最终到达网络。
在 Netty 中,
ChannelOutboundHandlerAdapter
处理的是数据从应用程序到网络的过程,即数据从上层(应用层或业务层)向下层(传输层、网络层、数据链路层等)传递的过程。 -
入站(Inbound):数据从网络流向应用程序的过程被称为“入站”,因为数据是从外部网络进入应用程序,穿越协议栈的各个层级,最终到达应用程序。
在 Netty 中,
ChannelInboundHandlerAdapter
处理的是数据从网络到应用程序的过程,即数据从下层(传输层、网络层、数据链路层等)向上层(应用层或业务层)传递的过程。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Server {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//H1->H2->H3->h4->h5->h6
//入站处理器
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("H1");
//向下走
super.channelRead(ctx,msg);
}
});
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("H2");
//向下走
super.channelRead(ctx,msg);
}
});
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("H3");
//写操作,用来触发后面的出站处理器
nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Servers......".getBytes()));
}
});
//出站处理器
nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object o, ChannelPromise channelPromise) throws Exception {
System.out.println("h4");
super.write(ctx,o,channelPromise);
}
});
nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object o, ChannelPromise channelPromise) throws Exception {
System.out.println("h5");
super.write(ctx,o,channelPromise);
}
});
nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object o, ChannelPromise channelPromise) throws Exception {
System.out.println("h6");
super.write(ctx,o,channelPromise);
}
});
}
}).bind(8080);
}
}
client:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class Client {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel()
.writeAndFlush("hello world");
}
}
server端的输出结果:
入站处理器顺序执行,出栈处理器逆序执行。
8.byteBuf
8.1.创建
在Java NIO(New I/O)中,ByteBuffer
是一个用来处理字节数据的缓冲区类,是对NIO的byteBuffer的二次封装和扩展,可以直接理解为用户段内存的抽象。
开辟byteBuf:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
public class test {
public static void main(String[] args) {
//可以通过传参来指定大小
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
}
}
8.2.内存模式和池化
8.2.1.内存模式
根据所开辟的内存空间的位置的不同,byteBuf分为两类:
-
直接缓冲区
-
非直接缓冲区
直接缓冲区:
直接创建在物理机的缓冲区中,创建和销毁的代价昂贵,但是读写性能高。要注意的是直接内存不受GC的管理,需要注意手动释放内存,避免内存泄露。
创建池化的直接缓冲区:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
public class test {
public static void main(String[] args) {
ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer();
}
}
非直接缓冲区:
创建在JVM中的缓冲区,创建和销毁的代价相对没那么高,但是读写性能相对较低。
创建池化的非直接缓冲区:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
public class test {
public static void main(String[] args) {
ByteBuf directBuffer = ByteBufAllocator.DEFAULT.heapBuffer();
}
}
8.2.2.池化
ByteBuf
的池化是指将 ByteBuf
实例预先分配并存储在内存池中,以便在需要时进行重复使用。池化 ByteBuf
的主要目的是减少内存分配和垃圾回收的开销,从而提高性能。Netty 提供了池化 ByteBuf
的功能,它内置了两种 ByteBuf
池化的实现:PooledByteBufAllocator 和 UnpooledByteBufAllocator。
1.PooledByteBufAllocator(池化的内存分配器):
PooledByteBufAllocator
是 Netty 提供的默认的 ByteBuf
池化实现。它通过预先分配一些 ByteBuf
实例,并将它们存储在池中。当需要创建新的 ByteBuf
实例时,它会从池中获取已有的实例,而不是每次都重新分配内存。
使用 PooledByteBufAllocator
可以减少频繁的内存分配和释放操作,避免了堆内存的碎片化,提高了性能。
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
public class PooledByteBufExample {
public static void main(String[] args) {
// 使用 PooledByteBufAllocator 创建 ByteBuf
ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
ByteBuf pooledBuffer = allocator.buffer(1024); // 创建1KB的池化 ByteBuf
// 使用 pooledBuffer...
// 释放 ByteBuf,将其返回到池中
pooledBuffer.release();
}
}
2.UnpooledByteBufAllocator(非池化的内存分配器):
UnpooledByteBufAllocator
是 Netty 提供的非池化的 ByteBuf
实现。它每次都会分配新的内存,不会重用已有的 ByteBuf
实例。虽然不会涉及到池的管理,但在一些短期存活或者需要手动管理内存的场景下使用非池化内存分配器可能更合适。
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
public class UnpooledByteBufExample {
public static void main(String[] args) {
// 使用 UnpooledByteBufAllocator 创建 ByteBuf
ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
ByteBuf unpooledBuffer = allocator.buffer(1024); // 创建1KB的非池化 ByteBuf
// 使用 unpooledBuffer...
// 释放 ByteBuf(注意:在非池化情况下,需要手动释放 ByteBuf)
unpooledBuffer.release();
}
}
在使用池化 ByteBuf
时,需要注意在不再使用 ByteBuf
时调用 release()
方法,将它返回到池中,以便被重用。这样可以避免内存泄漏和提高性能。
8.3.组成
bytebuf一开始有个初始化容量(capacity),可以手动指定,没有手动指定时也有个默认值。
bytebuf是自动扩容的,扩容的上限(max capacity)其实就是机器的物理内存。
读写指针一开始在0位,随着读写,读写指针向后移动。要注意,bytebuf的读写,只涉及指针的移动,不涉及内存的回收,也就是读过的区域(废弃字节)并不会被释放,除非调用特殊的API(discardReadBytes())。
netty的bytebuf相较于NIO的bytebuffer,有以下优势:
-
bytebuffer读写公用一个指针,所以,读之前要切换到读模式;写之前要切换到写模式。
-
bytebuf自动扩容,而bytebuffer不行。
8.4.操作
8.4.1.读写
写操作:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.nio.charset.StandardCharsets;
public class test {
public static void main(String[] args) {
//写入数字
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeInt(666);
// 写入字符串
String stringValue = "Hello, World!";
byte[] stringBytes = stringValue.getBytes(StandardCharsets.UTF_8);
buffer.writeBytes(stringBytes);
}
}
读操作:
// 读取整数
int readIntValue = buffer.readInt();
// 读取字符串
int readableBytes = buffer.readableBytes();
byte[] stringBytes = new byte[readableBytes];
buffer.readBytes(stringBytes);
String readStringValue = new String(stringBytes, StandardCharsets.UTF_8);
需要注意的是,在读取数据之前,你需要确保 ByteBuf
中有足够的可读字节数。可以使用 readableBytes()
方法来检查 ByteBuf
中的可读字节数。
此外,ByteBuf
还提供了其他的读写操作,比如 readableBytes()
用于获取可读字节数,writerIndex()
和 readerIndex()
用于获取写入和读取的索引位置等。在使用 ByteBuf
时,请确保在读写时不越界,并且注意释放 ByteBuf
以避免内存泄漏。在Netty中,通常会使用 ReferenceCountUtil.release(buffer)
来释放 ByteBuf
,确保资源得到正确释放。
读写指针:
8.4.2.释放
bytebuf要特别注意资源的释放,以避免内存泄漏。Netty使用引用计数(Reference Counting)来管理 ByteBuf
的生命周期,确保在不再需要使用时及时释放资源。
在Netty中,release()
和 retain()
是用于管理 ByteBuf
引用计数的方法。
release()
方法用于将 ByteBuf
的引用计数减少1。当引用计数减至0时,Netty会释放 ByteBuf
的内存(如果使用了池化的 ByteBuf
,则将它归还给池)。
ByteBuf buffer = //... 从某个地方获取ByteBuf实例 buffer.release(); // 引用计数减少1,如果引用计数为0,释放ByteBuf的内存
retain()
方法用于将 ByteBuf
的引用计数增加1。当你调用 retain()
方法时,你告诉Netty你对这个 ByteBuf
感兴趣,即使在你使用完后,其他代码也可能继续使用它。
ByteBuf buffer = //... 从某个地方获取ByteBuf实例 buffer.retain(); // 引用计数增加1,防止在使用完后被提前释放
8.5.零拷贝
零拷贝其实没有严格的定义,指的是减少IO过程中数据在内存中拷贝的次数这样一个大致目标。在netty的ByteBuf中也存在一些零拷贝机制,用来在多个ByteBuf之间进行数据传递。
8.5.1.slice
在 Netty 中,ByteBuf
的 slice()
方法用于创建一个与原始 ByteBuf
共享数据的新 ByteBuf
。换句话说,slice()
方法返回一个从原始 ByteBuf
中截取出来的视图,这个视图与原始 ByteBuf
共享底层数据,但拥有自己的独立读写指针。由于是直接通过读写指针指向同一块内存的,所以slice出来的bytebuf并没有发送数据拷贝,是0拷贝。
如何理解拥有自己的独立读写指针喃?因为slice出来的buf和元buf共享内存,为了避免slice出来的buf通过写指针来进行写,进而影响元buf,netty在设计时故意就禁止了slice动用写指针来向元buf中进行写。只能通过读指针来读。
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
public class test {
public static void main(String[] args) {
//开一个容量为10字节的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
//写入数据
buffer.writeBytes(new byte[]{1,2});
//slice
ByteBuf slice = buffer.slice(0,2);
//因为slice不与元buf共享读写指针,所以write会报错,因为write是用的读写指针来进行读写,但是set不会报错,因为set不是用的读写指针来进行读写的。
//slice.writeByte(1);
slice.setByte(0,2);
while(slice.isReadable()){
System.out.println(slice.readByte());
}
}
}
8.5.2.composite
slice是将一个大的bytebuf划分成多个小的bytebuff,composite是将多个小的bytebuf聚合成一个大的bytebuf。
在 Netty 中,CompositeByteBuf
是 ByteBuf
的一个特殊实现,它提供了一种能够组合多个 ByteBuf
实例的方式。CompositeByteBuf
允许将多个 ByteBuf
视为一个单一的逻辑缓冲区,而不需要将它们合并成一个实际的连续内存块。这种设计可以提高内存的利用率和降低内存拷贝的次数。
public static void main(String[] args) {
ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer();
buffer1.writeBytes(new byte[]{1,2,3,4,5});
ByteBuf buffer2 = ByteBufAllocator.DEFAULT.buffer();
buffer2.writeBytes(new byte[]{6,7,8,9,10});
CompositeByteBuf compositeBuffer = ByteBufAllocator.DEFAULT.compositeBuffer();
//可变参数,可以有多个
compositeBuffer.addComponents(buffer1,buffer2);
while (compositeBuffer.isReadable()){
System.out.println(compositeBuffer.readByte());
}
}
8.6.工具类
Unpooled
是 Netty 提供的一个工具类,用于创建不需要池化的 ByteBuf
实例。在 Netty 中,ByteBuf
是用来操作字节数据的缓冲区类。通常,Unpooled
类提供了一些静态方法,用于创建不同类型的 ByteBuf
实例,包括堆缓冲区(heap buffer)、直接缓冲区(direct buffer)、组合缓冲区(composite buffer)等。
也就是说可以用unpooled来开辟各类型的bytebuf。
9.双向通信
服务端:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class Server {
public static void main(String[] args) {
//ServerBootstrap,启动器,负责组装netty组件
new ServerBootstrap()
//1.怎样去接收IO?
//事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件
.group(new NioEventLoopGroup())
//2.接收成什么?
//服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的
//除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel
.channel(NioServerSocketChannel.class)
//3.做什么处理?
//支持用责任链模式来对收到的IO进行链式处理
.childHandler(new ChannelInitializer<NioSocketChannel>() {
//连接建立后才会调用初始化方法
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定解码方式
nioSocketChannel.pipeline().addLast(new StringDecoder());
//ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
ByteBuf response = ctx.alloc().buffer();
response.writeBytes(msg.toString().getBytes());
ctx.writeAndFlush(response);
}
});
}
})
//4.绑定监听端口
.bind(8080);
}
}
客户端:
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class Client {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
//用什么进行发送?
//可以是BIO,也可以是NIO,也可以是epoll
.channel(NioSocketChannel.class)
//处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定编码方式
nioSocketChannel.pipeline().addLast(new StringEncoder());
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes("hello".getBytes());
ctx.writeAndFlush(buffer);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg.toString());
}
});
}
})
//连接到服务器
.connect(new InetSocketAddress("localhost", 8080));
}
}
10.粘包半包
10.1.问题成因
粘包:发送abc def,接收到abcdef
半包:发送abcdef,接收到abc或者def
原因:
-
没有清晰的结束符,导致不知道收到何处才是一个完成的包。
-
IO缓冲区大小过大或者过小,导致收太多或者收不完。
解决粘包和半包问题通常需要在设计通信协议时采取一些策略。
10.2.解决办法
10.2.1.短连接
解决粘包半包问题的其中一个办法是——短连接。
所谓短连接就是当一次完整的报文返送完成后,客户端主动断开TCP连接。粘包半包的根本原因其实就是不知道一个完整的报文何时收完,通过客户端发送完一次完整的信息后主动断开连接,让服务器端感知到,一次完整的信息发送完成。
客户端:
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
public static void main(String[] args) throws InterruptedException {
for (int i=0;i<10;i++){
send();
}
}
public static void send(){
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定编码方式
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10});
ctx.writeAndFlush(buffer);
ctx.channel().close();
}
});
}
}
);
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally {
worker.shutdownGracefully();
}
}
}
服务器:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server {
public static void main(String[] args) {
NioEventLoopGroup boss=new NioEventLoopGroup();
NioEventLoopGroup worker=new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss,worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
10.2.2.解码器
1.概述
解码器是netty自带的一类用来从请求报文中解析出数据的handler。其底层原理都是从指定位置开始,解析出定长的字节内容来。
2.定长解码器
FixedLengthFrameDecoder,定长解码器,用来在报文中获取出指定长度的字节。
server:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import java.nio.charset.StandardCharsets;
public class server {
public static void main(String[] args) throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 定长解码器,每个消息长度固定为10个字节
pipeline.addLast(new FixedLengthFrameDecoder(10));
// 业务处理器
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
String content = byteBuf.toString(StandardCharsets.UTF_8);
System.out.println("Received message: " + content);
byteBuf.release(); // 释放ByteBuf资源
}
});
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
}
}
3.行解码器
Netty的行处理器(LineBasedFrameDecoder)是一种用于处理以换行符(\n
)或回车换行符(\r\n
)为消息分隔符的情况。它会按照换行符或回车换行符将接收到的数据切分成消息,适用于处理文本协议中每行代表一个消息的场景。
server:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.LineBasedFrameDecoder;
public class LineBasedServerHandler extends ChannelInboundHandlerAdapter {
public LineBasedServerHandler() {
// 添加行处理器到ChannelPipeline中,使用换行符作为消息分隔符
ctx.pipeline().addLast(new LineBasedFrameDecoder(1024));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这里的msg是一个ByteBuf,表示一个完整的行消息
ByteBuf buf = (ByteBuf) msg;
String line = buf.toString(io.netty.util.CharsetUtil.UTF_8);
System.out.println("Received message: " + line);
buf.release(); // 释放ByteBuf资源
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
客户端:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class LineBasedClient {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加行处理器到ChannelPipeline中,使用换行符作为消息分隔符
pipeline.addLast(new LineBasedFrameDecoder(1024));
// 客户端的业务处理器
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 发送带换行符的消息
String message = "Hello, Netty!\n";
ctx.writeAndFlush(message);
}
});
}
});
bootstrap.connect("localhost", 8080).sync().channel().closeFuture().sync();
}
}
4.固定帧长的解码器
Netty中的LengthFieldBasedFrameDecoder
是一种用于解决粘包和半包问题的解码器。通信报文的结构说白了无非就是头部+身体,头部中记录关于消息长度等信息,身体中携带要传递的消息。LengthFieldBasedFrameDecoder
就是根据设置的参数来准确的切分消息的头部和身体,就能确保每个消息被正确地接收和处理。
构造方法如下:
public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip)
-
maxFrameLength
:指定消息的最大长度,超过这个长度的消息将被丢弃。 -
lengthFieldOffset
:指定长度字段在消息中的偏移量。 -
lengthFieldLength
:指定长度字段的长度,可以是1、2、3、4、8等字节。 -
lengthAdjustment
:指定长度字段的值需要进行调整的偏移量,通常为消息头的长度。 -
initialBytesToStrip
:指定解码时需要跳过的字节数,通常为长度字段的长度。
代码示例:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
public class LengthFieldServerHandler extends ChannelInboundHandlerAdapter {
public LengthFieldServerHandler() {
// 添加LengthFieldBasedFrameDecoder,指定各个参数
ctx.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这里的msg是一个ByteBuf,表示一个完整的消息
ByteBuf buf = (ByteBuf) msg;
String message = buf.toString(io.netty.util.CharsetUtil.UTF_8);
System.out.println("Received message: " + message);
buf.release(); // 释放ByteBuf资源
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
11.协议解析
11.1.Redis
11.2.Http
名字里带codec的,在业内基本都是编解码器。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class TestHttp {
public static void main(String[] args) {
NioEventLoopGroup boss=new NioEventLoopGroup();
NioEventLoopGroup worker=new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss,worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
//Http的解码器
socketChannel.pipeline().addLast(new HttpServerCodec());
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//经过http解码器解码后,请求会被解析为请求头实体或者请求体实体
if(msg instanceof HttpRequest){
//请求行、请求头
}else if(msg instanceof HttpContent){
//请求体
}
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
当然上面这种写法太繁琐了,netty提供了SimpleChannelInboundHandler,用泛型来指定处理请求头还是请求体:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class TestHttp {
public static void main(String[] args) {
NioEventLoopGroup boss=new NioEventLoopGroup();
NioEventLoopGroup worker=new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss,worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
//Http的解码器
socketChannel.pipeline().addLast(new HttpServerCodec());
socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
byte[] bytes = "<h1>hello world!</h1>".getBytes();
//响应头设置返回的消息的长度,否则浏览器不知道消息有多长,会一直刷新
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH,bytes.length);
//响应体设置返回的消息
response.content().writeBytes(bytes);
//写回响应
ctx.writeAndFlush(response);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
12.协议设计
12.1.概述
自定义协议要素:
-
魔术,用来判断数据包是否有效。
-
版本号,协议版本号,用来支持协议升级。
-
序列化算法,消息正文采用的序列化方式。
-
指令类型,是登录、注册、还是其他........
-
请求序号,用来支持双工通信,如TCP之类的。
-
正文长度
-
消息正文
12.2.编码
编解码,netty自带编解码器接口ByteToMessageCodec,允许开发者将数据报文转为自己想要的类型。
注意:想要转为的目标类型,必须是实现了序列化接口,可序列化的,不然会报错。
public class MyCodec extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
//4字节的魔数
out.writeBytes(new byte[]{1, 2,3,4});
//1字节的版本
out.writeByte(1);
//1字节的序列化方式jdk
out.writeByte(0);
//1字节的指令类型
out.writeByte(msg.getMessageType());
//4个字节序号
out.writeInt(msg.getSequenceId());
//填充字段
out.writeByte(0xff);
//消息内容
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);byte[] bytes = bos.toByteArray();
//消息长度
out.writeInt(bytes.length);
//写入内容
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//魔数
int magicNum = in.readInt();
//版本号
byte version = in.readByte();
//序列化类型
byte serializerType = in.readByte();
//指令类型
byte messageType = in.readByte();
//序号
int sequenceId = in.readInt();
//读填充字节
in.readByte();
//消息长度
int length = in.readInt();
//读消息
byte[] bytes = new byte[length];
in.readBytes(bytes, 0,length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
out.add(message);
}
}
测试:
public static void main(String[] args) throws Exception {
EmbeddedChannel channel=new EmbeddedChannel(
new LoggingHandler(),
new MyCodec()
);
Message message=new Message();
message.setData("hello".getBytes());
//出站会调用codec的encode()
channel.writeOutbound(message);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MyCodec().encode( null,message, buf);
//入站会调用codec的decode()
channel.writeInbound(buf);
}
测试半包、粘包问题:
public static void main(String[] args) throws Exception {
EmbeddedChannel channel=new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(1024,12,4,0,0),
new LoggingHandler(),
new MyCodec()
);
Message message=new Message();
message.setData("hello".getBytes());
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MyCodec().encode( null,message, buf);
ByteBuf s1=buf.slice(0,100);
ByteBuf s2=buf.slice(100,buf.readableBytes()-100);
//writeInbound后ByteBuf的引用计数会被-1,导致ByteBuf被释放掉,这里需要手动维持一下
s1.retain();
channel.writeInbound(s1);
channel.writeInbound(s2);
}