文章目录
- 1 Netty
- 1.1 概要设计
- 1.1.1 技术选型
- 1.1.2 数据库设计
- 1.1.3 通信设计
- 1.1.3.1 报文协议格式
- 1.1.3.2 报文交互场景
- 1.2 Netty简单示例
- 1.2.1 pom.xml
- 1.2.2 发送和接收
- 1.2.3 示例说明
- 1.2.3.1 线程阻塞问题
- 1.2.3.2 服务端和接收端 EventLoopGroup
- 1.3 Netty中handler概述
- 1.4 聊天服务端
- 1.4.1 示例
- 1.4.2 示例代码解释
- 1.4.3 各种出入站handler详解
- 1.4.3.1 IdleStateHandler
- 1.4.3.2 HeartBeatHandler
- 1.4.3.3 StringLengthFieldDecoder
- 1.4.3.4 StringDecoder
- 1.4.3.5 JsonDecoder
- 1.4.3.6 BussMessageHandler
- 1.4.3.7 JsonEncoder
- 1.4.3.8 SessionManager
- 1.5 聊天客户端
- 1.5.1 示例
- 1.5.2 ServerBootstrap和Bootstrap区别
- 1.5.3 示例代码解释
- 1.5.4 BussMessageHandler
1 Netty
学习此篇 可以先学习下 IO流之IO,NIO和AIO讲解
1.1 概要设计
1.1.1 技术选型
聊天服务端
聊天服务器与客户端通过TCP
协议进行通信,使用长连接、全双工通信模式,基于经典通信框架Netty
实现。
那么什么是长连接
顾名思义,客户端和服务器连上后,会在这条连接上面
反复收发消息,连接不会断开
。与长连接对应的当然就是短连接了,短连接每次发消息之前都需要先建立连接,然后发消息,最后断开连接。显然,即时聊天适合使用长连接。
那么什么又是全双工
当长连接建立起来后,在这条连接上
既有上行的数据,又有下行的数据
,这就叫全双工
Web管理控制台
Web管理端使用SpringBoot脚手架,前端使用Layuimini,后端使用SpringMVC+Jpa+Shiro
聊天客户端
使用SpringBoot+JavaFX
,做了一个极其简陋的客户端,JavaFX
是一个开发Java
桌面程序的框架
1.1.2 数据库设计
我们只简单用到一张用户表:
CREATE TABLE `sys_user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_name` varchar(64) DEFAULT NULL COMMENT '用户名:登陆账号',
`pass_word` varchar(128) DEFAULT NULL COMMENT '密码',
`name` varchar(16) DEFAULT NULL COMMENT '昵称',
`sex` char(1) DEFAULT NULL COMMENT '性别:1-男,2女',
`status` bit(1) DEFAULT NULL COMMENT '用户状态:1-有效,0-无效',
`online` bit(1) DEFAULT NULL COMMENT '在线状态:1-在线,0-离线',
`salt` varchar(128) DEFAULT NULL COMMENT '密码盐值',
`admin` bit(1) DEFAULT NULL COMMENT '是否管理员(只有管理员才能登录Web端):1-是,0-否',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
这张表都在什么时候用到?
- Web管理端登陆的时候;
- 聊天客户端将登陆请求发送到聊天服务端时,聊天服务端进行用户认证;
- 聊天客户端的好友列表加载。
1.1.3 通信设计
1.1.3.1 报文协议格式
报文格式设计目的:
粘包问题
,TCP
长连接中,粘包
是第一个需要解决的问题。通俗的讲,粘包的意思是消息接收方往往收到的不是整个
报文,有时候比整个
多一点,有时候比整个
少一点,这样就导致接收方无法解析这个报文。那么上图中的头8个字节就为了解决这个问题,接收方根据头8个字节标识的长度来获取到“整个”报文,从而进行正常的业务处理;2字节报文类型
,为了方便解析报文而设计。根据这两个字节将后面的json
转成相应的实体以便进行后续处理;- 变长报文体实际上就是
json
格式的串,当然,也可以自己设计报文格式 - 还可以把报文设计的更复杂、更专业,比如加密、加签名等。
1.1.3.2 报文交互场景
登陆
发送消息-成功
发送消息-目标客户端不在线
发送消息-目标客户端在线,但消息转发失败
1.2 Netty简单示例
1.2.1 pom.xml
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.70.Final</version> <!-- 或者是最新版本 -->
</dependency>
1.2.2 发送和接收
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class TempNetty {
private int port = 8011;
public void server() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, port)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(8011).sync();
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
private class ServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("Server received: " + msg);
ctx.writeAndFlush("Server response: " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public void client() {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", port).sync();
future.channel().writeAndFlush("Hello, Server!");
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
private class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("Client received: " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client active, sending message...");
super.channelActive(ctx);
}
}
public static void main(String[] args) throws Exception{
TempNetty temp = new TempNetty();
new Thread(temp::server).start(); // 启动服务器
new Thread(temp::client).start(); // 启动客户端
}
}
1.2.3 示例说明
1.2.3.1 线程阻塞问题
由于 上面示例是 把 server
和 client
放到一个 类中 会导致 接收不到信息会导致线程问题
线程阻塞
:确保服务器和客户端的启动是在不同的线程中进行的,以避免线程阻塞。如果在同一个线程中启动服务器和客户端,可能会导致服务器在接收消息时被阻塞,无法同时发送消息。因此需要用多线程开启异步
1.2.3.2 服务端和接收端 EventLoopGroup
在 Netty
中,EventLoopGroup
是一个处理 I/O
操作的多线程事件循环,用于接收进来的连接、接受数据、写数据等。当使用 Netty 创建服务器时,通常会创建两个 EventLoopGroup
,一个用于接收客户端
的连接(通常称为 boss group
),另一个用于处理已被接受的连接
上的 I/O 操作(通常称为 worker group
)。
bossGroup
:主要负责接收客户端的连接
当一个新的连接到达时,boss thread
(boss group
中的线程)会处理这个连接,完成TCP
的三次握手,然后这个连接就被注册到workerGroup
中的一个EventLoop
。
boss thread
之后不再参与这个连接上的任何 I/O 操作;它只负责接收新的连接。workerGroup
:负责处理所有已被bossGroup
接受的新连接的I/O
操作。
一旦连接被接受并注册到workerGroup
的某个EventLoop
,该EventLoop
就负责该连接上的所有I/O
操作,包括读取数据和写入数据。
ServerBootstrap
,它是 Netty
中用于设置服务器端的引导类。使用 ServerBootstrap
,可以配置服务器端的参数,比如 EventLoopGroup、Channel
类型、ChannelHandler
等。客户端通常使用的是 Bootstrap
类来配置客户端端的参数
客户端的 Bootstrap
用于初始化客户端连接。客户端通过它配置连接服务器的参数,如要连接的服务器地址和端口、使用的 EventLoopGroup
、Channel
类型以及 ChannelHandler
等。
在客户端,通常只需要一个 EventLoopGroup
来处理所有的 I/O
操作,包括连接远程服务器、发送和接收数据
等。这个 EventLoopGrou
p 中的 EventLoop
负责处理所有的事件,包括连接、读取、写入等。客户端通常只需要单个 EventLoopGroup
来管理所有的连接和操作,因为客户端的连接数量通常较少,而且对连接的管理相对简单
1.3 Netty中handler概述
点击了解 IO流之IO,NIO和AIO讲解
Netty
是一个相当优秀的通信框架,大多数的顶级开源框架中都有Netty
的身影。
应用过程中,它最核心的东西叫handler
,我们可以简单理解它为消息处理器。收到的消息和出去的消息都会经过一系列的handler
加工处理。收到的消息我们叫它入站消息
,发出去的消息我们叫它出站消息
,因此handle
r又分为出站handler
和入站handler
。收到的消息只会被入站handler处理,发出去的消息只会被出站handler处理。
举个例子,我们从网络上收到的消息是二进制的字节码,我们的目标是将消息转换成java bean
,这样方便我们程序处理,针对这个场景设计这么几个入handler:
- 将字节转换成
String的handler
; - 将
String
转成java bean
的handler
; - 对
java bean
进行业务处理的handler
。
发出去的消息呢,设计这么几个出站handler:
- java bean 转成String的handler;
- String转成byte的handler。
接下来再说一下Netty
的异步。异步的意思是当做完一个操作后,不会立马得到操作结果,而是有结果后Netty
会通知你。通过下面的一段代码来说明:
channel.writeAndFlush(sendMsgRequest).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()){
logger.info("消息发送成功:{}",sendMsgRequest);
}else {
logger.info("消息发送失败:{}",sendMsgRequest);
}
}
});
上面的writeAndFlush
操作无法立即返回结果,如果关注结果,那么添加一个listener
,有结果后会在listener
中响应。
1.4 聊天服务端
1.4.1 示例
首先看主入口的代码
public void start(){
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//心跳
ch.pipeline().addLast(new IdleStateHandler(25, 20, 0, TimeUnit.SECONDS));
//收整包
ch.pipeline().addLast(new StringLengthFieldDecoder());
//转字符串
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
//json转对象
ch.pipeline().addLast(new JsonDecoder());
//心跳
ch.pipeline().addLast(new HeartBeatHandler());
//实体转json
ch.pipeline().addLast(new JsonEncoder());
//消息处理
ch.pipeline().addLast(bussMessageHandler);
}
});
try {
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
}catch (InterruptedException e) {
logger.error("服务启动失败:{}", ExceptionUtils.getStackTrace(e));
}finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
1.4.2 示例代码解释
EventLoopGroup
和ServerBootstrap
是Netty
框架中的两个关键类,用于构建和管理网络应用程序的事件处理和服务启动。
EventLoopGroup
:
EventLoopGroup
是一个事件循环组
,用于管理和调度事件循环(EventLoop
)。
在Netty
中,事件循环是一个处理I/O
操作和触发事件的单线程执行器,它负责处理连接、读取、写入等事件。
EventLoopGroup
可以包含一个或多个事件循环,通常用于处理不同类型的事件,例如接受连接的事件循环、处理读写事件的事件循环等。
通过EventLoopGroup
,Netty
可以实现高效的事件处理和多路复用。ServerBootstrap
:
ServerBootstrap
是Netty
中用于创建和配置服务器的启动类。
它提供了一组方法来配置服务器的各种参数,例如指定EventLoopGroup
、设置服务器通道类型、添加处理器(Handler
)等。
使用ServerBootstrap
可以方便地创建一个服务器,并配置其各种属性,例如绑定端口、设置TCP参数、添加SSL支持等。
通过ServerBootstrap
,可以将各种网络组件(如Channel
、EventLoopGroup
等)组合在一起,从而构建出一个完整的网络应用程序服务器。- 如果 nio 与
Spring Boot
整合还需要 ServerBootstrap 吗
如果使用Spring Boot
来构建应用程序,Spring Boot
通常会封装对Netty
或其他服务器框架的集成,使得在创建和配置服务器时不需要直接使用ServerBootstrap
类。
通常情况下,在将Spring Boot
与Netty
进行整合时,不需要直接使用ServerBootstrap
类。相反,可以利用Spring Boot
提供的自动配置和集成功能来创建和配置服务器。Spring Boot
为Netty
提供了一些自定义的starter,例如spring-boot-starter-webflux
,它使用了Netty作为默认的服务器引擎。
1.4.3 各种出入站handler详解
下面我们着重看initChannel方
法里面的代码。这里面就是上面讲到的各种handler
,我们下面挨个讲这些handler
都是干啥的。
1.4.3.1 IdleStateHandler
IdleStateHandler
:这个是Netty
内置的一个handler
,既是出站handler
又是入站handler
。它的作用一般是用来实现心跳监测。所谓心跳,就是客户端和服务端建立连接后,服务端要实时监控客户端的健康状态,如果客户端挂了,服务端及时释放相应的资源,以及做出其他处理比如通知运维。所以在我们的场景中,客户端需要定时上报自己的心跳,如果服务端检测到一段时间内没收到客户端上报的心跳,那么及时做出处理,我们这里就是简单的将其连接断开,并修改数据库中相应账户的在线状态。
第一个参数叫读超时时间
,第二个参数叫写超时时间
,第三个参数叫读写超时时间
,第四个参数是时间单位秒
。
这个handler
表达的意思是当25秒内没读到客户端的消息,或者20秒内没往客户端发消息,就会产生一个超时事件。那么这个超时事件我们该对他做什么处理呢,请看下一条。
1.4.3.2 HeartBeatHandler
HeartBeatHandler
:当发生超时事件时,HeartBeatHandler
会收到这个事件,并对它做出处理:第一将链接断开;第二讲数据库中相应的账户更新为不在线状态。
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
private static Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state() == IdleState.READER_IDLE) {
//读超时,应将连接断掉
InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
String ip = socketAddress.getAddress().getHostAddress();
ctx.channel().disconnect();
logger.info("【{}】连接超时,断开",ip);
String userName = SessionManager.removeSession(ctx.channel());
SpringContextUtil.getBean(UserService.class).updateOnlineStatus(userName,Boolean.FALSE);
}else {
super.userEventTriggered(ctx, evt);
}
}else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HeartBeat){
//收到心跳包,不处理
logger.info("server收到心跳包:{}",msg);
return;
}
super.channelRead(ctx, msg);
}
}
1.4.3.3 StringLengthFieldDecoder
StringLengthFieldDecoder
:这是个入站handler
,作用就是解决上面提到的粘包
问题:
public class StringLengthFieldDecoder extends LengthFieldBasedFrameDecoder {
public StringLengthFieldDecoder() {
super(10*1024*1024,0,8,0,8);
}
@Override
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
buf = buf.order(order);
byte[] lenByte = new byte[length];
buf.getBytes(offset, lenByte);
String lenStr = new String(lenByte);
Long len = Long.valueOf(lenStr);
return len;
}
}
只需要集成Netty
提供的LengthFieldBasedFrameDecoder
类,并重写getUnadjustedFrameLength
方法即可。
首先看构造方法中的5个参数。
第一个表示能处理的包的最大长度
;第二三个参数应该结合起来理解,表示长度字段从第几位开始,长度的长度是多少,也就是上面报文格式协议中的头8个字节;
第四个参数表示长度是否需要校正,举例理解,比如头8个字节解析出来的长度=包体长度+头8个字节的长度,那么这里就需要校正8个字节,我们的协议中长度只包含报文体,因此这个参数填0;
最后一个参数,表示接收到的报文是否要跳过一些字节,本例中设置为8,表示跳过头8个字节,因此经过这个handler
后,我们收到的数据就只有报文本身了,不再包含8个长度字节了。
再看getUnadjustedFrameLength
方法,其实就是将头8个字符串型的长度为转换成long型。重写完这个方法后,Netty
就知道如何收一个完整
的数据包了。
1.4.3.4 StringDecoder
StringDecoder
:这个是Netty
自带的入站handler
,会将字节流以指定的编码解析成String
1.4.3.5 JsonDecoder
JsonDecoder
:是自定义的一个入站handler
,目的是将json String
转换成java bean
,以方便后续处理:
public class JsonDecoder extends MessageToMessageDecoder<String> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, String o, List<Object> list) throws Exception {
Message msg = MessageEnDeCoder.decode(o);
list.add(msg);
}
}
这里会调用我们自定义的一个编解码帮助类进行转换:
public static Message decode(String message){
if (StringUtils.isEmpty(message) || message.length() < 2){
return null;
}
String type = message.substring(0,2);
message = message.substring(2);
if (type.equals(LoginRequest)){
return JsonUtil.jsonToObject(message,LoginRequest.class);
}else if (type.equals(LoginResponse)){
return JsonUtil.jsonToObject(message,LoginResponse.class);
}else if (type.equals(LogoutRequest)){
return JsonUtil.jsonToObject(message,LogoutRequest.class);
}else if (type.equals(LogoutResponse)){
return JsonUtil.jsonToObject(message,LogoutResponse.class);
}else if (type.equals(SendMsgRequest)){
return JsonUtil.jsonToObject(message,SendMsgRequest.class);
}else if (type.equals(SendMsgResponse)){
return JsonUtil.jsonToObject(message,SendMsgResponse.class);
}else if (type.equals(HeartBeat)){
return JsonUtil.jsonToObject(message,HeartBeat.class);
}
return null;
}
1.4.3.6 BussMessageHandler
BussMessageHandler
:先看这个入站handler
,是我们的一个业务处理主入口,主要工作就是将消息分发给线程池去处理,另外还负责一个小场景,当客户端主动断开时,需要将相应的账户数据库中状态更新为不在线。
public class BussMessageHandler extends ChannelInboundHandlerAdapter {
private static Logger logger = LoggerFactory.getLogger(BussMessageHandler.class);
@Autowired
private TaskDispatcher taskDispatcher;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("收到消息:{}",msg);
if (msg instanceof Message){
taskDispatcher.submit(ctx.channel(),(Message)msg);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//客户端连接断开
InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
ctx.channel().disconnect();
String ip = socketAddress.getAddress().getHostAddress();
logger.info("客户端断开:{}",ip);
String userName = SessionManager.removeSession(ctx.channel());
SpringContextUtil.getBean(UserService.class).updateOnlineStatus(userName,Boolean.FALSE);
super.channelInactive(ctx);
}
}
接下来还差线程池的处理逻辑,也非常简单,就是将任务封装成executor
然后交给线程池处理:
public class TaskDispatcher {
private ThreadPoolExecutor threadPool;
public TaskDispatcher(){
int corePoolSize = 15;
int maxPoolSize = 50;
int keepAliveSeconds = 30;
int queueCapacity = 1024;
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(queueCapacity);
this.threadPool = new ThreadPoolExecutor(
corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS,
queue);
}
public void submit(Channel channel, Message msg){
ExecutorBase executor = null;
String messageType = msg.getMessageType();
if (messageType.equals(MessageEnDeCoder.LoginRequest)){
executor = new LoginExecutor(channel,msg);
}
if (messageType.equalsIgnoreCase(MessageEnDeCoder.SendMsgRequest)){
executor = new SendMsgExecutor(channel,msg);
}
if (executor != null){
this.threadPool.submit(executor);
}
}
}
接下来看一下消息转发executor
是怎么做的:
public class SendMsgExecutor extends ExecutorBase {
private static Logger logger = LoggerFactory.getLogger(SendMsgExecutor.class);
public SendMsgExecutor(Channel channel, Message message) {
super(channel, message);
}
@Override
public void run() {
SendMsgResponse response = new SendMsgResponse();
response.setMessageType(MessageEnDeCoder.SendMsgResponse);
response.setTime(new Date());
SendMsgRequest request = (SendMsgRequest)message;
String recvUserName = request.getRecvUserName();
String sendContent = request.getSendMessage();
Channel recvChannel = SessionManager.getSession(recvUserName);
if (recvChannel != null){
SendMsgRequest sendMsgRequest = new SendMsgRequest();
sendMsgRequest.setTime(new Date());
sendMsgRequest.setMessageType(MessageEnDeCoder.SendMsgRequest);
sendMsgRequest.setRecvUserName(recvUserName);
sendMsgRequest.setSendMessage(sendContent);
sendMsgRequest.setSendUserName(request.getSendUserName());
recvChannel.writeAndFlush(sendMsgRequest).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()){
logger.info("消息转发成功:{}",sendMsgRequest);
response.setResultCode("0000");
response.setResultMessage(String.format("发给用户[%s]消息成功",recvUserName));
channel.writeAndFlush(response);
}else {
logger.error(ExceptionUtils.getStackTrace(future.cause()));
logger.info("消息转发失败:{}",sendMsgRequest);
response.setResultCode("9999");
response.setResultMessage(String.format("发给用户[%s]消息失败",recvUserName));
channel.writeAndFlush(response);
}
}
});
}else {
logger.info("用户{}不在线,消息转发失败",recvUserName);
response.setResultCode("9999");
response.setResultMessage(String.format("用户[%s]不在线",recvUserName));
channel.writeAndFlush(response);
}
}
}
整体逻辑:一获取要把消息发给那个账号;二获取该账号对应的连接;三在此连接上发送消息;四获取消息发送结果,将结果发给消息“发起者”。
下面是登陆处理的executor:
public class LoginExecutor extends ExecutorBase {
private static Logger logger = LoggerFactory.getLogger(LoginExecutor.class);
public LoginExecutor(Channel channel, Message message) {
super(channel, message);
}
@Override
public void run() {
LoginRequest request = (LoginRequest)message;
String userName = request.getUserName();
String password = request.getPassword();
UserService userService = SpringContextUtil.getBean(UserService.class);
boolean check = userService.checkLogin(userName,password);
LoginResponse response = new LoginResponse();
response.setUserName(userName);
response.setMessageType(MessageEnDeCoder.LoginResponse);
response.setTime(new Date());
response.setResultCode(check?"0000":"9999");
response.setResultMessage(check?"登陆成功":"登陆失败,用户名或密码错");
if (check){
userService.updateOnlineStatus(userName,Boolean.TRUE);
SessionManager.addSession(userName,channel);
}
channel.writeAndFlush(response).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
//登陆失败,断开连接
if (!check){
logger.info("用户{}登陆失败,断开连接",((LoginRequest) message).getUserName());
channel.disconnect();
}
}
});
}
}
登陆逻辑也不复杂,登陆成功则更新用户在线状态,并且无论登陆成功还是失败,都会返一个登陆应答。同时,如果登陆校验失败,在返回应答成功后,需要将链接断开。
1.4.3.7 JsonEncoder
JsonEncoder
:最后看这个唯一的出站handler
,服务端发出去的消息都会被出站handler
处理,职责就是将java bean转成我们之前定义的报文协议格式:
public class JsonEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
String msgStr = MessageEnDeCoder.encode(message);
int length = msgStr.getBytes(Charset.forName("UTF-8")).length;
String str = String.valueOf(length);
String lenStr = StringUtils.leftPad(str,8,'0');
msgStr = lenStr + msgStr;
byteBuf.writeBytes(msgStr.getBytes("UTF-8"));
}
}
1.4.3.8 SessionManager
SessionManager
:剩下最后一个东西没说,这个是用来保存每个登陆成功账户的链接的,底层是个map,key为用户账户,value为链接:
public class SessionManager {
private static ConcurrentHashMap<String,Channel> sessionMap = new ConcurrentHashMap<>();
public static void addSession(String userName,Channel channel){
sessionMap.put(userName,channel);
}
public static String removeSession(String userName){
sessionMap.remove(userName);
return userName;
}
public static String removeSession(Channel channel){
for (String key:sessionMap.keySet()){
if (channel.id().asLongText().equalsIgnoreCase(sessionMap.get(key).id().asLongText())){
sessionMap.remove(key);
return key;
}
}
return null;
}
public static Channel getSession(String userName){
return sessionMap.get(userName);
}
}
1.5 聊天客户端
客户端中界面相关的东西是基于JavaFX
框架做的
1.5.1 示例
public void login(String userName,String password) throws Exception {
Bootstrap clientBootstrap = new Bootstrap();
EventLoopGroup clientGroup = new NioEventLoopGroup();
try {
clientBootstrap.group(clientGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000);
clientBootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(20, 15, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new StringLengthFieldDecoder());
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new JsonDecoder());
ch.pipeline().addLast(new JsonEncoder());
ch.pipeline().addLast(bussMessageHandler);
ch.pipeline().addLast(new HeartBeatHandler());
}
});
ChannelFuture future = clientBootstrap.connect(server,port).sync();
if (future.isSuccess()){
channel = (SocketChannel)future.channel();
LoginRequest request = new LoginRequest();
request.setTime(new Date());
request.setUserName(userName);
request.setPassword(password);
request.setMessageType(MessageEnDeCoder.LoginRequest);
channel.writeAndFlush(request).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()){
logger.info("登陆消息发送成功");
}else {
logger.info("登陆消息发送失败:{}", ExceptionUtils.getStackTrace(future.cause()));
Platform.runLater(new Runnable() {
@Override
public void run() {
LoginController.setLoginResult("网络错误,登陆消息发送失败");
}
});
}
}
});
}else {
clientGroup.shutdownGracefully();
throw new RuntimeException("网络错误");
}
}catch (Exception e){
clientGroup.shutdownGracefully();
throw new RuntimeException("网络错误");
}
}
1.5.2 ServerBootstrap和Bootstrap区别
在网络编程中,特别是在基于Netty
等框架的应用中,ServerBootstrap
和Bootstrap
是两个关键类,它们分别用于服务端和客户端的引导启动。
下面是它们的区别:
ServerBootstrap
:
ServerBootstrap
是Netty
中用于创建和配置服务器
的启动类。
它专门用于服务端应用程序,在服务端启动时使用。
ServerBootstrap
负责创建并配置用于接受客户端连接的服务器。
通过ServerBootstrap
可以设置服务器的各种参数,例如绑定端口、设置TCP参数、添加SSL支持等。Bootstrap
:
Bootstrap
是Netty
中用于创建和配置客户端
的启动类。
它专门用于客户端应用程序,在客户端启动时使用。
Bootstrap
负责创建并配置用于连接到服务器的客户端。
通过Bootstrap
可以设置客户端的各种参数,例如远程主机地址、远程主机端口、连接超时等。
总之,ServerBootstrap
用于创建和配置服务器端,而Bootstrap
用于创建和配置客户端。它们分别针对服务端和客户端的启动进行了专门的设计和实现。
1.5.3 示例代码解释
对这段代码,我们主要关注这几点:一所有handler的初始化;二connect服务端。
所有handler
中,除了bussMessageHandler
是客户端特有的外,其他的handler
在服务端章节已经讲过了
先看连接服务端的操作。
首先发起连接,连接成功后发送登陆报文。发起连接需要对成功和失败进行处理。发送登陆报文也需要对成功和失败进行处理。注意,这里的成功失败只是代表当前操作的网络层面
的成功失败,这时候并不能获取服务端
返回的应答中的业务层面的成功失败
1.5.4 BussMessageHandler
BussMessageHandler
:整体流程还是跟服务端一样,将受到的消息扔给线程池处理,我们直接看处理消息的各个executor
先看客户端发出登陆请求后,收到登陆应答消息后是怎么处理的:
public class LoginRespExecutor extends ExecutorBase {
private static Logger logger = LoggerFactory.getLogger(LoginRespExecutor.class);
public LoginRespExecutor(Channel channel, Message message) {
super(channel, message);
}
@Override
public void run() {
LoginResponse response = (LoginResponse)message;
logger.info("登陆结果:{}->{}",response.getResultCode(),response.getResultMessage());
if (!response.getResultCode().equals("0000")){
Platform.runLater(new Runnable() {
@Override
public void run() {
LoginController.setLoginResult("登陆失败,用户名或密码错误");
}
});
}else {
LoginController.setCurUserName(response.getUserName());
ClientApplication.getScene().setRoot(SpringContextUtil.getBean(MainView.class).getView());
}
}
}
接下来看客户端是怎么发聊天信息的:
public void sendMessage(Message message) {
channel.writeAndFlush(message).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
SendMsgRequest send = (SendMsgRequest)message;
if (future.isSuccess()){
Platform.runLater(new Runnable() {
@Override
public void run() {
MainController.setMessageHistory(String.format("[我]在[%s]发给[%s]的消息[%s],发送成功",
DateFormatUtils.format(send.getTime(),"yyyy-MM-dd HH:mm:ss"),send.getRecvUserName(),send.getSendMessage()));
}
});
}else {
Platform.runLater(new Runnable() {
@Override
public void run() {
MainController.setMessageHistory(String.format("[我]在[%s]发给[%s]的消息[%s],发送失败",
DateFormatUtils.format(send.getTime(),"yyyy-MM-dd HH:mm:ss"),send.getRecvUserName(),send.getSendMessage()));
}
});
}
}
});
}
参考连接:https://mp.weixin.qq.com/s/zlyexr2ix3PaK_Qm6yezHQ