spring boot 实现直播聊天室(二)

spring boot 实现直播聊天室(二)

技术方案:

  • spring boot
  • netty
  • rabbitmq

目录结构

在这里插入图片描述

引入依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.96.Final</version>
</dependency>

SimpleNettyWebsocketServer

netty server 启动类

@Slf4j
public class SimpleNettyWebsocketServer {

    private SimpleWsHandler simpleWsHandler;

    public SimpleNettyWebsocketServer(SimpleWsHandler simpleWsHandler) {
        this.simpleWsHandler = simpleWsHandler;
    }

    public void start(int port) throws InterruptedException {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup work = new NioEventLoopGroup(2);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, work).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //HTTP协议编解码器,用于处理HTTP请求和响应的编码和解码。其主要作用是将HTTP请求和响应消息转换为Netty的ByteBuf对象,并将其传递到下一个处理器进行处理。
                    pipeline.addLast(new HttpServerCodec());
                    //用于HTTP服务端,将来自客户端的HTTP请求和响应消息聚合成一个完整的消息,以便后续的处理。
                    pipeline.addLast(new HttpObjectAggregator(65535));
                    pipeline.addLast(new IdleStateHandler(30,0,0));
                    //处理请求参数
                    pipeline.addLast(new SimpleWsHttpHandler());
                    pipeline.addLast(new WebSocketServerProtocolHandler("/n/ws"));
                    pipeline.addLast(simpleWsHandler);

                }
            });
            Channel channel = bootstrap.bind(port).sync().channel();
            log.info("server start at port: {}", port);
            channel.closeFuture().sync();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}

NettyUtil: 工具类

public class NettyUtil {

    public static AttributeKey<String> G_U = AttributeKey.valueOf("GU");

    /**
     * 设置上下文参数
     * @param channel
     * @param attributeKey
     * @param data
     * @param <T>
     */
    public static <T> void setAttr(Channel channel, AttributeKey<T> attributeKey, T data) {
        Attribute<T> attr = channel.attr(attributeKey);
        if (attr != null) {
            attr.set(data);
        }
    }


    /**
     * 获取上下文参数 
     * @param channel
     * @param attributeKey
     * @return
     * @param <T>
     */
    public static <T> T getAttr(Channel channel, AttributeKey<T> attributeKey) {
        return channel.attr(attributeKey).get();
    }


    /**
     * 根据 渠道获取 session
     * @param channel
     * @return
     */
    public static NettySimpleSession getSession(Channel channel) {
        String attr = channel.attr(G_U).get();
        if (StrUtil.isNotBlank(attr)){
            String[] split = attr.split(",");
            String groupId = split[0];
            String username = split[1];
            return new NettySimpleSession(channel.id().toString(),groupId,username,channel);
        }
        return null;
    }
}

处理handler

SimpleWsHttpHandler

处理 websocket 协议升级时地址请求参数 ws://127.0.0.1:8881/n/ws?groupId=1&username=tom, 解析groupId 和 username ,并设置这个属性到上下文

/**
 * @Date: 2023/12/13 9:53
 * 提取参数
 */
@Slf4j
public class SimpleWsHttpHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof  FullHttpRequest request){
            //ws://localhost:8080/n/ws?groupId=xx&username=tom
            String decode = URLDecoder.decode(request.uri(), StandardCharsets.UTF_8);
            log.info("raw request url: {}", decode);
            Map<String, String> queryMap = getParams(decode);
            String groupId = MapUtil.getStr(queryMap, "groupId", null);
            String username = MapUtil.getStr(queryMap, "username", null);
            if (StrUtil.isNotBlank(groupId) && StrUtil.isNotBlank(username)) {
                NettyUtil.setAttr(ctx.channel(), NettyUtil.G_U, groupId.concat(",").concat(username));
            }
            //去掉参数 ===>  ws://localhost:8080/n/ws
            request.setUri(request.uri().substring(0,request.uri().indexOf("?")));
            ctx.pipeline().remove(this);
            ctx.fireChannelRead(request);
        }else{
            ctx.fireChannelRead(msg);
        }
    }

    /**
     * 解析 queryString
     * @param uri
     * @return
     */
    public static Map<String, String> getParams(String uri) {
        Map<String, String> params = new HashMap<>(10);
        int idx = uri.indexOf("?");
        if (idx != -1) {
            String[] paramsArr = uri.substring(idx + 1).split("&");
            for (String param : paramsArr) {
                idx = param.indexOf("=");
                params.put(param.substring(0, idx), param.substring(idx + 1));
            }
        }

        return params;
    }
}
SimpleWsHandler

