网络编程中消息的长度是不太确定的,read方法读取字节数据到ByteBuffer中,ByteBuffer会有一个固定容量,单次超出容量的部分字节数据将会在下一次的ByteBuffer中,这样消息就会按照字节截断,出现消息边界问题。
Http 2.0 是LTV格式
Type类型、Length长度、Value数据。在类型和长度已知的情况下,就可以方便的获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果内容过大,则影响server的吞吐量。
/**
* 服务端
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
//##创建一个Selector,管理多个Channel(比如:ServerSocketChannel,SocketChannel)
Selector selector = Selector.open();
//创建一个服务器
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//****ServerSocketChannel配置为非阻塞-默认是阻塞true,可以让accept方法变成非阻塞
serverSocketChannel.configureBlocking(false);
//## 将ServerSocketChannel注册到Selector
//## 返回值selectionKey,将来事件发生后,可以知道事件和发生事件的channel
//## 事件有:accept,connect, read, write,
//## ServerSocketChannel -> accept事件 在服务端,会在客户端发起连接请求时触发
//## connect事件,在客户端,当客户端与服务端建立连接以后,客户端触发
//## SocketChannel -> read事件,可读事件,当客户端发送数据到Channel,服务端可以从channel读取数据
//## write事件,可写事件
//## 第二个参数0表示,不关注任何事件
SelectionKey serverSocketChannelSelectionKey = serverSocketChannel.register(selector, 0, null);
log.info("register key={}", serverSocketChannelSelectionKey);
//## 只关注accept事件
serverSocketChannelSelectionKey.interestOps(SelectionKey.OP_ACCEPT);
//给服务器绑定一个端口8000,让客户端来连接
serverSocketChannel.bind(new InetSocketAddress(8000));
while(true) { //保证可以多个客户端连接
//## select方法,没有事件发生时线程阻塞,有事件时候线程才能恢复运行
// select在事件未处理时,它不会阻塞。(下面不调用SocketChannel的accept方法,线程就不会阻塞)
// select在事件处理了(accept)或者取消(cancel)了才会阻塞。不能对事件置之不理,就会一直空转循环
selector.select();
//## 处理事件
//## 获取到内部包含了所有发生的事件
//## 这里的selectionKeys就包含了上面accept事件注册到Selector上返回的selectKey
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//因为要在遍历的时候会删除元素,所以采用迭代器遍历iterator
Iterator<SelectionKey> iterator = selectionKeys.iterator();
log.info("开始遍历所有的selectionKey");
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//拿到的selectionKey一定从selectionKeys集合中删除,否则就可能会出问题
//遍历的selectionKey处理完一定要删除
iterator.remove();
log.info("事件selectKey={}", selectionKey);
//判断事件类型
if(selectionKey.isAcceptable()) {//服务端accept事件
ServerSocketChannel channel = (ServerSocketChannel)selectionKey.channel();
log.info("事件关联的channel={}", channel);
//接受客户端的连接,事件处理accept
//如果上面不删除selectionKeys集合中的selectionKey,下次循环并没有客户端发起连接,accept方法返回null
SocketChannel accept = channel.accept();
log.info("accept={}", accept);
//将SocketChannel配置为非阻塞模式
accept.configureBlocking(false);
//将socketChannel注册到selector
//第三个参数attachment附件,专属于每个channel的ByteBuffer
//将ByteBuffer关联到selectionKey上
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(16);
SelectionKey acceptSelectKey = accept.register(selector, 0, byteBuffer);
//只关注read事件
acceptSelectKey.interestOps(SelectionKey.OP_READ);
}else if(selectionKey.isReadable()) {//read事件
try{
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
//获取channel专属的attachment->ByteBuffer,根据SelectionKey
ByteBuffer byteBuffer = (ByteBuffer)selectionKey.attachment();
int read = socketChannel.read(byteBuffer);
//-1表示客户端正常断开,客户端SocketChannel调用了close方法
if(read == -1) {
selectionKey.cancel();
}else {
// byteBuffer.flip();//读模式
// String resStr = StandardCharsets.UTF_8.decode(byteBuffer).toString();
// log.info("resStr={}", resStr);
split(byteBuffer);//这里会将原byteBuffer变成写模式
//需要扩容
if(byteBuffer.position() == byteBuffer.limit()) {
//创建一个新的byteBuffer其容量是原来的2倍
ByteBuffer increaseByteBuffer = ByteBuffer.allocateDirect(byteBuffer.capacity() * 2);
byteBuffer.flip();//切换为读模式,才能将byteBuffer中的数据读到
//将原来的byteBuffer的数据拷贝到新的ByteBuffer中
increaseByteBuffer.put(byteBuffer);
//用新的ByteBuffer替换原来的ByteBuffer
selectionKey.attach(increaseByteBuffer);
}
}
}catch (Exception e) {
e.printStackTrace();
//因为客户端断开了,会抛出Exception in thread "main" java.io.IOException: 远程主机强迫关闭了一个现有的连接。
//因此需要将key取消
//从selector的key集合中真正的删除key
selectionKey.cancel();
}
}
}
}
}
private static void split(ByteBuffer source) {
source.flip();//切换读模式
for (int i = 0; i < source.limit(); i++) {
if(source.get(i) == '\n') {
int length = i + 1 - source.position();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(length);
for (int j = 0; j < length; j++) {
byte b = source.get();
byteBuffer.put(b);
}
byteBuffer.flip();
String str = StandardCharsets.UTF_8.decode(byteBuffer).toString();
log.info("str={}", str);
}
}
source.compact();//切换写模式,接着上次读的位置接着写
}
/**
* 客户端
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
//连接服务端,地址localhost:8000
socketChannel.connect(new InetSocketAddress("localhost", 8000));
//将hello字符串->byte[]->ByteBuffer->socketChannel
socketChannel.write(StandardCharsets.UTF_8.encode("Hello world\nI'm zhangsan\nHow are you?\n"));
System.out.println("waiting...");
System.in.read();
}