Websocket在Java中的实践——整合Rabbitmq和STOMP

大纲

  • Rabbitmq
    • 开启STOMP支持
  • 服务端
    • 依赖
    • 参数
    • 参数映射类
    • 配置类
    • 逻辑处理类
  • 测试
    • 测试页面
    • Controller
    • 测试案例

在《Websocket在Java中的实践——STOMP通信的最小Demo》一文中,我们使用enableSimpleBroker启用一个内置的内存级消息代理。本文我们将使用Rabbitmq作为消息代理,这样我们的服务就可以变成分布式部署。

Rabbitmq

开启STOMP支持

在Rabbitmq所在的机器上执行下面的命令:

sudo -H -u rabbitmq bash -c "/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_stomp"

在这里插入图片描述
然后启动Rabbitmq

sudo service rabbitmq-server start

服务端

依赖

spring-boot-starter-websocket用于Websocket服务。
spring-boot-starter-amqp和spring-rabbit-stream都是用于Rabbitmq操作。
reactor-netty用于Broker。

<dependency>
   <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-stream</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor.netty</groupId>
    <artifactId>reactor-netty</artifactId>
    <version>1.1.20</version>
</dependency>

参数

src/main/resources/application.properties
需要注意的是,rabbitmq_stomp启动后会开启61613端口。

spring.rabbitmq.host=172.30.254.255
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=fangliang
spring.rabbitmq.stomp.port=61613

在这里插入图片描述
还有一点需要注意,很多文章上说使用guest用户登录。但是guest用户只能在Rabbitmq所在的机器上使用,如果跨机器使用会报下列错误。而且这和是否设置guest为全域无关。所以我们使用admin账户。

Received ERROR {message=[Bad CONNECT], content-type=[text/plain], version=[1.0,1.1,1.2], content-length=[26]} session=system text/plain payload=non-loopback access denied

spring.rabbitmq.stomp.port是一个自定义参数,它只是供Broker连接Rabbitmq使用。
spring.rabbitmq.port在当前本文例子中没有使用。

参数映射类

这个类主要是映射上述参数,方便后续使用。
src/main/java/com/nyctlc/stomprbmq/component/RabbitMQProperties.java

package com.nyctlc.stomprbmq.component;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQProperties {
    @Value("${spring.rabbitmq.password}")
    private String rabbitmqPassword;

    public String getRabbitmqPassword() {
        return rabbitmqPassword;
    }

    @Value("${spring.rabbitmq.username}")
    private String rabbitmqUsername;

    public String getRabbitmqUsername() {
        return rabbitmqUsername;
    }

    @Value("${spring.rabbitmq.host}")
    private String rabbitmqHost;

    public String getRabbitmqHost() {
        return rabbitmqHost;
    }

    @Value("${spring.rabbitmq.port}")
    private String rabbitmqPort;

    public String getRabbitmqPort() {
        return rabbitmqPort;
    }

    @Value("${spring.rabbitmq.stomp.port}")
    private String rabbitmqStompPort;

    public String getRabbitmqStompPort() {
        return rabbitmqStompPort;
    }
}

配置类

/handshake是STOMP和Websocket建立握手的接口。
enableStompBrokerRelay(“/topic”)会订阅Rabbitmq默认的交换器amq.topic的绑定关系中定义的队列。(所以我们看到很多文章订阅的前缀使用的是“topic”,而不用其他字段,这是有渊源的)
在这里插入图片描述
在这里插入图片描述

setRelayPort方法传递的是Rabbitmq的STOMP端口,即61613。
setClientLogin、setClientPasscode、setSystemLogin和setSystemPasscode都要设置为admin及其密码,否则会报错。

src/main/java/com/nyctlc/stomprbmq/config/WebSocketConfig.java

package com.nyctlc.stomprbmq.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

