spring boot 实现直播聊天室

spring boot 实现直播聊天室

技术方案:

  • spring boot
  • websocket
  • rabbitmq

使用 rabbitmq 提高系统吞吐量

引入依赖

<dependencies>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.42</version>
    </dependency>
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.8.23</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
    </dependency>
</dependencies>

websocket 实现

MHttpSessionHandshakeInterceptor

参数拦截

/**
 * @Date: 2023/12/8 14:52
 * websocket 握手拦截
 * 1. 参数拦截(header或者 url 参数)
 * 2. token 校验
 */
@Slf4j
public class MHttpSessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                   WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        if (request instanceof ServletServerHttpRequest servletRequest){
            //ws://127.0.0.1:8080/group/2?username=xxxx
            HttpServletRequest httpServletRequest = servletRequest.getServletRequest();
            String requestURI = httpServletRequest.getRequestURI();
            String groupId = requestURI.substring(requestURI.lastIndexOf("/") + 1);
            String username = httpServletRequest.getParameter("username");
            log.info(">>>>>>> beforeHandshake groupId: {} - username: {}", groupId, username);
            attributes.put("username", username);
            //解析占位符
            attributes.put("groupId", groupId);
        }
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }


}

GroupWebSocketHandler

消息发送

@Slf4j
public class GroupWebSocketHandler implements WebSocketHandler {

    //Map<room,List<map<session,username>>>
    private ConcurrentHashMap<String, Queue<WebSocketSession>> sessionMap = new ConcurrentHashMap<>();

    @Autowired
    private MessageClient messagingClient;


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String username = (String) session.getAttributes().get("username");
        String groupId = (String) session.getAttributes().get("groupId");
        log.info("{} 用户上线房间 {}", username, groupId);
        TomcatWsSession wsSession = new TomcatWsSession(session.getId(),groupId, username, session);
        SessionRegistry.getInstance().addSession(wsSession);
    }

    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        String groupId = (String) session.getAttributes().get("groupId");
        String username = (String) session.getAttributes().get("username");
        if (message instanceof PingMessage){
            log.info("PING");
            return;
        }
        else if (message instanceof TextMessage textMessage) {
            MessageDto messageDto = new MessageDto();
            messageDto.setSessionId(session.getId());
            messageDto.setGroup(groupId);
            messageDto.setFromUser(username);
            messageDto.setContent(new String(textMessage.getPayload()));
            messagingClient.sendMessage(messageDto);
        }
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        String username = (String) session.getAttributes().get("username");
        String groupId = (String) session.getAttributes().get("groupId");
        log.info(">>> handleTransportError {} 用户上线房间 {}", username, groupId);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        String username = (String) session.getAttributes().get("username");
        String groupId = (String) session.getAttributes().get("groupId");
        log.info("{} 用户下线房间 {}", username, groupId);
        TomcatWsSession wsSession = new TomcatWsSession(session.getId(),groupId, username, session);
        SessionRegistry.getInstance().removeSession(wsSession);
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }


}
WebSocketConfig

websocket 配置

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {


    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/group/{groupId}")
            .addInterceptors(new MHttpSessionHandshakeInterceptor()).setAllowedOrigins("*");
    }

    @Bean
    public GroupWebSocketHandler myHandler() {
        return new GroupWebSocketHandler();
    }


    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192);  //文本消息最大缓存
        container.setMaxBinaryMessageBufferSize(8192);  //二进制消息大战缓存
        container.setMaxSessionIdleTimeout(3L * 60 * 1000); // 最大闲置时间,3分钟没动自动关闭连接
        container.setAsyncSendTimeout(10L * 1000); //异步发送超时时间
        return container;
    }

}

session 管理

将 websocketSession进行抽像,websocketsession可以由不同容器实现

WsSession
public interface  WsSession {

    /**
     * session 组
     * @return
     */
    String group();

    /**
     * session Id
     * @return
     */
    String getId();

    /**
     * 用户名或其他唯一标识
     * @return
     */
    String identity();

    /**
     * 发送文本消息
     * @param messageDto
     */

    void sendTextMessage(MessageDto messageDto);
}

public abstract class AbstractWsSession implements WsSession {

    private String id;
    private String group;

    private String identity;

    public AbstractWsSession(String id, String group, String identity) {
        this.id = id;
        this.group = group;
        this.identity = identity;
    }

    @Override
    public String group() {
        return this.group;
    }

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public String identity() {
        return this.identity;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        AbstractWsSession that = (AbstractWsSession) o;
        //简单比较 sessionId
        return Objects.equals(id, that.id);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, group, identity);
    }
}

TomcatWsSession

默认session实现

@Slf4j
public class TomcatWsSession extends AbstractWsSession {

    private WebSocketSession webSocketSession;

    public TomcatWsSession(String id, String group, String identity, WebSocketSession webSocketSession) {
        super(id, group, identity);
        this.webSocketSession = webSocketSession;
    }

    @Override
    public void sendTextMessage(MessageDto messageDto) {
        String content = messageDto.getFromUser() + " say: " + messageDto.getContent();
        try {
            webSocketSession.sendMessage(new TextMessage(content));
        } catch (IOException e) {
            log.error("TomcatWsSession sendTextMessage error: identity:{}-group:{}-msg: {}",
                    super.identity(), super.group(), JSON.toJSONString(messageDto));
        }

    }
}

SessionRegistry

websocket session管理

public class SessionRegistry {

    private static SessionRegistry instance;

    private SessionRegistry() {

    }

    public static SessionRegistry getInstance() {
        if (instance == null) {
            synchronized (SessionRegistry.class) {
                if (instance == null) {
                    instance = new SessionRegistry();
                }
            }
        }
        return instance;
    }


    //Map<group,List<Session>>
    private ConcurrentHashMap<String, Queue<WsSession>> sessionMap = new ConcurrentHashMap<>();


    /**
     * 添加 session
     * @param wsSession
     */
    public void addSession(WsSession wsSession) {
        sessionMap.computeIfAbsent(wsSession.group(),g -> new ConcurrentLinkedDeque<>()).add(wsSession);
    }

    /**
     * 移除 session
     * @param wsSession
     */
    public void removeSession(WsSession wsSession) {
        Queue<WsSession> wsSessions = sessionMap.get(wsSession.group());
        if (!CollectionUtils.isEmpty(wsSessions)){
            //重写 WsSession equals 和 hashCode 方法,不然会移除失败
            wsSessions.remove(wsSession);
            if (CollectionUtils.isEmpty(wsSessions)){
                sessionMap.remove(wsSession.group());
            }
        }
    }

    /**
     * 发送消息
     * @param messageDto
     */
    public void sendGroupTextMessage(MessageDto messageDto){
        Queue<WsSession> wsSessions = sessionMap.get(messageDto.getGroup());
        if (!CollectionUtils.isEmpty(wsSessions)){
            for (WsSession wsSession : wsSessions) {
                if (wsSession.getId().equals(messageDto.getSessionId())){
                    continue;
                }
                wsSession.sendTextMessage(messageDto);
            }
        }
    }


    /**
     * session 在线统计
     * @param groupId
     * @return
     */
    public Integer getSessionCount(String groupId) {
        if (StrUtil.isNotBlank(groupId)) {
            return sessionMap.get(groupId).size();
        }
        return sessionMap.values().stream().map(l -> l.size()).collect(Collectors.summingInt(a -> a));
    }
}

消息队列

这里使用 rabbitmq

MessageDto

消息体

@Data
public class MessageDto {

    /**
     * sessionId
     */
    private String sessionId;
    /**
     * 组
     */
    private String group;
    /**
     * 消息发送者
     */
    private String fromUser;
    /**
     * 发送内容
     */
    private String content;
}
MessageClient
@Component
@Slf4j
public class MessageClient {

    private String routeKey = "bws.key";
    private String exchange = "bws.exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void sendMessage(MessageDto messageDto) {
        try {
            rabbitTemplate.convertAndSend(exchange, routeKey, JSON.toJSONString(messageDto));
        } catch (AmqpException e) {
            log.error("MessageClient.sendMessage: {}", JSON.toJSONString(messageDto), e);
        }
    }
}
MessageListener
@Slf4j
@Component
public class MessageListener {

    @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "bws.exchange", type = "topic"), value =
    @Queue(value = "bws.queue", durable = "true"), key = "bws.key"))
    public void onMessage(Message message) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("<<<<<<<<< MessageListener.onMessage:{}", messageStr);
            MessageDto messageDto = JSON.parseObject(messageStr, MessageDto.class);
            if (!Objects.isNull(messageDto)) {
                SessionRegistry.getInstance().sendGroupTextMessage(messageDto);
            } else {
                log.info("<<<<<<<<< MessageListener.onMessage is null:{}", messageStr);
            }
        } catch (Exception e) {
            log.error("######### MessageListener.onMessage: {}-{}", messageStr, e);
        }
    }

}