处理消息

@Slf4j
@ChannelHandler.Sharable
public class SimpleWsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Autowired
    private PushService pushService;

    /**
     * 在新的 Channel 被添加到 ChannelPipeline 中时被调用。这通常发生在连接建立时,即 Channel 已经被成功绑定并注册到 EventLoop 中。
     * 在连接建立时被调用一次
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        NettySimpleSession session = NettyUtil.getSession(ctx.channel());
        if (session == null) {
            log.info("handlerAdded channel id: {}", ctx.channel().id());
        } else {
            log.info("handlerAdded channel group-username: {}-{}", session.group(), session.identity());
        }
    }

    /**
     * 连接断开时,Netty 会自动触发 channelInactive 事件,并将该事件交给事件处理器进行处理
     * 在 channelInactive 事件的处理过程中,会调用 handlerRemoved 方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        NettySimpleSession session = NettyUtil.getSession(ctx.channel());
        if (session!=null){
            log.info("handlerRemoved channel group-username: {}-{}", session.group(), session.identity());
        }
        offline(ctx.channel());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //todo msg 可以是json字符串,这里仅仅只是纯文本
        NettySimpleSession session = NettyUtil.getSession(ctx.channel());
        if (session!=null){
            MessageDto messageDto = new MessageDto();
            messageDto.setSessionId(session.getId());
            messageDto.setGroup(session.group());
            messageDto.setFromUser(session.identity());
            messageDto.setContent(msg.text());
            pushService.pushGroupMessage(messageDto);
        }else {
            log.info("channelRead0 session is null channel id: {}-{}", ctx.channel().id(),msg.text());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("SimpleWsHandler 客户端异常断开 {}", cause.getMessage());
        //todo offline
        offline(ctx.channel());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent idleStateEvent) {
            if (idleStateEvent.state().equals(IdleStateEvent.READER_IDLE_STATE_EVENT)) {
                log.info("SimpleWsIdleHandler channelIdle 5 秒未收到客户端消息,强制关闭: {}", ctx.channel().id());
                //todo offline
                offline(ctx.channel());
            }
        } else if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            String attr = NettyUtil.getAttr(ctx.channel(), NettyUtil.G_U);
            if (StrUtil.isBlank(attr)) {
                ctx.writeAndFlush("参数异常");
                offline(ctx.channel());
            } else {
                //todo 可以做用户认证等等
                //记录用户登陆session
                NettySimpleSession session = NettyUtil.getSession(ctx.channel());
                Assert.notNull(session, "session 不能为空");
                SessionRegistry.getInstance().addSession(session);
            }
        }
        super.userEventTriggered(ctx,evt);
    }

    /**
     * 用户下线,处理失效 session
     * @param channel
     */
    public void offline(Channel channel){
        NettySimpleSession session = NettyUtil.getSession(channel);
        if (session!=null){
            SessionRegistry.getInstance().removeSession(session);
        }
        channel.close();
    }

}

PushService

推送服务抽取

public interface PushService {

    /**
     * 组推送
     * @param messageDto
     */
    void pushGroupMessage(MessageDto messageDto);

}

@Service
public class PushServiceImpl implements PushService {

    @Autowired
    private MessageClient messagingClient;

    @Override
    public void pushGroupMessage(MessageDto messageDto) {
        messagingClient.sendMessage(messageDto);
    }
}

NettySimpleSession

netty session 封装

public class NettySimpleSession extends AbstractWsSession {

    private Channel channel;

    public NettySimpleSession(String id, String group, String identity, Channel channel) {
        super(id, group, identity);
        this.channel = channel;
    }

    @Override
    public void sendTextMessage(MessageDto messageDto) {
        String content = messageDto.getFromUser() + " say: " + messageDto.getContent();
        // 不能直接 write content, channel.writeAndFlush(content);
        // 要封装成 websocketFrame,不然不能编解码!!!
        channel.writeAndFlush(new TextWebSocketFrame(content));
    }
}

启动类

