RocketMQ学习笔记(一)

一、基本概念

  • 生产者(Producer):也称为消息发布者,是RocketMQ中用来构建并传输消息到服务端的运行实体,举例:发信者
  • 主题(Topic):Topic是RocketMQ中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息;Topic是一个逻辑概念,并不是实际的消息容器;
  • 消息队列(MessageQueue):队列是RocketMQ中消息存储和传输的实际容器,也是消息的最小存储单元,相当于是Topic的分区;用于并行发送和接收消息;
  • 消费者(Consumer):也称为消息订阅者,是RocketMQ中用来接收并处理消息的运行实体,举例:收信者
  • 消费者组(ConsumerGroup):消费者组是RocketMQ中承载多个消费行为一致的消费者负载均衡分组(不同的消费者组,可以消费同一个topic的消息)。和消费者不同,消费者组是一个逻辑概念。
  • NameServer:可以理解成注册中心,负责更新和发现Broker服务。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的,它是无状态的,举例:各个邮局的管理机构
  • Broker:可以理解为消息中转角色,负责消息的存储和转发,接收生产者产生的消息并持久化消息;当用户发送的消息被发送到Broker时,Broker会将消息转发到与之关联的Topic中,以便让更多的接收者进行处理,举例:邮局

1.1.消息模型

请添加图片描述

1.2.部署模型

请添加图片描述

1.3 RocketMQ下载地址

RocketMQ的官网地址:https://rocketmq.apache.org/
Github地址:https://github.com/apache/rocketmq
下载地址:https://rocketmq.apache.org/zh/download/

二、实战

我们需要先搭建一个基于Maven的springboot项目,只需要加入以下依赖:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>5.1.0</version>
</dependency>

2.1 基本样例

生产者生产消息

消息生产者分别通过三种方式发送消息:

  • 同步发送:等待消息返回后再继续进行下面的操作。
  • 异步发送:不等待消息返回直接进入后续流程。broker将结果返回后调用callback函数,并使用CountDownLatch计数。
  • 单向发送:只负责发送,不管消息是否发送成功。
同步发送
package Simple;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
* 同步发送
* Created by BaiLi
*/
public class SyncProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        producer.setNamesrvAddr("192.168.43.137:9876");
        producer.start();
        for (int i = 0; i < 2; i++) {
            Message msg = new Message("Simple", //主题
                                      "TagA",  //设置消息Tag,用于消费端根据指定Tag过滤消息。
                                      "Simple-Sync".getBytes(StandardCharsets.UTF_8) //消息体。
                                     );
            SendResult send = producer.send(msg);
            System.out.printf(i + ".发送消息成功:%s%n", send);
        }
        producer.shutdown();
    }
}
异步发送
package Simple;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* 异步发送
* Created by BaiLi
*/
public class AsyncProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
        producer.setNamesrvAddr("192.168.43.137:9876");
        producer.start();
        CountDownLatch countDownLatch = new CountDownLatch(100);//计数
        for (int i = 0; i < 100; i++) {
            Message message = new Message("Simple", "TagA", "Simple-Async".getBytes(StandardCharsets.UTF_8));
            final int index = i;
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    countDownLatch.countDown();
                    System.out.printf("%d 消息发送成功%s%n", index, sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    countDownLatch.countDown();
                    System.out.printf("%d 消息失败%s%n", index, throwable);
                    throwable.printStackTrace();
                }
            }
                         );
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}
单向发送
package Simple;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
* 单向发送
* Created by BaiLi
*/
public class OnewayProducer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
        producer.setNamesrvAddr("192.168.43.137:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message message = new Message("Simple","TagA", "Simple-Oneway".getBytes(StandardCharsets.UTF_8));
            producer.sendOneway(message);
            System.out.printf("%d 消息发送完成 %n" , i);
        }
        Thread.sleep(5000);
        producer.shutdown();
    }
}

消费者消费消息
消费者消费消息分两种:

  • 拉模式:消费者主动去Broker上拉取消息。
  • 推模式:消费者等待Broker把消息推送过来。
拉模式
package Simple;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.HashSet;
import java.util.Set;

