RocketMQ基本概念与入门

文章目录

  • MQ基本结构
  • 依赖
    • 案例:
      • product
      • Consumer
    • 核心概念
      • 1.nameserver
      • 2.broker
      • 3.主题队列
      • 4.queue队列
      • 5. 生产者
      • 6.消费者分组和生产者分组
      • 7.消费点位

MQ基本结构

在这里插入图片描述

  • message: 消息数据对象
  • product: 程序代码,生成消息,发送消息到队列
  • consumer: 程序代码,监听(绑定)队列,获取消息,执行消费代码
  • queue: Rocketmq rabbitmq kafka这些消息队列中间件软件.

依赖

<dependency>
    <!--2.2.2底层rocketmq客户端4.9.1-->
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

案例:

product

public class MyProducer {
    /**
     * 向rocketmq发送第一条消息
     */
    @Test
    public void sendTest01() throws Exception {
    //1.准备一个生产者对象,开启长链接
        DefaultMQProducer producer=new DefaultMQProducer();
        //对当前producer设置分组
        producer.setProducerGroup("first-producer-group");
        //连接nameserver localhost:9876
        producer.setNamesrvAddr("localhost:9876");
        //开启长链接
        producer.start();
    //2.封装一个消息对象,我们想要发送的内容,只是消息的一部分
        //创建一个消息对象
        Message message=new Message();
        //消息携带的内容 body
        String msg="当前发送的第一条消息";
        message.setBody(msg.getBytes(StandardCharsets.UTF_8));
        //设置消息主题,分类,按业务分类
        message.setTopic("first-topic-a");
        //主题标签 和key标识 
    //3.调用api方法将消息发送,接收返回结果,查看发送的信息比如状态
        //分为异步发送,同步发送,异步发送性能速度更高,但是无法保证成功.
        //同步发送,性能速度没有异步快,但是可以接收反馈结果
        SendResult send = producer.send(message);
        //result解析获取发送相关的信息
        System.out.println("发送状态:"+send.getSendStatus());
        System.out.println("消息到达主题,队列,broker信息:"+send.getMessageQueue());
    }
}

Consumer

public class MyConsumer1 {
    @Test
    public void consumerTest01() throws Exception {
    //1.构建一个消费者对象,连接nameserver创建长链接
        // push pull的区别 push消费端,消费的消息是队列推送给他的
        // pull 消费端代码执行一次pull 拉取过来一条消息
        // 收邮件 推的, 抢红包 拉取的
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer();
        //设置nameserver地址
        consumer.setNamesrvAddr("localhost:9876");
        //消费者分组
        consumer.setConsumerGroup("first-consumer-group-a");
        //定义监听的主题,消费端代码会根据定义的主题寻找nameserver路由信息,找到主题的队列进行绑定
        //topic 主题名称,subExpression 定义过滤逻辑 *表示匹配所有
        consumer.subscribe("first-topic-a","*");
    //2.执行api开始监听主题,实现队列的消费
        //提供给consumer一个监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            /**
             * 推送过来的消息,都会调用consumerMessage执行消费逻辑
             * @param list 消息数据 list表示可以批量处理的消息,不是批量消息,list元素只有1个
             * @param consumeConcurrentlyContext
             * @return
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> list,
                    ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //获取消息 由于不是批量发送只有list一个元素
                MessageExt messageExt = list.get(0);
                messageExt.getMsgId();//唯一的一个标识,每次消息组装的对象都会在发送时,生成一个msgId
                byte[] body = messageExt.getBody();
                //将消息转化
                String message=new String(body, StandardCharsets.UTF_8);
                System.out.println("消费端获取到消息:"+message);
                //context 控制返回确认信息 ackIndex顺序
                //返回消费状态 success 队列会将消息对应当前消费组,移动偏移量,记录消费完成
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //开启长连接
        consumer.start();
        while (true);
    }
}

核心概念

1.nameserver

NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
主要包括两个功能:

  • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
  • 路由信息管理,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

NameServer通常会有多个实例部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,客户端仍然可以向其它NameServer获取路由信息。

  • 总之: nameserver作为协调器, 谁的信息被用到,就要到nameserver注册,谁要用注册信息,就要到nameserver同步抓取.
    broker要作为rocketmq容器被生产者和消费者代码使用.

2.broker

Broker主要负责消息的存储、投递和查询以及服务高可用保证。

  • NameServer几乎无状态节点,因此可集群部署,节点之间无任何信息同步。Broker部署相对复杂。
  • 在 Master-Slave 架构中,Broker 分为 Master 与 Slave。一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master 与 Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

3.主题队列

  • 简单理解,主题是一类消息的集合,每次我们发送消息必须指定消息绑定某一个主题.

  • 生产者发送的某一条消息,只能指向一个主题,多条消息可以指向同一个主题,同一个主题中有多个消息队列保存消息,消费端可以根据订阅的主题消费不同主题的消息.这样可以实现业务隔离.

  • 比如电商主题可以是order,主题也可以是cart,还可以是product相关的…

  • 一类消息,从数据的格式,携带body格式,都是完全一致的. 不会出现"第一条消息的"body是普通字符串,第二条消息是个对象Json,不可能第一条消息延迟消息(支付订单倒计时取消),第二条消息普通同步消息.
    在这里插入图片描述

