RocketMQ-快速实战

MQ简介

MQ:MessageQueue,消息队列。是在互联网中使用非常广泛的一系列服务中间件。

Message:消息。消息是在不同进程之间传递的数据。这些进程可以部署在同一台机器上,也可以分布在不同机器上。(数据形式:二进制压缩数据、RPC、http,都属于进程间通讯的机制)

Queue:队列。队列原意是指一种具有FIFO(先进先出)特性的数据结构,是用来缓存数据的。对于消息中间件产品来说,能不能保证FIFO特性,尚值得考量。但是,所有消息队列都是需要具备存储消息,让消息排队的能力。

作用:

  • 异步,提高系统的响应速度、吞吐量。

  • 解耦,减少服务之间的影响。提高系统整体的稳定性以及可扩展性。另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。

  • 消峰,以稳定的系统资源应对突发的流量冲击。

RocketMQ产品特点

RocketMQ介绍

RocketMQ是阿里巴巴开源的一个消息中间件,在阿里内部历经了双十一等很多高并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的一个顶级项目。

早期阿里使用ActiveMQ,但是,当消息开始逐渐增多后,ActiveMQ的IO性能很快达到了瓶颈。于是,阿里开始关注Kafka。但是Kafka是针对日志收集场景设计的,他的高级功能并不是很贴合阿里的业务场景。尤其当他的Topic过多时,由于Partition文件也会过多,这就会加大文件索引的耗时,会严重影响IO性能。于是阿里才决定自研中间件,最早叫做MetaQ,后来改名成为RocketMQ。最早他所希望解决的最大问题就是多Topic下的IO性能压力。但是产品在阿里内部的不断改进,RocketMQ开始体现出一些不一样的优势。

RocketMQ特点

当今互联网MQ产品众多,其中,影响力和使用范围最大的当数Apache Kafka、RabbitMQ、Apache RocketMQ以及Apache Plusar。这几大产品虽然都是典型的MQ产品,但是由于设计和实现上的一些差异,造成他们适合于不同的细分场景。

优点缺点适合场景
Apache Kafka吞吐量非常大,性能非常好,集群高可用。会有丢数据的可能,功能比较单一日志分析、大数据采集
RabbitMQ消息可靠性高,功能全面。erlang语言不好定制。吞吐量比较低。企业内部小规模服务调用
Apache Pulsar基于Bookeeper构建,消息可靠性非常高。周边生态还有差距,目前使用的公司比较少。企业内部大规模服务调用
Apache RocketMQ高吞吐、高性能、高可用。功能全面。客户端协议丰富。使用java语言开发,方便定制。服务加载比较慢。几乎全场景,特别适合金融场景

其中RocketMQ,孵化自阿里巴巴。历经阿里多年双十一的严格考验,RocketMQ可以说是从全世界最严苛的高并发场景中摸爬滚打出来的过硬产品,也是少数几个在金融场景比较适用的MQ产品。从横向对比来看,RocketMQ与Kafka和RabbitMQ相比。RocketMQ的消息吞吐量虽然和Kafka相比还是稍有差距,但是却比RabbitMQ高很多。在阿里内部,RocketMQ集群每天处理的请求数超过5万亿次,支持的核心应用超过3000个。而RocketMQ最大的优势就是他天生就为金融互联网而生。他的消息可靠性相比Kafka也有了很大的提升,而消息吞吐量相比RabbitMQ也有很大的提升。另外,RocketMQ的高级功能也越来越全面,广播消费、延迟队列、死信队列等等高级功能一应俱全,甚至某些业务功能比如事务消息,已经呈现出领先潮流的趋势。

RocketMQ快速实战

快速搭建RocketMQ服务

RocketMQ的官网地址: RocketMQ · 官方网站 | RocketMQ

下载页面地址:下载 | RocketMQ

当前最新的版本是5.x,这是一个着眼于云原生的新版本,给 RocketMQ 带来了非常多很亮眼的新特性。但是目前来看,企业中用得还比较少。因此,我们这里采用的还是更为稳定的4.9.5版本。

注:在2020年下半年,RocketMQ新推出了5.0的大版本,这对于RocketMQ来说,是一个里程碑式的大版本。在这个大版本中,RocketMQ对整体功能做了一次大的升级。增加了很多非常有用的新特性,也对已有功能重新做了升级。

比如在具体功能方面,在4.x版本中,对于定时消息,只能设定几个固定的延迟级别,而5.0版本中,已经可以指定具体的发送时间了。在客户端语言方面,4.x版本,RocketMQ原生只支持基于Netty框架的Java客户端。而在5.0版本中,增加了对Grpc协议的支持,这基本上就解除了对客户端语言的限制。在服务端架构方面,4.x版本只支持固定角色的普通集群和可以动态切换角色的Dledger集群,而在5.0版本中,增加了Dledger Controller混合集群模式,即可以混合使用Dledger的集群机制以及 Broker 本地的文件管理机制。