import com.nyctlc.stomprbmq.component.RabbitMQProperties;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
    @Autowired
    private RabbitMQProperties rabbitMQProperties;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/handshake");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/send");
        registry.enableStompBrokerRelay("/topic")
                .setRelayHost(rabbitMQProperties.getRabbitmqHost())
                .setRelayPort(Integer.parseInt(rabbitMQProperties.getRabbitmqStompPort()))
                .setClientLogin(rabbitMQProperties.getRabbitmqUsername())
                .setClientPasscode(rabbitMQProperties.getRabbitmqPassword())
                .setSystemLogin(rabbitMQProperties.getRabbitmqUsername())
                .setSystemPasscode(rabbitMQProperties.getRabbitmqPassword());
    }
}

逻辑处理类

这个类的handle方法会接受/send/msg-from-user端点发来的消息,然后转发给Rabbitmq的amp.topic交换器下msg-to-user路由键对应的队列。上述代码创建的Broker会持续监听这个队列,如果收到消息,则发送给客户端。

src/main/java/com/nyctlc/stomprbmq/controller/WebSocketController.java

package com.nyctlc.stomprbmq.controller;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;

@Controller
public class WebSocketController {
    @MessageMapping("/msg-from-user")
    @SendTo("/topic/msg-to-user")
    public String handle(String msg) {
        System.out.println("Received message: " + msg);
        return msg;
    }
}

测试

测试页面

src/main/resources/static/index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>STOMP over WebSocket Example with StompJs.Client</title>
    <script src="https://cdn.jsdelivr.net/npm/@stomp/stompjs"></script>
</head>
<body>
    <h2>STOMP over WebSocket Example with StompJs.Client</h2>
    <button id="connectButton">Connect</button>
    <form id="messageForm">
        <input type="text" id="messageInput" placeholder="Type a message..."/>
        <button type="submit">Send</button>
    </form>
    <div id="messages"></div>

    <script>
        var client = null;

        function connect() {
            client = new StompJs.Client({
                brokerURL: 'ws://localhost:8080/handshake', // WebSocket服务端点
                connectHeaders: {},
                debug: function (str) {
                    console.log(str);
                },
                reconnectDelay: 5000,
                heartbeatIncoming: 4000,
                heartbeatOutgoing: 4000,
            });

            client.onConnect = function(frame) {
                console.log('Connected: ' + frame);
                client.subscribe('/topic/msg-to-user', function(message) { // 订阅端点
                    showMessageOutput(JSON.parse(message.body).content);
                });
            };

            client.onStompError = function(frame) {
                console.error('Broker reported error: ' + frame.headers['message']);
                console.error('Additional details: ' + frame.body);
            };

            client.activate();
        }

        function sendMessage(event) {
            event.preventDefault(); // 阻止表单默认提交行为
            var messageContent = document.getElementById('messageInput').value.trim();
            if(messageContent && client && client.connected) {
                var chatMessage = { content: messageContent };
                client.publish({destination: "/send/msg-from-user", body: JSON.stringify(chatMessage)}); // 发送端点
                document.getElementById('messageInput').value = '';
            }
        }

        function showMessageOutput(message) {
            var messagesDiv = document.getElementById('messages');
            var messageElement = document.createElement('div');
            messageElement.appendChild(document.createTextNode(message));
            messagesDiv.appendChild(messageElement);
        }

        document.getElementById('messageForm').addEventListener('submit', sendMessage);

        document.getElementById('connectButton').addEventListener('click', connect);
    </script>
</body>
</html>

Controller

这个Controller主要是为了让上述HTML可以通过URL访问。
src/main/java/com/nyctlc/stomprbmq/controller/FileController.java

package com.nyctlc.stomprbmq.controller;

import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;

@Controller
public class FileController {

    @GetMapping("/")
    public String index() {
        return "index"; // 返回index.html
    }

    @RequestMapping(value = "/favicon.ico")
    @ResponseStatus(value = HttpStatus.NO_CONTENT)
    public void favicon() {
        // No operation. Just to avoid 404 error for favicon.ico
    }
}

测试案例

在这里插入图片描述
在这里插入图片描述
我们在管理后台直接给这个队列发送消息,前端页面也会收到。比如我们发送{“content”:“message from management”}
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/784694.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

计算机类期刊横纵向对比

