需求:
1、编写一个NIO群聊系统,实现服务端和客户端之间数据简单通讯(非阻塞)
2、实现多人群聊
3、服务端:可以监测用户上线、离线、并实现消息转发功能。
4、客户端:通过channel可以无阻塞发送消息给其他所有用户,同时可以接受其他用户发送的消息。
5、目的:进一步理解NIO非阻塞网络编程机制
服务端代码:GroupChatServer.java
@Slf4j public class GroupChatServer { //选择器 private Selector selector; //监听器 private ServerSocketChannel serverSocketChannel; //端口号 private static final int PORT = 8000; //构造方法,初始化成员变量 public GroupChatServer(){ try { //1 创建监听器 serverSocketChannel = ServerSocketChannel.open(); //2 创建选择器 selector = Selector.open(); //3 绑定端口号 serverSocketChannel.socket().bind(new InetSocketAddress(PORT)); //4 设置非阻塞模式 serverSocketChannel.configureBlocking(false); //5 事件绑定 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); }catch (Exception e){ e.printStackTrace(); } } //监听 public void listen(){ log.info("监听的线程号是:{}",Thread.currentThread().getId()); try { //循环等待客户端连接 while(true){ int count = selector.select(); if(count > 0){ //表示有客户端连接 //遍历得到selectorKey Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); //判断事件监听类型 if(key.isAcceptable()){ /*//事件:连接 SocketChannel socketChannel = serverSocketChannel.accept(); //设置为非阻塞 socketChannel.configureBlocking(false); //把socketchannel的读取事件类型注册到选择器上 socketChannel.register(selector,SelectionKey.OP_READ); //提示用户上线 log.info("用户,{}",socketChannel.getRemoteAddress(),"已上线");*/ accept(serverSocketChannel); } if(key.isReadable()){ readClientMessage(key); } iterator.remove(); } }else{ log.info("等待客户端连接"); } } }catch (Exception e){ e.printStackTrace(); }finally { } } //客户端连接事件 public void accept(ServerSocketChannel serverSocketChannel) throws Exception{ //获取SocketChannel SocketChannel socketChannel = serverSocketChannel.accept(); //设置SocketChannel 为非阻塞模式 socketChannel.configureBlocking(false); //把读取事件绑定到选择器上 socketChannel.register(selector,SelectionKey.OP_READ); //提示用户上线 log.info("用户,{},已上线",socketChannel.getRemoteAddress()); } //读取客户端信息 public void readClientMessage(SelectionKey key){ SocketChannel socketChannel = null; try { //根据key获取SocketChannel socketChannel = (SocketChannel)key.channel(); //创建ByteBuffer ByteBuffer buffer = ByteBuffer.allocate(1024); //channel 读取 buffer int count = socketChannel.read(buffer); //根据count的值做处理 if(count > 0) { //把缓冲区的数据转换成字符串 String msg = new String(buffer.array()); //输出该消息 log.info("来自客户端:{}, 的消息是:{}",socketChannel.getRemoteAddress(),msg); //向其他客户端转发消息(排除自己) sendMessageToOtherClients(msg,socketChannel); } }catch (Exception e){ try { //提示离线 log.info("{}",socketChannel.getRemoteAddress(),":已下线"); //取消注册 key.cancel(); //关闭通道 socketChannel.close(); }catch (Exception e2){ e2.printStackTrace(); } } } public void sendMessageToOtherClients(String message,SocketChannel socketChannel) throws Exception{ log.info("消息转发中。。。"); log.info("服务器发送数据给客户端的线程是:{}",Thread.currentThread().getId()); //遍历所有注册到selector的SocketChannel,排除自己 for(SelectionKey key : selector.keys()){ //通过key取出对应的SocketChannel Channel targetChannel = key.channel(); //排除自己 if(targetChannel instanceof SocketChannel && targetChannel != socketChannel){ SocketChannel dest = (SocketChannel) targetChannel; //把消息存储到ByteBuffer ByteBuffer buffer = ByteBuffer.wrap(message.getBytes()); //把buffer的数据写入通道 dest.write(buffer); } } } public static void main(String[] args) { GroupChatServer server = new GroupChatServer(); server.listen(); } }
客户端 GroupChatClient.java
@Slf4j public class GroupChatClient { //服务器IP private static final String HOST = "127.0.0.1"; //服务器端口号 private static final int PORT = 8000; //选择器 private Selector selector; //SocketChannel private SocketChannel socketChannel; //用户名 private String username; //构造器 public GroupChatClient() throws Exception{ //获取selector selector = Selector.open(); //连接到服务器 socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT)); //设置为非阻塞 socketChannel.configureBlocking(false); //将channel的读事件注册到selector socketChannel.register(selector, SelectionKey.OP_READ); //初始化username username = socketChannel.getLocalAddress().toString().substring(1); log.info("{},客户端初始化完成",username); } //向服务器发送消息 public void sendMessage(String message){ message = username + " 说:" + message; try { socketChannel.write(ByteBuffer.wrap(message.getBytes())); //log.info("用户:{},说:{}",username,message); }catch (Exception e){ e.printStackTrace(); } } //读取从服务器获取的消息 public void readMessage(){ try { //获取socketChannel的通道数量 int count = selector.select(); //如果通道数量大于0,说明有可用的通道 if(count > 0){ //获取所有通道的迭代器 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); //循环判断 while(iterator.hasNext()){ //获取key SelectionKey key = iterator.next(); //如果事件类型是读取类型 if(key.isReadable()){ //获取相关通道 SocketChannel channel = (SocketChannel) key.channel(); //创建Bytebuffer ByteBuffer buffer = ByteBuffer.allocate(1024); //读取buffer channel.read(buffer); //把读取到缓冲区的数据转换成字符串,并输出 String message = new String(buffer.array()); log.info("{}",message); } } //删除当前的SelectionKey,方法重复注册 iterator.remove(); } }catch (Exception e){ e.printStackTrace(); } } public static void main(String[] args) throws Exception{ //启动客户端 GroupChatClient chatClient = new GroupChatClient(); //启动一个线程,每隔3秒,从服务器读取数据 new Thread(){ public void run(){ while(true){ chatClient.readMessage(); try { Thread.currentThread().sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } } } }.start(); //发送数据到服务端 Scanner scanner = new Scanner(System.in); while(scanner.hasNextLine()){ String s = scanner.nextLine(); chatClient.sendMessage(s); } } }
服务端运行结果:
客户端1运行结果:
客户端2运行结果:
客户端3运行结果: