【stomp实战】搭建一套websocket推送平台

前面几个小节我们已经学习了stomp协议,了解了stomp客户端的用法,并且搭建了一个小的后台demo,前端页面通过html页面与后端服务建立WebSocket连接。发送消息给后端服务。后端服务将消息内容原样送回。通过这个demo我们学习了前端stomp客户端的用法,同时也学到了如何通过spring-boot来搭建一个websocket平台。
本节我们将继续深入,搭建一套可用于生产的websocket平台。

基本介绍

websocket连接推送服务包含两个服务,websocket-connector和websocket-gateway。

在这里插入图片描述
架构如上图
websocket-connector

  • 作为和客户端建立websocket连接的服务,负责消息的接收和推送

websocket-gateway

  • 作为后台服务,提供http接口给其他微服务,其他微服务可以通过http接口发送消息给指定的用户

使用说明

通过下面的步骤来进行调试

  1. 分别启动项目websocket-connector和websocket-gateway
  2. 访问下面接口,获取某个用户的token
    下面示例是获取用户1001的token
curl -X POST -H  "Accept:*/*" -H  "Content-Type:application/json" -d "{\"userId\":\"1001\"}" "http://localhost:8081/api/ws/token/get"
  1. 打开websocket-connector调试页面http://localhost:8080/index.html
    将上一个接口获取到的token作为参数,与服务器建立连接
    在这里插入图片描述

  2. 通过页面的send按钮,发送一条消息给服务器,同时服务器会将此消息回复给前端页面。参考上图

  3. 通过websocket-gateway的接口,发送用户单播消息

curl -X POST -H  "Accept:*/*" -H  "Content-Type:application/json" -d "{\"appCode\":\"test2\",\"messageData\":{\"body\":\"single message\",\"headers\":{}},\"topic\":\"/user/topic/single/hello\",\"userIds\":[1001]}" "http://localhost:8081/api/ws/message/single/send"

在这里插入图片描述
在这里插入图片描述

前端页面可以收到该消息的推送
6.通过websocket-gateway的接口,发送广播消息

curl -X POST -H  "Accept:*/*" -H  "Content-Type:application/json" -d "{\"appCode\":\"test1\",\"messageData\":{\"body\":\"hello board message1\",\"headers\":{}},\"topic\":\"/topic/boardCast/hello\"}" "http://localhost:8081/api/ws/message/boardCast/send"

在这里插入图片描述
在这里插入图片描述

主要代码分析

前端代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>STOMP</title>
</head>
<body onload="disconnect()">
<h1 id="tip">消息发布订阅测试页</h1>
请输入token:<input type="text" id="token" placeholder=""> <br>
<button onclick="connect()" id="connect">connect</button>
<button onclick="disconnect()" id="disconnect">disconnect</button>

<p>输入消息: <span id="msg"></span></p>
<input type="text" id="content" placeholder=""> <br>
<button onclick="send()">send</button>

<ul id="ul">
    回答消息<p id="answer"></p>
    单播消息<p id="single"></p>
    广播消息<p id="board"></p>
</ul>
<script src="sockjs.min.js"></script>
<script src="stomp.min.js"></script>
<script>
    var stompClient = null;
    var endpoint = "/ws";

    //断开连接
    function disconnect() {
        if (stompClient != null) {
            stompClient.disconnect();
        }
        setConnect(false);
        console.log("disconnected");
    }
    //建立连接
    function connect() {
        var token = document.getElementById("token").value;
        if (token === '' || token === undefined) {
            alert("请输入token");
            return;
        }
        //连接请求头里面,设置好我们提前获取好的token
        var headers = {
            token: token
        };
        var url = endpoint;
        var socket = new SockJS(url);
        stompClient = Stomp.over(socket);
        //建立连接
        stompClient.connect(headers, function (msg) {
            setConnect(true);
            console.log("connected:" + msg);
            //订阅了三个topic
            //订阅用户消息topic1
            stompClient.subscribe("/user/topic/answer", function (response) {
                createElement("answer", response.body);
            });
            //订阅用户消息topic2
            stompClient.subscribe("/user/topic/single/hello", function (response) {
                createElement("single", response.body);
            });
            //订阅广播消息topic
            stompClient.subscribe("/topic/boardCast/hello", function (response) {
                createElement("board", response.body);
            });
        });
    }
    //主动发送消息给服务器,对应的后端topic为/app/echo
    function send() {
        var value = document.getElementById("content").value;
        var msg = {
            msgType: 1,
            content: value
        };
        stompClient.send("/app/echo", {}, JSON.stringify(msg));
    }

    function createElement(eleId, msg) {
        var singleP = document.getElementById(eleId);
        var p = document.createElement("p");
        p.style.wordWrap = "break-word";
        p.appendChild(document.createTextNode(msg));
        singleP.appendChild(p);
    }

    function setConnect(connected) {
        document.getElementById("connect").disabled = connected;
        document.getElementById("disconnect").disabled = !connected;
    }