备注&#xff1a;综合影响因子更具针对性&#xff0c;将科技类期刊和人文社科期刊的影响力考虑&#xff0c;更加聚焦于某一特定科学领域&#xff1b;复合影响因子是基于期刊、学位论文、以及会议论文等多个类型的文献作为计算基础。 两者都是通过前两年发表的可被引文献在统计年…

pandas数据分析(8)

描述性统计量和数据聚合 描述性统计量 描述性统计量通过量化数据来概括数据集。DataFrame和Series可以通过sum、mean、count等方法来获取各种描述性统计量。在默认情况下会按照axis0返回一个Series&#xff0c;也就是说会得到一个有关列的统计量&#xff1a; 如果要计算行的统…

鼠标宏怎么设置?6款鼠标自动点击器强推,游戏玩家专用!(2024全)

随着电子游戏和日常应用的不断发展&#xff0c;我们经常会遇到一些重复性的任务或操作。而在这种情况下&#xff0c;鼠标宏以其自动化的特点成为了许多玩家和使用者的利器之一。如果你正在寻找如何设置鼠标宏来简化操作并提高效率&#xff0c;那么你来对地方了。在本文中&#…

理解算法复杂度:空间复杂度详解

引言 在计算机科学中&#xff0c;算法复杂度是衡量算法效率的重要指标。时间复杂度和空间复杂度是算法复杂度的两个主要方面。在这篇博客中&#xff0c;我们将深入探讨空间复杂度&#xff0c;了解其定义、常见类型以及如何进行分析。空间复杂度是衡量算法在执行过程中所需内存…

利用canvas压缩图片

前情提要 页面打印导出pdf文件的时候&#xff0c;图片大小会影响pdf文件大小。 为了减小pdf文件大小&#xff0c;需要将图片压缩一下。在只有图片地址的情况下&#xff0c;将图片压缩后显示&#xff0c;一开始用的browser-image-compression插件&#xff0c;这是js压缩&#x…

硬件产品设计过程:结构及硬件设计

目录 简介 设计管理问题 简介 之前也多次谈到硬件产品的设计分为多个过程,每个过程所涉及的内容也是完全不同的。 比如说: 后台、应用app层的开发;电子硬件设计;结构、ID设计;营销侧;生产管理侧;供应链管理侧等等。接下来就谈谈最近公司开发上的一些问题。 以往由于公…

docker nginx mysql redis

启动没有数据卷的nginx docker run -d -p 86:80 --name my-nginx nginx把/etc/nginx中的配置复制到宿主机 docker cp my-nginx:/etc/nginx /home/nginxlkl把/html 中的文件复制到宿主机 docker cp my-nginx:/etc/nginx /home/nginxlkl删除当前镜像 docker rm -f my-nginx重新起…

理解算法复杂度:时间复杂度详解

引言 在计算机科学中&#xff0c;算法复杂度是衡量算法效率的重要指标。时间复杂度和空间复杂度是算法复杂度的两个主要方面。在这篇博客中&#xff0c;我们将深入探讨时间复杂度&#xff0c;了解其定义、常见类型以及如何进行分析。 什么是时间复杂度&#xff1f; 时间复杂度…

【多语言独立站】什么是跨境电商独立站?||如何完成完善电商系统搭建

随着国际贸易的发展和互联网技术的不断提升&#xff0c;在跨境电商业务中&#xff0c;独立站是一个非常重要的组成部分。我们经常会听到的词语就是&#xff1a;「跨境电商独立站」、「外贸独立站」、「跨境独立站」、「电商独立站」等等。因此&#xff0c;我们可以发现独立站和…

【web前端HTML+CSS+JS】--- JS学习笔记03

一、JS介绍 可以在前端页面上进行逻辑处理&#xff0c;来解决表单的验证等问题&#xff0c;提升效率&#xff0c;直接在前端提示问题&#xff0c;减少服务器压力 应用1&#xff1a;可以做静态验证和动态验证&#xff08;进行异步请求&#xff09; 应用2&#xff1a;可以解析后…

Splunk Enterprise 任意文件读取漏洞(CVE-2024-36991)

