初探RocketMQ

初探RocketMQ

1、引言

  Message Queue(消息队列),从字面上理解:首先它是一个队列。FIFO先进先出的数据结构-队列。消息队列就是所谓的存放消息的队列。

  消息队列解决的不是消息的队列的目的,解决的是通信问题。

image-20240428142918760

  比如以电商订单系统为例,如果各服务之间使用同步通信,不仅耗时较久,且过程中受到网络波动的影响,不能保证高成功率。因此,使用异步的通信方式对架构进行改造。

image-20240428143611772

  使用异步的通信方式对模块间的调用进行解耦,可以快速的提升系统的吞吐量。上游执行完消息的发送业务后立即获得结果,下游多个服务订阅到消息后各自消费。通过消息队列,屏蔽底层的通信协议,使得解耦和并行消费得以实现。

2、RocketMQ介绍

  随着使⽤中队列和虚拟主题的增加,阿⾥巴巴团队使⽤的ActiveMQ IO 模块达到了瓶颈。为了尽⼒通过节流、断路器或降级来解决这个问题,但效果不佳。所以开始关注当时流⾏的消息传递解决⽅案Kafka。不幸的是, Kafk⽆法满⾜要求,尤其是在低延迟和⾼可靠性⽅⾯。在这种情况下,决定发明⼀种新的消息传递引擎来处理更⼴泛的⽤例,从传统的发布/订阅场景到⼤容量实时零丢失交易系统。⽬前RocketMQ已经开源给Apache基⾦会。如今,已有 100 多家公司在其业务中使⽤开源版本的 RocketMQ。

消息产品客户端SDK协议和规范订购 信息预定消息批量消息广播消息消息过滤器服务器触发的重新交付消息存储消息追溯消息优先级高可用性和故障转移消息跟踪配置管理和运
ActiveMQJava、.Net、C++等推送模型,支持OpenWire、STOMP、AMQP、MQTT、JMSExclusive Consumer或Exclusive Queues可以保证排序支持不支持支持支持不支持使用 JDBC可高性能日志支持非常快速的持久化,例如levelDB、kahaDB支持支持支持,取决于存储,如果使用levelDB则需要ZooKeeper服务器不支持默认配置为低级,用户需优化配置参数支持
kafkaJava、Scala等拉取模型,支持TCP确保分区内消息的排序不支持支持,带有异步生产者不支持支持,可以使用kafka Streams过滤消息不支持高性能文件存储支持,偏移量指示不支持支持,需要ZooKeeper服务器不支持kafka使用键值对格式进行配置。这些值可以从文件或以编程方式提供。支持,使用终端命令公开核心指标
RocketMQJava、C++、Go拉取模型,支持TCP、JMS、OpenMessaging确保消息的严格排序,并且可以优雅地横向扩展支持支持,具有同步模式以避免消息丢失支持支持基于SQL92的属性过滤器表达式支持高性能和低延迟的文件存储支持,时间戳和偏移量两种表示不支持支持的主从模型,无需其他套件支持开箱即用,用户只需注意一些配置支持,丰富的Web和终端命令以公开核心指标

3、RocketMQ的基本概念

3.1 技术架构

image-20240428145803835

  RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的ZooKeeper,支持Broker的动态注册于发现。主要包括两个功能:Broker管理,NameServer接收Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息。Producer,Consumer仍然可以动态感知Broker的路由信息。

  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块:

    • Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
    • Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
    • Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    • HA Service:高可用服务,提供Master Broker和Slave Broker之间的数据同步功能。
    • Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

    image-20240428152009844

3.2 部署架构

image-20240428152059537

RocketMQ网络部署特点:

  • NameServer是⼀个⼏乎⽆状态节点,可集群部署,节点之间⽆任何信息同步。
  • Broker部署相对复杂, Broker分为Master与Slave,⼀个Master可以对应多个Slave,但是⼀个Slave只能对应⼀个Master, Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义, BrokerId为0表示Master,⾮0表示Slave。 Master也可以部署多个。每个Broker与NameServer集群中的所有节点建⽴⻓连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上⽀持⼀Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
  • Producer与NameServer集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建⽴⻓连接,且定时向Master发送⼼跳。 Producer完全⽆状态,可集群部署。
  • Consumer与NameServer集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、 Slave建⽴⻓连接,且定时向Master、 Slave发送⼼跳。 Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时, Master服务器会根据拉取偏移量与最⼤偏移量的距离(判断是否读⽼消息,产⽣读I/O),以及从服务器是否可读等因素建议下⼀次是从Master还是Slave拉取。