</script>
</body>
</html>

websocket-connector端的代码

会话的鉴权及连接建立

@Slf4j
@Component
public class WebSocketInboundInterceptor implements ChannelInterceptor {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (accessor == null) {
            return message;
        }

        //建立连接
        if (Objects.equals(accessor.getCommand(), StompCommand.CONNECT)) {
            connect(message, accessor);
        }
        return message;
    }

    /**
     * 建立会话
     *
     * @param message
     * @param accessor
     */
    private void connect(Message<?> message, StompHeaderAccessor accessor) {
        String token = accessor.getFirstNativeHeader(WsConstants.TOKEN_HEADER);
        if (StringUtils.isEmpty(token)) {
            throw new MessageDeliveryException("token missing!");
        }
        String userId = TokenUtil.parseToken(token);
        if (StringUtils.isEmpty(userId)) {
            throw new MessageDeliveryException("userId missing!");
        }
        String simpleSessionId = (String) message.getHeaders().get(SimpMessageHeaderAccessor.SESSION_ID_HEADER);

        UserSession userSession = new UserSession();
        userSession.setSimpleSessionId(simpleSessionId);
        userSession.setUserId(userId);
        userSession.setCreateTime(LocalDateTime.now());
        //关联用户的会话。通过msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg); 此方法,可以发送给用户消息
        accessor.setUser(new UserSessionPrincipal(userSession));
    }
}

如何接收客户端发送的消息

@Slf4j
@Controller
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class StompController {
    private final SimpMessageSendingOperations msgOperations;
    private final SimpUserRegistry simpUserRegistry;

    /**
     * 回音消息,将用户发来的消息内容加上 Echo 前缀后推送回客户端
     */
    @MessageMapping("/echo")
    public void echo(Principal principal, Msg msg) {
        String username = principal.getName();
        msg.setContent("Echo: " + msg.getContent());
        msgOperations.convertAndSendToUser(username, "/topic/answer", msg);
        int userCount = simpUserRegistry.getUserCount();
        int sessionCount = simpUserRegistry.getUser(username).getSessions().size();
        log.info("当前本系统总在线人数: {}, 当前用户: {}, 该用户的客户端连接数: {}", userCount, username, sessionCount);
    }
}

消费rabbitMQ推送的单播和广播消息

@Component
@Slf4j
public class MessageReceiveConsumer {

    private final Gson gson;
    private final ReceivedMessageHandler receivedMessageHandler;

    public MessageReceiveConsumer(Gson gson, ReceivedMessageHandler receivedMessageHandler) {
        this.gson = gson;
        this.receivedMessageHandler = receivedMessageHandler;
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(),
            exchange = @Exchange(value = WsConstants.SINGLE_EXCHANGE, type = ExchangeTypes.FANOUT)))
    public void handleSingleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        String body = new String(message.getBody(), StandardCharsets.UTF_8);
        SingleMessage singleMessage = gson.fromJson(body, SingleMessage.class);
        receivedMessageHandler.handleSingleMessage(singleMessage);
        channel.basicAck(tag, false);
    }


    @RabbitListener(bindings = @QueueBinding(value = @Queue(),
            exchange = @Exchange(value = WsConstants.BOARD_CAST_EXCHANGE, type = ExchangeTypes.FANOUT)))
    public void handleBoardCastMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        String body = new String(message.getBody(), StandardCharsets.UTF_8);
        BoardCastMessage boardCastMessage = gson.fromJson(body, BoardCastMessage.class);
        receivedMessageHandler.handleBoardCastMessage(boardCastMessage);
        channel.basicAck(tag, false);
    }
}

