SpringBoot集成RocketMQ

SpringBoot整合RocketMQ使用非常简单,下面是一个简单的例子,作为备忘:

完整项目代码: https://github.com/dccmmtop/springBootRocketMQ

项目目录结构

依赖

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.1.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.springframework</groupId>
                <artifactId>spring-core</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>2.1.6.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <version>2.1.6.RELEASE</version>
    </dependency>
<dependencies>

配置类 ExtRocketMQTemplate

package com.roy.rocketmq.config;

import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

@ExtRocketMQTemplateConfiguration()
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

消费者 SpringConsumer

package com.roy.rocketmq.basic;

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
// consumerGroup 相同角色的消费者需要具有完全相同的订阅和consumerGroup才能正确实现负载均衡。它是必需的并且需要是全球唯一的。
// consumeMode 控制消费模式,可以选择并发或顺序接收消息。
// 该注解还有很多其他属性,请详细查看源码
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode= ConsumeMode.CONCURRENTLY)
public class SpringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Received message : "+ message);
    }
}

发送者示例 SpringProducer

package com.roy.rocketmq.basic;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class SpringProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    // 就是如此简单
    public void sendMessage(String topic,String msg){
        this.rocketMQTemplate.convertAndSend(topic,msg);
    }
}

更多的发消息示例

带有返回数据的消费者

package com.roy.rocketmq.basic;

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup1", topic = "TestTopicReplyString",consumeMode= ConsumeMode.CONCURRENTLY)
// 这里注意要使用与收到消息对应的类型 <String,String> 代表接收string 消息 返回的也是string类型的消息
public class SpringConsumerWithReply implements RocketMQReplyListener<String,String> {

    @Override
    public String onMessage(String message) {
        System.out.println("收到消息,并返回结果, " + , message);
        return "OK";
    }
}

接收 User 返回User

package com.roy.rocketmq.basic;

import com.roy.rocketmq.domain.User;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup2", topic = "TestTopicUser",consumeMode= ConsumeMode.CONCURRENTLY)
public class SpringConsumerReplyUser implements RocketMQReplyListener<User,User> {


    @Override
    public User onMessage(User user) {
        System.out.println("收到User: " + user.toString());
        return user;
    }
}

User

package com.roy.rocketmq.domain;

public class User {
    private String userName;
    private Byte userAge;

    public String getUserName() {
        return userName;
    }

    public User setUserName(String userName) {
        this.userName = userName;
        return this;
    }

    public Byte getUserAge() {
        return userAge;
    }

    public User setUserAge(Byte userAge) {
        this.userAge = userAge;
        return this;
    }

    @Override
    public String toString() {
        return "User{" +
                "userName='" + userName + '\'' +
                ", userAge=" + userAge +
                '}';
    }
}

OrderPaidEvent

package com.roy.rocketmq.domain;

import java.math.BigDecimal;

public class OrderPaidEvent {
    private String orderId;

    private BigDecimal paidMoney;

    public OrderPaidEvent() {
    }

    public OrderPaidEvent(String orderId, BigDecimal paidMoney) {
        this.orderId = orderId;
        this.paidMoney = paidMoney;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public BigDecimal getPaidMoney() {
        return paidMoney;
    }

    public void setPaidMoney(BigDecimal paidMoney) {
        this.paidMoney = paidMoney;
    }
}

发送

package com.roy.rocketmq;

import com.roy.rocketmq.domain.OrderPaidEvent;
import com.roy.rocketmq.domain.User;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQLocalRequestCallback;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.MimeTypeUtils;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringRocketTest {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void sendMessageTest(){
        String springTopic="TestTopic";
        String springTopicReplyString="TestTopicString";
        String springTopicUser="TestTopicUser";
        //发送字符消息
        SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "你好!");
        System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);