application.properties配置


spring.rabbitmq.host=192.168.x.x
spring.rabbitmq.password=guest
spring.rabbitmq.port=27067
spring.rabbitmq.username=guest
spring.rabbitmq.virtual-host=my-cluster

测试

websoket链接: ws://127.0.0.1:8080/group/2?username=xxx, websocket客户端测试地址

在这里插入图片描述

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/249535.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

广州华锐互动:汽车电子线束加工VR仿真培训与实际生产场景相结合,提高培训效果

随着科技的不断发展&#xff0c;虚拟现实&#xff08;VR&#xff09;技术已经逐渐渗透到各个领域&#xff0c;为企业和个人带来了前所未有的便利。在汽车制造行业中&#xff0c;线束加工作为一项关键的生产工艺&#xff0c;其质量直接影响到汽车的性能和安全。因此&#xff0c;…

【UE】在蓝图中修改材质实例的参数的两种方式

目录 方式一、通过“在材质上设置标量/向量参数值”节点实现 方式二、通过“设置标量/向量参数值”节点实现 方式一、通过“在材质上设置标量/向量参数值”节点实现 1. 在材质中设置了两个参数 2. 创建材质实例 3. 创建一个蓝图&#xff0c;对静态网格体赋予材质实例 在事件…

基于虚拟机下的win7系统安装简记

文章目录 安装系统激活win7提示系统保留分区未分配驱动器问题使用win7 Active激活系统根据dns分配的ip地址将网络改为固定ip&#xff0c;然后关闭防火墙&#xff0c;即可完成虚拟机与宿主机互通 安装系统 在虚拟机中找到自己下载win7镜像文件&#xff0c;配置完成后一路next即…

MySQL数据库卸载-Windows

目录 1. 停止MySQL服务 2. 卸载MySQL相关组件 3. 删除MySQL安装目录 4. 删除MySQL数据目录 5. 再次打开服务&#xff0c;查看是否有MySQL卸载残留 1. 停止MySQL服务 winR 打开运行&#xff0c;输入 services.msc 点击 "确定" 调出系统服务。 2. 卸载MySQL相关组…

虚幻学习笔记13—C++静态和动态加载

一、前言 我们在蓝图中可以很方便的添加各种需要的组件&#xff0c;那么在C代码中要如何实现呢。在代码中分静态和动态加载&#xff0c;而无论静态和动态&#xff0c;加载的内容有资源和资源类&#xff0c;资源类通常为带资源的蓝图类。 二、实现 在实现静态或动态加载时&…

什么是回调函数

需求 A&#xff0c;B两个小组开发一个功能。B小组开发制作油条模块:make_youtiao。A小组需要调用B小组开发的模块&#xff0c;然后执行后续的操作&#xff1a;sell()如下图&#xff1a; 上面的方式A小组必须等待B小组开发的模块make_youtiao执行完成后才能执行sell()。 上图代…

力扣 | 98. 验证二叉搜索树

