SpingBoot集成Rabbitmq及Docker部署

文章目录

    • 介绍
      • RabbitMQ的特点
      • Rabbitmq术语
      • 消息发布接收流程
    • Docker部署
      • 管理界面说明
        • Overview: 这个页面显示了RabbitMQ服务器的一般信息,例如集群节点的名字、状态、运行时间等。
        • Connections: 在这里,可以查看、管理和关闭当前所有的TCP连接。
        • Channels: 这个页面展示了所有当前打开的通道以及它们的详细信息。外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
        • Exchanges: 可以在这里查看、创建和删除交换机。
        • Queues: 这个页面展示了所有当前的队列以及它们的详细信息。
        • Admin: 在这里,可以查看系统中所有的操作用户。
      • 延时队列插件下载安装
      • rabbitmq.conf配置文件示例
        • 1.1 rabbitmq.conf
        • 1.2 advanced.config
        • 1.3 rabbitmq-env.conf
    • Java配置
      • Yml完整配置
      • RabbitMQ的六种工作模式
        • 消费者@RabbitListener注解下的配置内容
        • 1.simple简单模式(点对点模式)
        • 2.work工作模式(一对多)
        • 3.publish/subscribe发布订阅(共享资源)
        • 4.routing路由模式
        • 5.topic 主题模式(路由模式的一种)
        • 6.RPC (基于消息的远程过程调用)
      • 延时队列、循环队列、兜底机制、定时任务
        • 1.延时队列
          • 使用TTL+死信队列组合实现延迟队列的效果。
          • 使用RabbitMQ官方延迟插件,实现延时队列效果。
        • 2.循环队列
        • 3.兜底机制
        • 4.定时任务

介绍

RabbitMQ是由Erlang语言开发的AMQP的开源实现

AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ的特点

  • 可靠性(Reliablity):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。
  • 灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。
  • 消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 高可用(Highly Avaliable Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
  • 多种协议(Multi-protocol):支持多种消息队列协议,如STOMP、MQTT等。
  • 多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。
  • 管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
  • 跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
  • 插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件

Rabbitmq术语

  • 消费者:订阅某个队列

  • 生产者::创建消息,然后发布到队列中(queue),最终将消息发送到监听的消费者。

  • Broker:标识消息队列服务器实体.

  • Virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。

  • Exchange:交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  • Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  • Banding:绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换机理解成一个由绑定构成的路由表。

  • Channel:通道,多路复用连接中的一条独立的双向数据流通道。通道是建立在真实的TCP连接内的虚拟链接,AMQP命令都是通过通道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过通道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了通道的概念,以复用一条TCP连接。

  • Connection:网络连接,比如一个TCP连接。

  • Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。

  • Consumer:消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。

  • Message:消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。

消息发布接收流程

1.发送消息

  • 生产者和Broker建立TCP连接。
  • 生产者和Broker建立通道。
  • 生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
  • Exchange将消息转发到指定的Queue(队列)

2.接收消息

  • 消费者和Broker建立TCP连接 。
  • 消费者和Broker建立通道
  • 消费者监听指定的Queue(队列)
  • 当有消息到达Queue时Broker默认将消息推送给消费者。
  • 消费者接收到消息。

Docker部署

查询rabbitmq最新版本

docker search rabbitmq #查询镜像 已经集成了erlang,无需单独安装erlang
docker pull rabbitmq  #拉取镜像 最新版,或指定版本 docker pull rabbitmq:3.13-management 自带管理界面
docker images # 查看拉取的镜像
#启动容器 指定管理界面登陆账号和密码
# 15672 管理界面端口
# 5672 amqp协议端口,程序连接端口 
# -v /mnt/data/rabbitmq/conf:/etc/rabbitmq 配置文件目录
# -v /mnt/data/rabbitmq/data:/var/lib/rabbitmq 数据目录
# -v /mnt/data/rabbitmq/log:/var/log/rabbitmq 日志目录
# -e RABBITMQ_DEFAULT_USER=tlmroot 管理界面登陆账号
# -e RABBITMQ_DEFAULT_PASS=123456 管理界面登陆密码
# 最好限制容器内存 --memory 2g
docker run -d --hostname rabbitmq --name rabbitmq --restart=always -v /mnt/data/rabbitmq/conf:/etc/rabbitmq -v /mnt/data/rabbitmq/data:/var/lib/rabbitmq -v /mnt/data/rabbitmq/log:/var/log/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=tlmroot -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:latest
# docker ps 如果容器启动失败,需要提高日志挂载目录的访问权限后重启服务
chmod 777 /mnt/data/rabbitmq/log
# 进入容器内部安装管理界面插件
docker exec -it rabbitmq /bin/bash
# 容器内部创建管理界面插件 安装完成即可访问 服务器IP加15672,如无法访问 关闭防火墙 systemctl stop firewalld
rabbitmq-plugins enable rabbitmq_management
# 容器内部启用所有功能标志,也可以在管理界面操作(Admin/Feature Flags)
enable all feature flags
#至此创建完成 服务器需要开放15672和5672端口

在这里插入图片描述

管理界面说明

  • Overview: 这个页面显示了RabbitMQ服务器的一般信息,例如集群节点的名字、状态、运行时间等。

在这里插入图片描述

Totals 消息数,队列消息、连接数、通道数、交换机数、队列数、消费者数
Nodes:节点信息 进程数、磁盘数据、运行时间、等
Churn statistics: 流失率统计,最后一分钟连接操作、通道操作、队列操作
Ports and contexts: 端口信息及网络环境信息
Export definitions: 导出配置
Import definitions:导入配置
  • Connections: 在这里,可以查看、管理和关闭当前所有的TCP连接。

在这里插入图片描述

  Virtual host: 所属的虚拟主机。
  Name: 名称。
  User name: 使用的用户名。
  State: 当前的状态,running:运行中;idle:空闲。
  SSL/TLS: 是否使用ssl进行连接。
  Protocol: 使用的协议。
  Channels: 创建的channel的总数。
  From client: 每秒发出的数据包。
  To client: 每秒收到的数据包。
  • Channels: 这个页面展示了所有当前打开的通道以及它们的详细信息。外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

在这里插入图片描述

channel:名称。
Virtual host:所属的虚拟主机。
User name:使用的用户名。
Mode:渠道保证模式。 可以是以下之一,或者不是:C: confirm。T:transactional(事务)。
State :当前的状态,running:运行中;idle:空闲。
Unconfirmed:待confirm的消息总数。
Prefetch:设置的prefetch的个数。
Unacker:待ack的消息总数。
publish:生产端 pub消息的速率。
confirm:生产端确认消息的速率。
deliver/get:消费端获取消息的速率。
ack:消费端 ack消息的速率
  • Exchanges: 可以在这里查看、创建和删除交换机。

在这里插入图片描述

 Name:名称。
 Type:exchange type,具体的type可以查看RabbitMq系列之一:基础概念。
 Features:持久化,D:持久化 I:内部 AD:自动删除
 Message rate in:消息输入速率。
 Message rate out:消息输出速率
 Add a new exchange:
    Virtual host:属于哪个Virtual host,我这里只有一个所以不显示
    Name:名字,同一个Virtual host里面的Name不能重复。
    Durability: 是否持久化,Durable:持久化。Transient:不持久化。
    Auto delete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。
    Internal: 是否是内部专用exchange,是的话,就意味着我们不能往该exchange里面发消息。
    Arguments: 参数,是AMQP协议留给AMQP实现做扩展使用的
  • Queues: 这个页面展示了所有当前的队列以及它们的详细信息。

在这里插入图片描述

Virtual host: 所属的虚拟主机。
Name: 名称。
Features: 功能。(参数参考上述交换机页面)
State: 当前的状态,running:运行中;idle:空闲。
Ready: 待消费的消息总数。
Unacked: 待应答的消息总数。
Total: 总数 Ready+Unacked。
incoming: 消息进入的速率。
deliver/get: 消息获取的速率。
ack: 消息应答的速率。
Add a new queue:
    Virtual host:属于哪个Virtual host
    Name:名字,同一个Virtual host里面的Name不能重复。
    Durability: 是否持久化,Durable:持久化。Transient:不持久化。
    Auto delete:当最后一个绑定(队列或者exchange)被unbind之后,该 queue 自动被删除。
    Arguments: 参数,是AMQP协议留给AMQP实现做扩展使用的
  • Admin: 在这里,可以查看系统中所有的操作用户。

在这里插入图片描述

Name: 名称。
Tags: 角色标签,只能选取一个。
Can access virtual hosts: 允许进入的vhost。
Has password: 设置了密码。

Virtual Host:虚拟主机

虚拟主机(vhost)提供逻辑分组和资源分离。每一个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的connection、exchange、queue、binding等,拥有自己的权限。vhost之于RabbitMQ就像虚拟机于物理机一样,他们通过在各个实例间提供逻辑上分离,允许为不同的应用程序安全保密的运行数据。

Feature Flags:功能标志开关

Deprecated Features:已废特性

Policies:策略配置

策略分为“用户策略”和“系统策略”
策略使用的是正则表达匹配规则,按名称匹配一个或多个队列,并将其定义的一些规则(参数)到匹配队列中。换句话说,可以使用策略一次为多个队列配置参数。策略可以理解为给“队列”和“分发器”设置额外的“Arguments”参数。每个“分发器”和“队列”只能生效一个“策略”,并且是是立即生效的。
    参数:
    Apply to:指定策略是只匹配队列、还是只匹配交换,或两则两者都匹配。
    Priority:表示的是策略的优先级、值越大,优先级越高。
    Definition:才是真正的规则。有四大类,分别是HA(高可用性)、federation(联合)、Queues(队列)、Exchanges(备用分发器)
    HA(高可用性):表示将队列怎么镜像到节点的策略。
    ha-mode:选项有三个,分别是“all“(表示同步到所有节点),“exactly”,“nodes”。"exactly"和"nodes"需要结合ha-params才能决定同步策略
    ha-params:为数值、表示个数
    ha-sync-mode:(手动(manual)/自动(automatic)同步)

Limits 可以设置最大连接数

Cluster 集群 更改集群名称

延时队列插件下载安装

延时插件链接地址3.13.0,下载版本和rabbitmq版本要一致 我安装的是 3.13.0

# 下载插件到/home/目录,
curl -JL https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez  -o /home/rabbitmq_delayed_message_exchange-3.13.0.ez
# 可省略,进入容器查询插件目录,三方插件需要放在这里 (ez结尾的文件,/opt/rabbitmq/plugins)
rabbitmq-plugins directories -s
# 容器/opt/rabbitmq/plugins 为插件目录 延时队列插件需要复制到这里
docker cp /home/rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq:/opt/rabbitmq/plugins
# 进入容器内部查询插件,
docker exec -it rabbitmq /bin/bash
# rabbitmq-plugins list 如下图
rabbitmq-plugins list
# 安装插件命令同安装管理界面命令 rabbitmq-plugins enable 《plugin_name》
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 安装完成后 界面Exchanges(交换机),新增的时候就会出现x-delayed-message

在这里插入图片描述

在这里插入图片描述

rabbitmq.conf配置文件示例

容器运行后,默认没有配置文件,自带的配置足够使用,自行创建放在主机/mnt/data/rabbitmq/conf/目录,或是放在容器/etc/rabbitmq目录,创建容器时已映射

容器内部查询有效配置 
rabbitmqctl environment
RabbitMQ 常用的三种自定义服务器的通用方法:

    配置文件 rabbitmq.conf
    环境变量文件 rabbitmq-env.conf
    补充配置文件 advanced.config

rabbitmq.conf和rabbitmq-env.conf的位置

    在二进制安装中路径是在 :安装目录下的/etc/rabbitmq/
    rpm 安装: /etc/rabbitmq/

如果rabbitmq.conf和rabbitmq-env.conf 的两个文件不存在,那么我们可以创建该文件,然后我们可以通过环境变量
指定该文件的位置。

补充 :
    rabbitmqctl rabbitmqctl 是管理虚拟主机和用户权限的工具
    rabbitmq-plugins 是管理插件的工具
1.1 rabbitmq.conf
属性描述默认值
listeners要监听 AMQP 0-9-1 and AMQP 1.0 的端口listeners.tcp.default = 5672
num_acceptors.tcp接受tcp连接的erlang 进程数num_acceptors.tcp = 10
handshake_timeoutAMQP 0-9-1 超时时间,也就是最大的连接时间,单位毫秒handshake_timeout = 10000
listeners.ssl启用TLS的协议默认值为none
num_acceptors.ssl接受基于TLS协议的连接的erlang 进程数num_acceptors.ssl = 10
ssl_optionsTLS 配置ssl_options =none
ssl_handshake_timeoutTLS 连接超时时间 单位为毫秒ssl_handshake_timeout = 5000
vm_memory_high_watermark触发流量控制的内存阈值,可以为相对值(0.5),或者绝对值 vm_memory_high_watermark.relative = 0.6 ,vm_memory_high_watermark.absolute = 2GB默认vm_memory_high_watermark.relative = 0.4
vm_memory_calculation_strategy内存使用报告策略,assigned:使用Erlang内存分配器统计信息 rss:使用操作系统RSS内存报告。这使用特定于操作系统的方法,并可能启动短期子进程。legacy:使用遗留内存报告(运行时认为将使用多少内存)。这种策略相当不准确。erlang 与legacy一样 是为了向后兼容vm_memory_calculation_strategy = allocated
vm_memory_high_watermark_paging_ratio当内存的使用达到了50%后,队列开始将消息分页到磁盘vm_memory_high_watermark_paging_ratio = 0.5
total_memory_available_override_value该参数用于指定系统的可用内存总量,一般不使用,适用于在容器等一些获取内存实际值不精确的环境默认未设置
disk_free_limitRabbitmq存储数据的可用空间限制,当低于该值的时候,将触发流量限制,设置可参考vm_memory_high_watermark参数disk_free_limit.absolute = 50MB
log.file.level控制记录日志的等级,有info,error,warning,debuglog.file.level = info
channel_max最大通道数,但不包含协议中使用的特殊通道号0,设置为0表示无限制,不建议使用该值,容易出现channel泄漏channel_max = 2047
channel_operation_timeout通道操作超时,单位为毫秒channel_operation_timeout = 15000
heartbeat表示连接参数协商期间服务器建议的心跳超时的值。如果两端都设置为0,则禁用心跳,不建议禁用heartbeat = 60
default_vhostrabbitmq安装后启动创建的虚拟主机default_vhost = /
default_user默认创建的用户名default_user = guest
default_pass默认用户的密码default_pass = guest
default_user_tags默认用户的标签default_user_tags.administrator = true
default_permissions在创建默认用户是分配给默认用户的权限default_permissions.configure = .* default_permissions.read = .* default_permissions.write = .*
loopback_users允许通过回环地址连接到rabbitmq的用户列表,如果要允许guest用户远程连接(不安全)请将该值设置为none,如果要将一个用户设置为仅localhost连接的话,配置loopback_users.username =true(username要替换成用户名)loopback_users.guest = true(默认为只能本地连接)
cluster_formation.classic_config.nodes设置集群节点cluster_formation.classic_config.nodes.1 = rabbit@hostname1
cluster_formation.classic_config.nodes.2 = rabbit@hostname2默认为空,未设置
collect_statistics统计收集模式,none 不发出统计信息事件,coarse每个队列连接都发送统计一次,fine每发一条消息的统计数据collect_statistics = none
collect_statistics_interval统计信息收集间隔,以毫秒为单位collect_statistics_interval = 5000
delegate_count用于集群内通信的委托进程数。在多核的服务器上我们可以增加此值delegate_count = 16
tcp_listen_options默认的套接字选项tcp_listen_options.backlog = 128 …
hipe_compile设置为true以使用HiPE预编译RabbitMQ的部分,HiPE是Erlang的即时编译器,启用HiPE可以提高吞吐量两位数,但启动时会延迟几分钟。Erlang运行时必须包含HiPE支持。如果不是,启用此选项将不起作用。HiPE在某些平台上根本不可用,尤其是Windows。hipe_compile = false
cluster_keepalive_interval节点应该多长时间向其他节点发送keepalive消息(以毫秒为单位),keepalive的消息丢失不会被视为关闭cluster_keepalive_interval = 10000
queue_index_embed_msgs_below消息的字节大小,低于该大小,消息将直接嵌入队列索引中 bytesqueue_index_embed_msgs_below = 4096
mnesia_table_loading_retry_timeout等待集群中Mnesia表可用的超时时间,单位毫秒mnesia_table_loading_retry_timeout = 30000
mnesia_table_loading_retry_limit集群启动时等待Mnesia表的重试次数,不适用于Mnesia升级或节点删除。mnesia_table_loading_retry_limit = 10
mirroring_sync_batch_size要在队列镜像之间同步的消息的批处理大小mirroring_sync_batch_size = 4096
queue_master_locator队列主节点的策略,有三大策略 min-masters,client-local,randomqueue_master_locator = client-local
proxy_protocol如果设置为true ,则连接需要通过反向代理连接,不能直连接proxy_protocol = false
management.listener.portrabbitmq web管理界面使用的端口management.listener.port = 15672
1.2 advanced.config

某些配置设置不可用或难以使用sysctl格式进行配置。因此,可以使用Erlang术语格式的其他配置文件advanced.config
它将与rabbitmq.conf 文件中提供的配置合并。

属性描述默认值
msg_store_index_module设置队列索引使用的模块{rabbit,[ {msg_store_index_module,rabbit_msg_store_ets_index} ]}
backing_queue_module队列内容的实现模块。{rabbit,[ {backing_queue_module,rabbit_variable_queue} ]}
msg_store_file_size_limit消息储存的文件大小,现有的节点更改是危险的,可能导致数据丢失默认值16777216
trace_vhosts内部的tracer使用,不建议更改{rabbit,[ {trace_vhosts,[]} ]}
msg_store_credit_disc_bound设置消息储存库给队列进程的积分,默认一个队列进程被赋予4000个消息积分{rabbit, [{msg_store_credit_disc_bound, {4000, 800}}]}
queue_index_max_journal_entries队列的索引日志超过该阈值将刷新到磁盘{rabbit, [{queue_index_max_journal_entries, 32768}]}
lazy_queue_explicit_gc_run_operation_threshold在内存压力下为延迟队列设置的值,该值可以触发垃圾回收和减少内存使用,降低该值,会降低性能,提高该值,会导致更高的内存消耗{rabbit,[{lazy_queue_explicit_gc_run_operation_threshold, 1000}]}
queue_explicit_gc_run_operation_threshold在内存压力下,正常队列设置的值,该值可以触发垃圾回收和减少内存使用,降低该值,会降低性能,提高该值,会导致更高的内存消耗{rabbit, [{queue_explicit_gc_run_operation_threshold, 1000}]}
1.3 rabbitmq-env.conf

通过rabbitmq-env.conf 来定义环境变量
RABBITMQ_NODENAME 指定节点名称

属性描述默认值
RABBITMQ_NODE_IP_ADDRESS绑定的网络接口默认为空字符串表示绑定本机所有的网络接口
RABBITMQ_NODE_PORT端口默认为5672
RABBITMQ_DISTRIBUTION_BUFFER_SIZE节点之间通信连接的数据缓冲区大小默认为128000,该值建议不要使用低于64MB
RABBITMQ_IO_THREAD_POOL_SIZE运行时用于io的线程数建议不要低于32,linux默认为128 ,windows默认为64
RABBITMQ_NODENAMErabbitmq节点名称,集群中要注意节点名称唯一linux 默认节点名为 rabbit@$hostname
RABBITMQ_CONFIG_FILErabbitmq 的配置文件路径,注意不要加文件的后缀(.conf)默认 $RABBITMQ_HOME/etc/rabbitmq/rabbitmq(二进制安装) /etc/rabbitmq/rabbitmq(rpm 安装)
RABBITMQ_ADVANCED_CONFIG_FILEadvanced.config文件路径默认 $RABBITMQ_HOME/etc/rabbitmq/advanced(二进制安装) /etc/rabbitmq/advanced(rpm 安装)
RABBITMQ_CONF_ENV_FILE环境变量配置文件路径默认 $RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf(二进制安装) /etc/rabbitmq/rabbitmq-env.conf(rpm 安装)
RABBITMQ_SERVER_CODE_PATH在使用HiPE 模块时需要使用默认为空
RABBITMQ_LOGS指定日志文件位置默认为 $RABBITMQ_HOME/etc/var/log/rabbitmq/

RABBITMQ_DISTRIBUTION_BUFFER_SIZE 节点间通信缓冲区大小,默认值 128Mb,节点流量比较多的集群中,可以提升该值,建议该值不要低于64MB。

tcp 缓存区大小
下示例将AMQP 0-9-1连接的TCP缓冲区设置为192 KiB:

tcp_listen_options.backlog = 128
tcp_listen_options.nodelay = true
tcp_listen_options.linger.on = true
tcp_listen_options.linger.timeout = 0
tcp_listen_options.sndbuf = 196608
tcp_listen_options.recbuf = 196608

Java配置

<!-- Maven依赖,Springboot默认集成-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Yml完整配置

spring:
  rabbitmq:
    host: 127.0.0.1 #ip
    port: 5672      #端口
    username: tlmroot #账号
    password: 123456 #密码
    virtualHost:    #链接的虚拟主机 ,切换不同环境 dev\test\prod
    addresses: 127.0.0.1:5672     #多个以逗号分隔,与host功能一样。
    requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s
    publisherConfirms: true  #发布确认机制是否启用
    publisherReturns: #发布返回是否启用
    connectionTimeout: #链接超时。单位ms。0表示无穷大不超时
    ### ssl相关
    ssl:
      enabled: #是否支持ssl
      keyStore: #指定持有SSL certificate的key store的路径
      keyStoreType: #key store类型 默认PKCS12
      keyStorePassword: #指定访问key store的密码
      trustStore: #指定持有SSL certificates的Trust store
      trustStoreType: #默认JKS
      trustStorePassword: #访问密码
      algorithm: #ssl使用的算法,例如,TLSv1.1
      verifyHostname: #是否开启hostname验证
    ### cache相关
    cache:
      channel: 
        size: #缓存中保持的channel数量
        checkoutTimeout: #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
      connection:
        mode: #连接工厂缓存模式:CHANNEL 和 CONNECTION
        size: #缓存的连接数,只有是CONNECTION模式时生效
    ### listener
    listener:
       type: #两种类型,SIMPLE,DIRECT 默认simple
       ## simple类型 
       simple: # 一对一
         concurrency: #最小消费者数量
         maxConcurrency: #最大的消费者数量
         transactionSize: #指定一个事务处理的消息数量,最好是小于等于prefetch的数量
         missingQueuesFatal: #是否停止容器当容器中的队列不可用
         ## 与direct相同配置部分
         autoStartup: #是否自动启动容器
         acknowledgeMode: #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
         prefetch: #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量
         defaultRequeueRejected: #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
         idleEventInterval: #container events发布频率,单位ms
         ##重试机制
         retry: 
           stateless: #有无状态
           enabled:  #是否开启
           maxAttempts: #最大重试次数,默认3
           initialInterval: #重试间隔
           multiplier: #对于上一次重试的乘数
           maxInterval: #最大重试时间间隔
       direct: # 一对多
         consumersPerQueue: #每个队列消费者数量
         missingQueuesFatal:
         #...其余配置看上方公共配置
     ## template相关
     template:
       mandatory: #是否启用强制信息;默认false
       receiveTimeout: #`receive()`接收方法超时时间
       replyTimeout: #`sendAndReceive()`超时时间
       exchange: #默认的交换机
       routingKey: #默认的路由
       defaultReceiveQueue: #默认的接收队列
       ## retry重试相关
       retry: 
         enabled: true #是否重试功能 默认false
         maxAttempts: 3 #最大重试次数 默认为3
         initialInterval: 1000ms #重试间隔时间 可以使用ms、s、m、h、d
         multiplier: #重试乘数,默认为1,即每次重试间隔时间保持不变
         maxInterval: 10000ms #最大重试间隔时间 与乘数结合使用

配置文件

package com.tecloman.cloud.singleton.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Administrator
 */
@Configuration
@Slf4j
public class RabbitConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 序列化配置
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        rabbitTemplate.setMandatory(true);
        // 推送到server回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
                log.info("ConfirmCallback correlationData:{},ack:{},cause:{}",correlationData,ack,cause));
        // 消息返回给生产者, 路由不到队列时返回给发送者  先returnCallback,再 confirmCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("ReturnCallback message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",
                    message,replyCode,replyText,exchange,routingKey);
        });
        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