@Slf4j
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }


    @Bean
    public SimpleWsHandler simpleWsHandler(){
        return new SimpleWsHandler();
    }

    @PostConstruct
    public void init() {
        new Thread(() -> {
            log.info(">>>>>>>> start netty ws server....");
            try {
                new SimpleNettyWebsocketServer(simpleWsHandler()).start(8881);
            } catch (InterruptedException e) {
                log.info(">>>>>>>> SimpleNettyWebsocketServer start error", e);
            }
        }).start();
    }

}

其他代码参考 spring boot 实现直播聊天室

测试

websocket 地址 ws://127.0.0.1:8881/n/ws?groupId=1&username=tom

在这里插入图片描述

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/243803.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用YOLOv8训练图集详细教程

准备自己的数据集 训练YOLOv8时&#xff0c;选择的数据格式是VOC&#xff0c;因此下面将介绍如何将自己的数据集转换成可以直接让YOLOv8进行使用。 1、创建数据集 我的数据集都在保存在mydata文件夹&#xff08;名字可以自定义&#xff09;&#xff0c;目录结构如下&#xf…

万界星空科技MES---制造企业的加工生产模式

在现代制造业中&#xff0c;加工生产模式是制造企业组织和管理生产过程的重要方面。不同的加工模式适用于不同的生产需求和产品类型。其中流水型、离散型和混合型是三种常见的加工生产模式。1. 流水型加工模式 流水型加工模式是一种高度自动化的生产方式&#xff0c;适用于…

羊大师解答,鲜羊奶应该怎样煮才好喝?

羊大师解答&#xff0c;鲜羊奶应该怎样煮才好喝&#xff1f; 你是否对如何煮鲜羊奶感到困惑&#xff1f;继续阅读本文&#xff0c;小编羊大师将为大家揭秘鲜羊奶的烹饪方法。不管是作为配料还是单独享用&#xff0c;了解如何煮鲜羊奶将会让您获得更加美味又营养丰富的食物。接…

mysql8 windows下修改my.ini配置 this is incompatible with sql_mode=only_full_group_by

1、找到安装路径 show variables like %sql_mode;SHOW VARIABLES LIKE config_file;SHOW VARIABLES LIKE %datadir%;SHOW VARIABLES; 2、修改 sql_modeSTRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION

第二证券:防御性板块逆势活跃 A股结构性机会轮动

昨日商场慎重张望心境升温&#xff0c;个股跌多涨少。防御性板块中的医药、燃气板块涨幅居前。医药板块中&#xff0c;拓新药业、森萱医药涨超19%&#xff0c;百利天恒、亨迪药业、新赣江等多股涨超10%。 据中国气候网消息&#xff0c;从12月12日夜间初步&#xff0c;新一轮寒…

注塑模具ERP有哪些功能?可以帮助企业解决什么难题

不同的注塑模具有不同的业务流程和生产环节&#xff0c;有些生产企业在订单、物料需求计划、车间、班组负荷评估、项目成本核算、边角料统计分析等方面还存在不少问题。 与此同时&#xff0c;也有部分注塑模具企业通过ERP软件科学制定注塑生产排产&#xff0c;智能核算注塑物料…

算法通关村第十八关-黄金挑战回溯困难问题

大家好我是苏麟 , 今天带来几道回溯比较困难的题 . 回溯有很多比较难的问题&#xff0c;这里我们看两个&#xff0c;整体来说这两个只是处理略复杂&#xff0c;还不是最难的问题 . 大纲 IP问题 IP问题 描述 : 有效 IP 地址 正好由四个整数&#xff08;每个整数位于 0 到 255 …

SAP报错 Exception condition “CNTL_ERROR“ triggered

报错背景&#xff0c;我写了个function alv跳转屏幕&#xff0c;而且有修改事件的程序&#xff0c;但是在我反复跳转修改操作&#xff0c;点创建单据的时候&#xff0c;我的程序直接dump啦 报错如下&#xff1a; 通过查询SAPQ&A查询到对应的解决方案。 机器翻译&#xff…

processon使用及流程图和泳道图的绘画(登录界面流程图,门诊流程图绘制门诊泳道图,住院泳道图,OA会议泳道图),Axure自定义元件

目录 一.processon图形的使用场景介绍 二.流程图绘画 三.泳道图的绘画 1.绘制门诊流程图绘制门诊泳道图 2. 绘制住院泳道图​编辑 3.绘制药库采购入库流程图 4.绘制OA会议泳道图 四.Axure自定义元件 1.Axure载入元件库 一.processon图形的使用场景介绍 二.流程图绘画 示例&…

