RocketMQ使用

说明:本文介绍RocketMQ的消费模式&消息类型,RocketMQ的安装参考及简单使用,参考:http://t.csdn.cn/BKFPj

消费模式

RocketMQ与RabbitMQ最大的区别在于,RocketMQ是根据消息的Topic锁定消费者的,Topic属性设置为相同的消费者,可以看做是一个消费者集群。消息模式分为以下三种:

(1)一对一

最简单的一种方式,消息的Topic只被一个消费者消费,如下:

(生产者)

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void simpleTest(){
        rocketMQTemplate.syncSend("simple","hello rocketmq!");
    }

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("s = " + s);
    }
}

执行结果

在这里插入图片描述

(2)一对多

当存在多个Topic相同的消费者时,这些消费者共同消费消息,如下:

(开启两个消费者,Topic相同)

在这里插入图片描述

(生产者)

    @Test
    public void oneToMany(){
        for (int i = 0; i < 10; i++) {
            rocketMQTemplate.syncSend("simple","one to many" + i);
        }
    }

执行结果,可以看到负载均衡策略是随机;

在这里插入图片描述

在这里插入图片描述

(3)多对多

参考一对多方式,发送多个Topic的消息,让多种Topic的消费者接收消息;

消息类型

根据消息的类型和对消息的处理,可以分为以下几种:

(1)同步消息

同步消息,消息发送到MQ,MQ保存成功后才会返回结果,在API中是以"sync"(synchronous,同步)开头的一些方法,可以看到这些方法都有返回值,可以通过返回结果判断是否发送成功;
在这里插入图片描述

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("接收到同步消息 = " + s);
    }
}

(生产者,可以通过返回结果判断发送是否成功)

	@Autowired
	private RocketMQTemplate rocketMQTemplate;
	
	@Test
	public void simpleTest1(){
	    SendResult sendResult = rocketMQTemplate.syncSend("simple", "这是一个异步消息");
	    System.out.println("sendResult.getSendStatus() = " + sendResult.getSendStatus());
	}

在这里插入图片描述

(2)异步消息

异步消息,消息发送给MQ后代码就会立即向下执行,在API中是以“asyn”(asynchronous,异步),可以手动设置发送消息成功与否执行的方法;

(生产者)

    @Test
    public void simpleTest2() throws InterruptedException {
        rocketMQTemplate.asyncSend("simple", "这是一个异步消息", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("成功信息" + sendResult.toString());
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("异常信息" + throwable.getMessage());
            }
        });
        TimeUnit.SECONDS.sleep(2);
    }

(发送消息成功,执行成功的方法)

在这里插入图片描述

需要注意,这里是指发送消息成功与否,与消费者是否成功消费无关;

(3)单向消息

单向消息,是指只管发送消息,不关系MQ是否成功接收,没有返回值;

    @Test
    public void simpleTest3() {
        rocketMQTemplate.sendOneWay("simple", "这是一个单向消息");
    }

(4)延迟消息

延迟消息,指给消息设置一个延迟级别,达到指定时间后,消费者才能收到这个消息,延迟级别如下:

# 延迟级别,从1开始
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

(生产者)

    @Test
    public void simpleTest4() {
        // 设置超时为1秒,延迟等级为3,即10秒
        rocketMQTemplate.syncSend("simple", MessageBuilder.withPayload("这是一个延迟消息").build(),1000,3);
    }

(消费者,10秒后才收到消息)

在这里插入图片描述

延迟消息相较于RabbitMQ,使用起来更方便,但是只能设置时间等级,不能设置准确时间,非常难受;

(5)批量消息

RocketMQ可以发送一个集合,如下:

(消费者)

    @Test
    public void simpleTest5(){

        ArrayList<Message> list = new ArrayList<>();
        list.add(MessageBuilder.withPayload("aaa").build());
        list.add(MessageBuilder.withPayload("bbb").build());
        list.add(MessageBuilder.withPayload("ccc").build());

        rocketMQTemplate.syncSend("simple", list, 3000);
    }