文章目录 前言漏洞描述影响版本漏洞复现POC批量检测-nuclei脚本 修复建议 前言 Splunk Enterprise 是一款强大的机器数据管理和分析平台&#xff0c;能够实时收集、索引、搜索、分析和可视化来自各种数据源的日志和数据&#xff0c;帮助企业提升运营效率、增强安全性和优化业务…

【可视化还能免费做?!】数据安全不用愁,快来用这款免费可视化工具做智慧港口管理平台

在智慧港口的建设中&#xff0c;实现港口的统一调度是一项关键任务。山海鲸可视化&#xff0c;这款免费可视化工具&#xff0c;通过其卓越的功能和特色&#xff0c;为智慧港口的建设提供了强大的支持。从智慧港口的需求出发&#xff0c;结合船舶调度和货物转运的需求&#xff0…

「API取数」FDL获取金蝶云星空的单据数据

很多企业的ERP系统都在用金蝶云星空&#xff0c;金蝶云星空API是IT人员获取数据的重要来源&#xff0c; 常常用来生成定制化报表&#xff0c;进行数据分析&#xff0c;或是将金蝶云的数据与OA系统、BI工具集成。 通常情况下&#xff0c;IT人员需要使用Python、Java等语言编写脚…

Failed to get D-Bus connection: Operation not permitted

最近使用wsl安装了centOS7镜像&#xff0c;在系统中安装了docker服务&#xff0c;但是在执行systemctl start docker的时候遇到了&#xff1a;Failed to get D-Bus connection: Operation not permitted问题&#xff0c;查阅了很多资料都没有效果&#xff0c;最终找到了一种解决…

理解JS与多线程

理解JS与多线程 什么是四核四线程&#xff1f; 一个CPU有几个核它就可以跑多少个线程&#xff0c;四核四线程就说明这个CPU同一时间最多能够运行四个线程&#xff0c;四核八线程是使用了超线程技术&#xff0c;使得单个核像有两个核一样&#xff0c;速度比四核四线程有多提升。…

Q-Learning实战——找房间

介绍 样例来自A Painless Q-learning Tutorial (一个 Q-learning 算法的简明教程) 简单来说就是从某个房间开始&#xff0c;找到去目标房间的路径。 代码实现 import numpy as np from tqdm import tqdm, trangeroom_num 6 room_paths [(0, 4), (3, 4), (3, 1), (1, 5)…

exel带单位求和,统计元素个数

如果exel表格中&#xff0c;如果数据有单位&#xff0c;无法直接用 自动求和 直接求和。如下图所示&#xff0c;求和结果为0&#xff0c;显然不是我们想要的。 用下面的公式求和&#xff0c;单位不是“个”的时候记得替换单位。统计范围不是“C1:C7”也记得换一下啊&#xff01…

19_谷歌GoogLeNet(InceptionV1)深度学习图像分类算法

1.1 简介 GoogLeNet&#xff08;有时也称为GoogleNet或Inception Net&#xff09;是一种深度学习架构&#xff0c;由Google的研究团队在2014年提出&#xff0c;主要设计者为Christian Szegedy等人。这个模型是在当年的ImageNet大规模视觉识别挑战赛&#xff08;ILSVRC&#xf…

实用性提升百分之一百!!!【ONLYOFFICE 8.1版本】全方位深度性能测评

目录 【ONLYOFFICE 8.1 版本】全方位深度性能测评 一、界面与用户体验 二、文字处理功能 表格处理功能 演示文稿功能 协作与共享功能 性能与稳定性 总结 【ONLYOFFICE 8.1 版本】全方位深度性能测评 在当今数字化办公的时代&#xff0c;办公软件的选择对于提高工作效率和…

【HTML入门】第四课 - 换行、分割横线和html的注释

这一小节&#xff0c;我们继续说HTML的入门知识&#xff0c;包括换行、横线分割以及注释&#xff08;html的注释&#xff09;。 目录 1 换行 2 分割横线 3 html注释 1 换行 html中分为块元素和行内元素。这一小节呢&#xff0c;先不说这些元素们&#xff0c;我们先说一下换…