前言
了解了Netty的基本功能和相关概念,使用基于Netty实现多人聊天的功能。
需求
1.服务端能够接收客户端的注册,并且接受用户的信息注册
2.服务端能够处理客户端发送的消息,并且根据消息类型进行私发或者广播发送消
3.服务端能够私发消息和广播消息
代码实现
环境
java版本:JDK17
netty版本: 4.1.111.Final
服务端
import com.fftf.netty.handler.ChatServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ChatRoomServer {
private final int port;
public ChatRoomServer(int port) {
this.port = port;
}
public void run() throws Exception{
//用于建立连接
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ChatServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
ChatRoomServer chatRoomServer=new ChatRoomServer(8080);
try {
chatRoomServer.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
服务端自定义handler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.*;
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
private static final Map<ChannelHandlerContext, String> clients = Collections.synchronizedMap(new HashMap<>());
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client connected: " + ctx.channel().remoteAddress());
ctx.writeAndFlush("请输入你的用户名:\n");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client disconnected: " + clients.get(ctx));
clients.remove(ctx);
broadcastMessage("User left: " + clients.get(ctx));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
if (!clients.containsKey(ctx)) {
if (!msg.isEmpty()) {
if (clients.containsValue(msg)) {
ctx.writeAndFlush("用户名已经存在,请重新输入用户名:\n");
return;
}
clients.put(ctx, msg.trim());
broadcastMessage("用户加入聊天室: " + msg);
} else {
ctx.writeAndFlush("无效的用户名,请重新输入用户名:\n");
return;
}
} else {
System.out.println("接收到消息 " + clients.get(ctx) + ": " + msg);
if (msg.startsWith("/msg ")) {
handlePrivateMessage(ctx, msg.substring(5));
} else {
broadcastMessage(clients.get(ctx) + ": " + msg);
}
}
}
private void broadcastMessage(String msg) {
synchronized (clients) {
for (ChannelHandlerContext client : clients.keySet()) {
client.writeAndFlush(msg + "\n");
}
}
}
private void handlePrivateMessage(ChannelHandlerContext senderCtx, String msg) {
String[] parts = msg.split(" ", 2);
if (parts.length != 2) {
senderCtx.writeAndFlush("无效的消息格式 使用 /msg <username> <message>\n");
return;
}
String recipientUsername = parts[0];
String message = parts[1];
for (Map.Entry<ChannelHandlerContext, String> entry : clients.entrySet()) {
if (entry.getValue().equals(recipientUsername)) {
entry.getKey().writeAndFlush("[私发消息 来自于 " + clients.get(senderCtx) + "] " + message + "\n");
return;
}
}
senderCtx.writeAndFlush("用户未找到: " + recipientUsername + "\n");
}
}
服务端自定义handler主要的功能:
- 维护客户端注册信息
- channel的注册
- channel的激活事件的处理
- channel注销事件的处理
- 消息的处理
-
- 广播消息的转发
- 私发消息的转发
客户端
import com.fftf.netty.handler.ChatClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class ChatRoomClient {
private final String host;
private final int port;
private Channel channel;
public ChatRoomClient(String host, int port) {
this.host = host;
this.port = port;
}
public Channel getChannel() {
return channel;
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ChatClientHandler(ChatRoomClient.this));
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
channel = future.channel();
// 启动一个线程用于接收用户的输入
Thread userInputThread = new Thread(() -> {
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
while (true) {
try {
String msg = in.readLine();
if (msg == null || "exit".equalsIgnoreCase(msg)) {
break;
}
channel.writeAndFlush(msg + "\n");
} catch (Exception e) {
e.printStackTrace();
}
}
channel.close();
});
userInputThread.start();
// 等待直到客户端连接关闭
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 8080;
if (args.length == 2) {
host = args[0];
port = Integer.parseInt(args[1]);
}
new ChatRoomClient(host, port).run();
}
}
客户端自定义handler
import com.fftf.netty.client.ChatRoomClient;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
private final ChatRoomClient client;
public ChatClientHandler(ChatRoomClient client){
this.client=client;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("收到消息:"+msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
client.getChannel().close();
}
}
客户端handler的功能比较简单,就是输出收到消息的内容
效果演示
启动一个sever端,两个client端
服务端
客户端1:
客户端2:
完整代码
https://github.com/qyfftf/netty-chatroom