RabbitMQ的六种工作模式

消费者@RabbitListener注解下的配置内容
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(RabbitListeners.class)
public @interface RabbitListener {
    String id() default ""; // 用于为监听器指定一个唯一标识符,不指定则自动生成。

    String containerFactory() default "";// 指定要使用的消息监听容器工厂 bean 的名称。

    String[] queues() default {}; // 指定要监听的队列名称。可以是'队列名称','属性占位符键'或'表达式',队列必须存在,queues 属性与 bindings() 和   queuesToDeclare() 属性互斥,不能同时使用

    Queue[] queuesToDeclare() default {}; // 用于声明要监听的队列,可以通过 @Queue 注解定义队列的属性。与 bindings() 和 queues() 属性互斥,不能同时使用,允许动态声明队列。

    boolean exclusive() default false; // 指定是否为独占模式,即只有一个消费者可以消费该队列,为true时要求并发数=1。

    String priority() default ""; // 指定消息的优先级,越大优先级越高。默认为容器优先级,可以为负数。

    String admin() default ""; // 属性用于指定一个 RabbitAdmin bean 的引用。

    QueueBinding[] bindings() default {}; // 用于绑定队列和交换机,以便监听指定的交换机中的消息。与 queues() 和 queuesToDeclare() 属性互斥,不能同时使用。