建立了两个exchange分别来接收消息。这里 @QueueBinding(value = @Queue(),通过这种方式建立的队列,队列名是spring取的一个随机名称,如下图所示
在这里插入图片描述

调用客户端工具类,发送消息给客户端

@Slf4j
@Component
public class ReceivedMessageHandler {
    private final WsMessageClient wsMessageClient;

    public ReceivedMessageHandler(WsMessageClient wsMessageClient) {
        this.wsMessageClient = wsMessageClient;
    }

    public void handleSingleMessage(SingleMessage singleMessage) {
        wsMessageClient.sendToUsers(singleMessage.getUserIds(), singleMessage);
    }


    public void handleBoardCastMessage(BoardCastMessage boardCastMessage) {
        wsMessageClient.boardCast(boardCastMessage);
    }
}

websocket-gateway 申请token接口

通过该接口,生成用户的jwtToken,在客户端建立连接时需要此token,不然无法建立连接

public class TokenController {
    @PostMapping("/get")
    public String getToken(@RequestBody @Validated TokenRequest tokenRequest) {
        return TokenUtil.generateToken(tokenRequest.getUserId());
    }
}

websocket-gateway 发送消息的接口

websocket-gateway暴露发送消息的接口给业务服务

public class MessageController {
    private final MessageSendService messageSendService;

    public MessageController(MessageSendService messageSendService) {
        this.messageSendService = messageSendService;
    }

    @PostMapping("/single/send")
    public Boolean singleSend(@RequestBody SingleMessage singleMessage) {
        return messageSendService.singleSend(singleMessage);
    }

    @PostMapping("/boardCast/send")
    public Boolean boardCastSend(@RequestBody BoardCastMessage boardCastMessage) {
        return messageSendService.boardCastSend(boardCastMessage);
    }

}

通过该http接口,最终会调用rabbitmq,发送消息给websocket-connector服务

项目地址

更多项目代码直接看一下源码吧
https://gitee.com/syk1234/websocket-services

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/573531.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

BBEdit for Mac v15.0.3激活版 支持多种类型的代码编辑器

BBEdit包含了很多一流的功能&#xff0c;包括GREP图样匹配&#xff0c;搜索和替换多个文件&#xff08;即使未开启的远程服务器上的文件&#xff09;&#xff0c;项目定义的工具&#xff0c;功能导航和众多的源代码语言的语法着色&#xff0c;代码折叠&#xff0c;FTP和SFTP打开…

视频质量度量VQM算法详细介绍

视频质量评价 视频质量评价(Video Quality Assessment,VQA)是指通过主观、客观的方式对视频图像的内容、画质等,进行感知、衡量与评价。 ITU definations subjective assessment: the determination of the quality or impairment of programme-like pictures presented…

后端程序员利用 AI 给网站制作专业 favicon

看看你的 Chrome 浏览器顶部的标签页&#xff0c;每个标签页前面有一个小小的图标&#xff0c;这个就是 favicon&#xff0c;如果你将网页保存到收藏夹&#xff0c;前面也会是这个小图标。这个图标有时候就是网站的 Logo&#xff0c;有时候也不太一样。 上面截图中&#xff0c…

C++学习随笔(10)——string

本章我们来了解一下string类。 目录 1. string类是什么&#xff1f; 1.1 C语言中的字符串 1.2 string类本质 2. 标准库中的string类 2.1 string类 2.2 string类的常用接口说明 1. string类对象的常见构造 2. string类对象的容量操作 3. string类对象的访问及遍历操作…

static和extern关键字详解

目录 创作不易&#xff0c;如对您有帮助&#xff0c;还望一键三连&#xff0c;谢谢&#xff01;&#xff01;&#xff01; 回顾 1.作用域和声明周期 1.1作用域 1.2生命周期 2.static和extern 2.1extern 2.2static 2.2-1static修饰局部变量 2.2-2static修饰全局变量 创…

vue flvjs 播放视频

写在前面&#xff1a; 之前使用过vodiejs插件播放过mp4视频格式的视频&#xff1b; 此次需要使用flvjs插件播放rtsp视频格式的视频&#xff1b; 因为视频的数据格式不同&#xff0c;所以对应的插件不同。 思维导图&#xff1a; 参考链接&#xff1a;rtmp、rtsp、flv、m3u8、 …

手把手教会你做属于自己的网站《保姆级教程》

手把手教会你做属于自己的网站《保姆级教程》 前言开始教程特别说明下期内容预报 前言 什么是个人网站&#xff1f; 个人网站是指因特网上一块固定的面向全世界发布消息的地方&#xff0c;通常由域名&#xff08;也就是网站地址&#xff09;、程序和网站空间构成&#xff0c;并…

麒麟 Kylin V10 一键安装 Oracle 11GR2 单机 ASM(231017)

前言 Oracle 一键安装脚本&#xff0c;演示麒麟 Kylin V10 一键安装 Oracle 11GR2 单机 ASM&#xff08;231017&#xff09;过程&#xff08;全程无需人工干预&#xff09;&#xff1a;&#xff08;脚本包括 ORALCE PSU/OJVM 等补丁自动安装&#xff09; ⭐️ 脚本下载地址&a…

(八)小案例银行家应用程序-排序-数组排序

排序一直有很多的算法&#xff0c;今天我们仅仅来说JavaScript内置的排序方法 ● 字符串 const owners [Jonas, Zach, Adam, Martha]; console.log(owners.sort()); console.log(owners);但是注意&#xff0c;这个方法会改变原有的数组&#xff1b; ● 我们在试试数字 cons…

用java实现PDF的下载

1.下载PDF模版 2.导入依赖 <dependency><groupId>com.itextpdf</groupId><artifactId>itext7-core</artifactId><version>7.2.5</version><type>pom</type></dependency> 3.完整代码 package com.by.controller…

JAVASE8中基本数据类型

本篇文章基于有过一部分的C语言基础的&#xff0c;还望大家理解 在进入到学习之前我们必须要清楚的是在JAVASE中数据类型与C语言中的数据类型基本上是相同的,接下来我们先来对8中数据类型进行简要介绍&#xff0c;他们分别是&#xff1a; 如果大家之前了解过C语言那么对于基本数…

常见的工业路由器访问问题

A&#xff1a;工业路由器已经设置了pptp怎么访问路由下面的电脑 1. 确认PPTP VPN设置&#xff1a;首先&#xff0c;确保PPTP VPN服务器在工业路由器上已正确设置&#xff0c;并且处于活动状态。这包括确保VPN服务器的IP地址、端口、用户名和密码等设置正确无误。 2. 连接到VP…

Apple公司面试题之Apple-Orange

1. 引言 你幻想过在Apple公司工作吗&#xff1f; 如果心动过&#xff0c;那这个逻辑推理面试题就是给你准备的&#xff01;这是一道有趣的面试题&#xff0c;如下所示&#xff1a; 看到这里的同学&#xff0c;我建议你暂停文章&#xff0c;拿起笔和纸&#xff0c;试一试。准…

KBL410-ASEMI新能源专用整流桥KBL410

编辑&#xff1a;ll KBL410-ASEMI新能源专用整流桥KBL410 型号&#xff1a;KBL410 品牌&#xff1a;ASEMI 封装&#xff1a;KBL-4 最大重复峰值反向电压&#xff1a;1000V 最大正向平均整流电流(Vdss)&#xff1a;4A 功率(Pd)&#xff1a;小功率 芯片个数&#xff1a;4…

Linux实现文件共享

#nfs-utils、rpcbind 软件包来提供 NFS 共享服务 #客户端创建共享文件夹&#xff1a; nmcli c reload nmcli c up ens160 systemctl stop firewalld systemctl disable firewalld rpm -q nfs-utils rpcbind #查看是否安装 systemctl enable rpcbind systemctl enable nfs…

Skill Check: Fundamentals of Large Language Models

Skill Check: Fundamentals of Large Language Models 完结&#xff01;

CUDA的开发框架

CUDA的开发框架主要提供了一系列工具和库&#xff0c;使得开发者可以充分利用NVIDIA GPU进行高效的并行计算。以下是CUDA开发框架的一些关键组成部分。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 1.CUDA核心库&#xff1a;这些是构…

带你走进不一样的策略模式

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 带你走进不一样的策略模式 前言策略模式简介概念解释 策略模式的结构策略模式优点项目实践之bean策略构思业务策略实现策略接口实现策略上下文业务实现 前言 在编程的世界里&#xff0c;每一次按键都…

【办公类-26-02】20240423 UIBOT学分自动评价(自动登录、评价和退出,全自动)

背景需求&#xff1a; 我想用UIBOT自动模拟鼠标&#xff0c;登录每位老师的账户&#xff0c;进入评价区域&#xff0c;自动选择7次“满意”&#xff0c;输入1次“无”&#xff0c;然后提交。 C Dim objExcelWorkBook,arrayRet,iRet,temp,iPID,hWeb,dictRet,XobjExcelWorkBook …

《QT实用小工具·四十一》无边框窗口

1、概述 源码放在文章末尾 该项目实现了无边框窗口效果&#xff0c;项目demo如下所示&#xff1a; 项目代码如下所示&#xff1a; #include "framelesswindow.h" #include <QGuiApplication> #include <QScreen>#ifdef Q_OS_WIN #include <window…