(执行结果)

在这里插入图片描述

(6)消息过滤

消息过滤,是RocketMQ较与RabbitMQ独有的功能,指对发送的消息进行过滤,指接收限定条件的消息,对消息进行限制接收。有两种方式,如下:

a. 标签过滤

在发送消息时,指定topic的同时,加上一个标签,表示只发给有这个标签的消费者;

(生产者)

    @Test
    public void simpleTest6(){
        rocketMQTemplate.syncSend("simple:tag", "Tag Message");
    }

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple", selectorExpression = "tag1")
public class ConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("接收到标签过滤消息 = " + s);
    }
}

(执行结果)

在这里插入图片描述

b. SQL过滤

另一种是SQL过滤的方式,在消费者这边,写SQL语句对消息进行过滤消息;

(生产者,设置name = SQL)

    @Test
    public void simpleTest6(){
        // 标签方式
        rocketMQTemplate.syncSend("simple:tag", "Tag Message");

        // SQL语句方式
        rocketMQTemplate.syncSend("simple",
                MessageBuilder.withPayload("SQL Message")
                        .setHeader("name","SQL")
                        .build());
    }

(消费者,只接受name = SQL的消息)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple",
 selectorType = SelectorType.SQL92, selectorExpression = "name = 'SQL'")
public class ConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("接收到SQL语句过滤消息 = " + s);
    }
}

(执行结果)

在这里插入图片描述

(7)对象消息

RocketMQ当然也可以发送对象作为消息,该对象应该要实现Serializable接口,如下:

import java.io.Serializable;

public class User implements Serializable {

    private String username;

    private String password;

    public User() {
    }

    public User(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}

(生产者)

    @Test
    public void simpleTest7(){
        User user = new User();
        user.setUsername("zhangsan");
        user.setPassword("123456");
        rocketMQTemplate.syncSend("simple", user);
    }

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<User> {
    @Override
    public void onMessage(User user) {
        System.out.println("user = " + user);
    }
}

(执行结果)

在这里插入图片描述

(8)顺序消息

顺序消息,是指消息从发送到被消费,需要始终保持前后顺序。如下,发送15次消息,可以看到消费者那边的消费顺序并不是一直的;

    @Test
    public void simpleTest1() {
        for (int i = 0; i < 15; i++) {
            rocketMQTemplate.syncSend("simple", "这是一个同步消息===>" + i);
        }
    }

在这里插入图片描述

顺序消息,需要保证以下两方面:

  • 所有的消息存入到MQ中的同一个队列中,因为RocketMQ默认有四个队列,消息会被负载均衡存储在这些队列里;

  • 该队列只能被一个线程消费,因为一个队列的消息在消费时会有多个线程同时进行消费;

前者可以通过,XxxOrderly()方法实现消息在队列中的顺序存储,如下:

(生产者:给对象设置一个ID,让它们按照ID顺序存储在MQ中)

