手搭手RocketMQ发送消息

消息中间件的对比

消息中间件

ActiveMQ

RabbitMQ

RocketMQ

kafka

开发语言

java

erlang

java

scala

单击吞吐量

万级

万级

10万级

10万级

时效性

ms

us

ms

ms

可用性

高(主从架构)

高(主从架构)

非常高(主从架构)

非常高(主从架构)

消息中间件: activeMQ:java(jms协议),性能一般,吞吐量低。rabbitMQ:erlang(amqp协议),性能好,功能丰富,吞吐量一般。rocketMQ:java,性能好,吞吐量丰富,功能丰富。Kafka: scala,吞吐量最大,功能单一,大数据领域

RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。


RocketMQ的作用:数据收集、限流削峰、异步解耦
数据收集
分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。

限流削峰
MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

异步解耦
上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。

rocketmq.apache.org

Broker:经纪人(经理人)

Topic主题:消息区分,分类,虚拟结构

Queue:消息队列

Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

发送消息

发送同步消息

同步消息发送后会用一个返回值,也就是MQ服务器接收到消息返回的一个确认,这种方式非常安全,但是性能就没那么高,而在MQ集群中,也是要等到所有的从机都复制了消息以后才会返回,这种方式适合重要消息的场景

@Test
void rocketmqProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("Topic","消息".getBytes());
SendResult send = producer.send(message);
System.out.println("发送状态"+send.getSendStatus());
//关闭生产者
producer.shutdown();
}

@Test
void rocketmqConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("Topic","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

发送异步消息

异步消息常用在对响应时间敏感的业务场景,发送端不能容忍长时间等待Broker的响应。发送完后会有一个异步消息通知

@Test
void aysncProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("aysncTopic","异步消息".getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功"+sendResult);
}

@Override
public void onException(Throwable throwable) {
System.err.println("发送失败"+throwable);
}
});
System.out.println("执行了");
//关闭生产者
//producer.shutdown();
//挂起当前jvm
System.in.read();
}

