消息队列篇--通信协议篇--AMOP(交换机,队列绑定,消息确认,AMOP实现实例,AMOP报文,帧,AMOP消息传递模式等)

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一种开放的、跨平台的消息传递协议,旨在提供一种标准化的方式在不同的消息代理和客户端之间进行消息传递。AMQP不仅定义了消息格式和路由机制,还规定了如何建立连接、发送和接收消息等操作。它适用于多种编程语言和平台,并且支持复杂的路由机制。

特点:

  • 支持复杂的路由规则和消息传递模式。
  • 提供多种消息确认机制(如ACK)。
  • 支持消息持久化和高可用性。
  • 支持多语言客户端库。
  • 适用于企业级应用中的异步通信。

1、AMQP的核心概念

(1)、消息(Message)

消息是AMQP中传输的基本单位,通常包含两个部分:

  • 消息头(Header):包含一些元数据,如消息ID、优先级、时间戳等。
  • 消息体(Body):实际要传递的数据内容,可以是文本、字节流等形式。

(2)、交换机(Exchange)

交换机(Exchange)是AMQP中的一个关键组件,是消息进入消息队列的入口点,负责接收消息并将它们路由到一个或多个队列中。交换机根据消息的路由键(Routing Key)和绑定规则(Binding Rules)决定将消息发送到哪些队列。

常见的交换机类型包括:

  • Direct Exchange:根据精确匹配的路由键将消息发送到相应的队列。
  • Fanout Exchange:将消息广播到所有绑定的队列,不考虑路由键。
  • Topic Exchange:根据模式匹配的路由键将消息发送到相应的队列。
  • Headers Exchange:根据消息头中的属性进行路由。

(3)、队列(Queue)

队列是存储消息的地方,消费者从队列中获取消息并进行处理。每个队列都有一个唯一的名称,并且可以有多个消费者同时监听同一个队列。

(4)、绑定(Binding)

绑定是指将交换机与队列关联起来的过程。它决定了哪些消息应该被路由到哪些队列。绑定时需要指定一个路由键(Routing Key),用于确定哪些消息应该被发送到该队列。

(5)、连接(Connection)

连接是客户端与消息代理(Broker)之间的物理网络连接。通常使用TCP协议建立连接。

(6)、通道(Channel)

通道是客户端与消息代理之间的通信路径。通过通道,客户端可以在同一连接上执行多个操作(如发送和接收消息)。通道的好处是可以复用连接,减少资源开销。

(7)、确认(Acknowledgment)

确认机制确保消息已被成功处理。当消费者接收到消息后,可以选择手动或自动发送确认回执给消息代理。如果消息未被确认,消息代理会认为消息未被处理,并可能重新投递该消息。

(8)、生产者(Producer)

生产者是发送消息的应用程序或服务。它们将消息发送到交换器。

(9)、消费者(Consumer)

消费者是从队列中接收消息的应用程序或服务。它们负责处理消息内容。

2、AMQP的工作流程

原理示意图:
在这里插入图片描述

AMQP的典型工作流程包括以下几个步骤:
(1)、建立连接:客户端与消息代理(如:RabbitMQ)建立TCP连接。
(2)、创建通道:在连接上创建一个或多个通道,用于执行消息传递操作。
(3)、声明交换机和队列:声明交换机和队列,并设置相关的属性(如持久性、自动删除等)。
(4)、绑定交换机和队列:将交换机与队列绑定,并指定路由键。
(5)、发送消息:生产者通过交换机发送消息到队列。
(6)、接收消息:消费者从队列中接收消息并进行处理。
(7)、确认消息:消费者发送确认回执给消息代理,表示消息已被成功处理。

3、AMQP的优势