    String group() default ""; // 指定消费者所属的分组。可以用于实现分组消费,确保同一组内的消费者共享消息。

    String returnExceptions() default ""; // 定义一个异常处理策略,用于处理消息发送失败时的异常情况。

    String errorHandler() default ""; // 指定消息监听容器的错误处理器,用于处理在消息处理过程中发生的错误。

    String concurrency() default ""; // 指定消费者的并发数量,表示同时处理消息的线程数或者并发消费者的数量。

    String autoStartup() default ""; // 指定容器是否自动启动,如果设置为 true,则容器会在启动时自动开始侦听消息。

    String executor() default ""; // 定义用于处理消息的执行器,可以指定一个线程池来处理消息的消费逻辑。

    String ackMode() default ""; // 指定消息确认模式,用于控制消息的确认方式,包括自动、手动、批量确认等。

    String replyPostProcessor() default ""; // 定义一个后处理器,用于在发送响应时对响应消息进行处理。

    String messageConverter() default ""; // 指定消息转换器,用于将消息从字节流转换为目标对象,或者将目标对象转换为字节流。

    String replyContentType() default ""; // 指定回复消息的内容类型。

    String converterWinsContentType() default "true"; // 指定转换器是否覆盖内容类型。
}

1.simple简单模式(点对点模式)
  • 消息的生产者将消息放入队列

  • 消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)

    	/**
         * 配置文件添加,简单模式队列
         * @return
         */
        @Bean
        public Queue simpleQueue(){
            //持久化 非独占 非自动删除
            return QueueBuilder.durable("simpleQueue").build();
        }
    
         /**
         * 简单模式生产者
         */
        @GetMapping("/simple")
        public R simple(@RequestParam String msg){
            Map<String, Object> map = createMsg(msg);
            // 预先要创建好队列
            rabbitTemplate.convertAndSend("simpleQueue",map);
            return R.ok();
        }
    
         /**
         * 简单模式的消费者
         *
         * @param message 消息属性
         * @param channel 通道
         * @param msg     消息内容
         * @throws IOException
         */
        //使用queuesToDeclare属性,如果不存在则会创建队列
        @RabbitListener(queuesToDeclare = @Queue(value = "simpleQueue"))
        public void simple(Message message, Channel channel, JSONObject msg) {
            try {
                MessageProperties properties = message.getMessageProperties();
                // 这个tag每次服务重启会清0
                long tag = properties.getDeliveryTag();
                log.info("简单模式的消费者收到:{}", msg);
                // 简单模式下,消息其实无需确认
                // 由于在yml设置手动回执,此处需要手动回执,false不批量签收,回执后才能处理下一批消息
                channel.basicAck(tag, false);
            } catch (IOException e) {
                log.error(this.getClass().getName());
            }
        }
    
