文章目录
- 一.NIO
- 1.简介
- 2.缓冲区(Buffer)
- 3.通道(Channel)
- 4.选择器(Selector)
- 5.原理
- 6.SelectionKey
- 7.ServerSocketChannel 和 SocketChannel
- 8.Socket
- 二.线程模型
- 1.传统阻塞 I/O 服务模型
- 2.Reactor 模式
- 3.单 Reactor 单线程
- 4.单Reactor多线程
- 5.主从 Reactor 多线程
- 6.为什么用Netty
- 三.Netty模型
- 1.简单版
- 2.Netty工作原理
- 3.TCP服务
- 4.异步模型
- 5.HTTP服务开发
- 四.Netty常用组件
- 1.Bootstrap、ServerBootstrap
- 2.Future、ChannelFuture
- 3.Channel
- 4.Selector
- 5.ChannelHandler
- 6.Pipeline 和 ChannelPipeline
- 7.ChannelHandlerContext
- 8.ChannelOption
- 9.EventLoopGroup 和其实现类 NioEventLoopGroup
- 10.Unpooled类
- 11.常用参数
- 12.编码和解码
- 13.ChannelOutboundHandler和ChannelInboundHadnler
- 14.TCP 粘包和拆包
- 15.群聊
- 16.心跳
- 17.websocket长连接
- 五.实战
- 1.堆外内存泄露
一.NIO
1.简介
I/O 模型简单的理解:就是用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能,
Java共支持3种网络编程模型/IO模式:BIO、NIO、AIO
Java BIO
-
同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善。
BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解。 -
BIO编程简单流程
- 服务器端启动一个ServerSocket
- 客户端启动Socket对服务器进行通信,默认情况下服务器端需要对每个客户 建立一个线程与之通讯
- 客户端发出请求后, 先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝
- 如果有响应,客户端线程会等待请求结束后,在继续执行
public static void main(String[] args) throws Exception {
//创建一个线程池
ExecutorService executorService = Executors.newCachedThreadPool();
//创建一个serverSocket
ServerSocket serverSocket = new ServerSocket(7777);
System.out.println("服务器启动。。。。。。。");
while (true){
System.out.println("等待连接=========================");
//未连接前阻塞在此
Socket accept = serverSocket.accept();
System.out.println("一个客户端连接进来了。。。。。。。");
//创建线程,与之通讯
executorService.execute(new Runnable() {
@Override
public void run() {
//通讯方法
handler(accept);
}
});
}
}
public static void handler(Socket accept) {
try {
System.out.println("线程消息 id="+Thread.currentThread().getId()+" name= "+Thread.currentThread().getName());
byte[] bytes = new byte[1024];
//循环读取数据
while (true){
System.out.println("read......");
//获取输入流
InputStream inputStream = accept.getInputStream();
int read = inputStream.read(bytes);
if (read != -1){
System.out.println(new String(bytes,0,read));
} else {
break;
}
}
} catch (Exception e){
e.printStackTrace();
} finally {
System.out.println("关闭客户端的连接。。。");
try {
accept.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Java NIO : 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4开始支持。
NIO 有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)
NIO是 面向缓冲区 ,或者面向 块 编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络
Java NIO的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。
NIO是可以做到用一个线程来处理多个操作的。假设有10000个请求过来,根据实际情况,可以分配50或者100个线程来处理。不像之前的阻塞IO那样,非得分配10000个。HTTP2.0使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比HTTP1.1大了好几个数量级。
Selector 、Channel 和 Buffer 的关系
- 每个channel 都会对应一个Buffer
- Selector 对应一个线程, 一个线程对应多个channel(连接)
- 该图反应了有三个channel 注册到 该selector //程序
- 程序切换到哪个channel 是有事件决定的, Event 就是一个重要的概念
- Selector 会根据不同的事件,在各个通道上切换
- Buffer 就是一个内存块 , 底层是有一个数组
- 数据的读取写入是通过Buffer, 这个和BIO , BIO 中要么是输入流,或者是输出流, 不能双向,但是NIO的Buffer 是可以读也可以写, 需要 flip 方法切换
- channel 是双向的, 可以返回底层操作系统的情况, 比如Linux , 底层的操作系统通道就是双向的
Java AIO(NIO.2) : 异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程。AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持
2.缓冲区(Buffer)
缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer
常用Buffer子类
- ByteBuffer,存储字节数据到缓冲区
- ShortBuffer,存储字符串数据到缓冲区
- CharBuffer,存储字符数据到缓冲区
- IntBuffer,存储整数数据到缓冲区
- LongBuffer,存储长整型数据到缓冲区
- DoubleBuffer,存储小数到缓冲区
- FloatBuffer,存储小数到缓冲区
Buffer属性
Buffer类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息:
Buffer API
public abstract class Buffer {
//JDK1.4时,引入的api
public final int capacity( )//返回此缓冲区的容量
public final int position( )//返回此缓冲区的位置
public final Buffer position (int newPositio)//设置此缓冲区的位置
public final int limit( )//返回此缓冲区的限制
public final Buffer limit (int newLimit)//设置此缓冲区的限制
public final Buffer mark( )//在此缓冲区的位置设置标记
public final Buffer reset( )//将此缓冲区的位置重置为以前标记的位置
public final Buffer clear( )//清除此缓冲区, 即将各个标记恢复到初始状态,但是数据并没有真正擦除, 后面操作会覆盖
public final Buffer flip( )//反转此缓冲区
public final Buffer rewind( )//重绕此缓冲区
public final int remaining( )//返回当前位置与限制之间的元素数
public final boolean hasRemaining( )//告知在当前位置和限制之间是否有元素
public abstract boolean isReadOnly( );//告知此缓冲区是否为只读缓冲区
//JDK1.6时引入的api
public abstract boolean hasArray();//告知此缓冲区是否具有可访问的底层实现数组
public abstract Object array();//返回此缓冲区的底层实现数组
public abstract int arrayOffset();//返回此缓冲区的底层实现数组中第一个缓冲区元素的偏移量
public abstract boolean isDirect();//告知此缓冲区是否为直接缓冲区
}
public abstract class ByteBuffer {
//缓冲区创建相关api
public static ByteBuffer allocateDirect(int capacity)//创建直接缓冲区
public static ByteBuffer allocate(int capacity)//设置缓冲区的初始容量
public static ByteBuffer wrap(byte[] array)//把一个数组放到缓冲区中使用
//构造初始化位置offset和上界length的缓冲区
public static ByteBuffer wrap(byte[] array,int offset, int length)
//缓存区存取相关API
public abstract byte get( );//从当前位置position上get,get之后,position会自动+1
public abstract byte get (int index);//从绝对位置get
public abstract ByteBuffer put (byte b);//从当前位置上添加,put之后,position会自动+1
public abstract ByteBuffer put (int index, byte b);//从绝对位置上put
}
3.通道(Channel)
-
NIO的通道类似于流,但有些区别如下:
• 通道可以同时进行读写,而流只能读或者只能写
• 通道可以实现异步读写数据
• 通道可以从缓冲读数据,也可以写数据到缓冲: -
BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道
(Channel)是双向的,可以读操作,也可以写操作。 -
Channel在NIO中是一个接口 public interface Channel extends Closeable{}
-
常用的 Channel 类有:FileChannel、DatagramChannel、ServerSocketChannel 和SocketChannel。【ServerSocketChanne 类似ServerSocket , SocketChannel 类似 Socket】
-
FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP的数据读写。
FileChannel类
FileChannel主要用来对本地文件进行 IO 操作,常见的方法有
- public int read(ByteBuffer dst) ,从通道读取数据并放到缓冲区中
- public int write(ByteBuffer src) ,把缓冲区的数据写到通道中
- public long transferFrom(ReadableByteChannel src, long position, long count),从目标通道
中复制数据到当前通道 - public long transferTo(long position, long count, WritableByteChannel target),把数据从当
前通道复制给目标通道
将 “hello,world” 写入到file01.txt 中
public static void main(String[] args) throws IOException {
//创建一个输出流
FileOutputStream fileOutputStream = new FileOutputStream("d:\\file02.txt");
//通过 fileOutputStream 获取 对应的 FileChannel
FileChannel channel = fileOutputStream.getChannel();
//创建buffer,并将字符串存入buffer
String str = "hello222";
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(str.getBytes());
byteBuffer.flip();
//将buffer中存放的数据写入channel
channel.write(byteBuffer);
fileOutputStream.close();
}
将 file01.txt 中的数据读入到程序,并显示在控制台屏幕
public static void main(String[] args) throws IOException {
//创建文件的输入流
File file = new File("d:\\file02.txt");
FileInputStream fileInputStream = new FileInputStream(file);
//通过fileInputStream 获取对应的FileChannel
FileChannel channel = fileInputStream.getChannel();
//创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate((int) file.length());
//将 通道的数据读入到Buffer
channel.read(buffer);
System.out.println(new String(buffer.array()));
fileInputStream.close();
}
示意图
使用 FileChannel(通道) 和 方法 read , write,完成文件的拷贝
public static void main(String[] args) throws IOException {
//创建输入流 用于读取源文件数据
FileInputStream fileInputStream = new FileInputStream("d:\\file02.txt");
FileChannel inputStreamChannel = fileInputStream.getChannel();
//创建输出流 用于将数据写入目标文件
FileOutputStream fileOutputStream = new FileOutputStream("d:\\file022.txt");
FileChannel outputStreamChannel = fileOutputStream.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(512);
while (true){
//清空buffer
buffer.clear();
//将inputStreamChannel中的存放的d:\file02.txt文件的数据写入buffer
int read = inputStreamChannel.read(buffer);
System.out.println("read="+read);
if (read==-1){
break;
}
//将buffer 中的数据写入到 outputStreamChannel --d:\file022.txt
buffer.flip();
outputStreamChannel.write(buffer);
}
fileInputStream.close();
fileOutputStream.close();
}
使用 FileChannel(通道) 和 方法 transferFrom ,完成文件的拷贝
public static void main(String[] args) throws IOException {
//创建输入流 用于读取源文件数据
FileInputStream fileInputStream = new FileInputStream("d:\\file02.txt");
FileChannel inputStreamChannel = fileInputStream.getChannel();
//创建输出流 用于将数据写入目标文件
FileOutputStream fileOutputStream = new FileOutputStream("d:\\file02-copy.txt");
FileChannel outputStreamChannel = fileOutputStream.getChannel();
//使用transferForm完成拷贝
outputStreamChannel.transferFrom(inputStreamChannel,0,inputStreamChannel.size());
inputStreamChannel.close();
outputStreamChannel.close();
fileInputStream.close();
fileOutputStream.close();
}
关于Buffer 和 Channel的注意事项和细节
- ByteBuffer 支持类型化的put 和 get, put 放入的是什么数据类型,get就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException 异常。
- 可以将一个普通Buffer 转成只读Buffer
- NIO 还提供了 MappedByteBuffer, 可以让文件直接在内存(堆外的内存)中进行修改, 而如何同步到文件由NIO 来完成.
- 前面我们讲的读写操作,都是通过一个Buffer 完成的,NIO 还支持 通过多个Buffer (即 Buffer 数组) 完成读写操作,即 Scattering 和 Gathering
4.选择器(Selector)
基本介绍
- Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器)
- Selector 能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。【示意图】
- 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程
- 避免了多线程之间的上下文切换导致的开销
特点
- Netty 的 IO 线程 NioEventLoop 聚合了 Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客
户端连接。 - 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
- 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。
- 由于读写操作都是非阻塞的,这就可以充分提升 IO线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
- 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
Selector类相关方法
public abstract class Selector implements Closeable {
public static Selector open();//得到一个选择器对象
public int select(long timeout);//阻塞,监控所有注册的通道,当其中有 IO 操作可以进行时,将对应的 SelectionKey 加入到内部集合中并返回,参数用来设置超时时间
public Set<SelectionKey> selectedKeys();//从内部集合中得到所有的 SelectionKey
selector.wakeup();//唤醒selector
selector.selectNow();//不阻塞,立马返还
}
5.原理
- 当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel
- Selector 进行监听 select 方法, 返回有事件发生的通道的个数.
- 将socketChannel注册到Selector上, register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel
- 注册后返回一个 SelectionKey, 会和该Selector 关联(集合)
- 进一步得到各个 SelectionKey (有事件发生)
- 在通过 SelectionKey 反向获取SocketChannel , 方法 channel()
- 可以通过 得到的 channel , 完成业务处理
服务器端和客户端之间的数据简单通讯(非阻塞)
服务端
public static void main(String[] args) throws IOException {
//创建ServerSocketChannel , 并通过它获取ServerSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//绑定一个端口6666, 在服务器端监听
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//得到一个Selecor对象
Selector selector = Selector.open();
//把 serverSocketChannel 注册到 selector 关心 事件为 OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("注册后的selectionkey 数量="+selector.keys().size());
//循环等待客户端连接
while (true){
//这里我们等待1秒,如果没有事件发生, 返回
if (selector.select(1000) == 0){
System.out.println("服务器等待1秒,无连接");
continue;
}
//如果返回的>0,表示已经获取到关注的事件, 可获取到相关的 selectionKey集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("selectionKeys 数量 = " + selectionKeys.size());
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
//获取到对应的SelectionKey
SelectionKey key = iterator.next();
//根据key 对应的通道发生的事件做相应处理
if (key.isAcceptable()){//如果是 OP_ACCEPT, 有新的客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode());
//将 SocketChannel 设置为非阻塞
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size());
}
//发生 OP_READ
if (key.isReadable()){
//通过key 反向获取到对应channel
SocketChannel channel = (SocketChannel) key.channel();
//获取到该channel关联的buffer
ByteBuffer buffer = (ByteBuffer)key.attachment();
channel.read(buffer);
System.out.println("客户端输出:"+new String(buffer.array()));
}
//手动从集合中移动当前的selectionKey, 防止重复操作
iterator.remove();
}
}
}
客户端
public static void main(String[] args) throws IOException {
//得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞
socketChannel.configureBlocking(false);
//服务器的ip和端口
InetSocketAddress address = new InetSocketAddress("127.0.0.1",6666);
//连接服务器
if (!socketChannel.connect(address)){
while (!socketChannel.finishConnect()){
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");
}
}
//如果连接成功,就发送数据,将 buffer 数据写入 channel
String str = "hello world";
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
socketChannel.read(buffer);
System.in.read();
}
6.SelectionKey
表示 Selector 和网络通道的注册关系, 共四种:
- int OP_ACCEPT:有新的网络连接可以 accept,值为 16
- int OP_CONNECT:代表连接已经建立,值为 8
- int OP_READ:代表读操作,值为 1
- int OP_WRITE:代表写操作,值为 4
SelectionKey相关方法
public abstract class SelectionKey {
public abstract Selector selector();//得到与之关联的Selector 对象
public abstract SelectableChannel channel();//得到与之关联的通道
public final Object attachment();//得到与之关联的共享数据
public abstract SelectionKey interestOps(int ops);//设置或改变监听事件
public final boolean isAcceptable();//是否可以 accept
public final boolean isReadable();//是否可以读
public final boolean isWritable();//是否可以写
}
7.ServerSocketChannel 和 SocketChannel
ServerSocketChannel 在服务器端监听新的客户端 Socket 连接
public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel{
public static ServerSocketChannel open(),得到一个 ServerSocketChannel 通道
public final ServerSocketChannel bind(SocketAddress local),设置服务器端端口号
public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
public SocketChannel accept(),接受一个连接,返回代表这个连接的通道对象
public final SelectionKey register(Selector sel, int ops),注册一个选择器并设置监听事件
}
SocketChannel,网络IO通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。
public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel{
public static SocketChannel open();//得到一个 SocketChannel 通道
public final SelectableChannel configureBlocking(boolean block);//设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
public boolean connect(SocketAddress remote);//连接服务器
public boolean finishConnect();//如果上面的方法连接失败,接下来就要通过该方法完成连接操作
public int write(ByteBuffer src);//往通道里写数据
public int read(ByteBuffer dst);//从通道里读数据
public final SelectionKey register(Selector sel, int ops, Object att);//注册一个选择器并设置监听事件,最后一个参数可以设置共享数据
public final void close();//关闭通道
}
8.Socket
套接字(socket)是一个抽象层,应用程序可以通过它发送或接收数据,可对其进行像对文件一样的打开、读写和关闭等操作。套接字允许应用程序将I/O插入到网络中,并与网络中的其他应用程序进行通信。网络套接字是IP地址与端口的组合,实现建立连接、发送数据以及接收数据的核心功能.
ServerSocket类具有accept()方法,此方法用于等待客户端发起通信,这样Socket对象就可用于进一步的数据传输。
EG:
服务端:
public class ServerDemo {
public static void main(String[] args) throws Exception {
//1.创建一个线程池,如果有客户端连接就创建一个线程, 与之通信
ExecutorService executorService = Executors.newCachedThreadPool();
//2.创建 ServerSocket 对象
ServerSocket serverSocket = new ServerSocket(9999);
System.out.println("服务器已启动");
while (true) {
//3.监听客户端
Socket socket = serverSocket.accept();
System.out.println("有客户端连接");
//4.开启新的线程处理
executorService.execute(new Runnable() {
@Override
public void run() {
handle(socket);
}
});
}
}
public static void handle(Socket socket) {
try {
System.out.println("线程ID:" + Thread.currentThread().getId()
+ " 线程名称:" + Thread.currentThread().getName());
//从连接中取出输入流来接收消息
InputStream is = socket.getInputStream();
byte[] b = new byte[1024];
int read = is.read(b);
System.out.println("客户端:" + new String(b, 0, read));
//连接中取出输出流并回话
OutputStream os = socket.getOutputStream();
os.write("没钱".getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//关闭连接
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
客户端:
public class ClientDemo {
public static void main(String[] args) throws Exception {
while (true) {
//1.创建 Socket 对象
Socket s = new Socket("127.0.0.1", 9999);
//2.从连接中取出输出流并发消息
OutputStream os = s.getOutputStream();
System.out.println("请输入:");
Scanner sc = new Scanner(System.in);
String msg = sc.nextLine();
os.write(msg.getBytes());
//3.从连接中取出输入流并接收回话
InputStream is = s.getInputStream();
byte[] b = new byte[1024];
int read = is.read(b);
System.out.println("老板说:" + new String(b, 0, read).trim());
//4.关闭
s.close();
}
}
}
参考文章
https://mp.weixin.qq.com/s/VdyXDBevE48Wtr95ug_aKw
二.线程模型
目前存在的线程模型有传统阻塞 I/O 服务模型,Reactor 模式
根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现
• 单 Reactor 单线程;
• 单 Reactor 多线程;
• 主从 Reactor 多线程
1.传统阻塞 I/O 服务模型
2.Reactor 模式
针对传统阻塞 I/O 服务模型的 2个缺点,解决方案:
基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等
待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应
用程序,线程从阻塞状态返回,开始进行业务处理
Reactor 对应的叫法: 1. 反应器模式 2. 分发者模式(Dispatcher) 3. 通知者模式(notifier)
基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
I/O 复用结合线程池,就是Reactor 模式基本设计思想:
- Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动)
- 务器端程序处理传入的多个请求, 并将它们同步分派到相应的处理线程, 因此Reactor模式也叫Dispatcher模式
- Reactor 模式使用IO复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键
Reactor 模式中 核心组成
- Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。 它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;
- Handlers:处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作
Reactor 模式分类:
- 单 Reactor 单线程
- 单 Reactor 多线程
- 主从 Reactor 多线程
3.单 Reactor 单线程
- Select 是前面 I/O 复用模型介绍的标准网络编程 API,可以实现应用程序通过一个阻塞对象监听多路连接请求
- Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发
- 如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个Handler 对象处理连接完成后的后续业务处理
- 如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响
优点:
- 模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
缺点:
- 性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。
- Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
- 可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
4.单Reactor多线程
- Reactor 对象通过select 监控客户端请求事件, 收到事件后,通过dispatch进行分发
- 如果建立连接请求, 则右Acceptor 通过accept 处理连接请求, 然后创建一个Handler对象处理完成连接后的各种事件
- 如果不是连接请求,则由reactor分发调用连接对应的handler 来处理
- handler 只负责响应事件,不做具体的业务处理, 通过read 读取数据后,会分发给后面的worker线程池的某个线程处理业务
- worker 线程池会分配独立线程完成真正的业务,并将结果返回给handler
- handler收到响应后,通过send 将结果返回给client
优点:
可以充分的利用多核cpu 的处理能力
缺点:
多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈
5.主从 Reactor 多线程
- Reactor主线程 MainReactor 对象通过select 监听连接事件, 收到事件后,通过Acceptor 处理连接事件
- 当 Acceptor 处理连接事件后,MainReactor 将连接分配给SubReactor
- subreactor 将连接加入到连接队列进行监听,并创建handler进行各种事件处理
- 当有新事件发生时, subreactor 就会调用对应的handler处理
- handler 通过read 读取数据,分发给后面的worker 线程处理
- worker 线程池分配独立的worker 线程进行业务处理,并返回结果
- 主Reactor可以对应多个子Reactor
优点:
- 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
- 父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。
缺点:
- 编程复杂度较高
6.为什么用Netty
Netty 底层基于 JDK 的 NIO,我们为什么不直接基于 JDK 的 NIO 或者其他NIO框架?
- 使用 JDK 自带的 NIO 需要了解太多的概念,编程复杂。
- Netty 底层 IO 模型随意切换,而这一切只需要做微小的改动。
- Netty自带的拆包解包,异常检测等机制让我们从 NIO 的繁重细节中脱离出来,只需关心业务逻辑即可。
- Netty解决了JDK 的很多包括空轮询在内的 Bug。
- Netty底层对线程,Selector 做了很多细小的优化,精心设计的 Reactor 线程做到非常高效的并发处理。
- 自带各种协议栈,让我们处理任何一种通用协议都几乎不用亲自动手。
- Netty社区活跃,遇到问题随时邮件列表或者 issue。
- Netty已经历各大RPC框架(Dubbo),消息中间件(RocketMQ),大数据通信(Hadoop)框架的广泛的线上验证,健壮性无比强大。
三.Netty模型
1.简单版
Netty 主要基于主从 Reactors 多线程模型(如图)做了一定的改进,其中主从 Reactor 多线程模型有多个 Reacto
- BossGroup 线程维护Selector , 只关注Accecpt
- 当接收到Accept事件,获取到对应的SocketChannel, 封装成 NIOScoketChannel并注册到Worker 线程(事件循环), 并进行维护
- 当Worker线程监听到selector 中通道发生自己感兴趣的事件后,就进行处理(就由handler), 注意handler 已经加入到通道
2.Netty工作原理
Netty 主要基于主从Reactors 多线程模型(如图)做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor
工作原理:
- Netty抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写
- BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup
- NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是 NioEventLoop
- NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个NioEventLoop 都有一个selector , 用于监听绑定在其上的socket的网络通讯
- NioEventLoopGroup 可以有多个线程, 即可以含有多个NioEventLoop
- 每个Boss NioEventLoop 循环执行的步骤有3步
- 轮询accept 事件
- 处理accept 事件 , 与client建立连接 , 生成NioScocketChannel , 并将其注册到某个worker NIOEventLoop 上的 selector
- 处理任务队列的任务 , 即 runAllTasks
- 每个 Worker NIOEventLoop 循环执行的步骤
- 轮询read, write 事件
- 处理i/o事件, 即read , write 事件,在对应NioScocketChannel 处理
- 处理任务队列的任务 , 即 runAllTasks
- 每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道), pipeline
3.TCP服务
服务端
public class MyServer {
public static void main(String[] args) throws InterruptedException {
//创建两个线程组 可设置线程数
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//服务端配置
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道实现
//.option(ChannelOption.SO_BACKLOG,128)//设置服务器 socket 的最大排队连接数,也就是 TCP 连接请求的队列大小。当服务器的处理能力不足以及连接请求过多时,新的连接请求就会被放在队列中等待处理。
//.childOption(ChannelOption.SO_KEEPALIVE,true)//Socket参数,连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。
.childHandler(new MyServerInitializer());//自定义一个通道初始化对象
//绑定一个端口并生成一个ChannelFuture对象
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
//给我们的workerGroup 的eventLoop对应的管道设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline设置处理器
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyServerHandler());
}
}
//自定义handler
public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
//处理异常 关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
//channelHandlerContext 上下文对象 含有管道pipeline 通道channel 地址等
//msg: 客户端发送的数据
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
//将buffer转成字符串
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("服务器接收到数据 "+message);
System.out.println("服务器接收到消息是="+(++this.count));
//服务端回送数据给客户端,回送一个随机id 创建一个新的 ByteBuf 对象。该方法的作用是将给定的数据复制到新的 ByteBuf 对象中,并返回该对象的引用。
ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", Charset.forName("utf-8"));
channelHandlerContext.writeAndFlush(responseByteBuf);
}
}
客户端
public class MyClient {
public static void main(String[] args) throws InterruptedException {
//客户端事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//客户端配置启动对象
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道的实现类
.handler(new MyClientInitializer());
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyClientHandler());
}
}
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客户端发送10条数据 hello,server 编号
for(int i= 0; i< 10; ++i) {
ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
ctx.writeAndFlush(buffer);
}
}
//当通道有读取事件时会触发该方法
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("客户端接收到消息=" + message);
System.out.println("客户端接收消息数量=" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
4.异步模型
- 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
- Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个ChannelFuture。
- 调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果
- Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过Future去监控方法 fun 的处理过程(即 : Future-Listener 机制)
Future 说明
- 表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,比如检索计算等
- ChannelFuture 是一个接口 : public interface ChannelFuture extends Future
我们可以添加监听器,当监听的事件发生时,就会通知到监听器.
工作原理示意图
- 在使用 Netty 进行编程时,拦截操作和转换出入站数据只需要您提供 callback 或利用future 即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。
- Netty 框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来
Future-Listener 机制
- 当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
- 通过 isDone 方法来判断当前操作是否完成;
- 通过 isSuccess 方法来判断已完成的当前操作是否成功;
- 通过 getCause 方法来获取已完成的当前操作失败的原因;
- 通过 isCancelled 方法来判断已完成的当前操作是否被取消;
- 通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则通知指定的监听器
监听绑定端口是异步操作,绑定操作完成后,将会调用相应的监听器处理逻辑
5.HTTP服务开发
public class TestServer {
public static void main(String[] args) throws InterruptedException {
//两个线程池 其中bossGroup负责接收客户端的连接请求,workerGroup 负责处理客户端发来的数据
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//启动服务器的类。
ServerBootstrap serverBootstrap = new ServerBootstrap();
//通过group方法将bossGroup和workerGroup设置给serverBootstrap,表示使用这两个线程池来处理服务器的请求。
//使用 channel 方法指定了服务器的通信模式,这里是基于 NIO 的 NioServerSocketChannel。
//每个 Channel 对象都包含了一个 ChannelPipeline 对象,用于管理和处理 Channel 对象的事件和消息
//使用 childHandler 方法为服务器的 ChannelPipeline 添加了一个TestServerInitializer,这个类是自定义的一个初始化器,用于在客户端连接建立后,添加一些自定义的业务处理器到ChannelPipeline中。
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
channelFuture.channel().closeFuture().sync();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()){
System.out.println("监听端口6668成功");
} else {
System.out.println("监听端口6668失败");
}
}
});
} finally {
//关闭线程池
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
//在Channel被创建后,将一个或多个自定义的ChannelHandler添加到 ChannelPipeline 中,用于处理客户端的请求和响应。
//用于初始化每个新的 Channel 对象的 ChannelPipeline,它的作用类似于在服务器启动时为每个客户端连接创建一个独立的处理器。
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//向管道加入处理器
//得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
//加入一个netty 提供的httpServerCodec codec =>[coder - decoder]
//HttpServerCodec 说明
//1. HttpServerCodec 是netty 提供的处理http的 编-解码器
pipeline.addLast("MyHttpServerCodec",new HttpServerCodec());
//2. 增加一个自定义的handler
pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
System.out.println("ok~~~~");
}
}
/*
说明
1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
2. HttpObject 客户端和服务器端相互通讯的数据被封装成 HttpObject
*/
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
//channelRead0 读取客户端数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println("对应的channel=" + ctx.channel() + " pipeline=" + ctx
.pipeline() + " 通过pipeline获取channel" + ctx.pipeline().channel());
System.out.println("当前ctx的handler=" + ctx.handler());
//判断 msg 是不是 httprequest请求
if(msg instanceof HttpRequest) {
System.out.println("ctx 类型="+ctx.getClass());
System.out.println("pipeline hashcode" + ctx.pipeline().hashCode() + " TestHttpServerHandler hash=" + this.hashCode());
System.out.println("msg 类型=" + msg.getClass());
System.out.println("客户端地址" + ctx.channel().remoteAddress());
//获取到
HttpRequest httpRequest = (HttpRequest) msg;
//获取uri, 过滤指定的资源
URI uri = new URI(httpRequest.uri());
if("/favicon.ico".equals(uri.getPath())) {
System.out.println("请求了 favicon.ico, 不做响应");
return;
}
//回复信息给浏览器 [http协议]
ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器", CharsetUtil.UTF_8);
//构造一个http的相应,即 httpresponse
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
//将构建好 response返回
ctx.writeAndFlush(response);
}
}
}
四.Netty常用组件
1.Bootstrap、ServerBootstrap
一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类
常用方法
• public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),该方法用于服务器端,用来设置两个EventLoop(事件循环组)
• public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop
• public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现
• public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置,如果是在客户端,因为是Bootstrap,只会有option而没有childOption,所以设置的是客户端Channel的选项。
• public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通道添加配置
• public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的 handler)
• public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号
• public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连接服务器
2.Future、ChannelFuture
Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
常见的方法
• Channel channel(),返回当前正在进行 IO 操作的通道
• ChannelFuture sync(),等待异步操作执行完毕
3.Channel
Channel是连接ByteBuf和Event的桥梁,netty中的Channel提供了统一的API,通过这种统一的API,netty可以轻松的对接多种传输类型,如OIO,NIO等。
channel注册到eventLoop多路复用器上的,通过eventLoop()可以获取当前channel所注册的eventLoop;当然eventLoop除了处理IO操作还可以执行定时任务和自定义的NIOTask。
Channel通过ChannelPipeline中的多个Handler处理器,Channel使用它处理IO数据。
- Netty 网络通信的组件,能够用于执行网络 I/O 操作。
- 通过Channel 可获得当前网络连接的通道的状态
- 通过Channel 可获得 网络连接的配置参数 (例如接收缓冲区大小)
- Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成
- 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以I/O 操作成功、失败或取消时回调通知调用方
- 支持关联 I/O 操作与对应的处理程序
- 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,常用的Channel 类型:
4.Selector
- Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
- 当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel
5.ChannelHandler
- ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其ChannelPipeline(业务处理链)中的下一个处理程序。
- ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类
- ChannelHandler 及其实现类一览图
- 我们经常需要自定义一个 Handler 类去继承ChannelInboundHandlerAdapter,然后通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
public ChannelInboundHandlerAdapter() { }
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
//通道就绪事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
//通道读取数据事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
6.Pipeline 和 ChannelPipeline
-
ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者outbound 的事件和操作,相当于一个贯穿 Netty 的链。(也可以这样理解:ChannelPipeline 是 保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站事件和出站操作)
-
ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的ChannelHandler
-
在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:
-
常用方法
• ChannelPipeline addFirst(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的第一个位置
• ChannelPipeline addLast(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的最后一个位置
7.ChannelHandlerContext
-
保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象
-
即ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler ,同时ChannelHandlerContext 中也绑定了对应的 pipeline 和 Channel 的信息,方便对 ChannelHandler进行调用.
-
常用方法
• ChannelFuture close(),关闭通道
• ChannelOutboundInvoker flush(),刷新
• ChannelFuture writeAndFlush(Object msg) , 将 数 据 写 到 ChannelPipeline 中 当 前
• ChannelHandler 的下一个 ChannelHandler 开始处理(出站
8.ChannelOption
- Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。
- ChannelOption 参数如下
9.EventLoopGroup 和其实现类 NioEventLoopGroup
-
EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
-
EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个EventLoop来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup
-
通常一个服务端口即一个 ServerSocketChannel对应一个Selector 和一个EventLoop线程。BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给WorkerEventLoopGroup 来进行 IO 处理,如下图所示
-
常用方法
• public NioEventLoopGroup(),构造方法
• public Future<?> shutdownGracefully(),断开连接,关闭线程
10.Unpooled类
- Netty 提供一个专门用来操作缓冲区(即Netty的数据容器)的工具类
- 常用方法如下所示
//通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 但有区别)
public static ByteBuf copiedBuffer(CharSequence string, Charset charset)
11.常用参数
通用参数
CONNECT_TIMEOUT_MILLIS : Netty参数,连接超时毫秒数,默认值30000毫秒即30秒。
MAX_MESSAGES_PER_READ : Netty参数,一次Loop读取的最大消息数,对于ServerChannel或者NioByteChannel,默认值为16,其他Channel默认值为1。默认值这样设置,是因为:ServerChannel需要接受足够多的连接,保证大吞吐量,NioByteChannel可以减少不必要的系统调用select。
WRITE_SPIN_COUNT : Netty参数,一个Loop写操作执行的最大次数,默认值为16。也就是说,对于大数据量的写操作至多进行16次,如果16次仍没有全部写完数据,此时会提交一个新的写任务给EventLoop,任务将在下次调度继续执行。这样,其他的写请求才能被响应不会因为单个大数据量写请求而耽误。
ALLOCATOR : Netty参数,ByteBuf的分配器,默认值为ByteBufAllocator.DEFAULT,4.0版本为UnpooledByteBufAllocator,4.1版本为PooledByteBufAllocator。该值也可以使用系统参数io.netty.allocator.type配置,使用字符串值:"unpooled","pooled"。
RCVBUF_ALLOCATOR : Netty参数,用于Channel分配接受Buffer的分配器,默认值为AdaptiveRecvByteBufAllocator.DEFAULT,是一个自适应的接受缓冲区分配器,能根据接受到的数据自动调节大小。可选值为FixedRecvByteBufAllocator,固定大小的接受缓冲区分配器。
AUTO_READ : Netty参数,自动读取,默认值为True。Netty只在必要的时候才设置关心相应的I/O事件。对于读操作,需要调用channel.read()设置关心的I/O事件为OP_READ,这样若有数据到达才能读取以供用户处理。该值为True时,每次读操作完毕后会自动调用channel.read(),从而有数据到达便能读取;否则,需要用户手动调用channel.read()。需要注意的是:当调用config.setAutoRead(boolean)方法时,如果状态由false变为true,将会调用channel.read()方法读取数据;由true变为false,将调用config.autoReadCleared()方法终止数据读取。
WRITE_BUFFER_HIGH_WATER_MARK : Netty参数,写高水位标记,默认值64KB。如果Netty的写缓冲区中的字节超过该值,Channel的isWritable()返回False。
WRITE_BUFFER_LOW_WATER_MARK : Netty参数,写低水位标记,默认值32KB。当Netty的写缓冲区中的字节超过高水位之后若下降到低水位,则Channel的isWritable()返回True。写高低水位标记使用户可以控制写入数据速度,从而实现流量控制。推荐做法是:每次调用channl.write(msg)方法首先调用channel.isWritable()判断是否可写。
MESSAGE_SIZE_ESTIMATOR : Netty参数,消息大小估算器,默认为DefaultMessageSizeEstimator.DEFAULT。估算ByteBuf、ByteBufHolder和FileRegion的大小,其中ByteBuf和ByteBufHolder为实际大小,FileRegion估算值为0。该值估算的字节数在计算水位时使用,FileRegion为0可知FileRegion不影响高低水位。
SINGLE_EVENTEXECUTOR_PER_GROUP : Netty参数,单线程执行ChannelPipeline中的事件,默认值为True。该值控制执行ChannelPipeline中执行ChannelHandler的线程。如果为Trye,整个pipeline由一个线程执行,这样不需要进行线程切换以及线程同步,是Netty4的推荐做法;如果为False,ChannelHandler中的处理过程会由Group中的不同线程执行。
SocketChannel参数
SO_RCVBUF : Socket参数,TCP数据接收缓冲区大小。该缓冲区即TCP接收滑动窗口,linux操作系统可使用命令:cat /proc/sys/net/ipv4/tcp_rmem查询其大小。一般情况下,该值可由用户在任意时刻设置,但当设置值超过64KB时,需要在连接到远端之前设置。
SO_SNDBUF : Socket参数,TCP数据发送缓冲区大小。该缓冲区即TCP发送滑动窗口,linux操作系统可使用命令:cat /proc/sys/net/ipv4/tcp_smem查询其大小。
TCP_NODELAY : TCP参数,立即发送数据,默认值为Ture(Netty默认为True而操作系统默认为False)。该值设置Nagle算法的启用,改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量,如果需要发送一些较小的报文,则需要禁用该算法。Netty默认禁用该算法,从而最小化报文传输延时。
SO_KEEPALIVE : Socket参数,连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。
SO_REUSEADDR : Socket参数,地址复用,默认值False。有四种情况可以使用:(1).当有一个有相同本地地址和端口的socket1处于TIME_WAIT状态时,而你希望启动的程序的socket2要占用该地址和端口,比如重启服务且保持先前端口。(2).有多块网卡或用IP Alias技术的机器在同一端口启动多个进程,但每个进程绑定的本地IP地址不能相同。(3).单个进程绑定相同的端口到多个socket上,但每个socket绑定的ip地址不同。(4).完全相同的地址和端口的重复绑定。但这只用于UDP的多播,不用于TCP。
SO_LINGER : Socket参数,关闭Socket的延迟时间,默认值为-1,表示禁用该功能。-1表示socket.close()方法立即返回,但OS底层会将发送缓冲区全部发送到对端。0表示socket.close()方法立即返回,OS放弃发送缓冲区的数据直接向对端发送RST包,对端收到复位错误。非0整数值表示调用socket.close()方法的线程被阻塞直到延迟时间到或发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。
IP_TOS : IP参数,设置IP头部的Type-of-Service字段,用于描述IP包的优先级和QoS选项。
ALLOW_HALF_CLOSURE : Netty参数,一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,连接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,事件为ChannelInputShutdownEvent。
ServerSocketChannel参数
SO_RCVBUF : 已说明,需要注意的是:当设置值超过64KB时,需要在绑定到本地端口前设置。该值设置的是由ServerSocketChannel使用accept接受的SocketChannel的接收缓冲区。
SO_REUSEADDR : 已说明
SO_BACKLOG : Socket参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows为200,其他为128。
DatagramChannel参数
SO_BROADCAST: Socket参数,设置广播模式。
SO_RCVBUF: 已说明
SO_SNDBUF:已说明
SO_REUSEADDR:已说明
IP_MULTICAST_LOOP_DISABLED: 对应IP参数IP_MULTICAST_LOOP,设置本地回环接口的多播功能。由于IP_MULTICAST_LOOP返回True表示关闭,所以Netty加上后缀_DISABLED防止歧义。
IP_MULTICAST_ADDR: 对应IP参数IP_MULTICAST_IF,设置对应地址的网卡为多播模式。
IP_MULTICAST_IF: 对应IP参数IP_MULTICAST_IF2,同上但支持IPV6。
IP_MULTICAST_TTL: IP参数,多播数据报的time-to-live即存活跳数。
IP_TOS: 已说明
DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION: Netty参数,DatagramChannel注册的EventLoop即表示已激活。
12.编码和解码
编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码。codec(编解码器) 的组成部分有两个:decoder(解码器)和 encoder(编码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成业务数据
Netty 本身的编码解码的机制和问题分析
- Netty 自身提供了一些 codec(编解码器)
- Netty 提供的编码器
• StringEncoder,对字符串数据进行编码
• ObjectEncoder,对 Java 对象进行编码 - Netty 提供的解码器
• StringDecoder, 对字符串数据进行解码
• ObjectDecoder,对 Java 对象进行解码 - Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象
的编码和解码,底层使用的仍是 Java 序列化技术 , 而Java 序列化技术本身效率就不高,存在如下问题
• 无法跨语言
• 序列化后的体积太大,是二进制编码的 5 倍多。
• 序列化性能太低
当Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会
被解码:从字节转换为另一种格式(比如java对象);如果是出站消息,它会
被编码成字节。
Netty提供一系列实用的编解码器,他们都实现了ChannelInboundHadnler或者
ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了。
以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,
它将调用由解码器所提供的decode()方法进行解码,并将已经解码的字节转发
给ChannelPipeline中的下一个ChannelInboundHandler。
解码器-ByteToMessageDecoder
由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理
eg:
这个例子,每次入站从ByteBuf中读取4字节,将其解码为一个int,然后将它添加到下一个List中。当没有更多元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。int在被添加到List中时,会被自动装箱为Integer。在调用readInt()方法前必须验证所输入的ByteBuf是否具有足够的数据
解码器-ReplayingDecoder
- public abstract class ReplayingDecoder< S> extends ByteToMessageDecoder
- ReplayingDecoder扩展了ByteToMessageDecoder类,使用这个类,我们不必调用readableBytes()方法。参数S指定了用户状态管理的类型,其中Void代表不需要状态管理
- 应用实例:使用ReplayingDecoder 编写解码器,对前面的案例进行简化
- ReplayingDecoder使用方便,但它也有一些局限性:
• 并不是所有的 ByteBuf 操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException。
• ReplayingDecoder 在某些情况下可能稍慢于 ByteToMessageDecoder,例如网络缓慢并且消息格
式复杂时,消息会被拆成了多个碎片,速度变慢
13.ChannelOutboundHandler和ChannelInboundHadnler
ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的
ChannelPipeline提供了ChannelHandler链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline中的一系列
ChannelOutboundHandler,并被这些Handler处理,反之则称为入站的
不论解码器handler 还是 编码器handler 即接收的消息类型必须与待处理的消息类型一致,
否则该handler不会被执行
eg:
编码器
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
//编码方法
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MyLongToByteEncoder encode 被调用"+ msg);
out.writeLong(msg);
}
}
解码器
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
*
* decode 会根据接收的数据,被调用多次, 直到确定没有新的元素被添加到list
* , 或者是ByteBuf 没有更多的可读字节为止
* 如果list out 不为空,就会将list的内容传递给下一个 channelinboundhandler处理, 该处理器的方法也会被调用多次
*
* @param ctx 上下文对象
* @param in 入站的 ByteBuf
* @param out List 集合,将解码后的数据传给下一个handler
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder 被调用");
//因为 long 8个字节, 需要判断有8个字节,才能读取一个long
if(in.readableBytes() >= 8) {
out.add(in.readLong());
}
}
}
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder2 被调用");
//在 ReplayingDecoder 不需要判断数据是否足够读取,内部会进行处理判断
out.add(in.readLong());
}
}
服务端
public class MyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();//一会下断点
//入站的handler进行解码 MyByteToLongDecoder
//pipeline.addLast(new MyByteToLongDecoder());
pipeline.addLast(new MyByteToLongDecoder2());
//出站的handler进行编码
pipeline.addLast(new MyLongToByteEncoder());
//自定义的handler 处理业务逻辑
pipeline.addLast(new MyInOutServerHandler());
System.out.println("xx");
}
});
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class MyInOutServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("server handlerAdded");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("从客户端" + ctx.channel().remoteAddress() + " 读取到long " + msg);
//给客户端发送一个long
ctx.writeAndFlush(98765L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端
public class MyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//加入一个出站的handler 对数据进行一个编码
pipeline.addLast(new MyLongToByteEncoder());
//这时一个入站的解码器(入站handler )
//pipeline.addLast(new MyByteToLongDecoder());
pipeline.addLast(new MyByteToLongDecoder2());
//加入一个自定义的handler , 处理业务
pipeline.addLast(new MyInOutClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
public class MyInOutClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器的ip=" + ctx.channel().remoteAddress());
System.out.println("收到服务器消息=" + msg);
}
//重写channelActive 发送数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 发送数据");
//ctx.writeAndFlush(Unpooled.copiedBuffer(""))
ctx.writeAndFlush(123456L); //发送的是一个long
//分析
//1. "abcdabcdabcdabcd" 是 16个字节
//2. 该处理器的前一个handler 是 MyLongToByteEncoder
//3. MyLongToByteEncoder 父类 MessageToByteEncoder
//4. 父类 MessageToByteEncoder
/*
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) { //判断当前msg 是不是应该处理的类型,如果是就处理,不是就跳过encode
@SuppressWarnings("unchecked")
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect);
try {
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
}
4. 因此我们编写 Encoder 是要注意传入的数据类型和处理的数据类型一致
*/
// ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd",CharsetUtil.UTF_8));
}
}
14.TCP 粘包和拆包
-
TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)
都要有一一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发
给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合
并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于
分辨出完整的数据包了,因为面向流的通信是无消息保护边界的 -
由于TCP无消息保护边界, 需要在接收端处理消息边界问题,也就是我们所说的粘
包、拆包问题, 看一张图
假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:
- 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包
- 服务端一次接受到了两个数据包,D1和D2粘合在一起,称之为TCP粘包
- 服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称之为TCP拆包
- 服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。
TCP 粘包和拆包解决方案
- 使用自定义协议 + 编解码器 来解决
- 关键就是要解决 服务器端每次读取数据长度的问题, 这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的TCP 粘包、拆包 。
15.群聊
服务端
public class GroupChatServer {
private int port;//监听的端口
public GroupChatServer(int port){
this.port = port;
}
//编写run方法 处理客户端请求
public void run() throws Exception{
//创建两个线程组
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.option(ChannelOption.SO_BACKLOG,128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("encoder",new StringEncoder());
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("服务器启动");
ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatServer(7002).run();
}
}
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//一个新的 ChannelHandler 被添加到 ChannelPipeline 中时被调用
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("handlerAdded [客户端]"+channel.remoteAddress()+"加入聊天 "+sdf.format(new Date())+"\n");
channelGroup.add(channel);
}
//ChannelHandler 从 ChannelPipeline 中被移除时被调用
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("handlerRemoved[客户端]"+channel.remoteAddress()+"离开聊天 "+sdf.format(new Date())+"\n");
System.out.println("channelGroup size " + channelGroup.size());
}
//在一个 Channel 被激活并且准备好进行数据传输时被调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "channelActive上线了~~~");
}
//在一个 Channel 失去连接并关闭时被调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "channelInactive离线了~~~");
}
//在一个 Channel 接收到数据并读取完成后被调用
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
Channel channel = channelHandlerContext.channel();
channelGroup.forEach(ch -> {
if (channel!=ch){
ch.writeAndFlush("channelRead0【客户】"+channel.remoteAddress() +" 发送了消息"+ msg+"\n");
} else {
ch.writeAndFlush("channelRead0[自己]发送了消息"+msg+"\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端
public class GroupChatClient {
private final String host;
private final int port;
public GroupChatClient(String host,int port){
this.host = host;
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup group =new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
System.out.println("----------------"+channel.localAddress()+"----------------");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String msg = scanner.nextLine();
channel.writeAndFlush(msg + "\r\n");
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new GroupChatClient("127.0.0.1",7002).run();
}
}
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println(msg.trim());
}
}
16.心跳
心跳是在TCP长连接中,客户端和服务端定时向对方发送数据包通知对方自己还在线,保证连接的有效性的一种机制。在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互. 自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性.
服务端
public class MyServer {
public static void main(String[] args) throws InterruptedException {
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//readerIdleTime:读空闲时间,即在这段时间内如果没有数据读入,则会触发 userEventTriggered 方法;
//writerIdleTime:写空闲时间,即在这段时间内如果没有数据发送,则会触发 userEventTriggered 方法;
//allIdleTime:读写空闲时间,即在这段时间内如果既没有数据读入也没有数据发送,则会触发 userEventTriggered 方法。
pipeline.addLast(new IdleStateHandler(10,20,30, TimeUnit.SECONDS));
//加入一个对空闲检测进一步处理的handler(自定义)
pipeline.addLast(new MyHeartServerHandler());
}
});
//启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(7007).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class MyHeartServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg======="+msg);
}
/**
*
* @param ctx 上下文
* @param evt 事件
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent) {
//将 evt 向下转型 IdleStateEvent
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "--超时时间--" + eventType);
System.out.println("服务器做相应处理..");
//如果发生空闲,我们关闭通道
// ctx.channel().close();
}
}
}
客户端
public class TestHeartBeat {
public static void main(String[] args) throws InterruptedException {
//客户端事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//客户端配置启动对象
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道的实现类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyHeartClientHandler());
}
});
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect("localhost", 7007).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class MyHeartClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("=============== channelActive");
ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("=============== channelRead");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("=============== handlerAdded");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("=============== handlerRemoved");
}
}
17.websocket长连接
服务端
public class MyServer {
public static void main(String[] args) throws InterruptedException {
//创建两个线程组 可设置线程数
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();//8个
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//因为基于http协议,使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
//是已块方式写的 添加ChunkedWriteHandler
pipeline.addLast(new ChunkedWriteHandler());
/*
说明
1. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
2. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/*
说明
1. 对应websocket ,它的数据是以 帧(frame) 形式传递
2. 可以看到WebSocketFrame 下面有六个子类
3. 浏览器请求时 ws://localhost:7000/hello 表示请求的uri
4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
5. 是通过一个 状态码 101
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello2"));
//自定义handler 处理业务逻辑
pipeline.addLast(new MyTextWebSocketFrameHandler());
}
});
// 启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
//这里 TextWebSocketFrame 类型,表示一个文本帧(frame)
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
System.out.println("接收到服务器消息"+textWebSocketFrame.text());
//回复消息
channelHandlerContext.writeAndFlush(new TextWebSocketFrame("服务器时间:"+ LocalDateTime.now()+" "+textWebSocketFrame.text()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught被调用,发送异常:"+cause.getMessage());
}
//当客户端连接之后 触发该方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一
System.out.println("handlerAdded被调用"+ctx.channel().id().asLongText());
System.out.println("handlerAdded被调用22"+ctx.channel().id().asShortText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved被调用"+ctx.channel().id().asLongText());
}
}
web
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
//判断当前浏览器是否支持websocket
if(window.WebSocket) {
//go on
socket = new WebSocket("ws://localhost:7000/hello2");
//相当于channelReado, ev 收到服务器端回送的消息
socket.onmessage = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
}
//相当于连接开启(感知到连接开启)
socket.onopen = function (ev) {
var rt = document.getElementById("responseText");
rt.value = "连接开启了.."
}
//相当于连接关闭(感知到连接关闭)
socket.onclose = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "连接关闭了.."
}
} else {
alert("当前浏览器不支持websocket")
}
//发送消息到服务器
function send(message) {
if(!window.socket) { //先判断socket是否创建好
return;
}
if(socket.readyState == WebSocket.OPEN) {
//通过socket 发送消息
socket.send(message)
} else {
alert("连接没有开启");
}
}
</script>
<form onsubmit="return false">
<textarea name="message" style="height: 300px; width: 300px"></textarea>
<input type="button" value="发生消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height: 300px; width: 300px"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>
五.实战
1.堆外内存泄露
某天早上,突然收到告警,Nginx 服务端出现大量5xx。我们使用 Nginx 作为服务端 WebSocket 的七层负载,5xx的爆发通常表明服务端不可用。
- 排查part1:log4j2 狂打日志导致 Netty 的 NIO 线程阻塞
因为线程被大量阻塞,我们首先想到的是定位哪些线程被阻塞,最后查出来是 log4j2 狂打日志导致 Netty 的 NIO 线程阻塞。NIO 线程阻塞之后,因我们的服务器无法处理客户端的请求,所以对Nginx来说就是5xx。
查看了 log4j2 的配置文件。
我们发现打印到控制台的这个 Appender 忘记注释掉了,所以初步猜测:因为这个项目打印的日志过多,而 log4j2 打印到控制台是同步阻塞打印的,所以就导致了这个问题。那么接下来,我们把线上所有机器的这行注释掉,本以为会“大功告成”,但没想到仅仅过了几天,5xx告警又来“敲门”。看来,这个问题并没我们最初想象的那么简单。
- 排查part2:可疑日志浮现
在极短的时间内,狂打 failed to allocate 64(bytes) of direct memory
定位OOM源,PlatformDependent.incrementMemory()
原来,堆外内存是否够用,是 Netty 这边自己统计的,那么是不是可以找到统计代码,找到统计代码之后我们就可以看到 Netty 里面的堆外内存统计逻辑了?于是,接下来翻翻代码,找到这段逻辑,就在 PlatformDependent 这个类里面。
这个地方,是一个对已使用堆外内存计数的操作,计数器为 DIRECT_MEMORY_COUNTER,如果发现已使用内存大于堆外内存的上限(用户自行指定),就抛出一个自定义 OOM Error,异常里面的文本内容正是我们在日志里面看到的。
接下来,就验证一下这个方法是否是在堆外内存分配的时候被调用。
果然,在 Netty 每次分配堆外内存之前,都会计数。
- 排查part3:反射进行堆外内存监控
确认堆外内存已快超过上限,并且已经知道 Netty 底层是使用的哪个字段来统计。那么接下来要做的第一件事情,就是反射拿到这个字段,然后我们自己统计 Netty 使用堆外内存的情况。
堆外内存统计字段是 DIRECT_MEMORY_COUNTER,我们可以通过反射拿到这个字段,然后定期 Check 这个值,就可以监控 Netty 堆外内存的增长情况。
于是我们通过反射拿到这个字段,然后每隔一秒打印,为什么要这样做?
因为,通过我们前面的分析,在爆发大量 OOM 现象之前,没有任何可疑的现象。那么只有两种情况,一种是突然某个瞬间分配了大量的堆外内存导致OOM;一种是堆外内存缓慢增长,到达某个点之后,最后一根稻草将机器压垮。在这段代码加上去之后,我们打包上线。
- 排查part4:到底是缓慢增长还是瞬间飙升?
代码上线之后,初始内存为 16384k(16M),这是因为线上我们使用了池化堆外内存,默认一个 Chunk 为16M,这里不必过于纠结。
但是没过一会,内存就开始缓慢飙升,并且没有释放的迹象,二十几分钟之后,内存使用情况如下:
走到这里,我们猜测可能是前面提到的第二种情况,也就是内存缓慢增长造成的 OOM,由于内存实在增长太慢,于是调整机器负载权重为其他机器的两倍,但是仍然是以数K级别在持续增长。那天刚好是周五,索性就过一个周末再开看。
周末之后,我们到公司第一时间就连上了跳板机,登录线上机器,开始 tail -f 继续查看日志。在输完命令之后,怀着期待的心情重重的敲下了回车键:
果然不出所料,内存一直在缓慢增长,一个周末的时间,堆外内存已经飙到快一个 G 了。这个时候,我竟然想到了一句成语:“只要功夫深,铁杵磨成针”。虽然堆外内存以几个K的速度在缓慢增长,但是只要一直持续下去,总有把内存打爆的时候(线上堆外内存上限设置的是2G)。
此时,我们开始自问自答环节:内存为啥会缓慢增长,伴随着什么而增长?因为我们的应用是面向用户端的WebSocket,那么,会不会是每一次有用户进来,交互完之后离开,内存都会增长一些,然后不释放呢?带着这个疑问,我们开始了线下模拟过程。
- 排查part5:线下模拟
本地起好服务,把监控堆外内存的单位改为以B为单位(因为本地流量较小,打算一次一个客户端连接),另外,本地也使用非池化内存(内存数字较小,容易看出问题),在服务端启动之后,控制台打印信息如下:
在没有客户端接入的时候,堆外内存一直是0,在意料之中。接下来,怀着无比激动的心情,打开浏览器,然后输入网址,开始我们的模拟之旅。
我们的模拟流程是:新建一个客户端链接->断开链接->再新建一个客户端链接->再断开链接。
如上图所示,一次 Connect 和 Disconnect 为一次连接的建立与关闭,上图绿色框框的日志分别是两次连接的生命周期。我们可以看到,内存每次都是在连接被关闭的的时候暴涨 256B,然后也不释放。走到这里,问题进一步缩小,肯定是连接被关闭的时候,触发了框架的一个Bug,而且这个Bug在触发之前分配了 256B 的内存,随着Bug被触发,内存也没有释放。问题缩小之后,接下来开始“撸源码”,捉虫!
- 排查part6:线下排查
接下来,我们将本地服务重启,开始完整的线下排查过程。同时将目光定位到 Netty-Socketio 这个框架的 Disconnect 事件(客户端WebSocket连接关闭时会调用到这里),基本上可以确定,在 Disconnect 事件前后申请的内存并没有释放。
在使用 Idea Debug 时,要选择只挂起当前线程,这样我们在单步跟踪的时候,控制台仍然可以看到堆外内存统计线程在打印日志。
在客户端连接上之后然后关闭,断点进入到 onDisconnect 回调,我们特意在此多停留了一会,发现控制台内存并没有飙升(7B这个内存暂时没有去分析,只需要知道,客户端连接断开之后,我们断点hold住,内存还未开始涨)。接下来,神奇的一幕出现了,我们将断点放开,让程序跑完:
Debug 松掉之后,内存立马飙升了!!此时,我们已经知道,这只“臭虫”飞不了多远了。在 Debug 时,挂起的是当前线程,那么肯定是当前线程某个地方申请了堆外内存,然后没有释放,继续“快马加鞭“,深入源码。
其实,每一次单步调试,我们都会观察控制台的内存飙升的情况。很快,我们来到了这个地方:
在这一行没执行之前,控制台的内存依然是 263B。然后,当执行完该行之后,立刻从 263B涨到519B(涨了256B)。
于是,Bug 范围进一步缩小。我们将本次程序跑完,释然后客户端再来一次连接,断点打在 client.send() 这行, 然后关闭客户端连接,之后直接进入到这个方法,随后的过程有点长,因为与 Netty 的时间传播机制有关,这里就省略了。最后,我们跟踪到了如下代码,handleWebSocket:
在这个地方,我们看到一处非常可疑的地方,在上图的断点上一行,调用 encoder 分配了一段内存,调用完之后,我们的控制台立马就彪了 256B。所以,我们怀疑肯定是这里申请的内存没有释放,它这里接下来调用 encoder.encodePacket() 方法,猜想是把数据包的内容以二进制的方式写到这段 256B的内存。接下来,我们追踪到这段 Encode 代码,单步执行之后,就定位到这行代码:
这段代码是把 Packet 里面一个字段的值转换为一个 Char。然而,当我们使用 Idea 预执行的时候,却抛出类一个愤怒的 NPE!!也就是说,框架申请到一段内存之后,在 Encoder 的时候,自己 GG 了,还给自己挖了个NPE的深坑,最后导致内存无法释放(最外层有堆外内存释放逻辑,现在无法执行到了)。而且越攒越多,直到被“最后一根稻草”压垮,堆外内存就这样爆了。这里的源码,有兴趣的读者可以自己去分析一下,限于篇幅原因,这里就不再展开叙述了。
- 排查part7:Bug解决
既然 Bug 已经找到,接下来就要解决问题了。这里只需要解决这个NPE异常,就可以 Fix 掉。我们的目标就是,让这个 subType 字段不为空。于是我们先通过 Idea 的线程调用栈,定位到这个 Packet 是在哪个地方定义的:
我们找到 Idea 的 Debugger 面板,眼睛盯着 Packet 这个对象不放,然后上线移动光标,便光速定位到。原来,定义 Packet 对象这个地方在我们前面的代码其实已经出现过,我们查看了一下 subType 这个字段,果然是 Null。接下来,解决 Bug 就很容易了。
我们给这个字段赋值即可,由于这里是连接关闭事件,所以我们给他指定了一个名为 DISCONNECT 的字段(可以改天深入去研究 Socket.IO 的协议),反正这个 Bug 是在连接关闭的时候触发的,就粗暴一点了 !
解决这个 Bug 的过程是:将这个框架的源码下载到本地,然后加上这一行,最后重新 Build一下,Pom 里改了一下名字,推送到我们公司的仓库。这样,项目就可以直接进行使用了。