activeMq将mqtt发布订阅转成消息队列

1、activemq.xml置文件新增如下内容

2、mqttx测试发送:

主题(配置的模糊匹配,为了并发):VirtualTopic/device/sendData/12312

3、mqtt接收的结果

4、程序处理

package com

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;

import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.Message;
import java.nio.charset.Charset;
import java.util.Date;

@Component
public class MessageHandler {

    @JmsListener(destination = "deviceQueue.receiveDate", containerFactory = "queueListener", concurrency = "1-3")
    public void deviceMessage(ActiveMQBytesMessage message) {
        System.out.println(StrUtil.str(message.getContent().getData(), Charset.defaultCharset()));
        System.out.println("###################" + message + "###################");
    }
}

 控制台打印

 全量的:activemq.xml

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>
		
		<!-- 消息订阅 -->
		<destinationInterceptors>
			<virtualDestinationInterceptor>
				<virtualDestinations>
					<compositeTopic name="VirtualTopic.device.sendData.*">
						<forwardTo>
							<queue physicalName="deviceQueue.receiveDate" />
						</forwardTo>
					</compositeTopic>
				</virtualDestinations>
			</virtualDestinationInterceptor>
		</destinationInterceptors>
		
		<plugins>
			<simpleAuthenticationPlugin>
				<users>
					<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>
					<!--<authenticationUser username="user" password="password" groups="users"/>
					<authenticationUser username="guest" password="password" groups="guests"/> -->
				</users>
			</simpleAuthenticationPlugin>
		</plugins>
    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/403528.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python专业版破解激活(超详细)

python专业版破解激活 1.下载pycharm应用程序 这里我使用的版本是pycharm-professional-2023.3.2 下载pycharm程序的连接为&#xff1a; 百度网盘 请输入提取码 提取码为&#xff1a;nym0 2.安装 选择安装路径 下一步 这里全选 下一步 这里直接点击安装就可&#xff0c;其…

探索分布式强一致性奥秘:Paxos共识算法的精妙之旅

提到分布式算法&#xff0c;就不得不提 Paxos 算法&#xff0c;在过去几十年里&#xff0c;它基本上是分布式共识的代名词&#xff0c;因为当前一批常用的共识算法都是基于它改进的。比如&#xff0c;Fast Paxos 算法、Cheap Paxos、Raft 算法等。 由莱斯利兰伯特&#xff08;L…

R语言数据分析(五)

R语言数据分析&#xff08;五&#xff09; 文章目录 R语言数据分析&#xff08;五&#xff09;前言一、什么是整洁的数据二、延长数据2.1 列名中的数据值2.2 pivot_longer()的处理原理2.3 列名中包含许多变量的情况2.4 列名同时包含数据和变量 三、扩宽数据3.1 pivot_wider的处…

Electron实战之环境搭建

工欲善其事必先利其器&#xff0c;在进行实战开发的时候&#xff0c;我们最终的步骤是搞好一个舒服的开发环境&#xff0c;目前支持 Vue 的 Electron 工程化工具主要有 electron-vue、Vue CLI Plugin Electron Builder、electron-vite。 接下来我们将分别介绍基于 Vue CLI Plu…

查询数据库的编码集Oracle,MySQL

1、查询数据库的编码集Oracle,MySQL 1.1、oracle select * from v$nls_parameters where parameterNLS_CHARACTERSET; 查询版本&#xff1a;SELECT * FROM v$version 2、MySQL编码集 SELECT DEFAULT_CHARACTER_SET_NAME, DEFAULT_COLLATION_NAME FROM information_schema.SC…

隐私也要付费?Meta公司为收集用户数据再出“奇招”

Cybernews网站消息&#xff0c;有相关人士表示&#xff0c;如果欧洲数据保护委员会&#xff08;EDPB&#xff09;不明确指出Meta公司的“付费或同意”的模式违反了欧盟的隐私法规&#xff0c;那么这一模式很可能会被大规模复制&#xff0c;危及数百万欧洲公民的自由选择权。 自…

Jenkins2.426邮件通知配置

之前安装的jenkins出现问题了&#xff0c;重新装了jenkins&#xff0c;需要重新配置&#xff1a;Maven&#xff0c;JDK&#xff0c;Allure报告&#xff0c;邮件通知&#xff0c;Extended E-mail Notification等 配置Maven&#xff0c;JDK参考&#xff1a;CICD集合(四):Jenkins…

排序第三篇 直接插入排序

插入排序的基本思想是&#xff1a; 每次将一个待排序的记录按其关键字的大小插入到前面已排好序的文件中的适当位置&#xff0c; 直到全部记录插入完为止。 一 简介 插入排序可分为2类 本文介绍 直接插入排序 它的基本操作是&#xff1a; 假设待排充序的记录存储在数组 R[1……