98. 验证二叉搜索树 中序遍历 (边遍历边验证顺序性) private TreeNode prev null;private boolean isBST true;public boolean isValidBST(TreeNode root) {inorder(root);return isBST;}private void inorder(TreeNode node) {if (node null) return;inorder(node.left);…

什么是第一方数据,如何使用它?

多年来&#xff0c;第一方数据一直是营销行业的话题。 随着用户数据隐私法律法规的不断收紧&#xff0c;营销人员必须接受一个几乎没有数据 cookie 的世界。 我们必须在如何合法和合乎道德地获取客户信息方面更具创造性。 不确定什么是第一方数据&#xff1f;或者不太确定从…

关于嵌入式开发的一些信息汇总:C标准、芯片架构、编译器、MISRA-C

关于嵌入式开发的一些信息汇总&#xff1a;C标准、芯片架构、编译器、MISRA-C 关于C标准芯片架构是什么&#xff1f;架构对芯片有什么作用&#xff1f;arm架构X86架构mips架构小结 编译器LLVM是什么&#xff1f;前端在干什么&#xff1f;后端在干什么&#xff1f; MISRA C的诞生…

基于JSP+Servlet+Mysql的建设工程监管信息

基于JSPServletMysql的建设工程监管信息 一、系统介绍二、功能展示1.企业信息列表2.录入项目信息3.项目信息列表 四、其它1.其他系统实现五.获取源码 一、系统介绍 项目名称&#xff1a;基于JSPServlet的建设工程监管信息 项目架构&#xff1a;B/S架构 开发语言&#xff1a;…

持续集成交付CICD:Jenkins使用CD流水线下载Nexus制品

目录 一、实验 1.Jenkins使用CD流水线下载Nexus制品 一、实验 1.Jenkins使用CD流水线下载Nexus制品 &#xff08;1&#xff09;Jenkins新建CD流水线 &#xff08;2&#xff09;新建视图 &#xff08;3&#xff09;查看视图 &#xff08;4&#xff09;添加字符参数 &#xf…

「Verilog学习笔记」 Johnson Counter

专栏前言 本专栏的内容主要是记录本人学习Verilog过程中的一些知识点&#xff0c;刷题网站用的是牛客网 timescale 1ns/1nsmodule JC_counter(input clk ,input rst_n,output reg [3:0] Q );always (posedge clk or negedge rst_n) begin…

Github详细使用教程

1. 什么是 Github? github是一个基于git的代码托管平台&#xff0c;付费用户可以建私人仓库&#xff0c;我们一般的免费用户只能使用公共仓库&#xff0c;也就是代码要公开。 Github 由Chris Wanstrath, PJ Hyett 与Tom Preston-Werner三位开发者在2008年4月创办。迄今拥有5…

Mybatis-plus介绍与入门

前言 MyBatis-Plus是在MyBatis基础上的一个增强工具库&#xff0c;旨在简化开发者的工作&#xff0c;提高开发效率&#xff0c;同时保留MyBatis的灵活性。使用 MyBatis-Plus 可以减少重复性的代码&#xff0c;简化常见的数据库操作 官方学习文档&#xff1a;MyBatis-Plus (bao…

深度学习 Day16——P5运动鞋识别

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 文章目录 前言1 我的环境2 代码实现与执行结果2.1 前期准备2.1.1 引入库2.1.2 设置GPU&#xff08;如果设备上支持GPU就使用GPU,否则使用C…

【k8s】使用Finalizers控制k8s资源删除

文章目录 词汇表基本删除操作Finalizers是什么&#xff1f;Owner References又是什么&#xff1f;强制删除命名空间参考 你有没有在使用k8s过程中遇到过这种情况: 通过kubectl delete指令删除一些资源时&#xff0c;一直处于Terminating状态。 这是为什么呢&#xff1f; 本文将…

『OPEN3D』1.5.2 动手实现点云栅格/体素最近邻

本专栏地址: https://blog.csdn.net/qq_41366026/category_12186023.html?spm=1001.2014.3001.5482 NEARBY6实现的voxel可视化 一种NEARBY14实现的可视化voxel

lwIP 细节之五:accept 回调函数是何时调用的

使用 lwIP 协议栈进行 TCP 裸机编程&#xff0c;其本质就是编写协议栈指定的各种回调函数。将你的应用逻辑封装成函数&#xff0c;注册到协议栈&#xff0c;在适当的时候&#xff0c;由协议栈自动调用&#xff0c;所以称为回调。 注&#xff1a;除非特别说明&#xff0c;以下内…

多维时序 | Matlab实现GA-LSTM-Attention遗传算法优化长短期记忆神经网络融合注意力机制多变量时间序列预测

多维时序 | MATLAB实现BWO-CNN-BiGRU-Multihead-Attention多头注意力机制多变量时间序列预测 目录 多维时序 | MATLAB实现BWO-CNN-BiGRU-Multihead-Attention多头注意力机制多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 多维时序 | Matlab实…

C#上位机与欧姆龙PLC的通信01----项目背景

最近&#xff0c;【西门庆】作为项目经理负责一个70万的北京项目&#xff0c;需要在工控系统集成软件开发中和欧 姆龙PLC对接&#xff0c;考虑项目现场情况优先想到了采用FinsTCP通讯协议&#xff0c;接下来就是记录如何一步步实现这些通讯过程的&#xff0c;希望给电气工程师&…