结合部署架构图,描述集群工作流程:

  • 启动NameServer, NameServer起来后监听端⼝,等待Broker、 Producer、 Consumer连上来,相当于⼀个路由控制中⼼。
  • Broker启动,跟所有的NameServer保持⻓连接,定时发送⼼跳包。⼼跳包中包含当前Broker信息(IP+端⼝等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时⾃动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中⼀台建⽴⻓连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择⼀个队列,然后与队列所在的Broker建⽴⻓连接从⽽向Broker发消息。
  • Consumer跟Producer类似,跟其中⼀台NameServer建⽴⻓连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建⽴连接通道,开始消费消息。

4、快速开始

4.1 下载RocketMQ

这里使用的是RocketMQ4.7.1版本,去官方下载即可:

image-20240428152515701

4.2 安装RocketMQ

  • 准备一台装有Linux系统的虚拟机,这里我用的是Centos7版本
  • 安装JDK,并配置环境变量
  • 安装RocketMQ,上传RocketMQ安装包并使用unzip命令解压缩再/usr/local/rocketmq目录下。
  • 配置JDK和RocketMQ的环境变量。

image-20240428152745250

  注意,RocketMQ的环境变量用来加载ROCKETMQ_HOME/conf下的配置文件,如果不配置则无法启动NameServer和Broker。

  完成后执行命令,让环境变量生效:

source /etc/profile

  修改bin/runserver.sh文件,由于RocketMQ默认设置的JVM内存为4G,但虚拟机一般没有这么大内存,因此调整为512MB。

vim runserver.sh

image-20240428153047902

4.3 启动NameServer

  启动RocketMQ服务需要先启动NameServer。

  在bin目录内使用静默方式启动:

nohup ./mqnamesrv -n 192.168.159.33:9876 &

image-20240428153218510

4.4 启动Broker

  • 修改broker的JVM参数配置,将默认8G内存修改为512MB。
vim runbroker.sh

image-20240428153414208

  • conf/broker.conf文件中加入如下配置,开启自动创建Topic功能。
autoCreateTopicEnable=true

image-20240428153534551

  • 以静默方法启动Broker

    nohup ./mqbroker -n 192.168.159.33:9876 &
    

image-20240428153607010

4.5 使用发送和接收消息验证MQ

  • 配置NameServer的环境变量

  在发送/接收消息之前,需要告诉客户端NameServer的位置。配置环境变量NAMESRV_ADDR

export NAMESRV_ADDR=192.168.159.33:9876
  • 使用bin/tools.sh工具验证消息的发送,默认会发1000条消息
 ./tools.sh org.apache.rocketmq.example.quickstart.Producer

  执行上述命令后,发送消息的日志如下:

image-20240428153928637

  • 使用bin/tools.sh工具验证消息的接收
./tools.sh org.apache.rocketmq.example.quickstart.Consumer

  执行上述命令后,可以看到接收的消息

image-20240428154025738

4.6 关闭服务器

  • 关闭Broker:

    ./mqshutdown broker
    

    image-20240428154129263

  • 关闭NameServer:

    ./mqshutdown namesrv
    

    image-20240428154208460

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/698153.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于51单片机的MQ-2烟雾报警设计

随着现代家庭用火、用电量的增加,家庭烟雾发生的频率越来越高。烟雾报警器也随之被广泛应用于各种场合。本课题所研究的无线多功能烟雾报警器采用STC89C51为核心控制器,利用气体传感器MQ-2、ADC0832模数转换器、DS18B20温度传感器等实现基本功能。通过这些传感器和芯片,当环…

代码随想录——把二叉搜索树转换为累加树(Leetcode538)

题目链接 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode() {}* TreeNode(int val) { this.val val; }* TreeNode(int val, TreeNode left, TreeNode right) {* …

mac怎么录制屏幕?这2个方法你值得拥有

在数字化时代,屏幕录制已经成为一种常见且重要的工具,无论是教学演示、游戏直播还是会议记录,屏幕录制都发挥着不可或缺的作用。对于Mac用户而言,如何高效、便捷地进行屏幕录制,是一个值得探讨的话题,可是很…

halcon算子register_object_model_3d_pair详解

搜索两个3D对象模型的之间的最优变换 搜索两个3D对象模型之间的转换。 register_object_model_3d_pair搜索具有最佳对齐的两个3D对象模型之间的转换。这个过程称为注册。Pose中返回的转换可用于将ObjectModel3D1转换为第二个对象ObjectModel3D2的参考框架。Score返回两个3D对象…

解密Prompt系列31. LLM Agent之从经验中不断学习的智能体

前言 Agent智能体的工作流可以简单分成两种:一种是固定的静态工作流,一种是智能体自主决策的动态工作流。 静态流程的Agent举几个例子,例如新闻热点追踪推送Agent,每日新论文摘要总结Agent,它们的优点是可控&#xf…

【智算101】为什么用好大模型,离不开“向量数据库“呢

关注【云原生AI百宝箱】公众号,获取更多云原生AI消息 大模型离不开向量数据库回答这个问题之前,我们先来理解一下什么是向量。 这是一个苹果,但在发明苹果这个词之前,人们怎么描述它呢? 颜色、大小、形状、纹理&#x…

海水那么咸,海鲜那么甜,我们那么馋

点击文末“阅读原文”即可参与节目互动 剪辑、音频 / 卷圈 运营 / SandLiu 卷圈 监制 / 姝琦 封面 / 姝琦Midjourney 产品统筹 / bobo 场地支持 / (新)声湃轩北京录音间 海鲜是大海给予人类的宝贵礼物,腥气又甜美,细腻又霸气…

认识和使用 Vite 环境变量配置,优化定制化开发体验

Vite 官方中文文档:https://cn.vitejs.dev/ 环境变量 Vite 内置的环境变量如下: {"MODE": "development", // 应用的运行环境"BASE_URL": "/", // 部署应用时使用的 URL 前缀"PROD": false, //应用…

APP需要做等保吗?

在数字化时代,APP已成为我们生活中不可或缺的一部分,它们如同无形的桥梁,连接着现实世界与虚拟世界,为我们提供了前所未有的便利。然而,随着APP的普及,其背后潜藏的安全风险也日益凸显。近年来,…

stm32MP135裸机编程:启动流程分析

0 参考资料 轻松使用STM32MP13x - 如MCU般在cortex A核上裸跑应用程序.pdf STM32MP135AD数据手册.pdf1 stm32MP135裸机启动流程分析 1.1 启动方式 stm32MP135支持8种启动方式: 注: UART和USB启动并不是指通过UART/USB加载程序,而是通过UA…

前端已学习内容

一、HTMLCSS 1、黑马B站视频-27小时 地址:基础班导学-精讲与实战_哔哩哔哩_bilibili 说明:讲义已下载。两个小项目还没学没练。 2、菜鸟教程 地址:HTML 简介 | 菜鸟教程 二、JavaScript 1、菜鸟教程 网址:JavaScript 教程 …

状态管理Vuex

官网:Vuex 是什么? | Vuex (vuejs.org)https://v3.vuex.vuejs.org/zh/ 创建一个vue2的新项目名为vuex-demo,安装命令 npm install vuex3 新建index.js import Vue from vue import Vuex from vuexVue.use(Vuex)const store new Vuex.Store(…

PYQT + flask httpserver 服务器提供简单的MES服务

main.py import sys # 导入创建的文件模块 import test import dcservice from PyQt5.QtWidgets import QApplication, QMainWindowif __name__ __main__:app QApplication(sys.argv)MainWindow QMainWindow()ui test.Ui_MainWindow()ui.setupUi(MainWindow)MainWindow.sho…

vue中插槽的本质

定义slotCompoent.vue 组件 <template><slot></slot><slot nameslot1></slot><slot name"slot2" msg"hello"></slot> </template>使用组件&#xff1a; <slotComponent><p>默认的</p>…

用Python代码锁定Excel单元格以及行和列

Excel能够帮助用户高效地组织数据&#xff0c;还支持复杂的公式计算和数据分析。而随着团队协作的日益频繁&#xff0c;保护数据的准确性和完整性变得尤为重要。在Excel表格中&#xff0c;我们可以通过锁定特定的单元格或区域&#xff0c;防止对单元格内容进行随意修改&#xf…

Golang:使用时会遇到的错误及解决方法详解

Go语言使用时常常会遇到的一些错误及解决方法&#xff0c;文中的示例代码讲解详细&#xff0c;感兴趣的小伙伴可以了解一下 1、go: go.mod file not found in current directory or any parent directory go mod init name 2、Failed to build the application: main.go:4:2:…

Java网络通信实现

UDP UDPServer import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket;public class UDPServer {public static void main(String[] args) throws IOException {System.out.println("UdpServer启动");// 创建upd套接字Data…

Java进阶_接口

接口的概念 在JAVA编程语言中是一个抽象类型&#xff0c;是抽象方法的集合&#xff0c;接口通常以interface来声明。一个类通过继承接口的方式&#xff0c;从而来继承接口的抽象方法。 接口并不是类&#xff0c;编写接口的方式和类很相似&#xff0c;但是它们属于不同的概念。类…

Bankless:为什么 AI 需要 Crypto 的技术?

原文标题&#xff1a;《Why AI Needs Crypto’s Values》 撰文&#xff1a;Arjun Chand&#xff0c;Bankless 编译&#xff1a;Chris&#xff0c;Techub News 原文来自香港Web3媒体&#xff1a;Techub News 人工智能革命的梦想一直是一把双刃剑。 释放人工智能的潜力可以解…

每日算法——归并排序

什么是归并排序 归并排序是一种分治算法。它将数组不断地分成两半&#xff0c;对每一半进行排序&#xff0c;然后再将排序好的两半合并起来。通过不断重复这个过程&#xff0c;最终得到完全排序的数组。 归并排序的注意点&#xff1a; 空间复杂度&#xff1a;归并排序需要额…