2.22 Qt day3 多界面跳转+qss登录界面优化+发布软件+对话框

思维导图&#xff1a; 完善对话框&#xff0c;点击登录对话框&#xff0c;如果账号和密码匹配&#xff0c;则弹出信息对话框&#xff0c;给出提示”登录成功“&#xff0c;提供一个Ok按钮&#xff0c;用户点击Ok后&#xff0c;关闭登录界面&#xff0c;跳转到其他界面 如果账号…

我们在SqlSugar开发框架中,用到的一些设计模式

我们在《SqlSugar开发框架》中&#xff0c;有时候都会根据一些需要引入一些设计模式&#xff0c;主要的目的是为了解决问题提供便利和代码重用等目的。而不是为用而用&#xff0c;我们的目的是解决问题&#xff0c;并在一定的场景下以水到渠成的方式处理。不过引入任何的设计模…

【教3妹学编程-算法题】匹配模式数组的子数组数目 I

3妹&#xff1a;2哥2哥&#xff0c;你有没有看到上海女老师出轨男学生的瓜啊。 2哥 : 看到 了&#xff0c;真的是太毁三观了&#xff01; 3妹&#xff1a;是啊&#xff0c; 老师本是教书育人的职业&#xff0c;明确规定不能和学生谈恋爱啊&#xff0c;更何况是出轨。 2哥 : 是啊…

HarmonyOS—LocalStorage:页面级UI状态存储

LocalStorage是页面级的UI状态存储&#xff0c;通过Entry装饰器接收的参数可以在页面内共享同一个LocalStorage实例。LocalStorage也可以在UIAbility实例内&#xff0c;在页面间共享状态。 本文仅介绍LocalStorage使用场景和相关的装饰器&#xff1a;LocalStorageProp和LocalS…

大保司保费贵,是否物有所值?

《大保司保费贵&#xff0c;是否物有所值》 这是罗师兄的原创文章 预计8-9分钟读完 作者&#xff1a;罗师兄 微信号&#xff1a;luoyun515 当我们想要买一份重疾险、储蓄险等长期险时&#xff0c; 我们会发现&#xff0c;同样的保障责任和保额&#xff0c; 不同保险公司的…

mac苹果电脑系统最好用的清理软件CleanMyMac2024功能介绍及如何激活解锁许可证

CleanMyMac X的界面设计简洁大气&#xff0c;为用户提供了直观且易于操作的使用体验。 布局清晰&#xff1a;界面布局非常明朗&#xff0c;左侧是功能栏&#xff0c;右侧则是信息界面。这种布局方式使得用户能够迅速找到所需的功能选项&#xff0c;提高了操作效率。色彩搭配&a…

Flutter常用命令,持续更新

目录 前言 Flutter 常用命令 Dart 常用命令 adb 常用命令&#xff08;用于 Android 开发&#xff09; 前言 当在开发Flutter项目时&#xff0c;熟悉一些常用的命令是非常重要的。这些命令可以帮助你执行各种任务&#xff0c;从构建应用程序到调试和测试。以下是一些Flutte…

亿道丨三防平板丨加固平板丨三防加固平板丨改善资产管理

库存资产管理中最重要的部分之一是准确性&#xff1b;过时的库存管理技术会增加运输过程中人为错误、物品丢失或纸张损坏的风险。如今随着三防平板电脑的广泛使用&#xff0c;库存管理也迎来了好帮手&#xff0c;通过使用三防平板电脑能够确保库存管理、数据存储和记录保存的准…

Hive【内部表、外部表、临时表、分区表、分桶表】【总结】

目录 Hive的物种表结构特性 一、内部表 建表 使用场景 二、外部表 建表:关键词【EXTERNAL】 场景&#xff1a; 外部表与内部表可互相转换 三、临时表 建表 临时表横向对比​编辑 四、分区表 建表&#xff1a;关键字【PARTITIONED BY】 场景&#xff1a; 五、分桶表 …

pip安装依赖环境出现的问题

一、error: subprocess-exited-with-error! 1、前期一直百度的错误如标题所示&#xff0c;得到的方案如下&#xff1a;&#xff08;但没解决问题&#xff09; &#xff08;1&#xff09;升级setuptools库&#xff0c;或者降低固定版本 //升级setuptools库&#xff0c;或者降低…

Spark之【基础介绍】

Spark最初是由美国伯克利大学AMP实验室在2009年开发&#xff0c;Spark时基于内存计算的大数据并行计算框架&#xff0c;可以用于构建大型的、低延迟的数据分析应用程序。 Spark是当今大数据领域最活跃、最热门、最高效的大数据通用计算平台之一。 Spark的特点 运行速度快 &am…