文章目录
- 一. VectorNettyApplication启动类配置
- 二.WebSocketServerBoot初始化服务端Netty
- 三. WebsocketServerChannelInitializer初始化服务端Netty读写处理器
- 四.initParamHandler处理器-去参websocket识别
- 五.MessageHandler核心业务处理类-采用工厂策略模式
- 5.1 策略上下文
- 六.统一响应
- 七.统一输出处理器
一. VectorNettyApplication启动类配置
初始化SpringBoot线程同时初始化Netty线程
/**
* @description: 通知启动类
* @Title: VectorNotification
* @Package com.vector.notification
* @Author YuanJie
* @Date 2023/3/2 12:57
*/
@EnableDiscoveryClient // 开启服务注册与发现
@SpringBootApplication(scanBasePackages = {"com.vector"},exclude = {DataSourceAutoConfiguration.class}) // 开启组件扫描和自动配置
public class VectorNettyApplication implements CommandLineRunner {
@Value("${netty.host}")
private String host;
@Value("${netty.port}")
private Integer port;
@Resource
private WebSocketServerBoot webSocketServerBoot;
public static void main(String[] args) {
SpringApplication.run(VectorNettyApplication.class, args);
}
// springboot启动后执行netty服务端启动
@Override
public void run(String... args) throws Exception {
ChannelFuture channelFuture = webSocketServerBoot.bind(host, port);
// 优雅关闭, jvm关闭时将netty服务端关闭
Runtime.getRuntime().addShutdownHook(new Thread(() -> webSocketServerBoot.destroy()));
// 阻塞 直到channel关闭
channelFuture.channel().closeFuture().syncUninterruptibly();
}
}
二.WebSocketServerBoot初始化服务端Netty
主要进行netty的基本配置
/**
* @author YuanJie
* @projectName vector-server
* @package com.vector.netty.accept
* @className com.vector.netty.accept.ServerBootstrap
* @copyright Copyright 2020 vector, Inc All rights reserved.
* @date 2023/6/9 18:34
*/
@Component
@Slf4j
public class WebSocketServerBoot {
private final EventLoopGroup parentGroup = new NioEventLoopGroup();
private final EventLoopGroup childGroup = new NioEventLoopGroup(2);
private Channel channel;
@Resource
private WebsocketServerChannelInitializer websocketServerChannelInitializer;
/**
* 初始化服务端
* sync():等待Future直到其完成,如果这个Future失败,则抛出失败原因;
* syncUninterruptibly():不会被中断的sync();
*/
public ChannelFuture bind(String host, Integer port) {
ChannelFuture channelFuture = null;
try {
channelFuture = new ServerBootstrap()
.group(parentGroup, childGroup) // 指定线程模型 一个用于接收客户端连接,一个用于处理客户端读写操作
.channel(NioServerSocketChannel.class) // 指定服务端的IO模型
.option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP缓冲区
.childOption(ChannelOption.SO_KEEPALIVE, true) // 保持连接 tcp底层心跳机制
.childHandler(websocketServerChannelInitializer) // 指定处理新连接数据的读写处理逻辑
.bind(host, port)
.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
log.info("服务端启动成功,监听端口:{}", port);
} else {
log.error("服务端启动失败,监听端口:{}", port);
bind(host, port + 1);
}
}
})
.syncUninterruptibly();// 绑定端口
channel = channelFuture.channel(); // 获取channel
} finally {
if (null == channelFuture) {
channel.close();
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
return channelFuture;
}
/**
* 销毁
*/
public void destroy() {
if (null == channel) return;
channel.close();
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
/**
* 获取通道
*
* @return
*/
public Channel getChannel() {
return channel;
}
}
三. WebsocketServerChannelInitializer初始化服务端Netty读写处理器
主要规划netty的读写处理器
/**
* @author YuanJie
* @projectName vector-server
* @package com.vector.netty.config
* @className com.vector.netty.server.ServerChannelInitializer
* @copyright Copyright 2020 vector, Inc All rights reserved.
* @date 2023/6/9 19:13
*/
@Component
public class WebsocketServerChannelInitializer extends ChannelInitializer<SocketChannel> {
// @Sharable
private final LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);
public final static String WEBSOCKET_PATH = "/ws";
@Resource
private InitParamHandler initParamHandler;
@Resource
private MessageHandler messageHandler;
@Resource
private OutBoundHandler outBoundHandler;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 日志打印
pipeline.addLast(loggingHandler);
// http报文解析器 线程不安全不能被共享
pipeline.addLast(new HttpServerCodec());
// // 添加对大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// // 消息聚合器 8192 8M
pipeline.addLast(new HttpObjectAggregator(1 << 13));
// 进行设置心跳检测
pipeline.addLast(new IdleStateHandler(60, 30, 60 * 30, TimeUnit.SECONDS));
// ================= 上述是用于支持http协议的 ==============
//websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址
// 处理uri参数 WebSocketServerProtocolHandler不允许带参数 顺序不可调换
pipeline.addLast(initParamHandler);
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,null, true, 1<<16,true,true,5000));
pipeline.addLast(messageHandler);
// 自定义出栈处理器
pipeline.addLast(outBoundHandler);
}
}
四.initParamHandler处理器-去参websocket识别
主要为了去参,WebSocketServerProtocolHandler不允许带参数,同时初始化一些信道用户数据
/**
* URL参数处理程序,这时候连接还是个http请求,没有升级成webSocket协议,此处SimpleChannelInboundHandler泛型使用FullHttpRequest
*
* @author YuanJie
* @date 2023/5/7 15:07
*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class InitParamHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
/**
* 存储已经登录用户的channel对象
*/
public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 存储用户id和用户的channelId绑定
*/
public static final Map<Long, ChannelId> userMap = new ConcurrentHashMap<>();
/**
* 用于存储群聊房间号和群聊成员的channel信息
*/
public static final Map<Long, ChannelGroup> groupMap = new ConcurrentHashMap<>();
@DubboReference
private MemberRemote memberRemote;
/**
* 此处进行url参数提取,重定向URL,访问webSocket的url不支持带参数的,带参数会抛异常,这里先提取参数,将参数放入通道中传递下去,重新设置一个不带参数的url
*
* @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}
* belongs to
* @param request the message to handle
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (!this.acceptInboundMessage(request)) {
ctx.fireChannelRead(request.retain());
}
String uri = request.uri();
log.info("NettyWebSocketParamHandler.channelRead0 --> : 格式化URL... {}", uri);
Map<CharSequence, CharSequence> queryMap = UrlBuilder.ofHttp(uri).getQuery().getQueryMap();
//将参数放入通道中传递下去
String senderId = "senderId";
if (StringUtils.isBlank(queryMap.get(senderId))) {
log.info("NettyWebSocketParamHandler.channelRead0 --> : 参数缺失 senderId");
ctx.close();
}
// 验证token
// verifyToken(ctx,senderId);
// 初始化数据
// initData(ctx, Long.valueOf(queryMap.get(senderId).toString()));
// 获取?之前的路径
request.setUri(WebsocketServerChannelInitializer.WEBSOCKET_PATH);
ctx.fireChannelRead(request.retain());
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
//添加到channelGroup通道组
channelGroup.add(ctx.channel());
ctx.channel().id();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// 移除channelGroup 通道组
channelGroup.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
log.error("NettyWebSocketParamHandler.exceptionCaught --> cause: ", cause);
ctx.close();
}
private void verifyToken(ChannelHandlerContext ctx, Long senderId) {
String userKey = CacheConstants.LOGIN_TOKEN_KEY + senderId;
RedissonCache redissonCache = SpringContextUtil.getBean(RedissonCache.class);
Boolean hasKey = redissonCache.hasKey(userKey);
if (!hasKey) {
log.info("NettyWebSocketParamHandler.channelRead0 --> : 用户未登录... {}", senderId);
ctx.close();
}
// token续期
redissonCache.expire(userKey, SystemConstants.TOKEN_EXPIRE_TIME, TimeUnit.MILLISECONDS);
}
/**
* 加入聊天室
*
* @param ctx
* @param senderId
* @throws ExecutionException
* @throws InterruptedException
*/
private void joinGroup(ChannelHandlerContext ctx, Long senderId) {
R r = null;
try {
CompletableFuture<R> result = memberRemote.getGroupListById(senderId);
r = result.get(3, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("messageHandler.joinGroup查询群聊列表失败 ===> {}", e.getMessage());
ctx.channel().write(WSMessageDTO.error("查询群聊列表失败"));
return;
}
if (r == null || r.getCode() != 200) {
log.error("查询群聊列表失败 ====> {}", r.getMsg());
ctx.channel().write(WSMessageDTO.error("查询群聊列表失败"));
return;
}
//查询成功
//获取群聊列表
String json = JacksonInstance.toJson(r.getData());
List<Long> groupIds = JacksonInstance.toObjectList(json, new TypeReference<List<Long>>() {
});
ChannelGroup group;
for (Long groupId : groupIds) {
group = groupMap.get(groupId);
if (group == null) {
//如果群聊信道不存在,则创建一个群聊信道
group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
groupMap.put(groupId, group);
}
//将当前用户加入到群聊信道中
group.add(ctx.channel());
}
}
/**
* 加入聊天信道
*/
private void joinChat(ChannelHandlerContext ctx, Long senderId) {
//将当前用户的channelId放入map中
userMap.put(senderId, ctx.channel().id());
}
private void initData(ChannelHandlerContext ctx, Long senderId) {
joinChat(ctx, senderId);
joinGroup(ctx, senderId);
}
}
五.MessageHandler核心业务处理类-采用工厂策略模式
使得业务和通信协议无关,无感知。具体业务可以增加策略
/**
* @author YuanJie
* @projectName vector-server
* @package com.vector.netty.handler
* @className com.vector.netty.handler.MessageTypeHandler
* @copyright Copyright 2020 vector, Inc All rights reserved.
* @date 2023/6/15 16:23
*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class MessageHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Resource
private MessageStrategyContext messageStrategyContext;
@Override
public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
// 获取客户端发送的数据
WSMessageDTO wsMessageDTO = JacksonInstance.toObject(msg.text(), new TypeReference<WSMessageDTO>() {
});
wsMessageDTO.setMessageId(SnowFlakeUtil.getNextId());
log.info("客户端收到服务器数据:{}", wsMessageDTO.getMessage());
verifyParams(ctx, wsMessageDTO);
// 根据消息类型获取对应的处理器 核心处理方法
messageStrategyContext.messageType(ctx, wsMessageDTO);
}
private void verifyParams(ChannelHandlerContext ctx, WSMessageDTO wsMessageDTO) {
StringBuilder sb = new StringBuilder();
if (wsMessageDTO.getSenderId() == null) {
sb.append("senderId不能为空");
}
if (!EnumBusiness.containsBusiness(wsMessageDTO.getBusinessType())) {
sb.append("businessType不能为空");
}
if (!EnumMessage.containsMessage(wsMessageDTO.getMessageType())) {
sb.append("messageType不能为空");
}
if (wsMessageDTO.getMessage() == null) {
sb.append("message不能为空");
}
if (sb.length() > 0) {
log.error("参数校验失败:{}", sb.toString());
ctx.channel().write(WSMessageDTO.error("参数校验失败:" + sb.toString()));
ctx.close();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
5.1 策略上下文
具体工厂策略可以详看我的策略模式文章
枚举维护前端参数和bean对象名
/**
* @author YuanJie
* @projectName vector-server
* @package com.vector.netty.enums
* @className com.vector.netty.enums.BusinessEnums
* @copyright Copyright 2020 vector, Inc All rights reserved.
* @date 2023/6/14 16:13
*/
public enum EnumBusiness {
/**
* 单聊
*/
chatMessage("chat", ChatMessageStrategy.class.getSimpleName()),
/**
* 群聊
*/
groupMessage("group", GroupMessageStrategy.class.getSimpleName()),
/**
* 在线人数
*/
onlineCount("onlineCount", OnlineCountStrategy.class.getSimpleName()),
TEST("test",TestStrategy.class.getSimpleName());
private final String businessType;
private final String beanName;
EnumBusiness(String businessType, String beanName) {
this.businessType = businessType;
this.beanName = StringUtils.isNotEmpty(beanName)?beanName.toLowerCase():null;
}
/**
* 根据code获取对应的枚举对象
*/
public static EnumBusiness getEnum(String businessType) {
EnumBusiness[] values = EnumBusiness.values(); // 获取枚举列表
if (null != businessType && values.length > 0) {
for (EnumBusiness value : values) {
if (value.businessType.equals(businessType)) {
return value; // 返回枚举对象
}
}
}
return null;
}
/**
* 该code在枚举列表code属性是否存在
*/
public static boolean containsBusiness(String businessType) {
EnumBusiness anEnum = getEnum(businessType); // 获取枚举对象
return anEnum != null;
}
/**
* 判断code与枚举中的code是否相同
*/
public static boolean equals(String businessType, EnumBusiness calendarSourceEnum) {
return calendarSourceEnum.businessType.equals(businessType);
}
public String getBusinessType() {
return businessType;
}
public String getBeanName() {
return beanName;
}
}
策略根据bean名获取实例对象
/**
* @author YuanJie
* @projectName vector-server
* @package com.vector.netty.service
* @className com.vector.netty.service.MessageContext
* @copyright Copyright 2020 vector, Inc All rights reserved.
* @date 2023/6/14 17:02
*/
@Component
@Slf4j
public class MessageStrategyContext {
/** 策略实例集合 */
private final ConcurrentHashMap<String, MessageStrategy> strategyConcurrentHashMap =
new ConcurrentHashMap<>(20);
/**
* 注入策略实例
* 如果使用的是构造器注入,可能会有多个参数注入进来。
*
* 如果使用的是field反射注入
*
* 如果使用的是setter方法注入,那么你将不能将属性设置为final。
*
* @param strategyMap
* 注意注入类型要是Map基础类型
* 注入接口,spring会自动注入他的所有被spring托管的实现类
*/
@Autowired
public MessageStrategyContext(Map<String, MessageStrategy> strategyMap) {
//清空集合数据
this.strategyConcurrentHashMap.clear();
if (!CollectionUtils.isEmpty(strategyMap)) {
strategyMap.forEach((beanName, messageStrategy) -> {
if (StringUtils.isEmpty(beanName) || messageStrategy == null) {
return;
}
this.strategyConcurrentHashMap.put(beanName.toLowerCase(), messageStrategy);
});
}
}
/**
* 选择业务方式
* 单聊,群聊,统计在线人数...
*
* @param msg 信息
*/
public void messageType(ChannelHandlerContext ctx, WSMessageDTO msg){
EnumBusiness enumerateInstances = EnumBusiness.getEnum(msg.getBusinessType());
if (CollectionUtils.isEmpty(strategyConcurrentHashMap)) {
log.info("策略实例集合初始化失败,请检查是否正确注入!");
}
MessageStrategy messageStrategy = strategyConcurrentHashMap.get(enumerateInstances.getBeanName());
messageStrategy.messageType(ctx, msg);
}
}
六.统一响应
注意使用该统一响应对象,所有入栈处理器必须使用即调用方必须是SimpleChannelInboundHandler,详细原因在下文 七.统一输出处理器中
/**
* @author YuanJie
* @projectName vector-server
* @package com.vector.netty.entity
* @className com.vector.netty.entity.SocketMessage
* @copyright Copyright 2020 vector, Inc All rights reserved.
* @date 2023/6/14 19:35
*/
@Data
public class WSMessageDTO {
/**
* 消息发送者
*/
private Long senderId;
/**
* 消息接收者/群聊id
*/
private Long chatId;
/**
* 消息类型 0文本 1图片 2文件 3视频 4语音 5位置 6名片 7链接 8系统消息
* @see com.vector.netty.enums.EnumMessage
*/
private byte messageType;
/**
* 业务类型 chat单聊 group群聊 onlineCount在线人数
* @see com.vector.netty.enums.EnumBusiness
*/
private String businessType;
/**
* 记录每条消息的id
*/
private Long messageId;
/**
* 消息内容
*/
private String message;
/**
* 消息发送时间
*/
private LocalDateTime sendTime;
/**
* 消息接收时间
*/
private LocalDateTime receiveTime;
/**
* 最后一条消息内容
*/
private String lastMessage;
/**
* 消息状态 0失败 1成功
*/
private byte code;
/**
* 封装统一返回格式
* @return
*/
public static TextWebSocketFrame ok(){
WSMessageDTO data = new WSMessageDTO();
data.setCode((byte) 1);
return new TextWebSocketFrame(JacksonInstance.toJson(data)).retain();
}
public static TextWebSocketFrame ok(WSMessageDTO data){
data.setCode((byte) 1);
return new TextWebSocketFrame(JacksonInstance.toJson(data)).retain();
}
public static TextWebSocketFrame error(String message){
WSMessageDTO data = new WSMessageDTO();
data.setCode((byte) 0);
data.setMessage(message);
return new TextWebSocketFrame(JacksonInstance.toJson(data)).retain();
}
}
七.统一输出处理器
- 若调用WSMessageDTO方法,必须注意内存泄露
- 即调用方必须是SimpleChannelInboundHandler<>
- 严禁使用ChannelInboundHandlerAdapter, 否则将造成严重内存泄露
- 相应地,必须使用此处的写出@param msg ,释放@param msg 引用
/**
* @author YuanJie
* @projectName vector-server
* @package com.vector.netty.handler
* @className com.vector.netty.handler.OutBoundHandler
* @copyright Copyright 2020 vector, Inc All rights reserved.
* @date 2023/7/24 22:38
*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class OutBoundHandler extends ChannelOutboundHandlerAdapter {
/**
* 若调用WSMessageDTO方法,必须注意内存泄露
* 即调用方必须是SimpleChannelInboundHandler<>
* 严禁使用ChannelInboundHandlerAdapter, 否则将造成严重内存泄露
* 相应地,必须使用此处的写出@param msg ,释放@param msg 引用
* @param ctx the {@link ChannelHandlerContext} for which the write operation is made
* @param msg the message to write
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof FullHttpMessage){
log.info("webSocket协议升级成功");
// 出栈必须得这样写,不能自定义通信消息,可能把websocket反馈的消息覆盖了。 也不能在最后处理器调ctx.fireChannelRead()
ctx.writeAndFlush(msg,promise);
return;
} else if (msg instanceof TextWebSocketFrame) {
log.info("我要给客户端发送消息了。。。。");
ctx.writeAndFlush(msg, promise);
return;
}
log.error("OutBoundHandler.write: 消息类型错误");
ctx.writeAndFlush(WSMessageDTO.error("服务器内部错误: OutBoundHandler.write()"),promise);
ctx.close();
}
}