(1)、跨平台兼容性
AMQP是一种开放标准,允许不同的消息代理实现相互兼容。这意味着你可以选择最适合你需求的消息代理,而不必担心供应商锁定问题。
(2)、可靠性
AMQP提供了多种机制来确保消息传递的可靠性,包括持久化消息、事务支持和确认机制。
(3)、灵活性
AMQP支持多种交换器类型和绑定规则,使得它可以灵活应对各种复杂的路由需求。
(4)、安全性
AMQP支持SSL/TLS加密,确保消息在网络传输过程中的安全性。

4、常见的AMQP实现

(1)、RabbitMQ
RabbitMQ是最流行的AMQP实现之一,提供了丰富的功能和良好的社区支持。它不仅支持AMQP,还支持其他协议如MQTT、STOMP等。
(2)、Qpid
Qpid是Apache基金会的一个项目,提供了AMQP的完整实现。它包括两个主要组件:Qpid Broker和Qpid Client。
(3)、ActiveMQ
ActiveMQ是另一个流行的消息代理,虽然它的默认协议是OpenWire,但它也支持AMQP。

5、AMQP的消息传递模型

AMQP支持多种消息传递模型,具体取决于使用的交换机类型。

(1)、点对点(P2P)模型

在P2P模型中,每条消息只能由一个消费者处理。消息被发送到一个队列,然后由某个消费者从队列中获取并处理。

示例:(python)

import pika

// 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

// 声明队列
channel.queue_declare(queue='task_queue', durable=True)

// 发送消息
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body='Hello, AMQP!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,   使消息持久化
                      ))

print(" [x] Sent 'Hello, AMQP!'")

// 关闭连接
connection.close()

(2)、发布/订阅(Pub/Sub)模型

在Pub/Sub模型中,消息可以被多个订阅者接收。消息被发送到一个交换机,然后广播到所有绑定的队列。

示例:(python)

import pika

// 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

// 声明交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')

// 发送消息
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body='Hello, AMQP!')
print(" [x] Sent 'Hello, AMQP!'")

// 关闭连接
connection.close()

(3)、主题(Topic)模型

在Topic模型中,消息根据路由键的模式匹配被发送到相应的队列。路由键可以包含通配符(表示一个单词, 表示零个或多个单词)。

示例:(python)

import pika

// 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

// 声明交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

// 绑定队列
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = ['.orange.', 'lazy.']
for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key
    )

// 接收消息
def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()} on {method.routing_key}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

print(' [] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

适用场景:

  • 企业级应用中的异步通信。
  • 微服务架构中的消息传递。
  • 需要复杂路由规则和高可靠性保障的场景。

6、代码示例:(RabbitMQ示例)

(1)、添加依赖

<dependencies>
    <!-- Spring Boot Starter for AMQP -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!-- Spring Boot Starter Web (Optional, if you need a REST controller) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- RabbitMQ Java Client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
    </dependency>
</dependencies>

(2)、配置文件(yaml)

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

(3)、创建配置类

创建一个配置类来定义交换器、队列和绑定关系。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 定义队列名称
    public static final String QUEUE_NAME = "exampleQueue";
    public static final String EXCHANGE_NAME = "exampleExchange";
    public static final String ROUTING_KEY = "example.routing.key";

    // 声明队列
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true); // true 表示持久化队列
    }

    // 声明交换器
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    // 绑定队列到交换器
    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}

(4)、创建消息生产者

创建一个服务类来发送消息。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        System.out.println("Sending message: " + message);
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message);
    }
}

(5)、创建消息消费者

