使用RabbitMQ实现微服务间的异步消息传递

使用RabbitMQ实现微服务间的异步消息传递

      • RabbitMQ简介
      • 安装RabbitMQ
        • 在Ubuntu上安装RabbitMQ
        • 在CentOS上安装RabbitMQ
      • 配置RabbitMQ
      • 创建微服务
        • 生产者服务
          • 安装依赖
          • 生产者代码
        • 消费者服务
          • 消费者代码
      • 运行微服务
      • 消息模式
        • 直接模式
          • 生产者代码
          • 消费者代码
        • 扇出模式
          • 生产者代码
          • 消费者代码
        • 主题模式
          • 生产者代码
          • 消费者代码
      • 高级特性
        • 持久化
          • 生产者代码
          • 消费者代码
        • 确认机制
          • 消费者代码
      • 监控和日志
        • 监控
        • 日志
      • 故障排除
      • 总结

在现代分布式系统中,微服务架构越来越受到欢迎。微服务之间需要进行高效、可靠的消息传递。RabbitMQ作为一个成熟的开源消息中间件,能够很好地满足这一需求。本文将详细介绍如何使用RabbitMQ实现微服务间的异步消息传递。

RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP(Advanced Message Queuing Protocol)协议。它支持多种消息模式,如发布/订阅、路由、主题等。

安装RabbitMQ

RabbitMQ可以在多种操作系统上安装,包括Linux、macOS和Windows。
在Ubuntu上安装RabbitMQ
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
在CentOS上安装RabbitMQ
sudo yum install epel-release
sudo yum install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

配置RabbitMQ

安装完成后,可以使用以下命令进行基本配置。
sudo rabbitmq-plugins enable rabbitmq_management
sudo systemctl restart rabbitmq-server
访问RabbitMQ管理界面:`http://localhost:15672`,默认用户名和密码都是`guest`。

创建微服务

我们将创建两个简单的微服务:生产者服务和消费者服务。
生产者服务
生产者服务负责发送消息到RabbitMQ。
安装依赖
pip install pika
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

message = 'Hello World!'
channel.basic_publish(exchange='', routing_key='hello', body=message)
print(f'Sent: {message}')
connection.close()
消费者服务
消费者服务负责从RabbitMQ接收消息。
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()

运行微服务

先启动消费者服务,然后启动生产者服务。
# 启动消费者服务
python consumer.py

# 启动生产者服务
python producer.py

消息模式

RabbitMQ支持多种消息模式,包括直接模式、扇出模式、主题模式和头部模式。
直接模式
直接模式是最简单的模式,消息会被发送到指定的队列。
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='direct_queue')

message = 'Direct message'
channel.basic_publish(exchange='', routing_key='direct_queue', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='direct_queue')

channel.basic_consume(queue='direct_queue', auto_ack=True, on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()
扇出模式
扇出模式将消息广播到所有绑定的队列。
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')

message = 'Fanout message'
channel.basic_publish(exchange='fanout_exchange', routing_key='', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='fanout_exchange', queue=queue_name)

channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()
主题模式
主题模式允许更复杂的路由规则。
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')

routing_key = 'kern.critical'
message = 'Critical kernel message'
channel.basic_publish(exchange='topic_exchange', routing_key=routing_key, body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

binding_keys = ['*.critical', 'kern.*']
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_exchange', queue=queue_name, routing_key=binding_key)

channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()

高级特性

RabbitMQ还支持许多高级特性,如持久化、确认机制、死信队列等。
持久化
可以配置消息和队列的持久化,以确保消息不会因为RabbitMQ服务器重启而丢失。
生产者代码
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='durable_queue', durable=True)

message = 'Persistent message'
channel.basic_publish(exchange='', routing_key='durable_queue', body=message, properties=pika.BasicProperties(delivery_mode=2))
print(f'Sent: {message}')
connection.close()
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')
    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='durable_queue', durable=True)

channel.basic_consume(queue='durable_queue', on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()
确认机制
可以配置消费者在处理完消息后发送确认,以确保消息不会被重复处理。
消费者代码
import pika

def on_message_received(ch, method, properties, body):
    print(f'Received: {body.decode()}')
    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='ack_queue')

