手拉手RocketMQ基础

消息中间件的对比

消息中间件

ActiveMQ

RabbitMQ

RocketMQ

kafka

开发语言

java

erlang

java

scala

单击吞吐量

万级

万级

10万级

10万级

时效性

ms

us

ms

ms

可用性

高(主从架构)

高(主从架构)

非常高(主从架构)

非常高(主从架构)

消息中间件: activeMQ:java(jms协议),性能一般,吞吐量低。rabbitMQ:erlang(amqp协议),性能好,功能丰富,吞吐量一般。rocketMQ:java,性能好,吞吐量丰富,功能丰富。Kafka: scala,吞吐量最大,功能单一,大数据领域

RocketMQ安装配置

RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。

rocketmq.apache.org

Broker:经纪人(经理人)

Topic主题:消息区分,分类,虚拟结构

Queue:消息队列

Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

RocketMQ下载安装

解压rocketmq-all-4.9.4-bin-release.zip

unzip rocketmq-all-4.9.4-bin-release.zip

配置 环境变量

vim + /etc/profile

export NAMESRV_ADDR=localhost:9876

根据本地环境需求更改

vim runserver.sh

vim runbroker.sh

修改配置文件

namesrvAddr=localhost:9876

autoCreateTopicEnable=true #自动创建主题

brokerIP1=公网地址

启动

nohup sh mqnamesrv > /opt/rocketmq/namesrv.log &

nohup sh mqbroker -c ../conf/broker.conf > /opt/rocketmq/mqbroker.log &

RocketMQ Dashboard的下载与打包启动

pom.xml这个文件所在的目录下执行 Maven 打包命令

mvn clean package -Dmaven.test.skip=true

\target目录下中,运行jar包

java -jar rocketmq-dashboard-1.0.0.jar

nohup java -jar rocketmq-dashboard-1.0.0.jar --server.port=8888 --rocketmq.config.nameservAddr=127.0.0.1:9876 > dashboard.log &

浏览器访问

快速使用

生产者(Producer)

生产者:也称为消息发布者,负责生产并发送消息至RocketMQ。

消费者(Consumer)

消费者:也称为消息订阅者,负责从RocketMQ接收并消费消息。

消息(Message)

消息:生产或消费的数据,对于RocketMQ来说,消息就是字节数组。

主机(Broker)

RocketMQ的核心,用于暂存和传输消息。

Pom.xml加入依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>

创建生产者

@Test
void rocketmqProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("Topic","消息".getBytes());
SendResult send = producer.send(message);
System.out.println("发送状态"+send.getSendStatus());
//关闭生产者
producer.shutdown();
}

运行生产者

创建消费者