但是功能强大,同时也意味着问题会很多。所以目前来看,企业中直接用新版本的还比较少。小部分使用新版本的企业,也大都是使用内部的改造优化版本。

这里下载的是这个版本:

上传到服务器并解压:(unzip rocketmq-all-4.9.5-bin-release.zip

RocketMQ建议的运行环境需要至少12G的内存,这是生产环境比较理想的资源配置。但是我买的云服务器是2核4g,所以需要修改启动配置:(:set number临时显示行号)

注意:生产环境不建议修改上面两个配置。

RocketMQ是基于Java开发的,所以依赖Java开发环境,安装JDK步骤省略,建议采用1.8版本

RocketMQ的后端服务分为nameserver和broker两个服务:

# 第一步:启动nameserver服务,进入安装目录执行命令
nohup bin/mqnamesrv &
# 是否启动成功可以通过jps检查,启动成功或失败可以查看nohup.out文件

# 为了方便测试在conf/broker.conf文件添加配置:
autoCreateTopicEnable=true
# 注意:如果是云服务器,还需要额外添加一行配置
brokerIP1 = 你的公网IP

# 第二步:启动broker服务,进入安装目录执行命令
nohup bin/mqbroker &

注意:

1、在实际服务部署时,通常会将RocketMQ的部署地址添加到环境变量当中。例如使用vi ~/.bash_profile指令,添加以下内容

export ROCKETMQ_HOME=/home/rocket/rocketmq-all-4.9.5-bin-release // 修改为你的安装目录
PATH=$ROCKETMQ_HOME/bin:$PATH
export PATH

2、停止RocketMQ服务可以通过mqshutdown指令进行,停止服务有短暂延迟,不建议kill杀进程

mqshutdown namesrv # 关闭nameserver服务
mqshutdown broker # 关闭broker服务

快速实现消息收发

1、命令行快速实现消息收发

第一步:需要配置一个环境变量NAMESRV_ADDR,指向之前启动的nameserver服务。

通过vi ~/.bash_profile添加以下配置。然后使用source ~/.bash_profile让配置生效。

export NAMESRV_ADDR='localhost:9876'

修改后文件:

第二步:通过指令启动RocketMQ的消息生产者发送消息。默认往RocketMQ中发送1000条消息

tools.sh org.apache.rocketmq.example.quickstart.Producer 
    
...消息发送日志
SendResult [sendStatus=SEND_OK, msgId=7F0000018FBA1B6D358697CBE7FB03E7, offsetMsgId=C0A800DA00002A9F000000000005DA64, messageQueue=MessageQueue [topic=TopicTest, brokerName=hcss-ecs-3744, queueId=3], queueOffset=499]
11:25:22.820 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
11:25:22.825 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
11:25:22.825 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.0.218:10911] result: true

第三步:可以启动消息消费者接收之前发送的消息

tools.sh org.apache.rocketmq.example.quickstart.Consumer
    
...消息消费日志
ConsumeMessageThread_please_rename_unique_group_name_4_15 Receive New Messages: [MessageExt [brokerName=hcss-ecs-3744, queueId=2, storeSize=192, queueOffset=199, sysFlag=0, bornTimestamp=1701312827986, bornHost=/192.168.0.218:32850, storeTimestamp=1701312827987, storeHost=/192.168.0.218:10911, msgId=C0A800DA00002A9F00000000000256D2, commitLogOffset=153298, bodyCRC=748130833, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1701314617997, UNIQ_KEY=7F0000018E561B6D358697AEFE52031F, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 57, 57], transactionId='null'}]]

注意:这个Consumer消费者的指令并不会主动结束,他会继续挂起,等待消费新的消息。可以使用CTRL+C停止该进程。

2、搭建Maven客户端项目

第一步:创建一个标准的maven项目,在pom.xml中引入以下核心依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.5</version>
</dependency>

第二步:就可以直接创建一个简单的消息生产者