2.work工作模式(一对多)
  • 消息生产者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)

  • 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)

    在这里插入图片描述

    	/**
         * 配置文件添加,Work模式队列
           work队列 默认是轮询发到消息者,priority="10" 设置消费者优先级,优先级相同轮询
         * @return
         */
        @Bean
        public Queue workQueue(){
            //持久化 非独占 非自动删除
            return QueueBuilder.durable("workQueue").build();
        }
    
        /**
         * 生产者,一次性生产50条消费,消费者轮询消费,消费者可设置优先级priority="10",越大越优先
         */
        @GetMapping("/work")
        public R work(@RequestParam String msg) {
            for (int i = 0; i < 50; i++) {
                rabbitTemplate.convertAndSend("workQueue", createMsg(i), message -> {
                    MessageProperties messageProperties = message.getMessageProperties();
                    //默认消息持久化,设置消息不持久化
                    messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                    return message;
                });
            }
            return R.ok();
        }
    
        /**
         * 工作模式的消费者1,group分组属性不会生效
         *
         * @param message 消息属性
         * @param channel 通道
         * @param msg     消息内容
         * @throws IOException
         */
        //使用queuesToDeclare属性,如果不存在则会创建队列
        @RabbitListener(queuesToDeclare = @Queue(value = "workQueue"))
        public void work1(Message message, Channel channel, JSONObject msg) {
            try {
                MessageProperties properties = message.getMessageProperties();
                long tag = properties.getDeliveryTag();
                log.error("工作模式的消费者1收到:{}", msg);
                //手动回执,不批量签收,回执后才能处理下一批消息
                channel.basicAck(tag, false);
            } catch (IOException e) {
                log.error(this.getClass().getName());
            }
        }
    
        /**
         * 工作模式的消费者2
         *
         * @param message 消息属性
         * @param channel 通道
         * @param msg     消息内容
         * @throws IOException
         */
        //使用queuesToDeclare属性
        @RabbitListener(queuesToDeclare = @Queue(value = "workQueue"))
        public void work2(Message message, Channel channel, JSONObject msg) {
            try {
                MessageProperties properties = message.getMessageProperties();
                long tag = properties.getDeliveryTag();
                log.error("工作模式的消费者2收到:{}", msg);
                //手动回执,不批量签收,回执后才能处理下一批消息
                channel.basicAck(tag, false);
            } catch (IOException e) {
                log.error(this.getClass().getName());
            }
        }
    
3.publish/subscribe发布订阅(共享资源)
  • 生产者通过fanout扇出交换机群发消息给消费者,同一条消息每一个消费者都可以收到,消息生产者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费

  • 相关场景:邮件群发,群聊天,广播(广告)

在这里插入图片描述

//------------------方法1:生产者创建交换机,消费者创建队列与监听队列------------------  
     /**
     * 配置文件定义交换机
     *
     * @return
     */
    @Bean
    public Exchange fanout() {
        //持久化 非自动删除
        return ExchangeBuilder.fanoutExchange("fanout").build();
    }
    //创建初始化RabbitAdmin对象
    @Bean
    public RabbitAdmin fanoutRabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        // 声明交换机 fanout
        rabbitAdmin.declareExchange(fanout());
        return rabbitAdmin;
    }

    /*
    * 生产者 发送50条消息,消费者各自消费50条,
    */
    @GetMapping("/fanout")
    public R fanout(@RequestParam String msg){
        for (int i = 0; i < 50; i++) {
            Map<String, Object> map = createMsg(i);
            // 第二个参数为路由Key
            rabbitTemplate.convertAndSend("fanout",null,map);
        }
        return R.ok();
    }
   
    /**
     * 发布订阅模式方法1的消费者1,group分组属性不会生效
     *
     * @param message 消息属性
     * @param channel 通道
     * @param msg     消息内容
     */
    @RabbitListener(
            // 这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
            // declare = "false":生产者已定义交换机,此处不再声明交换
            bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "fanout", declare = "false")))
    public void fanout1(Message message, Channel channel, JSONObject msg) {
        try {
            MessageProperties properties = message.getMessageProperties();
            long tag = properties.getDeliveryTag();
            log.error("发布订阅模式方法1的消费者1收到:{}", msg);
            // 手动回执,不批量签收,回执后才能处理下一批消息
            channel.basicAck(tag, false);
        } catch (IOException e) {
            log.error(this.getClass().getName());
        }
    }

    @RabbitListener(
            // 这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
            // declare = "false":生产者已定义交换机,此处不再声明交换
            bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "fanout", declare = "false")))
    public void fanout2(Message message, Channel channel, JSONObject msg) {
        try {
            MessageProperties properties = message.getMessageProperties();
            long tag = properties.getDeliveryTag();
            log.error("发布订阅模式方法1的消费者2收到:{}", msg);
            // 手动回执,不批量签收,回执后才能处理下一批消息
            channel.basicAck(tag, false);
        } catch (IOException e) {
            log.error(this.getClass().getName());
        }
    }
