RabbitMQ之Plugins插件----AMQP对接MQTT

1.启用插件

rabbitmq-plugins enable rabbitmq_mqtt

2.检查是否启动成功,打开rabbitmq后台

3.概念:
AMQP是由交换器和queue队列组成的消息队列机制,MQTT是由订阅主题组成的消息机制

1.MQTT创建连接时会向rabbitmq创建一个自己的queue,默认使用amq.topic交换器,由mqtt-subscription-***-qos1组成queue名。其中qos概念自行了解。

2.AMQP连接消费也需要绑定到MQTT对应的交换器和队列中,其中routingKey则使用MQTT订阅主题。

上代码:

//****************************************************************** */
//MQTT

const mqtt = require("mqtt")
const client = mqtt.connect({
    url: "mqtt://127.0.0.1:1883",
    clientId: 'nickchen111', // 客户端ID,此ID可自定义。queue后缀
    username: 'guest',   // 用户名
    password: 'guest'
}) // 连接到mqtt服务器

var quque = "device_aaa"
var quque_rpc = "device_aaa_rpc";
client.subscribe(quque_rpc, { qos: 1 })

var status = false;
var count = 0;
var error = 0;
setInterval(() => {
    if (status) {
        count += 1;
        var msg = Math.ceil(Math.random() * 40)
        console.log("MQTT发送消息:" + msg)
        client.publish(quque, msg.toString(), { qos: 0, retain: true })
    }
}, 3000);

client.on("message", function (top, message) {
    if (top == "device_aaa") {
        error += 1;
    }
    console.log("MQTT接收消息:" + top + "  : " + message.toString() + "*******丢失返回消息共:[" + error + "]条")
    if (JSON.parse(message.toString()).command == "stop") {
        status = false;
    }
    if (JSON.parse(message.toString()).command == "start") {
        status = true;
    }
})


//****************************************************************** */
//AMQP
const amqp = require('amqplib');

async function consumer() {
    // 创建链接对象
    const connection = await amqp.connect('amqp://localhost:5672');
    // 获取通道
    const channel = await connection.createChannel();
    // 声明参数
    const exchangeName = 'amq.topic';
    const queueName = 'mqtt-subscription-nickchen111qos1';
    const routingKey = 'device_aaa';
    // // 声明一个交换机
    // await channel.assertExchange(exchangeName, 'amq.topic', { durable: true });//消息持久化
    // // 声明一个队列
    await channel.assertQueue(queueName, { autoDelete: true, durable: true });
    // 绑定关系(队列、交换机、路由键)
    await channel.bindQueue(queueName, exchangeName, routingKey);
    var data = await channel.publish(exchangeName, "device_aaa_rpc", Buffer.from(JSON.stringify({ command: "start" })))
    // 消费
    await channel.consume(queueName, msg => {
        console.log('Consumer:', msg.content.toString());
        channel.ack(msg);
    },{ noAck: false });//开启通知
    console.log('消费端启动成功!');
}
consumer();

        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/482839.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringCloud之网关组件Gateway学习