4.queue队列

存储消息的物理实体(最小单位)。一个Topic中可以包含多个Queue(分布式体现的关键),每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区**(Partition**)。

注意:一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费(消费点位逻辑)。一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。
在这里插入图片描述

5. 生产者

问题:通过上述概念的了解,生产者,nameserver,broker之间是如何交互的.

  • 启动 nameserver 保存broker路由信息
  • 主题一旦创建,保存broker里,同时生成队列,这些数据,作为路由信息保存nameserver
  • 生产者从nameserver拿到当前集群所注册信息(路由)
  • 发送消息的时候,连接具体的那个broker找具体的topic的具体queue实现消息发送,使用的具体信息,在返回的SendResult中体现

6.消费者分组和生产者分组

消息生产者,负责生产消息。本质上是程序中的一段代码.Producer投递消息到broker代理中.找到主题,负载均衡存放到队列中.
RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。生产者组是同一类生产者的集合,产生同一类型的消息,这类Producer发送相同Topic类型的消息。一个生产者组可以同时向多个主题发送消息。
在这里插入图片描述
RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息,对应同一类消息数据。消费者组使得在消息消费方面,实现负载均衡(将一个Topic中的不同的Queue平均分配给同一个Consumer Group的不同的Consumer,注意,并不是将消息负载均衡)和容错(一个Consmer挂了,该Consumer Group中的其它Consumer可以接着执行消费逻辑.

由于主题中有多个队列,一组消费者,最多有和队列一样数量的消费成员,再多,无法绑定队列消费消息了.

7.消费点位

在队列中记录了所有和偏移量有关的数据比如:

  • 最小偏移量:都是0
  • 最大偏移量:当前消息的个数

在消费者中也在记录偏移量

  • 当前组对应主题队列的消费最小偏移量,和队列的最大偏移量(通过这两个值,能够知道当前消费者消费到哪个消息,还有多少没消费)
  • 在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/47396.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

分布式锁:Redis、Zookeeper

1.基于Redis实现分布式锁&#xfeff; Redis分布式锁原理如上图所示&#xff0c;当有多个Set命令发送到Redis时&#xff0c;Redis会串行处理&#xff0c;最终只有一个Set命令执行成功&#xff0c;从而只有一个线程加锁成功 2.SetNx命令加锁 利用Redis的setNx命令在Redis数据库…

数据结构【绪论】

数据结构入门级 第一章绪论 什么是数据结构&#xff1f;什么是数据类型&#xff1f; 程序数据结构算法 一、基本概念&#xff1a; 数据&#xff1a;指所有能被计算机处理的&#xff0c;无论图、文字、符号等。数据元素&#xff1a;数据的基本单位&#xff0c;通常作为整体考…

Unity TMP (TextMeshPro) 创建字体材质

1 TMP 简介 完整名称&#xff1a;Text Mesh Pro &#xff0c;unity新一代主流字体插件 1.1 组件变化 内置的Text组件以及与内置Text组件绑定的Button、DropDown、InputField均被替换为使用TextMeshPro的版本 内置的Text组件以及与内置Text组件绑定的Button、DropDown、Input…

tinymce插件tinymce-powerpaste-plugin——将word中内容(文字图片等)直接粘贴至tinymce编辑器中

TinyMCE是一款易用、且功能强大的所见即所得的富文本编辑器。同类程序有&#xff1a;UEditor、Kindeditor、Simditor、CKEditor、wangEditor、Suneditor、froala等等。 TinyMCE的优势&#xff1a; 开源可商用&#xff0c;基于LGPL2.1 插件丰富&#xff0c;自带插件基本涵盖日常…

【项目设计】基于负载均衡的在线oj平台

目录 一、项目介绍 二、开发环境以及技术 三、概要设计 四、关键算法 五、项目演示 六、代码实现 一、项目介绍 该项目是基于负载均衡的在线oj&#xff0c;模拟平时刷题网站&#xff08;leetcode和牛客&#xff09;写的一个在线判题系统 项目主要分为五个模块&#xff…

OpenAI重磅官宣ChatGPT安卓版本周发布,现已开启下载预约,附详细预约教程

7月22号&#xff0c;OpenAI 突然宣布&#xff0c;安卓版 ChatGPT 将在下周发布&#xff01;换句话说&#xff0c;本周安卓版 ChatGPT正式上线&#xff01; 最早&#xff0c;ChatGPT仅有网页版。 今年5月&#xff0c;iOS版ChatGPT正式发布&#xff0c;当时OpenAI表示Android版将…

Docker—— consul的容器服务更新与发现

Docker—— consul的容器服务更新与发现 一、Consul概述1.什么是服务注册与发现2.什么是consul 二、consul 部署1.consul服务器①. 建立 Consul 服务②. 查看集群信息③. 通过 http api 获取集群信息 2.registrator服务器①. 安装 Gliderlabs/Registrator②. 测试服务发现功能是…

智能小说文本字幕生成器

分享一个免费的&#xff0c;智能小说文本字幕生成器 智能分句。短词。 链接&#xff1a;https://pan.baidu.com/s/15xGlQg01LmbHHuGFZbgaiw?pwd0gjv 提取码&#xff1a;0gjv

分类评估指标

文章目录 1. 混淆矩阵2. Precision(精准率)3. Recall(召回率)4. F1-score5. ROC曲线和AUC指标5.1 ROC 曲线5.2 绘制 ROC 曲线5.3 AUC 值6. API介绍6.1 **分类评估报告api**6.2 **AUC计算API**练习-电信客户流失预测1. 数据集介绍2. 处理流程3. 案例实现4. 小结1. 混淆矩阵 …

Windows上安装Docker Desktop

运行环境 Windows 10Docker Desktop 4.21.1 安装步骤 步骤1&#xff1a; 勾掉"Use WSL 2 instead of Hyper-V(recommended)"&#xff08;原因见小插曲2章节&#xff09; 步骤2&#xff1a; 安装完成 步骤3&#xff1a; 运行Docker Desktop 步骤4&#xff1a; …

【MATLAB】ILOSpsi制导率的代码解析

ILOSpsi制导率的代码解析 这里记录一下关于fossen的MMS工具箱中&#xff0c;关于ILOSpsi制导率的代码解析内容&#xff0c;结合fossen的marine carft hydrodynamics and motion control这本书来参考看 文章目录 ILOSpsi制导率的代码解析前言一、代码全文二、内容解析1.persist…

opencv-27 阈值处理 cv2.threshold()

怎么理解阈值处理? 阈值处理&#xff08;Thresholding&#xff09;是一种常用的图像处理技术&#xff0c;在机器学习和计算机视觉中经常被用于二值化图像或二分类任务。它基于设定一个阈值来将像素值进行分类&#xff0c;将像素值大于或小于阈值的部分分为两个不同的类别&…

Redis持久化机制 RDB、AOF、混合持久化详解!如何选择?| JavaGuide

本文已经收录进 JavaGuide(「Java学习+面试指南」一份涵盖大部分 Java 程序员所需要掌握的核心知识。) Redis 持久化机制属于后端面试超高频的面试知识点,老生常谈了,需要重点花时间掌握。即使不是准备面试,日常开发也是需要经常用到的。 最近抽空对之前写的 Redis 持久化…

【ES】---ES的聚合(aggregations)

目录 一、前言1、聚合分类2、聚合的实现方式二、RestAPI--bucket聚合案例11、按照类型分bucket2、按照(String)时间分bucket三、RestAPI-- metric聚合案例11、metric指标统计四、RestAPI-- pipeline聚合案例1一、前言 聚合是对文档数据的统计、分析、计算。 注意:参与聚合的字…

Java中I/O流是什么?输入/输出流又是什么?

在 Java中所有数据都是使用流读写的。流是一组有序的数据序列&#xff0c;将数据从一个地方带到另一个地方。根据数据流向的不同&#xff0c;可以分为输入&#xff08;Input&#xff09;流和输出&#xff08;Output&#xff09;流两种。 在学习输入和输出流之前&#xff0c;我们…

PDF怎么转成Excel?4个方法非常实用!

如何使用记灵在线工具将PDF转成Excel&#xff1f;在日常工作中&#xff0c;我们经常需要转换PDF文件为Excel文件以方便我们处理数据。虽然PDF格式对于文本和图片的可视化效果效果不错&#xff0c;但是在处理数据时&#xff0c;Excel表格更加便捷。当我们将PDF文件转换成Excel文…

JDBC的的使用

首先导入jar包。 https://downloads.mysql.com/archives/c-j/ package com.test.sql;import java.sql.*;public class StudySql {public static void init() throws SQLException {Statement stmt null;Connection conn null;ResultSet res null;PreparedStatement pstm…

LeetCode Top100 Liked 题单(序号1~17)

01Two Sum - LeetCode 我自己写的代码【193ms】 因为不知道怎么加cmp函数&#xff0c;就只能pair的first设为值了&#xff0c;但其实这也是瞎做&#xff0c;应该也是O(n&#xff09;吧 class Solution { public:vector<int> twoSum(vector<int>& nums, int …

【渗透测试】PNG图片隐藏部分恢复

1、图片原尺寸还原方法一 缺点就是有点慢&#xff0c;毕竟遍历的次数比较多 import binascii import struct import sysfilename sys.argv[1] crcbp open(filename, "rb").read() # 打开图片 crc32frombp int(crcbp[29:33].hex(), 16) # 读取图片中的CRC校验值 …

HarmonyOS学习路之方舟开发框架—学习ArkTS语言(状态管理 一)

状态管理概述 在前文的描述中&#xff0c;我们构建的页面多为静态界面。如果希望构建一个动态的、有交互的界面&#xff0c;就需要引入“状态”的概念。 图1 效果图 上面的示例中&#xff0c;用户与应用程序的交互触发了文本状态变更&#xff0c;状态变更引起了UI渲染&#x…