    @Test
    public void simpleTest8(){
        ArrayList<User> users = new ArrayList<>();

        User user1 = new User("1","zhangsan","zs");
        User user2 = new User("2","lisi","ls");
        User user3 = new User("3","wangwu","ww");

        users.add(user1);
        users.add(user2);
        users.add(user3);


        for (User user : users) {
            rocketMQTemplate.syncSendOrderly("simple",user,user.getId());
        }
    }

后者,可以通过在消费者这边添加这个配置,保证消息被顺序消费,如下:

(消费者,设置消费模式 consumeMode = ConsumeMode.ORDERLY

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple",consumeMode = ConsumeMode.ORDERLY)
public class ConsumerListener implements RocketMQListener<User> {

    @Override
    public void onMessage(User user) {
        System.out.println(user);
    }
}

执行结果,可以看到消息时顺序进行的

在这里插入图片描述

总结

RocketMQ的内容还有很多,可参考 http://t.csdn.cn/QXQNZ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/65621.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

黑马机器学习day1

1.sklearn数据集 sklearn.datasets datasets.load_*() 获取小规模的数据集 datasets.fetch_*(data_homeNone) 获取大规模数据集 函数的第一个参数是data_home,标识数据集下载目录&#xff0c;默认/scikit_learn_data/ 1.1sklearn小数据集 sklearn.da…

【JAVA】类和对象

作者主页&#xff1a;paper jie的博客 本文作者&#xff1a;大家好&#xff0c;我是paper jie&#xff0c;感谢你阅读本文&#xff0c;欢迎一建三连哦。 本文录入于《JAVASE语法系列》专栏&#xff0c;本专栏是针对于大学生&#xff0c;编程小白精心打造的。笔者用重金(时间和精…

使用eXosip+ffmpeg、ffplay命令行实现sip客户端

文章目录 前言一、关键实现1、主要流程2、解决端口冲突&#xff08;1&#xff09;、出现原因&#xff08;2&#xff09;、解决方法 3、解析sdp&#xff08;1&#xff09;、定义实体&#xff08;2&#xff09;、解析视频&#xff08;3&#xff09;、解析音频 4、命令行推拉流&am…

Python中搭建IP代理池的妙招

在Python的爬虫世界里&#xff0c;你是否也想搭建一个功能强大的IP代理池&#xff0c;让你的爬虫无忧无虑地畅游各大网站&#xff1f;今天&#xff0c;我就来教你使用Scrapy框架搭建IP代理池&#xff0c;让你的爬虫更加智能、高效&#xff01;跟着我一步一步来&#xff0c;轻松…

(力扣)用两个队列实现栈---C语言

分享一首歌曲吧&#xff0c;希望在枯燥的刷题生活中带给你希望和勇气&#xff0c;加油&#xff01; 题目&#xff1a; 请你仅使用两个队列实现一个后入先出&#xff08;LIFO&#xff09;的栈&#xff0c;并支持普通栈的全部四种操作&#xff08;push、top、pop 和 empty&#…

【单片机】51单片机,TLC2543,驱动程序,读取adc

TLC2543 是一款 12 位精密模数转换器 (ADC)。 1~9、11、12——AIN0&#xff5e;AIN10为模拟输入端&#xff1b; 15——CS 为片选端&#xff1b; 17——DIN 为串行数据输入端&#xff1b;&#xff08;控制字输入端&#xff0c;用于选择转换及输出数据格式&#xff09; 16——…

大数据课程G2——Hbase的基本架构

文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 掌握Hbase的基本架构; ⚪ 掌握Hbase的读写流程; ⚪ 掌握Hbase的设计与优化; 一、基本架构 1. HRegion 1. 在HBase中,会将一个表从行键方向上进行切分,切分成1个或者多个HRegion。 …

C++入门(小白篇1—编译器安装-代码注释等)

前言&#xff1a; 最近想学一下一下C看了一些博客内容写的倒是很充实&#xff0c;但是&#xff0c;细节不到位&#xff0c;我是有Python基础的&#xff0c;所以学习来蛮快的&#xff0c;但是对于小白的话&#xff0c;有好多小细节大多数博客还是不够详细&#xff0c;由此我想写…

对话Sam Altman与Greg Brockman:初心和过去,信念和现在,责任和未来

导读 近日&#xff0c;硅谷著名投资人Reid Hoffman和Aria Finger联手对Sam Altman和Greg Brockman进行了一场访谈&#xff0c;访谈涉及到主题有&#xff1a;OpenAI的使命&#xff0c;人工智能对教育、医疗等行业的变革性影响&#xff0c;人工智能如何面对监管&#xff0c;OpenA…

十九、docker学习-Dockerfile

Dockerfile 官网地址 https://docs.docker.com/engine/reference/builder/Dockerfile其实就是我们用来构建Docker镜像的源码&#xff0c;当然这不是所谓的编程源码&#xff0c;而是一些命令的集合&#xff0c;只要理解它的逻辑和语法格式&#xff0c;就可以很容易的编写Docke…

Effective Java笔记(28)列表优于数组

数组与泛型相比&#xff0c;有两个重要的不同点 。 首先&#xff0c;数组是协变的&#xff08; covariant &#xff09; 。 这个词听起来有点吓人&#xff0c;其实只是表示如果 Sub 为 Super 的子类型&#xff0c;那么数组类型 Sub[ ]就是Super[ ]的子类型。 相反&#xff0c;泛…

【maven】构建项目前clean和不clean的区别

其实很简单&#xff0c;但是百度搜了一下&#xff0c;还是没人能简单说明白。 搬用之前做C项目时总结结论&#xff1a; 所以自己在IDE里一遍遍测试程序能否跑通的时候&#xff0c;不需要clean&#xff0c;因为反正还要改嘛。 但是这个项目测试好了&#xff0c;你要打成jar包给…

【Vue3】动态组件

动态组件的基本使用 动态组件&#xff08;Dynamic Components&#xff09;是一种在 Vue 中根据条件或用户输入来动态渲染不同组件的技术。 在 Vue 中使用动态组件&#xff0c;可以使用 元素&#xff0c;并通过 is 特性绑定一个组件的名称或组件对象。通过在父组件中改变 is 特…

AirServer2023最新Mac苹果电脑系统投屏软件

AirServer是一个Mac专用投屏工具&#xff0c;功能强大&#xff0c;并且可以通过网络和其他平台同步视频内容。可以使用多个设备进行投屏&#xff0c;快速查看同一局域网内的视频。支持的设备&#xff1a;苹果系统。支持 Windows、 Mac、 Android、 iOS、 windows平台。通过这款…

C++笔记之两个类的实例之间传递参数——通过构造函数传递类对象的方法详细探究

C笔记之两个类的实例之间传递参数——通过构造函数传递类对象的方法详细探究 code review! 文章目录 C笔记之两个类的实例之间传递参数——通过构造函数传递类对象的方法详细探究1.传递对象的const引用——ClassB的实例只能访问ClassA的实例&#xff0c;但不会修改ClassA的实…

js:Markdown编辑器Vue3版本md-editor-v3

文档 https://github.com/imzbf/md-editor-v3https://imzbf.github.io/md-editor-v3/zh-CN/index 安装 npm install md-editor-v3使用 <template><MdEditor v-model"text" /> </template><script setup> import { ref } from vue; impor…

EFLFK——ELK日志分析系统+kafka+filebeat架构(3)

zookeeperkafka分布式消息队列集群的部署 紧接上期&#xff0c;在ELFK的基础上&#xff0c;添加kafka做数据缓冲 附kafka消息队列 nginx服务器配置filebeat收集日志&#xff1a;192.168.116.40&#xff0c;修改配置将采集到的日志转发给kafka&#xff1b; kafka集群&#xff…

RabbitMQ在CentOS下的安装

RabbitMQ的版本是3.8.2 1.环境配置&#xff1a;CentOs 7.6以上版本&#xff0c;我的版本是7.9&#xff0c;不要对yum换源&#xff0c;否则可能会安装失败。 echo "export LC_ALLen_US.UTF-8" >> /etc/profile source /etc/profile 以上命令&#xff0c;是…

网络优化工程师,你到底了解多少?

5G网络优化工程师到底是什么&#xff1f; 5G&#xff0c;第五代移动通信技术&#xff08;5th Generation Mobile Communication Technology&#xff0c;简称5G&#xff09;是具有高速率、低时延和大连接特点的新一代宽带移动通信技术&#xff0c;5G通讯设施是实现人机物互联的…

【MySQL】sql字段约束

在MySQL中&#xff0c;我们需要存储的数据在特定的场景中需要不同的约束。当新插入的数据违背了该字段的约束字段&#xff0c;MySQL会直接禁止插入。 数据类型也是一种约束&#xff0c;但数据类型这个约束太过单一&#xff1b;比如我需要存储的是一个序号&#xff0c;那就不可…