算法复习——6种排序方法的简单回顾

算法复习——6种排序方法的简单回顾 常见排序方法&#xff1a;冒泡排序、选择排序、插入排序、堆排序、归并排序、快速排序的简单回顾 冒泡排序 重复“从序列右边开始比较相邻两个数字的大小,再根据结果交换两个数字的位置” 在冒泡排序中&#xff0c;第 1 轮需要比较 n - 1…

DSP外部中断笔记

中断原理 三部分 注意 &#xff0c;外部中断使能&#xff0c;PIE使能&#xff0c;CPU中断使能 外部中断有7个&#xff0c;PIE有12组&#xff0c;一个组有8个中断复用。只有一个CPU中断可执行。 外部中断原理 1、外部中断概述 外部中断结构图 外部中断XINT1对应的是0到31GPIO…

[Geek Challenge 2023] klf_2详解

考点 SSTI、join拼接绕过 fuzz测试后发现过滤了很多关键字 我们先试试构造__class__ {% set podict(po1,p2)|join()%} //构造pop {% set alipsum|string|list|attr(po)(18)%} //构造_ {% set cl(a,a,dict(claa,ssa)|join,a,a)|join()%} //构造__class__ {% set …

fl studio2024版本内置破解补丁和汉化文件,可以完美激活软件

fl studio是一款功能强大的编曲软件&#xff0c;怎么破解呢&#xff1f;今天小编就为大家带来了详细的安装破解教程&#xff0c;需要的朋友一起看看吧&#xff01; fl studio20.8是一款功能强大的编曲软件&#xff0c;也就是众所熟知的水果软件。它可以编曲、剪辑、录音、混音…

来聊聊final关键字

final关键字作用 final关键字可用于修饰类、方法、变量&#xff0c;通过final修饰后可以使类不可被继承&#xff0c;方法不可被重写&#xff0c;变量不可被修改。 正是因为这样使得final关键字修饰的东西天生自带线程安全属性&#xff0c;而且也没有额外的开销。 final使用注…

特征驱动开发

FDD 方法来自于一个大型的新加坡银行项目。FDD 的创立者 Jeff De Luca 和 Peter Coad 分别是这个项目的项目经理和首席架构设计师。在 Jeff 和 Peter 接手项目时&#xff0c;客户已经经历了一次项目的失败&#xff0c;从用户到高层都对这个项目持怀疑的态度&#xff0c;项目组士…

C++中string类的使用

目录 一.string类 1.1为什么学习string类&#xff1f; 1.2.标准库中的string类 二.string对象的元素访问 2.1.1使用operator[]与at实现访问 2.1.2正向迭代器访问 2.1.3反向迭代器访问 2.1.4const正向迭代器&#xff08;不能修改&#xff09; 2.1.5const反向迭代器&#…

智能优化算法应用:基于纵横交叉算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于纵横交叉算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于纵横交叉算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.纵横交叉算法4.实验参数设定5.算法结果6.…

2024 年 8 款最佳数据恢复软件深度评测(Windows 和 Mac)

由于意外删除、格式化或损坏而立即丢失重要数据是一场噩梦。当您开始寻找 2024 年最好的数据恢复软件时&#xff0c;由于选项太多&#xff0c;您可能会不知所措。 2024 年 8 款最佳数据恢复软件深度评测 有些工具适用于 Windows&#xff0c;其他工具适用于 Mac&#xff0c;但并…

以太网协议与DNS

以太网协议 以太网协议DNS 以太网协议 以太网用于在计算机和其他网络设备之间传输数据,以太网既包含了数据链路层的内容,也包含了物理层的内容. 以太网数据报: 其中目的IP和源IP不是网络层的目的IP和源IP,而是mac地址.网络层的主要负责是整体的转发过程,数据链路层负责的是局…

【STM32】ADC模数转换器

1 ADC简介 ADC&#xff08;Analog-Digital Converter&#xff09;模拟-数字转换器 ADC可以将引脚上连续变化的模拟电压转换为内存中存储的数字变量&#xff0c;建立模拟电路到数字电路的桥梁 STM32是数字电路&#xff0c;只有高低电平&#xff0c;没有几V电压的概念&#xff…