channel.basic_consume(queue='ack_queue', on_message_callback=on_message_received)

print('Waiting for messages...')
channel.start_consuming()

监控和日志

RabbitMQ提供了丰富的监控和日志功能,可以用于监控和调试。

监控
可以通过RabbitMQ管理界面监控队列、交换机和连接等。

日志
可以通过配置文件调整日志级别和输出方式。

故障排除

如果RabbitMQ配置出现问题,可以使用以下命令进行故障排除。

sudo rabbitmqctl status
sudo journalctl -u rabbitmq-server

总结

通过本文,你已经学习了如何使用RabbitMQ实现微服务间的异步消息传递。我们介绍了RabbitMQ的基本概念、安装方法、配置RabbitMQ、创建微服务、消息模式(直接模式、扇出模式、主题模式)、高级特性(持久化、确认机制)、监控和日志、故障排除等内容。掌握了这些知识,将有助于你在实际工作中更好地利用RabbitMQ来构建高效、可靠的微服务架构。
RabbitMQ管理界面示例
RabbitMQ消息传递模式示例

使用RabbitMQ可以显著提高微服务间消息传递的可靠性和效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/905473.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

「实战应用」如何在 DHTMLX Scheduler 中实现动态主题切换?

创建响应式、直观的 UI 需要适应用户对应用程序各个方面的偏好。其中一项可显著提升用户体验的热门功能是能够在明暗主题之间切换。它在日程安排日历等综合组件中尤其有用。 本文将指导您在 DHTMLX Scheduler 中实现动态主题切换,使其适应用户设置的首选系统主题。…

Marin说PCB之电源的Surface Current Density知多少?

小编我是一位资深的国漫迷,像什么仙逆,斗破,斗罗,完美世界,遮天,凡人修仙传,少年歌行等,为了可以看这些视频小编我不惜花费了攒了很多年的私房钱去开了这个三个平台的会员啊&#xf…

【YApi】接口管理平台

一、简介 YApi 是一个用于前后端开发团队协作的 API 管理平台,帮助团队更加高效地进行 API 接口的设计、测试、文档管理和版本控制等工作。 YApi 主要功能: API 设计和管理:提供 API 设计和文档生成工具,使开发者能够轻松创建、…

【C/C++】字符/字符串函数(1)——由string.h提供

零.导言 什么是字符/字符串函数呢? 其实就是一类用于处理字符和字符串的函数。 而其中一部分函数包含在头文件 string.h 中,有 strlen strcpy strcat strcmp strncpy strncat strncmp strstr strtok strerror 等等 接下来我将逐个讲解这些函数。 一.str…

简单的kafkaredis学习之redis

简单的kafka&redis学习之redis 2. Redis 2.1 什么是Redis Redis是一种面向 “Key-Value” 数据类型的内存数据库,可以满足我们对海量数据的快速读写需求,Redis是一个 NoSQL 数据库,NoSQL的全称是not only sql,不仅仅是SQL&…

在 Visual Studio 中使用 Eigen 库

在 Visual Studio 中使用 Eigen 库 参考教程: 在 Visual Studio 中配置 Eigen库_vs调用eigen-CSDN博客 Eigen 是一个开源的 C 库,主要用来支持线性代数,矩阵和矢量运算,数值分析及其相关的算法。Eigen 除了需要 C 标准库以外&am…

认证鉴权框架之—sa-token

一、概述 Satoken 是一个 Java 实现的权限认证框架,它主要用于 Web 应用程序的权限控制。Satoken 提供了丰富的功能来简化权限管理的过程,使得开发者可以更加专注于业务逻辑的开发。 二、逻辑流程 1、登录认证 (1)、创建token …

MES(Manufacturing Execution System)制造执行系统解决方案 :高效协同, 实现数字化智能工厂

文章目录 引言I 常用功能模块车间实时数据设备维修证书管理II UI设计III 术语5M1Esee also引言 MES软件即制造企业生产过程执行管理软件,是一套面向制造企业车间执行层的生产信息化管理系统。 MES 可以为企业提供包括制造数据管理、计划排程管理、生产调度管理、库存管理、质…

Qt 实战(10)模型视图 | 10.5、代理