//------------------方法2:生产者创建队列与交换机,消费者监听队列------------------
    /**
     * 定义队列 持久 非排他 非自动删除
     * @return
     */
    @Bean
    public Queue fanoutQueue1(){
        return QueueBuilder.durable("fanout-queue1").build();
    }

    @Bean
    public Queue fanoutQueue2(){
        return QueueBuilder.durable("fanout-queue2").build();
    }

    /**
     * 定义扇出交换机 持久  非自动删除
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return ExchangeBuilder.fanoutExchange("fanout2").build();
    }

    /**
     * 将队列1与交换机绑定
     * @return
     */
    @Bean
    public Binding fanoutBinding1(){
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }

    @Bean
    public Binding fanoutBinding2(){
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

    //创建初始化RabbitAdmin对象
    @Bean
    public RabbitAdmin fanoutRabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        // 声明交换机和队列
        rabbitAdmin.declareExchange(fanoutExchange());
        rabbitAdmin.declareQueue(fanoutQueue1());
        rabbitAdmin.declareQueue(fanoutQueue2());
        return rabbitAdmin;
    }

 // 不同消费者绑定在同一个交换机,队列相同,轮询消费,队列不同,各自消费
    /**
     * 发布订阅模式方法2的消费者1 ,队列不同,生产者发送50条消息,各自消费50条
     *
     * @param message 消息属性
     * @param channel 通道
     * @param msg     消息内容
     */
    //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
    @RabbitListener(queuesToDeclare = @Queue(value = "fanout-queue1"))
    public void fanout1(Message message, Channel channel, JSONObject msg) {
        try {
            MessageProperties properties = message.getMessageProperties();
            long tag = properties.getDeliveryTag();
            log.error("发布订阅模式方法2的消费者1收到:{}", msg);
            //手动回执,不批量签收,回执后才能处理下一批消息
            channel.basicAck(tag, false);
        } catch (IOException e) {
            log.error(this.getClass().getName());
        }
    }

    @RabbitListener(queuesToDeclare = @Queue(value = "fanout-queue2"))
    public void fanout2(Message message, Channel channel, JSONObject msg) {
        try {
            MessageProperties properties = message.getMessageProperties();
            long tag = properties.getDeliveryTag();
            log.error("发布订阅模式方法2的消费者2收到:{}", msg);
            //手动回执,不批量签收,回执后才能处理下一批消息
            channel.basicAck(tag, false);
        } catch (IOException e) {
            log.error(this.getClass().getName());
        }
    }
4.routing路由模式
  • 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;

  • 根据业务功能定义路由字符串

  • 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中

    在这里插入图片描述

        
        /**
         * 定义直流交换机
         * @return
         */
        @Bean
        public Exchange routeExchange(){
            //持久化 非自动删除
            return ExchangeBuilder.directExchange("route").build();
        }
    
        // 创建初始化RabbitAdmin对象
        @Bean
        public RabbitAdmin RabbitAdminRoute(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
            rabbitAdmin.setAutoStartup(true);
            // 声明直流交换机
            rabbitAdmin.declareExchange(routeExchange());
            return rabbitAdmin;
        }
        // 消费者发送消息,key=dev,test,prod
        @GetMapping("/router")
        public R router(@RequestParam String msg,@RequestParam String routerKey){
            Map<String, Object> map = createMsg(msg);
            rabbitTemplate.convertAndSend("route",routerKey,map);
            return R.ok();
        }
    
        /**
         * 路由式消费者1
         *
         * @param message 消息属性
         * @param channel 通道
         * @param msg     消息内容
         */
        @RabbitListener(bindings = @QueueBinding(
                // declare = "false":生产者已定义交换机,此处不再声明交换机
                value = @Queue, exchange = @Exchange(name = "route", declare = "false"),
                key = {"prod"}//路由键
        ))
        public void route1(Message message, Channel channel, JSONObject msg) {
            try {
                MessageProperties properties = message.getMessageProperties();
                String routingKey = properties.getReceivedRoutingKey();
                log.error("路由模式方法1的消费者1收到:{},路由键:{}", msg, routingKey);
                //手动回执,不批量签收,回执后才能处理下一批消息
                long tag = properties.getDeliveryTag();
                channel.basicAck(tag, false);
            } catch (Exception e) {
                log.error(this.getClass().getName());
            }
        }
    
        /**
         * 路由式消费者2
         *
         * @param message 消息属性
         * @param channel 通道
         * @param msg     消息内容
         */
        @RabbitListener(bindings = @QueueBinding(
                // declare = "false":生产者已定义交换机,此处不再声明交换机
                value = @Queue, exchange = @Exchange(name = "route", declare = "false"),
                key = {"dev","test"}//路由键
        ))
        public void route2(Message message, Channel channel, JSONObject msg) {
            try {
                MessageProperties properties = message.getMessageProperties();
                String routingKey = properties.getReceivedRoutingKey();
                log.error("路由模式方法1的消费者2收到:{},路由键:{}", msg, routingKey);
                //手动回执,不批量签收,回执后才能处理下一批消息
                long tag = properties.getDeliveryTag();
                channel.basicAck(tag, false);
            } catch (Exception e) {
                log.error(this.getClass().getName());
            }
        }
    
5.topic 主题模式(路由模式的一种)
  • 路由功能添加模糊匹配

  • 消息生产者生产消息,把消息交给交换机

  • 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

  • topic必须是 星号或#.dev.星号,不能以 molo/pcs/q0/*/data_up 这样匹配不了

在这里插入图片描述

    /**
     * 定义主题交换机
     * @return
     */
    @Bean
    public Exchange themeExchange(){
        //持久化 非自动删除
        return ExchangeBuilder.topicExchange("topic").build();
    }

    //创建初始化RabbitAdmin对象
    @Bean
    public RabbitAdmin rabbitAdminTopic(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        rabbitAdmin.declareExchange(themeExchange());
        return rabbitAdmin;
    }
    // 生产者,routerKey,****.dev.***,*****.test.****,分别走消费者1,和消费者2
    // 通配符*,#不能和 / 一起
    @GetMapping("/topic")
    public R topic(@RequestParam String msg, @RequestParam String routerKey) {
        Map<String, Object> map = createMsg(msg);
        rabbitTemplate.convertAndSend("topic", routeKey, map);
        return R.ok();
    }
     /**
     * 主题方法1的消费者1
     *
     * @param message 消息属性
     * @param channel 通道
     * @param msg     消息内容
     */
    @RabbitListener(bindings = @QueueBinding(
            // declare = "false":生产者已定义交换机,此处不再声明交换机
            value = @Queue, exchange = @Exchange(name = "topic", type = ExchangeTypes.TOPIC),
            key = {"#.dev.*"}))
    public void topic1(Message message, Channel channel, JSONObject msg) {
        try {
            MessageProperties properties = message.getMessageProperties();
            String routingKey = properties.getReceivedRoutingKey();
            log.error("主题模式方法1的消费者1收到:{},路由键:{}", msg, routingKey);
            //手动回执,不批量签收,回执后才能处理下一批消息
            long tag = properties.getDeliveryTag();
            channel.basicAck(tag, false);
        } catch (Exception e) {
            log.error(this.getClass().getName());
        }
    }

    /**
     * 路由式方法1的消费者2
     * # 号匹配多个 .分隔
     * @param message 消息属性
     * @param channel 通道
     * @param msg     消息内容
     */
    @RabbitListener(bindings = @QueueBinding(
            // declare = "false":生产者已定义交换机,此处不再声明交换机
            value = @Queue, exchange = @Exchange(name = "topic", type = ExchangeTypes.TOPIC),
            key = {"#.molo.*"}))
    public void topic2(Message message, Channel channel, JSONObject msg) {
        try {
            MessageProperties properties = message.getMessageProperties();
            String routingKey = properties.getReceivedRoutingKey();
            log.error("主题模式方法1的消费者2收到:{},路由键:{}", msg, routingKey);
            //手动回执,不批量签收,回执后才能处理下一批消息
            long tag = properties.getDeliveryTag();
            channel.basicAck(tag, false);
        } catch (Exception e) {
            log.error(this.getClass().getName());
        }
    }