        sendResult = rocketMQTemplate.syncSend(springTopic, new User().setUserAge((byte) 18).setUserName("Kitty"));
        System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);

        sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload(
                new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
        System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);

        //发送对象消息
        rocketMQTemplate.asyncSend(springTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                System.out.printf("async onSucess SendResult=%s %n", var1);
            }

            @Override
            public void onException(Throwable var1) {
                System.out.printf("async onException Throwable=%s %n", var1);
            }
        });

        //发送指定TAG的消息
        // 注意这里指定tag的方式,topic + : + tag
        rocketMQTemplate.convertAndSend(springTopic + ":tag0", "I'm from tag0");  // tag0 will not be consumer-selected
        System.out.printf("syncSend topic %s tag %s %n", springTopic, "tag0");
        rocketMQTemplate.convertAndSend(springTopic + ":tag1", "I'm from tag1");
        System.out.printf("syncSend topic %s tag %s %n", springTopic, "tag1");

        //同步发送消息并且返回一个String类型的结果。
        String replyString = rocketMQTemplate.sendAndReceive(springTopicReplyString, MessageBuilder.withPayload("同步发送测试").build(), String.class);
        System.out.printf("send %s and receive %s %n", "request string", replyString);

        //同步发送消息并且返回一个Byte数组类型的结果。
        byte[] replyBytes = rocketMQTemplate.sendAndReceive(springTopicReplyString, MessageBuilder.withPayload("request byte[]").build(), byte[].class, 3000);
        System.out.printf("send %s and receive %s %n", "request byte[]", new String(replyBytes));

        // 同步发送一个带hash参数的请求(排序消息),并返回一个User类型的结果
        User requestUser = new User().setUserAge((byte) 9).setUserName("requestUserName");
        User replyUser = rocketMQTemplate.sendAndReceive(springTopicUser, requestUser, User.class, "order-id");
        System.out.printf("send %s and receive %s %n", requestUser, replyUser);

        //同步发送一个带延迟级别的消息(延迟消息)
        String replyGenericObject = rocketMQTemplate.sendAndReceive(springTopicReplyString, "request generic",
                String.class, 30000, 2);
        System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);


        //异步发送消息,返回String类型结果。
        rocketMQTemplate.sendAndReceive(springTopicReplyString, "request string", new RocketMQLocalRequestCallback<String>() {
            @Override public void onSuccess(String message) {
                System.out.printf("send %s and receive %s %n", "request string", message);
            }

            @Override public void onException(Throwable e) {
                e.printStackTrace();
            }
        });
        //异步发送消息,并返回一个User类型的结果。
        rocketMQTemplate.sendAndReceive(springTopicUser, new User().setUserAge((byte) 9).setUserName("requestUserName"), new RocketMQLocalRequestCallback<User>() {
            @Override public void onSuccess(User message) {
                System.out.printf("send user object and receive %s %n", message.toString());
            }

            @Override public void onException(Throwable e) {
                e.printStackTrace();
            }
        }, 5000);
        //发送批量消息
        List<Message> msgs = new ArrayList<Message>();
        for (int i = 0; i < 10; i++) {
            msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
                    setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
        }

        SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);

        System.out.printf("--- Batch messages send result :" + sr);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/50474.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

18.Netty源码之ByteBuf 详解

highlight: arduino-light ByteBuf 是 Netty 的数据容器&#xff0c;所有网络通信中字节流的传输都是通过 ByteBuf 完成的。 然而 JDK NIO 包中已经提供了类似的 ByteBuffer 类&#xff0c;为什么 Netty 还要去重复造轮子呢&#xff1f;本节课我会详细地讲解 ByteBuf。 JDK NIO…

VMware搭建Hadoop集群 for Windows(完整详细,实测可用)

目录 一、VMware 虚拟机安装 &#xff08;1&#xff09;虚拟机创建及配置 &#xff08;2&#xff09;创建工作文件夹 二、克隆虚拟机 三、配置虚拟机的网络 &#xff08;1&#xff09;虚拟网络配置 &#xff08;2&#xff09;配置虚拟机 主机名 &#xff08;3&#xf…

【LLM】大语言模型学习之LLAMA 2:Open Foundation and Fine-Tuned Chat Model

大语言模型学习之LLAMA 2:Open Foundation and Fine-Tuned Chat Model 快速了解预训练预训练模型评估微调有监督微调(SFT)人类反馈的强化学习(RLHF)RLHF结果局限性安全性预训练的安全性安全微调上手就干使用登记代码下载获取模型转换模型搭建Text-Generation-WebUI分发模型…

高效率,38V最大输入单电感同步升/降稳压器SYV939C

SYV939是一种高压同步降压-升压转换器。该器件工作在4V至28V的宽输入电压范围内&#xff0c;具有10max平均电感电流能力。四个集成的低RDS(ON)开关最大限度地减少了传导损耗。 SYV939c包括完整的保护功能&#xff0c;如输出过流/短路保护&#xff0c;过压保护和热停机&#xff…

了解Unity编辑器之组件篇Playables和Rendering(十)

Playables 一、Playable Director&#xff1a;是一种用于控制和管理剧情、动画和音频的工具。它作为一个中央控制器&#xff0c;可以管理播放动画剧情、视频剧情和音频剧情&#xff0c;以及它们之间的时间、顺序和交互。 Playable Director组件具有以下作用&#xff1a; 剧情控…

【MATLAB第61期】基于MATLAB的GMM高斯混合模型回归数据预测

【MATLAB第61期】基于MATLAB的GMM高斯混合模型回归数据预测 高斯混合模型GMM广泛应用于数据挖掘、模式识别、机器学习和统计分析。其中&#xff0c;它们的参数通常由最大似然和EM算法确定。关键思想是使用高斯混合模型对数据&#xff08;包括输入和输出&#xff09;的联合概率…

最新版Onenet云平台HTTP协议接入上传数据

2023年最新版Onenet更新后&#xff0c;原来的多协议接口已经找不到&#xff0c;由于需要用HTTP接入&#xff0c;就研究了一下新版Onenet云平台&#xff0c;搞清楚Onenet云平台的鉴权信息&#xff0c;就知道怎么上传数据了&#xff0c;包括后续上传实际数据&#xff0c;其实只需…

JVM简述