/**
* 拉模式
* Created by BaiLi
*/
public class PullConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("SimplePullConsumer");
        pullConsumer.setNamesrvAddr("192.168.43.137:9876");//执行nameserver地址
        Set<String> topics = new HashSet<>();
        topics.add("Simple");//添加Topic
        topics.add("TopicTest");
        pullConsumer.setRegisterTopics(topics);
        pullConsumer.start();
        while (true) { //循环拉取消息
            pullConsumer.getRegisterTopics().forEach(n -> {
                try {
                    Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues(n);//获取主题中的Queue
                    messageQueues.forEach(l -> {
                        try {
                            //获取Queue中的偏移量
                            long offset = pullConsumer.getOffsetStore().readOffset(l, ReadOffsetType.READ_FROM_MEMORY);
                            if (offset < 0) {
                                offset = pullConsumer.getOffsetStore().readOffset(l, ReadOffsetType.READ_FROM_STORE);
                            }
                            if (offset < 0) {
                                offset = pullConsumer.maxOffset(l);
                            }
                            if (offset < 0) {
                                offset = 0;
                            }
                            //拉取Queue中的消息。每次获取32条
                            PullResult pullResult = pullConsumer.pull(l, "*", offset, 32);
                            System.out.printf("循环拉取消息ing %s%n",pullResult);
                            switch (pullResult.getPullStatus()) {
                                case FOUND:
                                    pullResult.getMsgFoundList().forEach(p -> {
                                        System.out.printf("拉取消息成功%s%n", p);
                                    });
                                    //更新偏移量
                                    pullConsumer.updateConsumeOffset(l, pullResult.getNextBeginOffset());
                            }
                        } catch (MQClientException e) {
                            e.printStackTrace();
                        } catch (RemotingException e) {
                            e.printStackTrace();
                        } catch (MQBrokerException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
                 } catch (MQClientException e) {
                    e.printStackTrace();
                 }
            });
        }
    }
}
推模式
package Simple;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
* 推模式
* Created by BaiLi
*/
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
        pushConsumer.setNamesrvAddr("192.168.43.137:9876");
        pushConsumer.subscribe("Simple","*");
        pushConsumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                list.forEach( n->{
                    System.out.printf("收到消息: %s%n" , n);
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        pushConsumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/618576.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【全开源】Java知识付费教育付费资源付费平台公众号小程序源码

特色功能&#xff1a; 多样化的内容呈现&#xff1a;资源付费平台小程序支持图文、音视频、直播等多种形式的内容呈现&#xff0c;为用户提供了丰富的学习体验。直播课程&#xff1a;专家或讲师可以通过小程序进行在线授课&#xff0c;与用户实时互动&#xff0c;增强了学习的…

再有人说数字孪生大屏没有用,用这8条怼回去。

数字孪生大屏之所以受到欢迎&#xff0c;主要有以下几个原因&#xff1a; 实时数据可视化 数字孪生大屏可以将实时数据以直观的可视化形式展示出来&#xff0c;让用户能够一目了然地了解数据的状态和趋势。这样可以帮助用户更好地理解和分析数据&#xff0c;及时做出决策和调…

动态规划算法练习——计数问题

题目描述 给定两个整数 a 和 b&#xff0c;求 a 和 b 之间的所有数字中 0∼9 的出现次数。 例如&#xff0c;a1024&#xff0c;b1032&#xff0c;则 a 和 b 之间共有 9 个数如下&#xff1a; 1024 1025 1026 1027 1028 1029 1030 1031 1032 其中 0 出现 10 次&#xff0c;1 出现…

蓝桥杯-网络安全比赛(7)基础知识 HTTP、TTL、IP数据包、MSS、MTU、ARP、LLMNR、MDNS、NBNS。

1. IP中TTL值能够给我提供什么信息&#xff1f;2. IP头部中标志、13位偏移、32位源IP地址、目标IP、IP数据包格式&#xff0c;有多少字节3. IP头部中的16位标识是什么&#xff1f;4. MSS 和MTU分别有多大&#xff1f;5. 怎么获取路由IP信息&#xff1f;PING、NSLOOKUP、TRACERT…

day6Qt作业

人脸识别系统 #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <opencv2/opencv.hpp> #include <iostream> #include <math.h> #include<opencv2/face.hpp> #include <vector> #include <map> #include <QMessag…

创建一个react项目(router,store,axios,antd)最后有项目地址

第一步&#xff1a;使用cra脚手架 创建项目 文档地址&#xff1a;Create React App 中文文档 npx create-react-app 你的项目名称 第二步&#xff1a;整理项目结构和删除多余代码 目标效果图&#xff1a; 在src目录下分别新建apis,assets,components,pages,router,store,ut…

重学JavaScript核心知识点(二)—— 详解Js中的模块化

详解Js中的模块化 1. 模块化的背景2. 来看一个例子3. 优雅的做法 —— 创建模块对象4. 模块与类&#xff08;class&#xff09;5. 合并模块6. 动态加载模块 1. 模块化的背景 JavaScript 在诞生之初是体积很小的&#xff0c;早期&#xff0c;它们大多被用来执行独立的脚本任务&…

C++初学者,使用命令行编绎C文件

今天在家里&#xff0c;闲来无事。又开始学习制作Helloworld! VStudio 版本众多&#xff0c;用哪个好呢&#xff0c;真是不好选择。今天就使用网上的大神&#xff1a;theoractice&#xff0c;制造的版本来学习C&#xff0c;我喜欢2013这个版本&#xff0c;真是太好用了。不但C…

Qt自定义控件--提升为

为什么要自定义控件 1&#xff0c;有复合小控件需要组合为一个整体控件时&#xff1b; 2&#xff0c;一个复合控件需要重复使用时&#xff1b; 实现 自定义控件文件 新增三个文件 关联不同组的控件 关联之前的准备工作 1&#xff0c;在主控件选择和子控件所有控件所在控件…

【一键录音,轻松转换:用Python打造个性化音频记录工具】

在数字化时代,音频记录已成为日常学习、工作和娱乐不可或缺的一部分。想象一下,只需简单按下几个键,即可随时随地捕捉灵感,记录会议要点,或是珍藏孩子的童言稚语。本文将引领您步入Python编程的奇妙世界,展示如何借助几个强大的库,构建一个既简单又实用的音频录制及转换…

第十二届蓝桥杯省赛真题 Java A 组【原卷】

文章目录 发现宝藏【考生须知】试题 A: 相乘试题 B: 直线试题 C : \mathrm{C}: C: 货物摆放试题 D: 路径试题 E: 回路计数试题 F : \mathrm{F}: F: 最少砝码试题 G: 左孩子右兄弟试题 H : \mathrm{H}: H: 异或数列试题 I \mathbf{I} I 双向排序试题 J : \mathrm{J}: J: 分…

如何快速展示专业:掌握类的基本概念-类/方法/关键字/变量/数据类型/注释

在李笑来的《财富自由之路》中提到一种初学者快速入门的学习方法&#xff1a;快速掌握最小必要知识。 关于Java的类&#xff0c;最少必要知识就是本文提到的基本概念&#xff0c;掌握了这些基本概念&#xff0c;就对类有了基本的了解&#xff0c;为后续的深入学习和沟通奠定了基…

Go编程语言的调试器Delve | Goland远程连接Linux开发调试(go远程开发)

文章目录 Go编程语言的调试器一、什么是Delve二、delve 安装安装报错cgo: C compiler "gcc" not found: exec: "gcc": executable file not found in $PATH解决 三、delve命令行使用delve 常见的调试模式常用调试方法todo调试程序代码与动态库加载程序运行…

真要这么卷?某国产大模型定价下调90%,百万 tokens 只需 1 元!

就在刚刚&#xff0c;国内明星AI公司——智谱AI官宣重磅炸弹&#xff1a; 将能力对标GPT3.5-Turbo的GLM-3的大模型API调用价格最高下调90%&#xff0c;价格仅为原来的十分之一&#xff01; 废话不多说&#xff0c;直接上图&#xff1a; 官网地址&#xff1a;https://open.big…

【SRC实战】前端脱敏信息泄露

挖个洞先 https://mp.weixin.qq.com/s/xnCQQCAneT21vYH8Q3OCpw “ 以下漏洞均为实验靶场&#xff0c;如有雷同&#xff0c;纯属巧合 ” 01 — 漏洞证明 一、前端脱敏&#xff0c;请求包泄露明文 “ 前端脱敏处理&#xff0c;请求包是否存在泄露&#xff1f; ” 1、获取验…

公有云Linux模拟UDP端口并抓包

目录 写在前面操作步骤服务端开启UDP端口并监听客户端连接Wireshark抓包查看 写在前面 关于具体的操作&#xff0c;请参考我的上一篇文章 公有云Linux模拟TCP三次挥手与四次握手&#xff08;Wireshark抓包验证版&#xff09; 在本文&#xff0c;仅介绍与上一篇不同的地方。 操…

STL中的优先级队列

目录 1.引言 2.简介 3.基本操作 4.实现原理 5.自定义优先级比较 6.相关题目 7.能特点 8.总结 1.引言 在C标准库中&#xff0c;优先级队列是一种非常有用的数据结构&#xff0c;它允许我们根据元素的优先级来对其进行排序和访问。这种数据结构在多种应用场景中都发挥着重…

Linux提权--第三方软件MYSQL数据库提权(WEB+本地)

免责声明:本文仅做技术交流与学习,非法搞事后果自负... 目录 靶场镜像: 过程: 手工: 下载mysql udf poc 进行编译. 进入数据库进行UDF导出 下载(上传) 创建do_system函数调用 探针(./LinEnum.sh),查找suid权限. 配合使用find调用执行 工具: 过程: 外连不上? 隧道出…

云器Lakehouse:Multi-Cluster弹性架构如何实现湖上高并发低延迟分析

导读 在当今快速发展的大数据时代&#xff0c;数据平台的性能和效率对于企业来说至关重要。云器Lakehouse的Multi-Cluster弹性架构为我们提供了一种全新的视角&#xff0c;以应对数据湖上高并发和低延迟分析的挑战。本文将深入探讨云器Lakehouse如何通过其独特的技术理念和架构…

B端弹窗设计指南,3000字讲清楚,内附大量案例。

B端系统弹窗是指在企业级&#xff08;Business to Business&#xff09;系统中&#xff0c;弹出的窗口或对话框&#xff0c;用于向用户展示信息、提供操作选项或者收集用户输入。 一、B端系统弹窗的作用 作用如下&#xff1a; 提示和通知&#xff1a;弹窗可以用于向用户展示重…