@Test
void rocketmqConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("Topic","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

RocketMQ的概念

分组(Group)

生产者:标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。主要作用用于事务消息,事务消息中如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其它producer,确认这条消息应该commit还是rollback。

消费者:标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。

消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。

主题(Topic)

标识一类消息的逻辑名字,消息的逻辑管理单位。无论消息生产还是消费,都需要指定Topic。

区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息。

标签(Tag)

RocketMQ支持给在发送的时候给topic打tag,同一个topic的消息虽然逻辑管理是一样的。但是消费topic的时候,如果你消费订阅的时候指定的是tagA,那么tagB的消息将不会投递。

消息队列(Message Queue)

简称Queue或Q。消息物理管理单位。一个Topic将有若干个Q。若一个Topic创建在不同的Broker,则不同的broker上都有若干Q,消息将物理地存储落在不同Broker结点上,具有水平扩展的能力。

无论生产者还是消费者,实际的生产和消费都是针对Q级别。例如Producer发送消息的时候,会预先选择(默认轮询)好该Topic下面的某一条Q发送;Consumer消费的时候也会负载均衡地分配若干个Q,只拉取对应Q的消息。

每一条message queue均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。

偏移量(Offset)

RocketMQ中,有很多offset的概念。一般我们只关心暴露到客户端的offset。不指定的话,就是指Message Queue下面的offset。

Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是offset,Message queue中的max offset表示消息的最大offset。

所遇问题

错误: 无法验证 dlcdn.apache.org 的由 “/C=US/O=Let‘s Encrypt/CN=R3” 颁发的证书: 颁发的证书已经过期。要以不安全的方式连接至 dlcdn.apa

yum install -y ca-certificates

wget --no-check-certificate https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/446234.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

分享软件项目实施方案模板

本项目在实施过程中将遵守做到以下几个方面&#xff1a; 与建设单位共同完成整个系统软件、网络等设计,负责系统的开发、测试、调试、人员培训、系统的试运行和交付&#xff0c;并保证系统质量。负责系统的维护、应用软件的升级和更新。提出对系统硬件设备的相关技术要求。在项…

JDBC的学习记录

JDBC就是使用java语言操作关系型数据库的一套API。 JDBC&#xff08;Java Database Connectivity&#xff09;是Java语言中用于连接和操作数据库的一种标准接口。它提供了一组方法和类&#xff0c;使Java程序能够与各种不同类型的关系型数据库进行交互。 JDBC的主要功能包括建…

PythonWeb

例题一 from flask import Flask app Flask(__name__) app.route(/index) def index():return f<h1>这是首页&#xff01;</h1> def second():return f<h1>这是第二页&#xff01;</h1> if __name__ __name__:app.run(host"0.0.0.0",port…

图腾柱PFC工作原理:一张图

视屏链接&#xff1a; PFC工作原理

SpringCloud--Sentinel使用

一、快速开始 Sentinel 的使用可以分为两个部分&#xff1a; 核心库&#xff08;Java 客户端&#xff09;&#xff1a;不依赖任何框架/库&#xff0c;能够运行于 Java 8 及以上的版本的运行时环境&#xff0c;同时对 Dubbo / Spring Cloud 等框架也有较好的支持。控制台&…

OpenFeign相关配置(超时,重试,HttpClient5,日志)

1.超时控制 默认OpenFeign客户端等待60秒钟&#xff0c;但是服务端处理超过规定时间会导致Feign客户端返回报错。 yml文件对应配置: connectTimeout&#xff1a;连接超时时间 readTimeout:请求处理超时时间 1.yml配置 server:port: 80spring:application:name: cloud-consu…

F - Earn to Advance

解题思路 由于对于一点不知道后面得花费&#xff0c;所以无法决策当前是否要停下赚钱或要停下多久考虑一点&#xff0c;可以由其左上方的所有点到达所以从往前推&#xff0c;得出到的总花费然后考虑从之后不赚钱直接到最终所用次数和剩余钱若存在&#xff0c;在后面点赚钱更优…

面试经典150题 -- 图的广度优先遍历 (总结)

总的链接 面试经典 150 题 - 学习计划 - 力扣&#xff08;LeetCode&#xff09;全球极客挚爱的技术成长平台 909 . 蛇梯棋 链接 : . - 力扣&#xff08;LeetCode&#xff09; 题意 &#xff1a; 直接bfs就好了 &#xff0c; 题意难以理解 : class Solution:def snakesA…

mockjs学习

1.前言 最近面试发现之前团队协同合作的项目没有mock数据难以向面试官直接展示&#xff0c;所以迟到得来速学一下mockjs。 参考视频&#xff1a;mockJs 妈妈再也不用担心我没有后端接口啦_哔哩哔哩_bilibili 一开始查阅了一些资料&#xff0c;先是看了下EasyMock&#xff0c…

分享几个Google Chrome谷歌浏览器历史版本下载网站

使用selenium模块的时候&#xff0c;从官网下载的谷歌浏览器版本太高&#xff0c;驱动不支持&#xff0c;所以需要使用历史的谷歌浏览器版本 &#xff0c;这里备份一下以防找不到了。 驱动下载地址&#xff1a;https://registry.npmmirror.com/binary.html?pathchromedriver 文…

Windows C++ 实现远程虚拟打印机(远程共享打印机)

编译错误已经修改完后的工程修改后的下载地址 https://download.csdn.net/download/2403_83063732/88928550 1、下载clawpdf&#xff08;0.8.7版本&#xff09; https://github.com/clawsoftware/clawPDF 2、打开clawpdf工程开始编译C#工程&#xff0c;出现如下错误&#xf…

利用YOLOv5模型进行锥桶识别

目录 1. YOLOv5模型简介 2. 准备数据集 3. 训练模型 4. 模型评估 5. 模型部署与应用 6. 注意事项 在计算机视觉领域&#xff0c;目标检测是一项重要的任务&#xff0c;它可以帮助我们识别图像或视频中的特定物体并进行定位。而YOLOv5是一种高效的目标检测模型&#xff0c…

栈与队列的故事

​​​​​​​ 目录 ​编辑​​​​​​​ 一、栈 1.1栈的概念及结构 1.2栈的实现 1、实现支持动态增长的栈 2、初始化栈 3、入栈 4、出栈 5、获取栈顶元素 6、检测栈是否为空 7、获取栈中有效元素个数 8、销毁栈 9、测试 1.3源码 二、队列 2.1队列的概念及结构…

【AI辅助研发】-开端:未来的编程范式

编程的四种范式 面向机器编程范式 面向机器编程范式是最原始的编程方式&#xff0c;它直接针对计算机硬件进行操作。程序员需要了解计算机的内部结构、指令集和内存管理等细节。在这种范式下&#xff0c;编程的主要目标是编写能够直接控制计算机硬件运行的机器代码。面向机器…

Babel:现代JavaScript的桥梁

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

【Prometheus】PromQL

数据类型 即时向量&#xff08;instant vector&#xff09; node_cpu_seconds_total{instance"ahoj-dev-ubuntu-virtualbox",mode"idle"} 区间向量&#xff08;range vector&#xff09; node_cpu_seconds_total{instance"ahoj-dev-ubuntu-virtu…

java正则表达式概述及案例

前言&#xff1a; 学习了正则表达式&#xff0c;记录下使用心得。打好基础&#xff0c;daydayup! 正则表达式 什么是正则表达式 正则表达式由一些特定的字符组成&#xff0c;代表一个规则。 正则表达式的功能 1&#xff1a;用来校验数据格式是否合规 2&#xff1a;在一段文本…

针对娃哈哈和农夫山泉,AI是如何看待的

娃哈哈和农夫山泉事件是中国饮料行业的两个重要事件。娃哈哈和农夫山泉都是中国知名的饮料品牌&#xff0c;两者之间的竞争一直存在。以下是对这两个事件的介绍&#xff1a; 1. 娃哈哈事件&#xff1a;娃哈哈是中国最大的饮料生产企业之一&#xff0c;也是中国最具影响力的品牌…

.Net6使用JWT认证和授权

文章目录 目的实现案例一.项目所需包&#xff1a;二.配置项目 appsettings.json 文件&#xff1a;三.创建Model文件夹&#xff0c;添加AppConfig类和UserRole类1.AppConfig类获取appsettings.json文件中的值2.UserRole类用于区分用户信息和权限 四.主体代码案例&#xff1a;1.L…

C++的类与对象(三):构造函数、析构函数、对象的销毁顺序

目录 类的6个默认成员函数 构造函数 语法 特性 析构函数 特性 对象的销毁顺序​​​​​​​​​​​​​​ 类的6个默认成员函数 问题&#xff1a;一个什么成员都没的类叫做空类&#xff0c;空类中真的什么都没有吗&#xff1f; 基本概念&#xff1a;任何类在什么都不…