JDK&JRE&JVMJVM运行时内存结构图方法区堆区栈区程序计数器本地方法栈 JVM 的主要组成部分及其作用 JDK&JRE&JVM JVM就是java虚拟机&#xff0c;一台虚拟的机器&#xff0c;用来运行java代码 但并不是只有这台机器就可以的&#xff0c;java程序在运行时需要依赖…

【图论】kruskal算法

一.介绍 Kruskal&#xff08;克鲁斯卡尔&#xff09;算法是一种用于解决最小生成树问题的贪心算法。最小生成树是指在一个连通无向图中&#xff0c;选择一棵包含所有顶点且边权重之和最小的树。 下面是Kruskal算法的基本步骤&#xff1a; 将图中的所有边按照权重从小到大进行…

C# 快速写入日志 不卡线程 生产者 消费者模式

有这样一种场景需求&#xff0c;就是某个方法&#xff0c;对耗时要求很高&#xff0c;但是又要记录日志到数据库便于分析&#xff0c;由于访问数据库基本都要几十毫秒&#xff0c;可在方法里写入BlockingCollection&#xff0c;由另外的线程写入数据库。 可以看到&#xff0c;在…

2023河南萌新联赛第(三)场:郑州大学 A - 发工资咯

2023河南萌新联赛第&#xff08;三&#xff09;场&#xff1a;郑州大学 A - 发工资咯 时间限制&#xff1a;C/C 2秒&#xff0c;其他语言4秒 空间限制&#xff1a;C/C 262144K&#xff0c;其他语言524288K 64bit IO Format: %lld 题目描述 一个公司有n个人&#xff0c;每个月都…

TCP如何保证服务的可靠性

TCP如何保证服务的可靠性 确认应答超时重传流量控制滑动窗口机制概述发送窗口和接收窗口的工作原理几种滑动窗口协议1比特滑动窗口协议&#xff08;停等协议&#xff09;后退n协议选择重传协议 采用滑动窗口的问题&#xff08;死锁可能&#xff0c;糊涂窗口综合征&#xff09;死…

iostat工具使用

文章目录 iostat命令简介iostat命令参数 iostat输出信息CPU利用率输出信息磁盘利用率输出信息更详细的磁盘利用率输出信息 iostat命令使用示例iostat -kdx 1 iostat数据来源相关参考 iostat命令简介 iostat工具可用于CPU使用统计信息和设备的输入输出统计信息。iostat能支持显…

数据结构—数组和广义表

4.2数组 数组&#xff1a;按一定格式排列起来的&#xff0c;具有相同类型的数据元素的集合。 **一维数组&#xff1a;**若线性表中的数据元素为非结果的简单元素&#xff0c;则称为一维数组。 **一维数组的逻辑结构&#xff1a;**线性结构&#xff0c;定长的线性表。 **声明…

Vue通过指令 命令将打包好的dist静态文件上传到腾讯云存储桶 (保存原有存储目录结构)

1、在项目根目录创建uploadToCOS.js文件 (建议起简单的名字 方便以后上传输入命令方便) 2、uploadToCOS.js文件代码编写 const path = require(path); const fs = require(fs); const COS = require(cos-nodejs-sdk-v5);// 配置腾讯云COS参数 const cos = new COS({SecretI…

基于Docker-compose创建LNMP环境并运行Wordpress网站平台

基于Docker-compose创建LNMP环境并运行Wordpress网站平台 1.Docker-Compose概述2.YAML文件格式及编写注意事项3.Docker-Compose配置常用字段4.Docker Compose常用命令5.使用Docker-compose创建LNMP环境&#xff0c;并运行Wordpress网站平台1. Docker Compose 环境安装下载安装查…

《入门级-Cocos2d 4.0塔防游戏开发》---第二课:游戏加载界面开发

目录 一、开发环境介绍 二、开发内容 2.1 修改窗口的大小。 2.2 添加加载场景相关代码 2.3 添加资源 三、显示效果 四、知识点 4.1 Sprite 4.2 定时器 一、开发环境介绍 操作系统&#xff1a;UOS1060专业版本。 cocos2dx:版本 环境搭建教程&#xff1a; 统信UOS下配…

cURL error 1: Protocol “https“ not supported or disabled in libcurl

1、php项目composer update报错 2、curl -V检查 发现curl已经支持了https了 3、php版本检查 4、php插件检查 插件也已经含有openssl组件了 5、phpinfo检查 curl是否开启ssl 定位到问题所在&#xff0c;php7.4的 curl扩展不支持 https 需要重装 php7.4的curl扩展 6、curl下载 下…

大学生活题解

样例输入&#xff1a; 3 .xA ... Bx.样例输出&#xff1a; 6思路分析&#xff1a; 这道题只需要在正常的广搜模板上多维护一个— —方向&#xff0c;如果当前改变方向&#xff0c;就坐标不变&#xff0c;方向变&#xff0c;步数加一&#xff1b;否则坐标变&#xff0c;方向不…

微信小程序radio单选按钮选中与取消

wxml <view bindtapcheckedTap><radio checked"{{checked}}">设为默认</radio> </view> wxss <style lang"less" > radio .wx-radio-input {border-radius: 50%; /* 圆角 */width: 24rpx;border: 2rpx solid #5e5e5f;hei…