public class Producer
{
    public static void main(String[] args)
        throws MQClientException, InterruptedException
    {
        // 初始化一个消息生产者
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 指定nameserver地址
        producer.setNamesrvAddr("192.168.232.128:9876");
        // 启动消息生产者服务
        producer.start();
        for (int i = 0; i < 2; i++)
        {
            try
            {
                // 创建消息。消息由Topic,Tag和body三个属性组成,其中Body就是消息内容
                Message msg =
                    new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送消息,获取发送结果
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            catch (Exception e)
            {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        // 消息发送完后,停止消息生产者服务。
        producer.shutdown();
    }
}

注意:对于生产者,需要指定对应的nameserver服务的地址,这个地址需要指向你自己的服务器。

第三步:创建一个消息消费者接收RocketMQ中的消息。

public class Consumer
{
    public static void main(String[] args)
        throws InterruptedException, MQClientException
    {
        // 构建一个消息消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        // 指定nameserver地址
        consumer.setNamesrvAddr("192.168.232.128:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 订阅一个感兴趣的话题,这个话题需要与消息的topic一致
        consumer.subscribe("TopicTest", "*");
        // 注册一个消息回调函数,消费到消息后就会触发回调。
        consumer.registerMessageListener(new MessageListenerConcurrently()
        {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
            {
                msgs.forEach(messageExt -> {
                    try
                    {
                        System.out.println("收到消息:" + new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    }
                    catch (UnsupportedEncodingException e)
                    {
                    }
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者服务
        consumer.start();
        System.out.print("Consumer Started");
    }
}

注意:对于消费者,同样需要指定nameserver的地址,另外消费者需要在RocketMQ中订阅具体的Topic,只有发送到这个Topic上的消息才会被这个消费者接收到

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/203993.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2023工作中遇到问题一

1、vue中模拟鼠标点击下拉框 <xxx-select ref"aaa" :value"form.serviceCharge" placeholder"请输入" add"(value) > getBasGoods(value)" :configInfo"configInfo" />vue中模拟鼠标点击 this.$refs.aaa.$el.cli…

经典的回溯算法题leetcode全排列问题思路代码详解

目录 全排列问题 leetcode46题.全排列 leetcode47题.全排列II 对回溯算法感兴趣的朋友也可以多多支持一下我的其他文章。 回溯算法详解-CSDN博客 经典的回溯算法题leetcode组合问题整理及思路代码详解-CSDN博客 经典的回溯算法题leetcode子集问题思路代码详解-CSDN博客 …

AI文章生成器-免费批量原创文章生成的工具

在科技的大潮中&#xff0c;AI技术愈发成熟&#xff0c;文言一心文章生成器悄然崭露头角。这一创新性工具的出现&#xff0c;为广大用户提供了快速、高效的文章生成方式。147SEO的批量原创功能更是锦上添花&#xff0c;让文章创作变得更为轻松。正是在这背后&#xff0c;我们看…

iPhone苹果手机如何将词令网页添加到苹果iPhone手机桌面快捷打开?

iPhone苹果手机如何将词令网页添加到苹果iPhone手机桌面快捷打开&#xff1f; 1、在iPhone苹果手机上找到「Safari浏览器」,并点击打开&#xff1b; 2、打开Safari浏览器后&#xff0c;输入词令官方网站地址&#xff1a;ciling.cn ; 3、打开词令官网后&#xff0c;点击Safari…

数字图像处理(实践篇)十二 基于小波变换的图像降噪

目录 一 基于小波变换的图像降噪 &#xff08;1&#xff09;小波变换基本理论 &#xff08;2&#xff09;小波分析在图像处理中的应用 &#xff08;3&#xff09;小波变换原理 &#xff08;4&#xff09;小波降噪原理 &#xff08;5&#xff09;小波降噪算法的实现 &…

Windows系统IIS服务配置与网站搭建,结合内网穿透实现公网访问

文章目录 1.前言2.Windows网页设置2.1 Windows IIS功能设置2.2 IIS网页访问测试 3. Cpolar内网穿透3.1 下载安装Cpolar内网穿透3.2 Cpolar云端设置3.3 Cpolar本地设置 4.公网访问测试5.结语 1.前言 在网上各种教程和介绍中&#xff0c;搭建网页都会借助各种软件的帮助&#xf…

python文件路径读取提示can‘t open/read file: check file path/integrity

它的意思是“不能打开/读取文件”&#xff0c;让我们检查路径&#xff0c;在查看下路径前是否忘记加r了 python文件路径读取&#xff0c;如果不加r&#xff0c;上述文件路径在代码运行时会报错&#xff0c;因为其会先将双引号”“去掉&#xff0c;然后系统看到了文件路径中有\r…

应用场景丨智慧社区怎么有效预警内涝积水灾害

在繁华的社区中&#xff0c;一场突如其来的暴雨可能会让整个社区陷入“水深火热”。面对这样的困境&#xff0c;社区内涝积水监测系统应运而生&#xff0c;成为社区生命线的重要守护者。 社区内涝积水监测系统是一套高科技预警机制&#xff0c;它运用传感器、数据采集器、通信网…

Nginx(无法解析PHP网页如何解决?FPM解决你的烦恼!)

♥️作者&#xff1a;小刘在C站 ♥️个人主页&#xff1a; 小刘主页 ♥️不能因为人生的道路坎坷,就使自己的身躯变得弯曲;不能因为生活的历程漫长,就使求索的 脚步迟缓。 ♥️学习两年总结出的运维经验&#xff0c;以及思科模拟器全套网络实验教程。专栏&#xff1a;云计算技…

卓越进行时 | 市人大常委组织深入赛宁网安考察调研

11月28日&#xff0c;市人大常委会党组书记、主任龙翔来到基层一线&#xff0c;督导江宁区主题教育工作。市委第五巡回督导组组长鲍陈&#xff0c;区领导赵洪斌、任宁等参加。 督导期间&#xff0c;龙翔在网络安全卓越中心听取赛宁网安研发情况汇报&#xff0c;了解公司产品在…

常见的回归测试策略有哪几种?

&#x1f4e2;专注于分享软件测试干货内容&#xff0c;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;交流讨论&#xff1a;欢迎加入我们一起学习&#xff01;&#x1f4e2;资源分享&#xff1a;耗时200小时精选的「软件测试」资…

NVM管理多个nodejs安装步骤

文章目录 NVM安装步骤**二、nvm下载****三、nvm安装**版本安装与切换命令使用 NVM安装步骤 一、nvm说明 nvm 主要是用来管理 nodejs 和 npm 版本的工具&#xff0c;可以用来切换不同版本的 nodejs。 安装nvm之前先卸载node 二、nvm下载 https://github.com/coreybutler/nvm-w…

关于前端学习的思考-内边距、边框和外边距

从最简单的盒子开始思考 先把实际应用摆出来&#xff1a; margin&#xff1a;居中&#xff0c;控制边距。 padding&#xff1a;控制边距。 border&#xff1a;制作三角形。 盒子分为内容盒子&#xff0c;内边距盒子&#xff0c;边框和外边距。 如果想让块级元素居中&#…

MySQL图书管理系统(49-94)源码

-- 九、 子查询 -- 无关子查询 -- 比较子查询&#xff1a;能确切知道子查询返回的是单值时&#xff0c;可以用>&#xff0c;<&#xff0c;&#xff0c;>&#xff0c;<&#xff0c;!或<>等比较运算符。 -- 49、 查询与“俞心怡”在同一个部门的读者的借…

【Docker】安装RabbitMQ

1.拉取镜像 docker pull rabbitmq 2.运行容器 docker run \-e RABBITMQ_DEFAULT_USERitcast \-e RABBITMQ_DEFAULT_PASS123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq 3.安装管理页面的插件 进入容器内部 dock…

mac安装homebrew/brew遇到443报错

文章目录 问题描述解决方法方法一方法二 参考文献 问题描述 brew 全称Homebrew 是Mac OSX上的软件包管理工具 想在mac终端安装&#xff0c;运行网上提供的指令 /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)&quo…

【Android Jetpack】Lifecycle 感知生命周期

文章目录 背景示例LifeCycle的原理LifecycleOwner自定义LifecycleOwnerLifecycleObserver 示例改进使用LifecycleService解耦Service与组件整个应用进程的生命周期ProcessLifecycleOwner 背景 在Android应用程序开发中&#xff0c;解耦很大程度上表现为系统组件的生命周期与普…

Linux学习笔记09、Shell命令之历史命令和自动补全

上一篇&#xff1a;Linux学习笔记08、Shell命令之常用命令缩写及全称 目录 1、历史命令&#xff1a; 1.1、查看所有历史命令列表&#xff1a; 1.2、查看指定历史命令&#xff1a; 1.3、清除历史命令&#xff1a; 2、自动补全 2.1、当字符串唯一时&#xff1a; 2.2、当有多个…

在编程中遇到的问题总结

IDEA空包粘黏问题 创建好目录以后会发现idea自动将空包合并在一起了&#xff0c;而且点击设置里面也没有Compact Middle Package Compact Middle Package如果不在设置的主面板上&#xff0c;则点击Tree Appearance&#xff0c;会发现Compact Middle Package在Tree Appearance里…

【JavaEE初阶】volatile 关键字、wait 和 notify

目录 一、volatile 关键字 1、volatile 能保证内存可见性 2、volatile 不保证原子性 二、wait 和 notify 1、wait()方法 2、notify()方法 3、notifyAll()方法 4、wait 和 sleep 的对比 一、volatile 关键字 1、volatile 能保证内存可见性 我们前面的线程安全文章中&…