SpringCloud之网关组件Gateway学习 GateWay简介 Spring Cloud Gateway是Spring Cloud的⼀个全新项目,目标是取代Netflix Zuul,它基于Spring5.0SpringBoot2.0WebFlux(基于高性能的Reactor模式响应式通信框架Netty,异步⾮阻塞模型…

基于python+vue超市管理系统flask-django-php-nodejs

课题主要分为二大模块:即管理员模块和员工模块,主要功能包括:个人信息修改、员工信息、商品信息、商品进货、商品出库、商品销量等; 目录 摘 要 I Abstrac II 目录 III 1绪论 1 1.1 研究背景 3 1.1.1国内研究现状 3 1.1.2国外研究…

是德科技keysight DSOX3024T示波器

181/2461/8938产品概述: DSOX3024T 示波器 要特性与技术指标 使用电容触摸屏进行简洁的触控操作: •提高调试效率 •触控设计可以简化文档记录 •使用起来就像您喜欢的智能手机或平板电脑一样简单 使用 MegaZoom IV 技术揭示偶发异常: •超快…

图像处理ASIC设计方法 笔记12 图像旋转ASIC中心控制器状态机

P109 1 流水线图像旋转ASIC整体架构 中心控制器负责各个模块的状态控制和数据调度,接收到外部启动信号后,进人芯片初始化阶段,片上FIFO接收外部输入的图像旋转参数、接收完毕后,再利用接收到的旋转角度到查找表中找到对应的正弦和正切值。 中心控制器将接收到的行列信息…

鸿蒙Harmony应用开发—ArkTS-LazyForEach:数据懒加载

LazyForEach从提供的数据源中按需迭代数据,并在每次迭代过程中创建相应的组件。当在滚动容器中使用了LazyForEach,框架会根据滚动容器可视区域按需创建组件,当组件滑出可视区域外时,框架会进行组件销毁回收以降低内存占用。 接口…

Pink老师Echarts教学笔记

可视化面板介绍 ​ 应对现在数据可视化的趋势,越来越多企业需要在很多场景(营销数据,生产数据,用户数据)下使用,可视化图表来展示体现数据,让数据更加直观,数据特点更加突出。 01-使用技术 完成该项目需…

是德科技keysight 53230A频率计数器

181/2461/8938产品概述: Keysight(原Agilent) 53230A 通用频率计数器/计时器可满足您所有的频率和时间间隔测量需求。除了典型的频率和时间间隔测量,它还可执行连续/无间隙测量,以进行基本调制域分析。 53230A 配有可选的猝发测量软件。它可…

腾讯云GPU服务器介绍_GPU实例规格价格_AI_深度学习

腾讯云GPU服务器是提供GPU算力的弹性计算服务,腾讯云GPU服务器具有超强的并行计算能力,可用于深度学习训练、科学计算、图形图像处理、视频编解码等场景,腾讯云百科txybk.com整理腾讯云GPU服务器租用价格表、GPU实例优势、GPU解决方案、GPU软…

Linux进程地址空间详解

文章目录 前言一、程序地址空间二、感受虚拟地址的存在三、进程地址空间四、程序从磁盘加载到内存的过程4.1 物理地址和虚拟地址的区别 五、写时拷贝5.1 解释fork()函数有两个返回值 前言 我们在学习C/C的时候用到的地址是什么地址呢?虚拟地址?物理地址&…

全球首位AI程序员诞生,对程序员的影响分析

全球首位AI程序员诞生,对程序员的影响分析 《全球首位AI程序员诞生,对程序员的影响分析》方向一:AI程序员的优势分析方向二:AI程序员的局限性方向三:对程序员职业的影响方向四:未来展望 博主 默语带您 Go t…

Swagger常用注解

Tag 标注位置controller类 表示Controller类作用 Schema modeal层javaBean 描述模型及属性 Operation 描述方法作用

Docker入门到实践之环境配置

Docker入门到实践之环境配置 docker 环境安装 Ubuntu/Debian: sudo apt update sudo apt install docker.ioCentOS/RHEL: sudo yum install dockerArch Linux: sudo pacman -S docker如果未安装成功,或者env的path未设置成功,运行时会报错 Bash: Do…

Linux详细介绍

Linux操作系统介绍 Linux 是一种开源的类 Unix 操作系统,最初由 Linus Torvalds 在 1991 年创建。与其他操作系统不同,Linux 是一个基于内核的操作系统,其核心是 Linux 内核。Linux 内核是由程序员社区不断开发和改进的,它提供了…

Docker - 哲学 默认网络和 自定义网络 与 linux 网络类型 和 overlay2

默认网络:不指定 --nerwork 不指定 网络 run 一个容器时,会直接使用默认的网络桥接器 (docker0) 自定义网络:指定 --nerwork 让这两台容器互相通信 的前提 - 共享同一个网络 关于 ip addr 显示 ens160 储存驱动 ov…

基于Springboot的药品管理系统(有报告)。Javaee项目,springboot项目。

演示视频: 基于Springboot的药品管理系统(有报告)。Javaee项目,springboot项目。 项目介绍: 采用M(model)V(view)C(controller)三层体系结构&…

深度学习 线性神经网络(线性回归 从零开始实现)

介绍: 在线性神经网络中,线性回归是一种常见的任务,用于预测一个连续的数值输出。其目标是根据输入特征来拟合一个线性函数,使得预测值与真实值之间的误差最小化。 线性回归的数学表达式为: y w1x1 w2x2 ... wnxn …

stm32使用定时器实现PWM与呼吸灯

PWM介绍 STM32F103C8T6 PWM 资源: 高级定时器( TIM1 ): 7 路 通用定时器( TIM2~TIM4 ):各 4 路 例如定时器2 PWM 输出模式: PWM 模式 1 :在 向上计数 时&#xff0…

网络安全顶会——NDSS '24 论文清单、摘要(上)

1、50 Shades of Support: A Device-Centric Analysis of Android Security Updates Android是迄今为止最受欢迎的操作系统,拥有超过30亿活跃移动设备。与任何软件一样,在Android设备上发现漏洞并及时应用补丁都至关重要。Android开源项目已经开始努力通…

<REAL-TIME TRAFFIC OBJECT DETCTION FOR AUTONOMOUS DRIVING>论文阅读

Abstract 随着计算机视觉的最新进展,自动驾驶迟早成为现代社会的一部分,然而,仍有大量的问题需要解决。尽管现代计算机视觉技术展现了优越的性能,他们倾向于将精度优先于效率,这是实时应用的一个重要方面。大型目标检测…

modelsim与quartus联合仿真ROM读不出数据

modelsim与quartus联合仿真ROM没有数据被读出,很是纳闷。 原因:hex或者mif文件放的不对,放在与db放在同一个文件夹下。modelsim在这个目录查找mif文件或hex。 这是我遇到的问题。当然可能还有其他的问题: 1、mif文件的格式不对&a…