6.RPC (基于消息的远程过程调用)
  • RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
  • 客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
  • 服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。
  • 服务端将RPC方法 的结果发送到RPC响应队列。
  • 客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

延时队列、循环队列、兜底机制、定时任务

1.延时队列
使用TTL+死信队列组合实现延迟队列的效果。

​ TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这 条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用。TTL并不是延时发送的意思。

​ 死信队列(Dead Letter Queue)是 RabbitMQ 中的一种特殊队列,用于存储无法正常被消费的消息。当消息满足一定条件时,例如消息过期、被拒绝或达到最大重试次数等情况,会被发送到死信队列中,以便后续进行处理。

        // 消息设置TTL
        Map<String, Object> message = createMsg(msg);
        rabbitTemplate.convertAndSend("exchange","routingKey", message,i->{
            MessageProperties properties = i.getMessageProperties();
            properties.setExpiration("10000");
            return i;
        });
        // 队列设置TTL
        QueueBuilder.durable("delayedQueue").withArgument("x-message-ttl",10000).build();

​ 可以在队列指定TTL,但这样并不灵活,所以在生产者那指定TTL

    // 配置类
    public static final String YS_QUEUE ="ys_queue";
    public static final String YS_EXCHANGE ="ys_exchange";
    public static final String YS_ROUTING_KEY ="ys_routing_key";
    // 死信队列、交换机、路由KEY
    public static final String DLX_QUEUE="dlx_queue";
    public static final String DLX_EXCHANGE="dlx_exchange";
    public static final String DLX_ROUTING_KEY="dlx_routing_key";
 
    // 普通的交换机及队列
    @Bean
    public Queue normalQueue(){
        Map map = new HashMap();
        // message在该队列queue的存活时间最大为10秒
        //map.put("x-message-ttl", 10000);
        // x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        map.put("x-dead-letter-exchange", DLX_EXCHANGE);
        // x-dead-letter-routing-key参数是给这个DLX指定路由键
        map.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
        return new Queue(YS_QUEUE,true,false,false,map);
    }
    
    @Bean
    public DirectExchange normalDirectExchange(){
        return new DirectExchange(YS_EXCHANGE);
    }
 
    @Bean
    public Binding normalBinding(){
        return BindingBuilder.bind(normalQueue())
                .to(normalDirectExchange())
                .with(YS_ROUTING_KEY);
    }
 
    // 死信交换机及队列
    @Bean
    public Queue dlxQueue(){
        return QueueBuilder.durable(DLX_QUEUE).build();
    }
 
    @Bean
    public DirectExchange dlxDirectExchange(){
        return new DirectExchange(DLX_EXCHANGE);
    }
 
    @Bean
    public Binding dlxBinding(){
        return BindingBuilder.bind(dlxQueue())
                .to(dlxDirectExchange())
                .with(DLX_ROUTING_KEY);
    }
    // 生产者 设置setExpiration 消息存活时间,这样更灵活
    // 超过Time这个时间 就会走到死信队列里面,达到延时效果
    @GetMapping("/ysmsg")
    public R ysmsg(String time) {
        JSONObject msg = new JSONObject();
        msg.put("msg", "死信交换机 延时发送的消息");
        msg.put("time", System.currentTimeMillis());
        rabbitTemplate.convertAndSend(RabbitmqDLXConfig.YS_EXCHANGE,
                RabbitmqDLXConfig.YS_ROUTING_KEY,
                msg, i -> {
                    MessageProperties properties = i.getMessageProperties();
                    properties.setExpiration(time);
                    return i;
                });
        return R.ok();
    }
使用RabbitMQ官方延迟插件,实现延时队列效果。

​ docker部署的时候已经安装了插件,直接使用

​ 不需要死信交换机和死信队列,支持消息延迟投递,消息投递之后没有到达投递时间,是不会投递给队列

​ 而是存储在一个分布式表,当投递时间到达,才会投递到目标队列


    public static final String YS_QUEUE_NAME = "YS_queue";
    public static final String YS_EXCHANGE_NAME = "YS_exchange";
    public static final String YS_ROUTING_KEY = "YS_routingKey";

    @Bean
    public Queue delayedQueue(){
        return new Queue(YS_QUEUE_NAME);
    }

    /**
     * 自定义交换机 定义一个延迟交换机
     * @return
     */
    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> args = new HashMap<>(1);
        // 自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(YS_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingDelayedQueue(){
        return BindingBuilder.bind(delayedQueue())
                .to(delayedExchange()).with(YS_ROUTING_KEY)
                .noargs();
    }
    
    // 生产者  由setExpiration改为setDelay,延时,毫秒,消费者监听队列即可
    @GetMapping("/ysmsg")
    public R ysmsg(String time) {
        JSONObject msg = new JSONObject();
        msg.put("msg", "延时发送的消息");
        msg.put("time", System.currentTimeMillis());
        rabbitTemplate.convertAndSend(RabbitmqDLXConfig.YS_EXCHANGE_NAME, RabbitmqDLXConfig.YS_ROUTING_KEY, msg, i ->{
            i.getMessageProperties().setDelay(Integer.parseInt(time));
            return i;
        });
        return R.ok();
    }
2.循环队列

​ Rabbitmq里并没有循环队列的概念,多数都是通过消费者来判断是否重新入队或是转到其它队列

​ 也可以设置消息的重试次数。

​ 手动确认机制下,如果消费者一直不确认消息,RabbitMQ 将会将该消息重新投递给其他消费者或当前消费者。

3.兜底机制

消息重试: 将处理失败的消息重新投递给消费者或其他消费者进行重试。您可以使用 RabbitMQ 的重试机制(例如使用 channel.basicReject()或channel.basicNack())来将消息重新放回队列中,以供后续的处理尝试。

死信队列: 在消息处理失败时,将消息发送到一个专门的死信队列。死信队列是一个存储无法被消费者正常处理的消息的队列。您可以定义一个死信交换机和死信队列,并将处理失败的消息路由到该死信队列。然后,您可以根据需要对死信队列中的消息进行分析、转发或进一步处理。

4.定时任务

定时任务使用延时队列就可以办到

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/462963.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

安装snap再安装flutter再安装localsend@Ubuntu(FreeBSD下未成功)

Localsend介绍 localsend是一个跨平台的文件传送软件&#xff0c;可以在Windows、MacOS、Linux、Android和IOS下互相传送文件&#xff0c;只要在同一个局域网即可。 localsend官网&#xff1a;LocalSend 尝试安装localsend&#xff0c;发现需要使用flutter&#xff0c; 安装f…

旋转花键的制造工艺

旋转花键的制造工艺是一门精细的技术&#xff0c;涉及多个步骤和精细的操作&#xff0c;以确保最终产品的质量和性能&#xff0c;下面简单介绍下旋转花键的制造工艺。 1、原材料准备&#xff1a;制造旋转花键的核心是选择合适的材料&#xff0c;根据花键的规格和性能要求&#…

R语言实现中介分析(1)

