Nodejs 第七十六章(MQ进阶)

MQ介绍和基本使用在上一章介绍过了,不再重复

  1. 消息:在RabbitMQ中,消息是传递的基本单元。它由消息体和可选的属性组成

  2. 生产者Producer:生产者是消息的发送方,它将消息发送到RabbitMQ的交换器(Exchange)中

  3. 交换器Exchange:交换器接收从生产者发送的消息,并根据特定的规则将消息路由到一个或多个队列中

  4. 队列Queue:队列是消息的接收方,它存储了待处理的消息。消费者可以从队列中获取消息并进行处理

  5. 消费者Consumer:消费者是消息的接收方,它从队列中获取消息并进行处理

MQ进阶用法

发布订阅

发布订阅,消息的发送者称为发布者(Publisher),而接收消息的一个或多个实体称为订阅者(Subscriber

回顾上一篇,点对点通讯生产者发送一条消息通过路由投递到Queue,只有一个消费者能消费到 也就是一对一发送

请添加图片描述

回归主题 发布订阅就是生产者的消息通过交换机写到多个队列,不同的订阅者消费不同的队列,也就是实现了一对多

发布订阅的模式分为四种

  1. Direct(直连)模式:把消息放到交换机指定key的队列里面。
  2. Topic(主题)模式: 把消息放到交换机指定key的队列里面,额外增加使用"*“匹配一个单词或使用”#"匹配多个单词
  3. Headers(头部)模式:把消息放到交换机头部属性去匹配队列
  4. Fanout(广播)模式:把消息放入交换机所有的队列,实现广播

发布订阅-代码编写

1. direct模式编写

主要就是通过 routingKey 匹配实现路由 这里的zs就是routingKey

生产者

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel() 
//声明一个交换机
/**
 * @param {String} exchange 交换机的名称
 * @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式
 * @param {Object} options {durable: true} //开启消息持久化
 */
await channel.assertExchange('logs', 'direct', {
    durable: true
})
//发送消息
/**
 * @param {String} exchange 交换机的名称
 * @param {String} routingKey 路由键
 * @param {Buffer} content 消息内容
 */
 //这里的zs就是routingKey
channel.publish('logs', 'zs', Buffer.from('小满direct模式发送的消息'))

//断开
await channel.close()
await connection.close()
process.exit(0)

消费者(我们编写多个方便测试)

consume.js

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道

await channel.assertExchange('logs', 'direct', {
    durable: true
})

//添加一个队列
const { queue } = await channel.assertQueue('queue1', {
    durable: true
})
//绑定交换机
/**
 * @param {String} queue 队列名称
 * @param {String} exchange 交换机名称
 * @param {String} routingKey 路由键
 */
//匹配对应的zs值才能收到
await channel.bindQueue(queue, 'logs', 'zs')
//接收消息
channel.consume('queue1', (msg) => {
    console.log(msg.content.toString());
}, {
    noAck: true //自动确认消息被消费
})

consume2.js

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道

await channel.assertExchange('logs', 'direct', {
    durable: true
})

//添加一个队列
const { queue } = await channel.assertQueue('queue2', {
    durable: true
})
//绑定交换机
/**
 * @param {String} queue 队列名称
 * @param {String} exchange 交换机名称
 * @param {String} routingKey 路由键
 */
 //匹配对应的zs值才能收到
await channel.bindQueue(queue, 'logs', 'zs')
//接收消息
channel.consume('queue2', (msg) => {
    console.log(msg.content.toString());
}, {
    noAck: true //自动确认消息被消费
})
2. Topic模式编写

我们把模式切换成了Topic 并且publish 发布的时候 routingKey 换成了 xm.xxxxxxxx

生产者

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel() 
//声明一个交换机
/**
 * @param {String} exchange 交换机的名称
 * @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式
 * @param {Object} options {durable: true} //开启消息持久化
 */
await channel.assertExchange('topic', 'topic', {
    durable: true
})
//发送消息
/**
 * @param {String} exchange 交换机的名称
 * @param {String} routingKey 路由键
 * @param {Buffer} content 消息内容
 */
 //注意这儿匹配规则换了 换成xm.xxxxxxxxxxxxxxxxxxxxx
channel.publish('logs', 'xm.sadsdsdasdasdasdsda', Buffer.from('小满topic模式发送的消息'))

//断开
await channel.close()
await connection.close()
process.exit(0)

消费者匹配(注意这里匹配规则xm.*'使用了* 就是模糊匹配的意思)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道

await channel.assertExchange('topic', 'topic', {
    durable: true
})

//添加一个队列
const { queue } = await channel.assertQueue('queue1', {
    durable: true
})
//绑定交换机
/**
 * @param {String} queue 队列名称
 * @param {String} exchange 交换机名称
 * @param {String} routingKey 路由键 *匹配一个单词 #匹配多个单词
 */
 //这儿变化了
await channel.bindQueue(queue, 'topic', 'xm.*')
//接收消息
channel.consume('queue1', (msg) => {
    console.log(msg.content.toString());
}, {
    noAck: true //自动确认消息被消费
})
3. Headers模式

生产者(注意 publish 增加第四个参数开启了header 添加了data参数)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel() 
   //声明一个交换机
   /**
    * @param {String} exchange 交换机的名称
    * @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式
    * @param {Object} options {durable: true} //开启消息持久化
    */
   await channel.assertExchange('headers', 'headers', {
       durable: true
   })
   //发送消息
   /**
    * @param {String} exchange 交换机的名称
    * @param {String} routingKey 路由键
    * @param {Buffer} content 消息内容
    * @param {Object} options {headers: {'key': 'value'}} //定义匹配规则
    */
    //嘿 这儿变了
   channel.publish('headers', '', Buffer.from('小满headers模式发送的消息'),{
       headers: {
           data:'xmzs'
       }
   })

   //断开
   await channel.close()
   await connection.close()
   process.exit(0)

消费者(bindQueue 增加一个对象 属性跟生产者对应即可)

   import amqplib from 'amqplib'
   const connection = await amqplib.connect('amqp://localhost:5672')
   const channel = await connection.createChannel() //创建一个频道
   await channel.assertExchange('headers', 'headers')

   //添加一个队列
   const { queue } = await channel.assertQueue('queue1')
   //绑定交换机
   /**
    * @param {String} queue 队列名称
    * @param {String} exchange 交换机名称
    * @param {String} routingKey 路由键 *匹配一个单词 #匹配多个单词
    */
   await channel.bindQueue(queue, 'headers', '',{
       data:'xmzs' //注意这儿不加headers 直接放值即可
   })
   //接收消息
   channel.consume(queue, (msg) => {
       console.log(msg.content.toString());
   }, {
       noAck: true //自动确认消息被消费
   })
4. Fanout模式

生产者(其实也就是routingKey 变成一个空值实现全体广播)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel()
//声明一个交换机
/**
* @param {String} exchange 交换机的名称
* @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式
* @param {Object} options {durable: true} //开启消息持久化
*/
await channel.assertExchange('fanout', 'fanout')
//发送消息
/**
* @param {String} exchange 交换机的名称
* @param {String} routingKey 路由键
* @param {Buffer} content 消息内容
*/
channel.publish('fanout', '', Buffer.from('小满fanout模式发送的消息'))

//断开
await channel.close()
await connection.close()
process.exit(0)

消费者(routingKey接受空值即可 就算有值也会被忽略)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道
await channel.assertExchange('fanout', 'fanout')

//添加一个队列
const { queue } = await channel.assertQueue('queue1')
//绑定交换机
/**
* @param {String} queue 队列名称
* @param {String} exchange 交换机名称
* @param {String} routingKey 路由键 *匹配一个单词 #匹配多个单词
*/
await channel.bindQueue(queue, 'fanout', '')
//接收消息
channel.consume(queue, (msg) => {
console.log(msg.content.toString());
}, {
noAck: true //自动确认消息被消费
})

总结

通过使用RabbitMQ作为缓冲,避免数据库服务崩溃的风险。生产者将消息放入队列,消费者从队列中读取消息并进行处理,随后确认消息已被处理。在应用之间存在一对多的关系时,可以使用Exchange交换机根据不同的规则将消息转发到相应的队列:

  1. 直连交换机(direct exchange):根据消息的路由键(routing key)将消息直接转发到特定队列。
  2. 主题交换机(topic exchange):根据消息的路由键进行模糊匹配,将消息转发到符合条件的队列。
  3. 头部交换机(headers exchange):根据消息的头部信息进行转发。
  4. 广播交换机(fanout exchange):将消息广播到交换机下的所有队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/697979.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

新概念英语视频百度云,新概念英语视频百度网盘,新概念1-4册

在现今数字化时代,英语学习资源丰富多样,其中新概念英语视频因其深入浅出的教学风格和丰富多样的学习内容,备受广大英语学习者的青睐。本文旨在为广大英语学习者提供一份详尽的新概念英语视频下载指南,帮助大家轻松获取优质学习资…

vscode 中 eslint 无效?npm init 是什么?

vscode 中 eslint 无效 我想要给一个项目添加 eslint,按照 eslint 官方指南操作: npm init eslint/configlatest自动安装了相关依赖并创建配置文件 eslint.config.mjs。 按理说,此刻项目应该已经配置好 eslint 了。但是我的编辑器 vscode …

冯喜运:6.11最新黄金原油趋势解析及独家多空操作建议

【黄金消息面分析】:周二(6月11日)亚市早盘,现货黄金窄幅震荡,目前交投于2310.15美元/盎司附近。黄金价格在上一交易日创下三年半来最大单日跌幅后于周一反弹,收报2310.71美元/盎司附近,投资者在…

java复习知识点

1.get,set: java 中当定义了一个私有的成员变量的时候,如果需要访问或者获取这个变量的时候,就可以编写set或者get方法去调用,set是给属性赋值的,get是取得属性值的,被设置和存取的属性一般是私有&#xf…

今日科普:生命杀手——“脑出血”

在我们的日常生活中,有一种被称为“脑出血”的疾病,它像是一位潜伏的杀手,无声无息地威胁着我们的生命。脑出血,简单来说,就是脑部血管破裂,导致血液流入脑组织,形成血肿,压迫和破坏…

SpringTask-Timer实现定时任务

1、Timer 实现定时任务 1.1、JDK1.3 开始推出定时任务实现工具。 1.2、API 执行代码 public static void main(String[] args) throws ParseException {Timer timer new Timer();String str"2024-06-10 23:24:00";Date date new SimpleDateFormat("yyyy-MM…

文本省略实现展开和收起功能(Taro)

目录 前言 思路 代码 CSS 效果 前言 在写项目的过程中很容易有说明性文本溢出需要出现省略号的功能,并且可以展开查看所有信息,并能够收起。我在写项目的过程中就遇到了这个问题,本来是想要使用组件库中的组件进行功能的实现,…

log4j日志打印导致OOM问题

一、背景 某天压测,QPS压到一定值后机器就开始重启,出现OOM,好在线上机器配置了启动参数-XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPath/**/**heapdump.hprof。将dump文件下载到本地,打开Java sdk bin目录下的jvisualvm工具&a…

IDEA | 安装通义灵码插件,开启智能编码旅程

安装步骤 从插件市场安装,点击导航-插件,打开应用市场,搜索通义灵码(TONGYI Lingma),找到通义灵码后点击安装。 https://tongyi.aliyun.com/lingma/download 使用方式 https://help.aliyun.com/documen…

YOLO-World:开启实时开放词汇目标检测的新篇章

目标检测作为计算机视觉领域的基石之一,其发展一直备受学术界和工业界的关注。传统的目标检测方法通常受限于固定词汇表的约束,即只能在预定义的类别集合中进行检测。然而,现实世界中的对象种类繁多,远远超出了任何固定词汇表的覆…

机器学习算法 —— 贝叶斯分类之模拟离散数据集

🌟欢迎来到 我的博客 —— 探索技术的无限可能! 🌟博客的简介(文章目录) 目录 实战(贝叶斯分类)莺尾花数据模拟离散数据集库函数导入数据导入和分析模型训练和预测 总结 实战(贝叶斯…

一道Delphi的For循环题目

起因 事情是这样的: 俺在一个Delphi交流QQ群,有点冷场,俺想热一下场子就发了下面这个段子。其实这是之前俺带新人时的一道题目。 第一个回答 第一个网友给的答案是 i:i-1; 俺说这个答案是不对的,因为 Delphi在编译时是不允许…

【教学类-64-03】20240611色块眼力挑战(三)-2-10宫格色差10-50(10倍)适合中班幼儿园(星火讯飞)

背景需求: 【教学类-64-02】20240610色块眼力挑战(二)-2-25宫格&色差10-100(10倍)(星火讯飞)-CSDN博客文章浏览阅读360次,点赞17次,收藏13次。【教学类-64-02】2024…

CTFHUB-SQL注入-时间盲注

本题用到sqlmap工具,没有sqlmap工具点击🚀🚀🚀直达下载安装使用教程 理论简述 时间盲注概述 时间盲注是一种SQL注入技术的变种,它依赖于页面响应时间的不同来确定SQL注入攻击的成功与否。在某些情况下,攻…

Java学习-MyBatis学习(一)

MyBatis MyBatis历史 MyBatis本是apache的一个开源项目iBatis,2010年这个项目由apache software foundation迁移到了google code,并且改名为MyBatis。2013年11月迁移到Github。iBATIS一词来源于“internet”和“abatis”的组合,是一个基于J…

三高系统的架构设计方案:高并发、高可用、高性能

文章目录 一、互联网系统三高概述1、互联网的三高2、高并发3、高可用4、高性能 二、高并发、高性能技术解决方案1、多高的并发才算高并发?2、水平扩展3、负载均衡思想4、缓存思想5、池化复用思想6、异步思想7、预处理-惰性更新思想8、分而治之思想 三、高可用技术解…

【Vue】 路由配置 - 一级路由

但凡是单个页面,独立展示的,都是一级路由 路由设计: 登录页首页架子 首页 - 二级分类页 - 二级购物车 - 二级我的 - 二级 搜索页搜索列表页商品详情页结算支付页我的订单页 由于每一个一级路由他会封装一些属于它自己模块的组件&#xff0c…

STM32 Customer BootLoader 刷新项目 (一) STM32CubeMX UART串口通信工程搭建

STM32 Customer BootLoader 刷新项目 (一) STM32CubeMX UART串口通信工程搭建 文章目录 STM32 Customer BootLoader 刷新项目 (一) STM32CubeMX UART串口通信工程搭建功能与作用典型工作流程 1. 硬件原理图介绍2. STM32 CubeMX工程搭建2.1 创建工程2.2 系统配置2.3 USART串口配…

头部外伤怎么办?别大意,科学处理是关键

头部外伤是一种常见的伤害,它可能由跌倒、撞击或其他事故造成。虽然许多头部外伤看似轻微,但如果不妥善处理,可能会带来严重的后果。因此,了解头部外伤的处理方法至关重要。 一、初步判断伤势 头部外伤后,首先要观察伤…

国资e学快速学习实战教程

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…