@Test
void aysncConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("aysncTopic","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

发送单向消息

单向消息发送这种方式不关心发送结果的场景,这种方式吞吐量大,但存在消息丢失的风险。使用案例:日志信息发送

@Test
void OnewayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("Oneway");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("Oneway","单向消息

".getBytes());
//发送消息
producer.sendOneway(message);
//关闭生产者
producer.shutdown();
}

@Test
void OnewayConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("Oneway","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

发送延时消息

发送延时消息,顾名思义。场景:比如淘宝商城下单后,并未支付,有30分钟未支付订单状态

@Test
void delayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("delayGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("delay","延迟消息".getBytes());
//设置延时 根据官方延时等级
message.setDelayTimeLevel(2);
//发送消息
producer.sendOneway(message);
//关闭生产者
producer.shutdown();
}

@Test
void delayConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("delay","*");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

发送批量消息

批量消息:可以一次性发送一组消息

@Test
void delayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("delayGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
//批量消息
List<Message> messages = Arrays.asList(
new Message("delay","批量消息1".getBytes()),
new Message("delay","批量消息2".getBytes()),
new Message("delay","批量消息3".getBytes())
);
//发送消息
producer.send(messages);
//关闭生产者
producer.shutdown();
}

发送带标签消息

RocketMQ提供消息过滤功能,可根据业务逻辑区分,带有A标签的被A消费,带有B标签的被B消费

@Test
void TagProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("TagGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
//批量消息
Message message1 = new Message("tagTopic", "tagA", "tag标签内容A".getBytes());
Message message2 = new Message("tagTopic", "tagB", "tag标签内容B".getBytes());
//发送消息
producer.send(message1);
producer.send(message2);
//关闭生产者
producer.shutdown();
}

@Test
void TagAConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("tagTopic","tagA");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

@Test
void TagBConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("tagTopic","tagB");

//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理

for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

 发送顺序消息

发送的消息要保证消息是一定有序的,顺序消息,发送到同一个队列

实体类

@Data
@AllArgsConstructor
public class MessageM {
private int userID;
private String desc;
}

顺序消息,发送到同一个队列

private List<MessageM> messageMs = Arrays.asList(
new MessageM(1,"下单"),
new MessageM(1,"付款"),
new MessageM(1,"配送"),
new MessageM(2,"下单"),
new MessageM(2,"付款"),
new MessageM(2,"配送")
);

@Test
void orderProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.211.131:9876");
//启动
producer.start();

messageMs.forEach(messageM -> {
//创建消息
Message message = new Message("orderMsg",messageM.toString().getBytes());
//发送顺序消息,发送到同一个队列
try {
//相同的userID去相同的队列
producer.send(
message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//选择队列
int hashCode = o.toString().hashCode();
int i = hashCode % list.size();
return list.get(i);
}
},
messageM.getUserID());
} catch (MQClientException e) {
throw new RuntimeException(e);
} catch (RemotingException e) {
throw new RuntimeException(e);
} catch (MQBrokerException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
//关闭生产者
producer.shutdown();
}

@Test
void orderConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.211.131:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("orderMsg","*");

//设置监听器(一直,异步回调方式)
//MessageListenerConcurrently 并发模式,多线程
//MessageListenerOrderly 顺序模式,单线程
consumer.registerMessageListener(new MessageListenerOrderly() {
//消费方法
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
System.out.println("当前线程ID"+Thread.currentThread().getId());
return ConsumeOrderlyStatus.SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/454260.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

flutter入门

本文真对 Flutter 的技术特性&#xff0c;做了一些略全面的入门级的介绍&#xff0c;如果你听说过Flutter&#xff0c;想去了解它&#xff0c;但是又不想去翻厚厚的API&#xff0c;那么本文就是为你准备的。 随着纯客户端到Hybrid技术&#xff0c;到RN&Weex&#xff0c;再…

Vue2(五):收集表单数据、过滤器、内置指令和自定义指令

一、回顾 总结Vue监视数据 1.Vue监视数据的原理&#xff1a; 1.vue会监视data中所有层次的数据。 2.如何监测对象中的数据?通过setter实现监视&#xff0c;且要在new Vue时就传入要监测的数据。(1&#xff09;.对象中后追加的属性&#xff0c;Vue默认不做响应式处理(2&#…

苍穹外卖学习-----2024/03/010---redis,店铺营业状态设置

1.Redis入门 2.在Java中操作Redis 3.店铺营业状态设置 BUG!!! 今天在启动项目时&#xff0c;用到了Redis缓存数据库&#xff0c;但是却出现了报错信息&#xff1a; ERR Client sent AUTH, but no password is set。Caused by: io.lettuce.core.RedisCommandExecutionException…

Codeforces Round 933 (Div. 3) A~D

比赛链接 : codeforces.com/contest/1941 A . Rudolf and the Ticket 直接暴力即可 ; #include<bits/stdc.h> #define IOS ios::sync_with_stdio(0);cin.tie(0);cout.tie(0); #define endl \n #define lowbit(x) (x&(-x)) #define sz(a) (int)a.size() #define p…

【阿里云系列】-基于云效构建部署Springboot项目到ACK

介绍 为了提高项目迭代的速度加速交付产品给客户&#xff0c;我们通常会选择CICD工具来减少人力投入产生的成本&#xff0c;开源的工具比如有成熟的Jenkins&#xff0c;但是本文讲的是阿里云提高的解决方案云效平台&#xff0c;通过配置流水线的形式实现项目的快速部署到服务器…

LeetCode101题:对称二叉树(python3)

对称二叉树定义&#xff1a; 对于树中 任意两个对称节点 L 和 R &#xff0c;一定有&#xff1a; L.val R.val &#xff1a;即此两对称节点值相等。 L.left.val R.right.val &#xff1a;即 L的 左子节点 和 R 的 右子节点 对称。 L.right.val R.left.val &#xff1a;即 L…

arcgis在GIS滑坡易发性分析中的应用技术研究

我国是地质灾害多发国家&#xff0c;地质灾害的发生无论是对于地质环境还是人类生命财产的安全都会带来较大的威胁&#xff0c;因此需要开展地质灾害风险普查。利用遥感&#xff08;RS&#xff09;技术进行地质灾害调查工作具有宏观、快速、准确的特点&#xff0c;能反映出地质…

优先级队列(堆)(1)

目录 一. 优先级队列 1.1 概念 二. 优先级队列的模拟实现 2.1 堆的概念 2.2 堆的存储方式 2.3 堆的创建 2.3.1 堆向下调整 2.3.2 堆的创建 2.3.3 建堆的时间复杂度 2.4 堆的插入与删除 2.4.1 堆的插入 2.4.2 堆的删除 常见习题&#xff1a; 一. 优先级队列 1.1 概…

力扣:数组篇

1、数组理论基础 数组是存放在连续内存空间上的相同类型数据的集合。 需要两点注意的是 数组下标都是从0开始的。数组内存空间的地址是连续的 因为数组的在内存空间的地址是连续的&#xff0c;所以我们在删除或者增添元素的时候&#xff0c;就难免要移动其他元素的地址。 …

【基础CSS】

本文章属于学习笔记&#xff0c;在https://www.freecodecamp.org/chinese/learn/2022/responsive-web-design/中练习 二、 CSS 样式&#xff0c;新建一个文件.css&#xff0c;该文件不含有style标签 <style>. h1&#xff0c;h2&#xff0c;p{ text-align&#xff1a;ce…

03-自媒体文章发布-黑马头条

自媒体文章发布 1)自媒体前后端搭建 1.1)后台搭建 ①&#xff1a;资料中找到heima-leadnews-wemedia.zip解压 拷贝到heima-leadnews-service工程下&#xff0c;并指定子模块 执行leadnews-wemedia.sql脚本 添加对应的nacos配置 spring:datasource:driver-class-name: com…

Linux:导出环境变量命令export

相关阅读 Linuxhttps://blog.csdn.net/weixin_45791458/category_12234591.html?spm1001.2014.3001.5482 Linux中的内建命令export命令用于创建一个环境变量&#xff0c;或将一个普通变量导出为环境变量&#xff0c;并且在这个过程中&#xff0c;可以给该环境变量赋值。 下面…

Java后端八股文之java基础

文章目录 0.Java 中有 8 种基本数据类型1. 为什么浮点数运算会丢失精度&#xff1f;如何解决&#xff1f;2. 面向对象的三大特征2.1 封装2.2 继承2.3 多态 3. 深拷贝和浅拷贝的区别&#xff1f;什么是引用拷贝&#xff1f;4. equals方法与“”方法4.1 4.2 equals方法 5.hashcod…

计算机组成原理实验报告1 | 实验1.1 运算器实验(键盘方式)

本文整理自博主大学本科《计算机组成原理》课程自己完成的实验报告。 —— *实验环境为学校机房实验箱。 目录 一、实验目的 二、实验内容 三、实验步骤及实验结果 Ⅰ、单片机键盘操作方式实验 1、实验连线&#xff08;键盘实验&#xff09; 2、实验过程 四、实验结果的…

TortoiseSVN 报错:The server unexpectedly closed the connetion

前言 CentOS7Linux 安装subversionmod_dav_svn&#xff0c;搭建subversion(svn)服务器 The server unexpectedly closed the connetion 解决办法 重启Apache服务 shell> systemctl restart httpd

12 list的使用

文档介绍 文档介绍 1.list是可以在常数范围内的任意位置进行插入和删除的序列式容器&#xff0c;并且该容器可以前后双向迭代 2.list的底层是带头双向链表循环结构&#xff0c;双向链表中每个元素存储在互不相关的独立节点中&#xff0c;在节点中通过指针指向其前一个元素和…

【JavaScript】数据类型转换 ① ( 隐式转换 和 显式转换 | 常用的 数据类型转换 | 转为 字符串类型 方法 )

文章目录 一、 JavaScript 数据类型转换1、数据类型转换2、隐式转换 和 显式转换3、常用的 数据类型转换4、转为 字符串类型 方法 一、 JavaScript 数据类型转换 1、数据类型转换 在 网页端 使用 HTML 表单 和 浏览器输入框 prompt 函数 , 接收的数据 是 字符串类型 变量 , 该…

docker容器镜像管理+compose容器编排(持续更新中)

目录 一、 Docker的基本组成 二、 容器和镜像的关系 2.1 面向对象角度 2.2 从镜像容器角度 三、 容器命令 3.1 使用Ubuntu 3.1.1 下载镜像 3.1.2 新建和启动容器 run 3.1.3交互式 compose编排与部署 1. docker-compose部署 2. docker-compose.yml模板 …

社区维修平台|基于SpringBoot+ Mysql+Java+JSP技术的社区维修平台设计与实现(可运行源码+数据库+设计文档+部署说明+视频演示)

推荐阅读100套最新项目 最新ssmjava项目文档视频演示可运行源码分享 最新jspjava项目文档视频演示可运行源码分享 最新Spring Boot项目文档视频演示可运行源码分享 目录 前台功能效果图 住户后台功能 维修员前台功能 维修员后台功能 管理员功能登录 系统功能设计 数据库E…

数据结构:哈希表

1.散列表的概念: 根据要存储的数据记录的关键字值计算出应该存储的位置 基本思想:记录的存储位置与关键字之间存在对应关系 Loc(i)H(keyi)-----等号右边就称之为hash函数.等号左边就是对应的存储位置; 2.哈希表的优缺点 这个就是散列表的特点:查找效率高,空间利用率低;&am…