中介分析&#xff0c;也称为介导分析&#xff0c;是统计学中的一种方法&#xff0c;它用于评估一个或多个中介变量&#xff08;也称为中间变量&#xff09;在自变量和因变量之间关系中所起的作用。换句话说&#xff0c;中介分析用于探索自变量如何通过中介变量影响因变量的机制…

相机拍照与摄影学基础

1.相机拍照 相机可能形状和大小不同&#xff0c;但基本功能相同&#xff0c;包括快门速度、光圈和感光度&#xff0c;这些是摄影的通用概念。即使是一次性相机也是基于这三个理念工作的。不同类型相机在这三个概念上的唯一区别是你可以控制这些功能的程度。这三个参数被称为相…

python入门(二)

python的安装很方便&#xff0c;我们这里就不再进行讲解&#xff0c;大家可以自己去搜索视频。下面分享一下Python的入门知识点。 执行命令的方式 在安装好python后&#xff0c;有两种方式可以执行命令&#xff1a; 命令行程序文件&#xff0c;后缀名为.py 对于命令行&…

设计模式在芯片验证中的应用——装饰器

一、装饰器模式 装饰器模式(Decorator)是一种结构化软件设计模式&#xff0c;它提供了一种通过向类对象添加行为来修改类对象的方法&#xff0c;而不会影响同一类的其它对象行为。该模式允许在不修改抽象类的情况下添加类功能。它从本质上允许基类代码对不可预见的修改具有前瞻…

大衍数列-蓝桥杯?-Lua 中文代码解题第2题

大衍数列-蓝桥杯&#xff1f;-Lua 中文代码解题第2题 中国古代文献中&#xff0c;曾记载过“大衍数列”, 主要用于解释中国传统文化中的太极衍生原理。 它的前几项是&#xff1a;0、2、4、8、12、18、24、32、40、50 … 其规律是&#xff1a;对偶数项&#xff0c;是序号平方再除…

如何重置iPhone的网络设置?这里提供详细步骤

前言 本文介绍如何重置iPhone上的网络设置。该信息适用于iPhone 12到iPhone 6以及iOS 14到iOS 8。 如何在iPhone上重置网络设置 采取以下步骤重置iPhone上的网络设置&#xff1a; 1、在iPhone上&#xff0c;打开设置应用程序。 2、单击通用。 3、滚动到屏幕底部&#xff…

SQLiteC/C++接口详细介绍之sqlite3类(十五)

返回目录&#xff1a;SQLite—免费开源数据库系列文章目录 上一篇&#xff1a;SQLiteC/C接口详细介绍之sqlite3类&#xff08;十四&#xff09; 下一篇&#xff1a;SQLiteC/C接口详细介绍之sqlite3类&#xff08;十六&#xff09; 47.sqlite3_set_authorizer 用法&#xff…

html5使用Websocket

html5使用Websocket 前言1、html5中的websocket2、创建一个 WebSocket 对象3、监听 WebSocket 连接事件4、监听 WebSocket 收到消息事件5、监听 WebSocket 关闭事件6、 监听 WebSocket 出错事件7、发送消息8、整体代码 前言 在即时通讯的交互方式中websocket是一个很使用的方式…

初出茅庐的小李博客之串口屏开发一个音乐控制器UI

串口屏介绍 串口屏通常指的是一种带有串口接口的显示屏&#xff0c;可以通过串口与其他设备进行通信和控制。这种屏幕通常具有独立的控制器和显示功能&#xff0c;可以直接接入主控系统&#xff0c;实现信息的显示和交互。 开发步骤 准备UI素材 准备了100张音量的图标&#x…

麒麟系统Redis7.2哨兵集群部署

redis哨兵集群部署 1、原理 Redis 哨兵模式是指在 Redis 集群中,有一组专门的进程(即哨兵进程)负责监控主节点和从节点的状态,并在发现故障时自动进行故障转移,以保证 Redis 集群的高可用性。 Redis 提供了哨兵的命令,哨兵命令是一个独立的进程,哨兵进程会周期性地向主…

数据结构与算法第八套试卷

1.建立一个长度为n的有序单链表的时间复杂度 0(n^2) 2.哈希算法 key%p&#xff1a;p最好为质数 如果两个关键字的值不等但哈希函数值相等&#xff0c;则称这两个关键字为同义词&#xff08;正确&#xff09;&#xff1b; 3.二分查找 注意&#xff1a; 二分查找是向下查询…

【蓝桥杯单片机】十四届省赛“重难点”解析(附源码)

【蓝桥杯单片机】十四届省赛“重难点”解析 一、题目难点解析二、易出错点提示三、完整代码链接 笔记包括&#xff1a;①题目难点解析、②易出错点提示、③完整代码链接 注&#xff1a;本文提供的所有代码都是使用第十四届竞赛包完成 ⭐----------系列文章链接----------⭐ 【蓝…

C# 当录入错误的时候,右下角弹窗提示错误信息

做一个textbox录入数字的判断&#xff0c;当录入不是数字的时候右下角弹窗提示 右下角弹窗提示 主要代码如下&#xff1a;判断是否为数字的代码&#xff1a; private void textBox1_KeyPress(object sender, KeyPressEventArgs e) { if(e.KeyChar13) …

计算机网络——物理层(编码与调制)

计算机网络——编码与调制 基带信号和宽带信号编码与调制数字数据编码为数字信号非归零编码归零编码反向不归零编码曼彻斯特编码差分曼彻斯特编码4B/5B编码 数字数据调制为模拟信号模拟数据编码为数字信号模拟数据调制为模拟信号 我们之前讲了物理层的一些基础知识和两个准则&a…

音频的录制及播放

在终端安装好pip install pyaudio&#xff0c;在pycharm中敲入录音的代码&#xff0c;然后点击运行可以在10s内进行录音&#xff0c;录音后的音频会保存在与录音代码同一路径项目中&#xff0c;然后再新建项目敲入播放的代码&#xff0c;点击运行&#xff0c;会把录入的录音进行…

关于UE的相机震动CameraShake

创建CameraShake资源 CameraShake配置是个蓝图类&#xff0c;我们选择创建BlueprintClass&#xff0c;父类选择CameraShakeBase即可。 参数调整 目前主要用到了 LocationAmplitudeMultiplier 1 LocationFrequencyMultiplier 10 RotationAmplitudeMultiplier 1 Rotation…

嵌入式系统和物联网常见的开发板介绍

嵌入式系统和物联网&#xff08;IoT&#xff09;领域&#xff0c;开发板是工程师和开发者进行原型设计和项目开发的重要工具。开发板通常集成了微控制器或处理器、内存、输入/输出接口和外设&#xff0c;以便于快速实现功能验证和产品原型。在本教程中&#xff0c;我们将讨论一…

Java设计模式 | 设计模式概述和分类

独孤求败五重境界 利剑&#xff08;“凌厉刚猛&#xff0c;无坚不摧&#xff0c;弱冠前以之与河朔群雄争锋。”&#xff09;软剑&#xff08;“紫薇软剑&#xff0c;三十岁前所用&#xff0c;误伤义士不祥&#xff0c;乃弃之深谷。”&#xff09;重剑&#xff08;“重剑无锋&a…