文章目录 一、代理1、简介2、自定义代理 前言: 在Qt的模型/视图(Model/View)框架中,代理(Delegate)是一个非常重要的概念。它充当了模型和视图之间的桥梁,负责数据的显示和编辑。代理可以自定义…

“北斗+实景三维”,助力全域社会治理

在国家治理体系和治理能力现代化的大背景下,全域社会治理成为提升国家治理效能的关键。“北斗实景三维”技术组合,为全域社会治理提供了新的技术支撑和解决方案。本文将探讨这一技术如何助力全域社会治理,以及其在实际应用中的潜力和挑战。 …

mysql8.0.32升级到8.0.40

上篇8.0.32库的准备:mysql: error while loading shared libraries: libncurses.so.5: cannot open shared object file: No suc-CSDN博客 此篇测试升级到8.0.40 MySQL :: Download MySQL Community Server rootjyc:~# mysql -u root -pabcd1234 mysql: [Warning]…

高阶数据结构--图(graph)

图(graph) 1.并查集1. 并查集原理2. 并查集实现3. 并查集应用 2.图的基本概念3. 图的存储结构3.1 邻接矩阵3.2 邻接矩阵的代码实现3.3 邻接表3.4 邻接表的代码实现 4. 图的遍历4.1 图的广度优先遍历4.2 广度优先遍历的代码 1.并查集 1. 并查集原理 在一…

go 聊天系统项目-1

1、登录界面 说明:这一节的内容采用 go mod 管理【GO111MODULE‘’】的模块,从第二节开始使用【GO111MODULE‘off’】GOPATH 管理模块。具体参见 go 包相关知识 1.1登录界面代码目录结构 代码所在目录/Users/zld/Go-project/day8/chatroom/ 1.2登录…

支持向量机背后的数学奥秘

一、基本概念与原理 1.1 支持向量机的定义 支持向量机是一种二分类模型,其核心思想是在样本空间中寻找一个超平面,将不同类别的样本分开。这个超平面被称为决策边界或分隔超平面。支持向量是距离决策边界最近的点,这些点决定了决策边界的位…

C语言指针和数组相关习题

目录 sizeof和一维int数组sizeof和一维char数组strlen()和一维char数组sizeof和字符串strlen()和字符串指针变量指向字符串字面常量易错点sizeof(a):sizeof是操作符 当心整型提升sizeof和二维数组复习一下相关知识点练习题 一个离谱的错误指针1指针2指针3指针4指针5指针6指针7指…

Centos安装ZooKeeper教程(单机版)

本章教程介绍,如何在Centos7中,安装ZooKeeper 3.9.3版本。 一、什么是ZooKeeper ? Apache ZooKeeper 是一个分布式协调服务,用于大型分布式系统中的管理和协调。它为分布式应用提供了一个高性能的通信框架,简化了开发人员在构建复杂分布式系统的任务。ZooKeeper 能够解决一…

出国工作——常用英语——网站注册

Please set your password for your new Qt Account. Password must be at least 8 characters in length. 请为您的新 Qt 账户设置密码。密码长度必须至少为 8 个字符。 Password Password strength: BadThis is similar to a commonly used password. TIP: Add another wor…

江协科技STM32学习- P25 UART串口协议

🚀write in front🚀 🔎大家好,我是黄桃罐头,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流 🎁欢迎各位→点赞👍 收藏⭐️ 留言📝​…

Servlet 3.0 新特性全解

文章目录 Servlet3.0新特性全解Servlet 3.0 新增特性Servlet3.0的注解Servlet3.0的Web模块支持servlet3.0提供的异步处理提供异步原因实现异步原理配置servlet类成为异步的servlet类具体实现异步监听器改进的ServletAPI(上传文件) Servlet3.0新特性全解 tomcat 7以上的版本都支…

mysql 通过GROUP BY 聚合并且拼接去重另个字段

我的需求: 我想知道同一个手机号出现几次,并且手机号出现在哪些地方。下面是要的效果。 源数据: CREATE TABLE bank (id bigint(20) unsigned NOT NULL AUTO_INCREMENT,user_id int(11) NOT NULL DEFAULT 0,tel varchar(255) COLLATE utf8mb4_unicode_…