创建一个监听器类来接收消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class MessageConsumer {

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

7、AMOP报文

AMQP(Advanced Message Queuing Protocol)是一种二进制协议,用于在客户端和消息代理之间传递消息。与HTTP这种基于文本的协议不同,AMQP的消息是通过二进制格式进行编码的,因此直接查看原始报文并不直观。然而,我们可以从概念上理解AMQP报文的结构,并通过一些工具或库来解析和展示这些报文的内容。

(1)、AMQP报文结构

AMQP报文通常由多个帧组成,每个帧都包含一个帧头和有效载荷部分。
具体来说,在发送一条消息的过程中,常见的报文结构确实包括三个主要的帧:方法帧(Method Frame)、内容头帧(Content Header Frame)和内容体帧(Content Body Frame)。每个帧都需要在其前面拼接一个帧头。

1、帧头(Frame Header)

每个AMQP帧都以一个帧头开始,包含帧类型、通道编号和帧长度等信息。

  • 帧类型(Frame Type):标识帧的类型(如方法帧、内容头帧、内容体帧等)。
  • 通道编号(Channel Number):标识该帧所属的通道。
  • 帧长度(Frame Length):帧的有效载荷部分的长度(不包括帧头本身)。
2、帧的类型
(1)、方法帧(Method Frame)

用于表示特定的操作,如连接、打开通道、声明队列等。

(2)、内容头帧(Content Header Frame)

包含消息的元数据,如消息大小、属性等。

(3)、内容体帧(Content Body Frame)

包含实际的消息内容。

(2)、示例:完整的AMQP报文

假设我们要发送一条JSON消息到exampleQueue的队列中,以下是完整的AMQP报文结构及其二进制表示。

1、声明队列的方法帧

示例:

| Frame Type: 1       |  // 方法帧
| Channel: 1          |
| Payload Length: 17  |
| Class ID: 50        |  // Queue类
| Method ID: 10       |  // Declare方法
| Queue Name Length: 13 |
| Queue Name: "exampleQueue" |
| Durable: true       |

解释:
前三行是方帧的帧头(指定了帧的类型,通道编号和帧的长度),后面为方法帧的具体内容(队列相关的信息,包含队列的类id,方法id,名称,长度等)。

二进制表示:

01 00 01 00 00 00 11  // 帧头
00 32 00 0A           // Class ID 和 Method ID
00 00 00 0D           // 队列名称长度
65 78 61 6D 70 6C 65 51 75 65 75 65  // 队列名称 "exampleQueue"
01                    // Durable: true
2、发送消息的内容头帧

示例:

| Frame Type: 2       |  // 内容头帧
| Channel: 1          |
| Payload Length: 22  |
| Class ID: 60        |  // Basic 类
| Weight: 0           |
| Body Size: 24 bytes |
| Properties:         |
|   Content-Type: application/json |
|   Delivery-Mode: 2 (persistent) |

解释:
前三行是方帧的帧头(指定了帧的类型,通道编号和帧的长度),后面为内容头帧的具体内容。

二进制表示:

02 00 01 00 00 00 16  // 帧头
00 3C 00 00           // Class ID 和 Weight
00 00 00 18           // Body Size: 24 bytes
00 00 00 0E 61 70 70 6C 69 63 61 74 69 6F 6E 2F 6A 73 6F 6E  // Content-Type: application/json
02                    // Delivery-Mode: persistent
3、发送消息的内容体帧

示例:

| Frame Type: 3       |  // 内容体帧
| Channel: 1          |
| Payload Length: 24  |
| {"name": "John", "age": 30} |

解释:
前三行是方帧的帧头(指定了帧的类型,通道编号和帧的长度),后面为内容体帧的具体内容。

二进制表示:
03 00 01 00 00 00 18 // 帧头
7B 22 6E 61 6D 65 22 3A 20 22 4A 6F 68 6E 22 2C 20 22 61 67 65 22 3A 20 33 30 7D // JSON 消息

(3)、AMQP报文总结

在AMQP协议中,每种类型的帧(方法帧、内容头帧、内容体帧)都需要在其前面拼接一个帧头。帧头提供了关于帧的基本信息,使得接收方能够正确解析和处理这些帧。
具体步骤如下:
1、方法帧:用于执行操作(如声明队列)。
2、内容头帧:包含消息的元数据(如消息大小和属性)。
3、内容体帧:包含实际的消息内容。

AMQP本质是一种二进制协议,消息内容是无法通过文本查看的。理解AMQP报文能够让我们更好的了解其本质和原理

8、总结

AMQP是一种强大的消息传递协议,广泛应用于分布式系统中,确保不同组件之间的可靠通信。通过理解其核心概念和工作流程,开发者可以更好地利用AMQP构建高效、可扩展的应用程序。无论是微服务架构、事件驱动系统还是日志收集,AMQP都能提供坚实的通信基础。

乘风破浪!Dare to Be!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/959730.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

小利特惠源码/生活缴费/电话费/油卡燃气/等充值业务类源码附带承兑系统

全新首发小利特惠/生活缴费/电话费/油卡燃气/等充值业务类源码附带U商承兑系统 安装教程如下 图片:

HTML<hgroup>标签

例子&#xff1a; 使用hgroup元素标记标题和段落是相关的&#xff1a; <hgroup> <h2>Norway</h2> <p>The land with the midnight sun.</p> </hgroup> 定义和用法&#xff1a; 标签<hgroup>用于包围标题和一个或多个<p&g…

14-6-3C++STL的list

&#xff08;一&#xff09;list的插入 1.list.insert(pos,elem);//在pos位置插入一个elem元素的拷贝&#xff0c;返回新数据的位置 #include <iostream> #include <list> using namespace std; int main() { list<int> lst; lst.push_back(10); l…

【2024年终总结】深圳工作生活评测

距离上次写年终总结已经过了一年半了&#xff0c;这一年半中哪怕经历了很多的事情&#xff0c;但是感觉又没发生什么。想写一些骚话&#xff0c;却总觉得自己无法完全表达&#xff0c;便也就这样&#xff0c;静静地记录下这一段时光。 现在是2025年&#xff0c;春节前的时光&am…

前端jquery 实现文本框输入出现自动补全提示功能

git仓库&#xff1a;web_study/some-demos/inputAutoFit at main Cong0925/web_study (github.com) 压缩包&#xff1a;已绑定到指定资源 示例图&#xff1a; 实现说明: 1.首先&#xff0c;html部分设置好相关的定位标签如图&#xff1a; 2.主要函数 3.默认数据

(5)STM32 USB设备开发-USB键盘

讲解视频&#xff1a;2、USB键盘-下_哔哩哔哩_bilibili 例程&#xff1a;STM32USBdevice: 基于STM32的USB设备例子程序 - Gitee.com 本篇为使用使用STM32模拟USB键盘的例程&#xff0c;没有知识&#xff0c;全是实操&#xff0c;按照步骤就能获得一个STM32的USB键盘。本例子是…

java后端之登录认证

基础登录功能&#xff1a;根据提供的用户名和密码判断是否存在于数据库 LoginController.java RestController Slf4j public class LoginController {Autowiredprivate UserService userService;PostMapping("/login")public Result login(RequestBody User user) {…

Spring--Bean的生命周期和循环依赖

Bean的生命周期和循环依赖 Bean 的生命周期了解么?Spring中的循环引用什么是循环引用&#xff1f;三级缓存解决循环依赖总结构造方法出现了循环依赖怎么解决&#xff1f; Bean 的生命周期了解么? 整体上可以简单分为四步&#xff1a;实例化 —> 属性赋值 —> 初始化 —…

【云安全】云原生-Docker(五)容器逃逸之漏洞利用

漏洞利用逃逸 通过漏洞利用实现逃逸&#xff0c;主要分为以下两种方式&#xff1a; 1、操作系统层面的内核漏洞 这是利用宿主机操作系统内核中的安全漏洞&#xff0c;直接突破容器的隔离机制&#xff0c;获得宿主机的权限。 攻击原理&#xff1a;容器本质上是通过 Linux 的…

【Uniapp-Vue3】request各种不同类型的参数详解

一、参数携带 我们调用该接口的时候需要传入type参数。 第一种 路径名称?参数名1参数值1&参数名2参数值2 第二种 uni.request({ url:"请求路径", data:{ 参数名:参数值 } }) 二、请求方式 常用的有get&#xff0c;post和put 三种&#xff0c;默认是get请求。…

基于SpringBoot的软件产品展示销售系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

hedfs和hive数据迁移后校验脚本

先谈论校验方法&#xff0c;本人腾讯云大数据工程师。 1、hdfs的校验 这个通常就是distcp校验&#xff0c;hdfs通过distcp迁移到另一个集群&#xff0c;怎么校验你的对不对。 有人会说&#xff0c;默认会有校验CRC校验。我们关闭了&#xff0c;为什么关闭&#xff1f;全量迁…

mysql学习笔记-数据库的设计规范

1、范式简介 在关系型数据库中&#xff0c;关于数据表设计的基本原则、规则就称为范式。 1.1键和相关属性的概念 超键:能唯一标识元组的属性集叫做超键。 候选键:如果超键不包括多余的属性&#xff0c;那么这个超键就是候选键 主键:用户可以从候选键中选择一个作为主键。 外…

高并发问题的多维度解决之道

‍‌​​‌‌​‌​‍‌​​​‌‌​​‍‌​​​‌​‌​‍‌​​‌​​‌​‍‌‌​​‌​‌​‍‌​‌​‌‌​​‍‌​‌​‌​​​‍‌​‌​‌​‌​‍‌​‌‌​​‌​‍‌​‌‌​​​​‍‌‌​​‌‌‌‌‍‌‌​​‌​‌‌‍‌​​​‌‌​​‍‌​​‌‌‌​​‍‌…

Windows Defender添加排除项无权限的解决方法

目录 起因Windows Defender添加排除项无权限通过管理员终端添加排除项管理员身份运行打开PowerShell添加/移除排除项的命令 起因 博主在打软件补丁时&#xff0c;遇到 Windows Defender 一直拦截并删除文件&#xff0c;而在 Windows Defender 中无权限访问排除项。尝试通过管理…

数据结构——堆(C语言)

基本概念&#xff1a; 1、完全二叉树&#xff1a;若二叉树的深度为h&#xff0c;则除第h层外&#xff0c;其他层的结点全部达到最大值&#xff0c;且第h层的所有结点都集中在左子树。 2、满二叉树&#xff1a;满二叉树是一种特殊的的完全二叉树&#xff0c;所有层的结点都是最…

工业相机 SDK 二次开发-Halcon 插件

本文介绍了 Halcon 连接相机时插件的使用。通过本套插件可连接海康 的工业相机。 一. 环境配置 1. 拷贝动态库 在 用 户 安 装 MVS 目 录 下 按 照 如 下 路 径 Development\ThirdPartyPlatformAdapter 找到目录为 HalconHDevelop 的文 件夹&#xff0c;根据 Halcon 版本找到对…

Vue3 + TS 实现批量拖拽 文件夹和文件 组件封装

一、html 代码&#xff1a; 代码中的表格引入了 vxe-table 插件 <Tag /> 是自己封装的说明组件 表格列表这块我使用了插槽来增加扩展性&#xff0c;可根据自己需求&#xff0c;在组件外部做调整 <template><div class"dragUpload"><el-dial…

CF 339A.Helpful Maths(Java实现)

题目分析 输入一串式子&#xff0c;输出从小到大排列的式子 思路分析 如上所说核心思路&#xff0c;但是我要使用笨方法&#xff0c;输入一串式子用split分割开&#xff0c;但是此时需要用到转义字符&#xff0c;即函数内参数不能直接使用“”&#xff0c;而是“\\”。分割开后…

naivecv的设计与实现(3): NV12到RGB的转换

准备 NV12 图像 在 github 搜索关键字 “YUVViewer", 找到样例文件&#xff1a; https://github.com/LiuYinChina/YUVViewer/blob/master/Output/720X576-NV12.yuv 它是二进制文件&#xff0c;没有文件头信息&#xff0c;只有像素内容, 排布方式: 先 Y 平面&#xff0c…