Spring boot框架下的RocketMQ消息中间件

1. RocketMQ 基础概念

1.1 核心概念

以下是 RocketMQ 核心概念在 Spring Boot 的 Java 后端代码中的实际使用方式:

Producer(生产者)

定义:Producer 是负责发送消息到 RocketMQ 的组件。它可以将消息发送到指定的 Topic。

实际代码使用 RocketMQTemplate 提供的 API 发送消息。

@Service
public class DemoProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendMessage(String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
        System.out.println("Message sent to topic: " + topic);
    }
}

Consumer(消费者)

定义:Consumer 是负责接收和处理从 RocketMQ 中接收到的消息的组件。

实际代码使用 @RocketMQMessageListener 注解标注的类来实现消费者功能。

@Service
@RocketMQMessageListener(topic = "demo-topic", consumerGroup = "demo-consumer-group")
public class DemoConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

Topic(主题)

定义:Topic 是消息分类的基本单元,Producer 将消息发送到 Topic,Consumer 从 Topic 中接收消息。

实际代码 Topic 是通过代码中指定的字符串定义的,例如 demo-topic

  • Producer 中指定 Topic:
    rocketMQTemplate.convertAndSend("demo-topic", "Hello RocketMQ!");
    
  • Consumer 中监听 Topic:
    @RocketMQMessageListener(topic = "demo-topic", consumerGroup = "demo-consumer-group")
    

Message(消息)

定义:消息是 Producer 发送到 Topic 的载体,包含消息体和属性。

实际代码示例

  • 发送简单文本消息:
    rocketMQTemplate.convertAndSend("demo-topic", "Hello RocketMQ!");
    
  • 发送带属性的消息:
    Message message = MessageBuilder.withPayload("Hello with properties")
                                    .setHeader("key", "value")
                                    .build();
    rocketMQTemplate.syncSend("demo-topic", message);
    

1.2 消息模型

点对点(Queue)模型

定义

  • 每条消息只能被一个消费者(Consumer)消费。

  • 消息会被存储在一个队列(Queue)中,消费者从队列中拉取消息。

  • 适合任务分发、负载均衡等场景。

特点

  • 消息独占性:一条消息只能被一个消费者消费。

  • 负载均衡:如果有多个消费者属于同一个消费者组(Consumer Group),消息会均匀分配给组内的消费者。

  • 消息顺序性:在 RocketMQ 中,队列(Queue)是 Topic 的底层实现,每个 Queue 内的消息是有序的。

实现

  • 在 RocketMQ 中,Queue 是 Topic 的底层实现,开发者无需直接配置 Queue,只需关注 Topic 和 Consumer Group。

  • 多个消费者属于同一个 Consumer Group 时,消息会按照负载均衡策略分配给组内的消费者。


 发布订阅(Topic)模型

定义

  • 消息会被广播给所有订阅了该 Topic 的消费者。

  • 每个消费者都会收到相同的消息。

  • 适合广播通知、事件分发等场景。

特点

  • 消息广播:一条消息会被发送给所有订阅了该 Topic 的消费者。

  • 消费者独立性:每个消费者独立消费消息,彼此之间没有影响。

  • 无负载均衡:所有消费者都会收到相同的消息,而不是分配不同的消息。

实现

  • 在 RocketMQ 中,可以通过设置 MessageModel.BROADCASTING 来实现广播消费。

  • 每个消费者都会收到 Topic 中的所有消息。

示例:在 Spring Boot 中实现广播消费

@RocketMQMessageListener(
    topic = "broadcast-topic",
    consumerGroup = "broadcast-group",
    messageModel = MessageModel.BROADCASTING
)
public class BroadcastConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Broadcast received: " + message);
    }
}

两者的核心区别

特性点对点(Queue)模型发布订阅(Topic)模型
消息消费方式一条消息只能被一个消费者消费一条消息会被所有消费者消费
负载均衡支持,消息会均匀分配给组内的消费者不支持,所有消费者收到相同消息
适用场景任务分发、订单处理等需要负载均衡的场景广播通知、事件分发等需要广播的场景
消费者组的作用组内消费者共享消息,实现负载均衡组内消费者各自独立消费所有消息
消息顺序性单个 Queue 内的消息是有序的无顺序性,所有消费者收到相同消息

2. 环境准备

2.1 安装与运行 RocketMQ

1. 下载 RocketMQ

  • 访问 RocketMQ 的官方 GitHub 仓库:Apache RocketMQ GitHub

  • 下载最新版本的 RocketMQ 二进制包(例如:rocketmq-all-x.x.x-bin-release.zip)。

  • 解压下载的文件

2. 启动 NameServer

  • NameServer 是 RocketMQ 的注册中心,负责管理 Broker 的路由信息。

  • 启动 NameServer:

    sh bin/mqnamesrv &
  • 检查 NameServer 是否启动成功:查看日志文件 logs/rocketmqlogs/namesrv.log,确认是否有以下内容:The Name Server boot success...

3. 启动 Broker

  • Broker 是 RocketMQ 的消息存储和转发服务器。

  • 启动 Broker,并指定 NameServer 地址:

    sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
    • -n localhost:9876:指定 NameServer 的地址。

    • autoCreateTopicEnable=true:允许自动创建 Topic。

  • 检查 Broker 是否启动成功:

    • 查看日志文件 logs/rocketmqlogs/broker.log,确认是否有以下内容:The broker boot success...

4. 验证 RocketMQ 运行状态

  • 使用 RocketMQ 提供的工具命令检查服务状态:

    sh bin/mqadmin clusterList -n localhost:9876
  • 如果看到 Broker 和 NameServer 的信息,说明 RocketMQ 已成功启动。

2.2 RocketMQ Dashboard

RocketMQ Dashboard 是一个可视化管理工具,用于监控和管理 RocketMQ 的 Topic、Producer、Consumer 等信息。

1. 下载 RocketMQ Dashboard

  • 访问 RocketMQ Dashboard 的官方 GitHub 仓库:RocketMQ Dashboard GitHub

  • 下载最新版本的 Dashboard 二进制包(例如:rocketmq-dashboard-x.x.x.jar)。

2. 部署 RocketMQ Dashboard

  • 使用以下命令启动 Dashboard:

    java -jar rocketmq-dashboard-x.x.x.jar
  • 默认情况下,Dashboard 会监听 8080 端口。

3. 访问 RocketMQ Dashboard

  • 打开浏览器,访问 http://localhost:8080

  • 在 Dashboard 页面中,配置 NameServer 地址(例如:localhost:9876),然后点击连接。

4. 监控和管理 RocketMQ

  • Topic 管理:查看所有 Topic 的详细信息,包括消息堆积情况、生产者、消费者等。

  • Producer 管理:查看生产者的运行状态和消息发送情况。

  • Consumer 管理:查看消费者的运行状态和消息消费情况。

  • 消息查询:支持按 Message ID、Topic、Key 等条件查询消息。

  • 集群状态:查看 NameServer 和 Broker 的运行状态。

3. Spring Boot 集成 RocketMQ 的依赖配置 

3.1 添加依赖

pom.xml 文件中添加 Spring Boot 提供的 RocketMQ 集成 Starter 依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version> <!-- 确保版本为你需要的最新稳定版 -->
</dependency>

3.2 配置文件

application.propertiesapplication.yml 中配置 RocketMQ 的相关属性。

# 指定 RocketMQ NameServer 地址
rocketmq.name-server=127.0.0.1:9876

# 配置生产者组名称
rocketmq.producer.group=demo_producer_group

# 配置消费者组名称
rocketmq.consumer.group=demo_consumer_group

配置项说明

配置项说明
rocketmq.name-serverRocketMQ NameServer 的地址,格式为 IP:Port,例如 127.0.0.1:9876

支持集群地址,多个地址用分号分隔,例如:127.0.0.1:9876;127.0.0.2:9876。

rocketmq.producer.group生产者组的名称,用于标识一组生产者。
rocketmq.consumer.group消费者组的名称,用于标识一组消费者。
rocketmq.access-key如果使用阿里云 RocketMQ,需要配置 Access Key。
rocketmq.secret-key如果使用阿里云 RocketMQ,需要配置 Secret Key。
rocketmq.send-message-timeout生产者发送消息的超时时间,默认值为 3000(单位:毫秒)。
rocketmq.consumer.consume-thread-nums

消费者消费消息的线程数,默认值为 20

rocketmq.consumer.maxReconsumeTimes自定义重试次数,消息消费失败会自动重试,默认最大重试次数为 16 次(包括第一次消费)

4. Spring Boot 集成 RocketMQ 的消息生产与消费

RocketMQTemplate 是否需要定义为 Bean?

  • 不需要手动定义:在 Spring Boot 项目中,RocketMQTemplate 已经由 rocketmq-spring-boot-starter 自动配置为 Bean,因此可以直接通过 @Autowired 注入使用。

  • 自动配置原理

    • rocketmq-spring-boot-starter 会在启动时自动创建 RocketMQTemplate 的实例。

    • 只要在 application.properties 或 application.yml 中正确配置了 RocketMQ 的相关参数(如 rocketmq.name-server),Spring Boot 就会自动完成初始化。

发送非 String 类型的消息

  • 支持任意类型RocketMQTemplate 支持发送任意类型的消息,包括 POJO(普通 Java 对象)。

  • 实现方式

    • RocketMQ 会自动将消息序列化为 JSON 格式(默认使用 Jackson)。

    • 消费者接收时,RocketMQ 会自动将 JSON 反序列化为对应的 Java 对象。

示例代码

发送 POJO 消息:

public class User {
    private String name;
    private int age;
    // 省略 getter 和 setter
}

// 生产者
public void sendUserMessage(String topic, User user) {
    rocketMQTemplate.convertAndSend(topic, user);
    System.out.println("Sent user message: " + user);
}

消费 POJO 消息:

@Service
@RocketMQMessageListener(
    topic = "user_topic",
    consumerGroup = "user_consumer_group"
)
public class UserConsumer implements RocketMQListener<User> {

    @Override
    public void onMessage(User user) {
        System.out.println("Received user message: " + user.getName() + ", age: " + user.getAge());
    }
}

4.1 消息生产(Producer)

一般会定义一个消息生产者类,然后其他类调用这个类包装好的消息发送方法。

发送普通消息

使用场景:发送简单的文本或对象消息。

实现方式使用 RocketMQTemplate.convertAndSend(topic, message) 方法发送消息。

示例代码

@Autowired
private RocketMQTemplate rocketMQTemplate;

public void sendSimpleMessage(String topic, String message) {
    rocketMQTemplate.convertAndSend(topic, message);
    System.out.println("Sent simple message: " + message);
}

发送带 Tag 的消息

使用场景:通过 Tag 对消息进行分类,消费者可以根据 Tag 过滤消息。

实现方式在 Topic 后添加 :Tag消息目的地的格式为:topic:tag

示例代码

public void sendMessageWithTag(String topic, String tag, String message) {
    String destination = topic + ":" + tag; // 格式:topic:tag
    rocketMQTemplate.convertAndSend(destination, message);
    System.out.println("Sent message with tag: " + message);
}

发送同步消息

使用场景:需要确保消息成功发送到 Broker 的场景。

实现方式使用 RocketMQTemplate.syncSend(destination, message) 方法。

示例代码

public void sendSyncMessage(String topic, String message) {
    SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
    System.out.println("Sync message sent, result: " + sendResult);
}

发送异步消息

使用场景:不需要立即确认消息是否发送成功,通过回调处理发送结果。

实现方式使用 RocketMQTemplate.asyncSend(destination, message, sendCallback) 方法。

示例代码

public void sendAsyncMessage(String topic, String message) {
    rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("Async message sent successfully: " + sendResult);
        }

        @Override
        public void onException(Throwable throwable) {
            System.err.println("Async message send failed: " + throwable.getMessage());
        }
    });
}

发送单向消息

使用场景:适用于不需要确认的日志、监控等场景。

实现方式
使用 RocketMQTemplate.sendOneWay(destination, message) 方法。

示例代码

public void sendOneWayMessage(String topic, String message) {
    rocketMQTemplate.sendOneWay(topic, message);
    System.out.println("One-way message sent: " + message);
}

4.2 消息消费(Consumer)

基础消费逻辑

实现方式

  • 实现 RocketMQListener 接口

    • 消费者类必须实现 RocketMQListener<T> 接口,并指定泛型类型(如 String 或自定义 POJO)。

  • 重写 onMessage 方法

    • 在 onMessage 方法中编写消息处理逻辑。

  • 使用 @RocketMQMessageListener 注解

    • 通过注解指定监听的 Topic 和 Consumer Group。

示例代码

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
    topic = "demo_topic",          // 监听的 Topic
    consumerGroup = "demo_consumer_group"  // 消费者组名称
)
public class DemoConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

消费消息的确认机制

自动确认
RocketMQ 默认会自动确认消息(ACK),消费者成功消费消息后,Broker 会认为消息已处理。

手动确认
如果需要手动确认消息,可以使用 RocketMQListener<MessageExt> 并调用 ack 方法。


消费失败的处理

重试机制

  • 如果消费者消费消息失败,RocketMQ 会尝试重新投递消息。

  • 默认重试次数为 16 次,重试间隔逐渐增加。

死信队列

  • 如果消息重试多次后仍然失败,会被投递到死信队列(DLQ,Dead Letter Queue)。

  • 死信队列的命名规则为:%DLQ% + Consumer Group

  • 自动创建

    • RocketMQ 会自动为每个 Consumer Group 创建一个死信队列,无需手动配置。

    • 例如,如果 Consumer Group 为 demo_consumer_group,则死信队列的topic为 %DLQ%demo_consumer_group

  • 手动消费死信队列

    • 可以创建一个消费者,专门消费死信队列中的消息。

@Service
@RocketMQMessageListener(
    topic = "%DLQ%demo_consumer_group",  // 死信队列名称
    consumerGroup = "dlq_consumer_group"  // 死信队列的消费者组
)
public class DLQConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received DLQ message: " + message);
        // 处理死信消息,例如记录日志或人工干预
    }
}

消费带 Tag 的消息

实现方式
在 @RocketMQMessageListener 中指定 selectorExpression 来过滤 Tag。

示例代码

@Service
@RocketMQMessageListener(
    topic = "demo_topic",
    consumerGroup = "demo_consumer_group",
    selectorExpression = "tagA || tagB"  // 过滤 Tag
)
public class TagConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received tagged message: " + message);
    }
}

顺序消费

  • RocketMQ 默认是并发消费,不保证消息的顺序性。

  • 顺序消费是指消息按照发送顺序被消费,适用于对消息顺序有严格要求的场景。

  • 实现顺序消费需要:

    1. 生产者发送消息时指定相同的消息组。

    2. 消费者设置 consumeMode = ConsumeMode.ORDERLY

  • 顺序消费的实现依赖于队列的顺序性和消费者的单线程消费。

示例代码

@Service
@RocketMQMessageListener(
    topic = "demo_topic",
    consumerGroup = "demo_consumer_group",
    consumeMode = ConsumeMode.ORDERLY  // 顺序消费
)
public class OrderlyConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received orderly message: " + message);
    }
}

5. Spring Boot 集成 RocketMQ 的高级功能

5.1 延时消息

延时消息是指消息发送后,不会立即被消费者消费,而是在指定的延迟时间后才会被投递。RocketMQ 提供了 18 个固定的延迟级别,每个级别对应不同的延迟时间。

延迟级别与时间对应关系

延迟级别

延迟时间

1

1 秒

2

5 秒

3

10 秒

4

30 秒

5

1 分钟

6

2 分钟

7

3 分钟

8

4 分钟

9

5 分钟

10

6 分钟

11

7 分钟

12

8 分钟

13

9 分钟

14

10 分钟

15

20 分钟

16

30 分钟

17

1 小时

18

2 小时

使用场景

  • 订单超时未支付取消。

  • 定时任务触发。

  • 延迟通知。

实现步骤

  • 构建消息:使用 MessageBuilder 构建消息,并设置 MessageConst.PROPERTY_DELAY_TIME_LEVEL 属性与等级值。

  • 发送消息:使用 RocketMQTemplate.syncSend 发送消息。

示例代码

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class DelayedMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送延迟消息
     *
     * @param topic       主题
     * @param message     消息内容
     * @param delayLevel 延迟级别(1-18)
     */
    public void sendDelayedMessage(String topic, String message, int delayLevel) {
        // 构建消息,设置延迟级别
        Message<String> msg = MessageBuilder.withPayload(message)
                .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(delayLevel))
                .build();

        // 发送延迟消息
        rocketMQTemplate.syncSend(topic, msg);
        System.out.println("Sent delayed message: " + message + ", delay level: " + delayLevel);
    }
}

调用消息生产者发送消息

delayedMessageProducer.sendDelayedMessage("my-topic", "Delayed Message", 3); // 延迟 10 秒

5.2 事务消息

事务消息用于实现分布式事务,确保业务操作和消息发送的一致性。RocketMQ 的事务消息分为两个阶段:

  • 发送半消息:消息发送到 Broker,但暂时对消费者不可见。

  • 提交或回滚:根据本地事务的执行结果,提交或回滚消息。

使用场景

  • 分布式事务场景,例如订单创建和库存扣减。

  • 需要保证业务操作和消息发送一致性的场景。

实现步骤

  • 定义事务监听器:实现 RocketMQLocalTransactionListener 接口,定义本地事务逻辑和回查逻辑。

  • 使用注解@RocketMQTransactionListener:通过 txProducerGroup 属性指定事务组的名称。事务组的名称必须与发送事务消息时指定的 txProducerGroup 一致。txProducerGroup 是事务组的唯一标识,用于关联生产者和事务监听器。

  • 发送事务消息:使用 rocketMQTemplate.sendMessageInTransaction 方法发送事务消息。

示例代码

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@RocketMQTransactionListener(txProducerGroup = "my-transaction-group")
public class MyTransactionListener implements RocketMQLocalTransactionListener {

    /**
     * 执行本地事务
     *
     * @param msg 消息
     * @param arg 附加参数
     * @return 事务状态
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地事务
            System.out.println("Executing local transaction: " + msg.getPayload());
            // 模拟业务操作成功
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            // 本地事务执行失败,回滚消息
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 回查本地事务状态
     *
     * @param msg 消息
     * @return 事务状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 回查本地事务状态
        System.out.println("Checking local transaction: " + msg.getPayload());
        return RocketMQLocalTransactionState.COMMIT;
    }
}

checkLocalTransaction 是 RocketMQLocalTransactionListener 接口中的一个方法,用于 回查本地事务的状态。它的作用是:

事务状态回查

  • 如果生产者发送事务消息后,未及时返回本地事务的执行结果,RocketMQ 会调用 checkLocalTransaction 方法回查事务状态。

返回事务状态:根据回查结果,返回 RocketMQLocalTransactionState 枚举值:

  • COMMIT:提交事务,消息对消费者可见。

  • ROLLBACK:回滚事务,消息被丢弃。

  • UNKNOWN:事务状态未知,RocketMQ 会继续回查。

定义消息生产者发送事务消息 

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class TransactionMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送事务消息
     *
     * @param topic   主题
     * @param message 消息内容
     */
    public void sendTransactionMessage(String topic, String message) {
        // 构建消息
        Message<String> msg = MessageBuilder.withPayload(message).build();

        // 发送事务消息
        rocketMQTemplate.sendMessageInTransaction("my-transaction-group", topic, msg, null);
        System.out.println("Sent transaction message: " + message);
    }
}
  • withPayload(message):设置消息的有效负载(Payload),即消息的内容。

  • message 可以是任意类型的对象,RocketMQ 会自动将其序列化为字节数组。

调用消息生产者发送消息

transactionMessageProducer.sendTransactionMessage("my-topic", "Transaction Message");

5.3 消息过滤

RocketMQ 支持通过 Tag 或 SQL92 表达式 过滤消息,消费者可以只接收符合条件的消息。

使用 Tag 过滤

  • Tag 是消息的标签,用于对消息进行分类。

  • 消费者可以订阅指定 Tag 的消息。

示例代码

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
    topic = "my-topic",
    consumerGroup = "my-consumer-group",
    selectorExpression = "tagA || tagB"  // 过滤 Tag
)
public class TagFilterConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received filtered message: " + message);
    }
}

生产者发送带 Tag 的消息

public void sendMessageWithTag(String topic, String tag, String message) {
    String destination = topic + ":" + tag; // 格式:topic:tag
    rocketMQTemplate.syncSend(destination, message);
    System.out.println("Sent message with tag: " + message);
}

使用 SQL92 表达式过滤

  • SQL92 表达式 可以根据消息的属性进行更复杂的过滤。

  • 需要在 Broker 配置中启用 SQL92 过滤功能。

示例代码

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
    topic = "my-topic",
    consumerGroup = "my-consumer-group",
    selectorType = SelectorType.SQL92,  // 使用 SQL92 过滤
    selectorExpression = "age > 18"     // 过滤条件
)
public class SqlFilterConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received SQL filtered message: " + message);
    }
}

生产者发送消息时设置属性

public void sendMessageWithProperties(String topic, String message) {
    Message<String> msg = MessageBuilder.withPayload(message)
            .setHeader("age", 20)  // 设置消息属性
            .build();
    rocketMQTemplate.syncSend(topic, msg);
    System.out.println("Sent message with properties: " + message);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/955825.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

BGP边界网关协议(Border Gateway Protocol)路由引入、路由反射器

一、路由引入背景 BGP协议本身不发现路由&#xff0c;因此需要将其他协议路由&#xff08;如IGP路由等&#xff09;引入到BGP路由表中&#xff0c;从而将这些路由在AS之内和AS之间传播。 BGP协议支持通过以下两种方式引入路由&#xff1a; Import方式&#xff1a;按协议类型将…

Solidity03 Solidity变量简述

文章目录 一、变量简述1.1 状态变量1.2 局部变量1.3 全局变量1.4 注意问题 二、变量可见性2.1 public2.2 private2.3 internal2.4 默认可见性2.5 可见性的用处 三、变量初始值3.1 值类型初始值 一、变量简述 变量是指可以保存数据的内部存储单元&#xff0c;里面的数据可以在程…

数据结构---并查集

目录 一、并查集的概念 二、并查集的实现 三、并查集的应用 一、并查集的概念 在一些实际问题中&#xff0c;需要将n个不同的元素划分成一些不相交的集合。开始时&#xff0c;每个元素自成一个单元素集合&#xff0c;然后按一定的规律将归于同一组元素的集合…

STM32 FreeRTOS内存管理简介

在使用 FreeRTOS 创建任务、队列、信号量等对象时&#xff0c;通常都有动态创建和静态创建的方式。动态方式提供了更灵活的内存管理&#xff0c;而静态方式则更注重内存的静态分配和控制。 如果是1的&#xff0c;那么标准 C 库 malloc() 和 free() 函数有时可用于此目的&#…

构建core模块

文章目录 1.环境搭建1.sunrays-common下新建core模块2.引入依赖&#xff0c;并设置打包常规配置 2.测试使用1.启动&#xff01;1.创建模块2.引入依赖3.application.yml 配置MySQL和Minio4.创建启动类5.启动测试 2.common-web-starter1.目录2.WebController.java3.结果 3.common…

【Flink系列】6. Flink中的时间和窗口

6. Flink中的时间和窗口 在批处理统计中&#xff0c;我们可以等待一批数据都到齐后&#xff0c;统一处理。但是在实时处理统计中&#xff0c;我们是来一条就得处理一条&#xff0c;那么我们怎么统计最近一段时间内的数据呢&#xff1f;引入“窗口”。 所谓的“窗口”&#xff…

AIGC与劳动力市场:技术进步与就业结构的重塑

随着人工智能&#xff08;AI&#xff09;技术的迅猛发展&#xff0c;尤其是生成式AI&#xff08;AIGC&#xff09;&#xff0c;劳动力市场正经历前所未有的变革。从内容创作到自动化生产线&#xff0c;几乎每个行业都在经历一场技术的洗礼。然而&#xff0c;这场革命并不是全然…

废品回收小程序,数字化回收时代

随着科技的不断创新发展&#xff0c;废品回收在各种技术的支持下也在不断地创新&#xff0c;提高了市场的发展速度&#xff0c;不仅能够让回收效率更加高效&#xff0c;还能够让居民更加便捷地进行回收&#xff0c;推动废品回收行业的发展。 回收市场机遇 目前&#xff0c;废…

题解 CodeForces 430B Balls Game 栈 C/C++

题目传送门&#xff1a; Problem - B - Codeforceshttps://mirror.codeforces.com/contest/430/problem/B翻译&#xff1a; Iahub正在为国际信息学奥林匹克竞赛&#xff08;IOI&#xff09;做准备。有什么比玩一个类似祖玛的游戏更好的训练方法呢&#xff1f; 一排中有n个球…

【Linux】线程全解:概念、操作、互斥与同步机制、线程池实现

&#x1f3ac; 个人主页&#xff1a;谁在夜里看海. &#x1f4d6; 个人专栏&#xff1a;《C系列》《Linux系列》《算法系列》 ⛰️ 道阻且长&#xff0c;行则将至 目录 &#x1f4da;一、线程概念 &#x1f4d6; 回顾进程 &#x1f4d6; 引入线程 &#x1f4d6; 总结 &a…

PDF文件提取开源工具调研总结

概述 PDF是一种日常工作中广泛使用的跨平台文档格式&#xff0c;常常包含丰富的内容&#xff1a;包括文本、图表、表格、公式、图像。在现代信息处理工作流中发挥了重要的作用&#xff0c;尤其是RAG项目中&#xff0c;通过将非结构化数据转化为结构化和可访问的信息&#xff0…

简历_使用优化的Redis自增ID策略生成分布式环境下全局唯一ID,用于用户上传数据的命名以及多种ID的生成

系列博客目录 文章目录 系列博客目录WhyRedis自增ID策略 Why 我们需要设置全局唯一ID。原因&#xff1a;当用户抢购时&#xff0c;就会生成订单并保存到tb_voucher_order这张表中&#xff0c;而订单表如果使用数据库自增ID就存在一些问题。 问题&#xff1a;id的规律性太明显、…

跨境电商使用云手机用来做什么呢?

随着跨境电商的发展&#xff0c;越来越多的卖家开始尝试使用云手机来协助他们的业务&#xff0c;这是因为云手机具有许多优势。那么&#xff0c;具体来说&#xff0c;跨境电商使用云手机可以做哪些事情呢&#xff1f; &#xff08;一&#xff09;实现多账号登录和管理 跨境电商…

计算机网络 (47)应用进程跨越网络的通信

前言 计算机网络应用进程跨越网络的通信是一个复杂而关键的过程&#xff0c;它涉及多个层面和组件的协同工作。 一、通信概述 计算机网络中的通信&#xff0c;本质上是不同主机中的应用进程之间的数据交换。为了实现这种通信&#xff0c;需要借助网络协议栈中的各层协议&#x…

Open3D 计算每个点的协方差矩阵【2025最新版】

目录 一、算法原理1、计算公式2、主要函数3、函数源码二、代码实现三、结果展示博客长期更新,本文最近更新时间为:2025年1月18日。 一、算法原理 1、计算公式 对于点云数据中的任意一点 p p p,根据其邻域内点的坐标计算其协方差矩阵。计算公式如下:

e2studio开发RA0E1(16)----配置RTC时钟及显示时间

e2studio开发RA0E1.16--配置RTC时钟及显示时间 概述视频教学样品申请完整代码下载硬件准备参考程序新建工程工程模板保存工程路径芯片配置工程模板选择时钟设置UART配置UART属性配置设置e2studio堆栈e2studio的重定向printf设置R_UARTA_Open()函数原型回调函数user_uart_callba…

Go语言strings包与字符串操作:从基础到高级的全面解析

Go语言strings包与字符串操作:从基础到高级的全面解析 引言 Go语言以其简洁、高效和强大的标准库而闻名,其中strings包是处理字符串操作的核心工具。本文将深入探讨Go语言中strings包的功能及其在实际开发中的应用,帮助开发者更好地理解和使用这一工具。 1. strings包概述…

微服务学习-快速搭建

1. 速通版 1.1. git clone 拉取项目代码&#xff0c;导入 idea 中 git clone icoolkj-microservices-code: 致力于搭建微服务架构平台 1.2. git checkout v1.0.1版本 链接地址&#xff1a;icoolkj-microservices-code 标签 - Gitee.com 2. 项目服务结构 3. 实现重点步骤 …

加密货币的基本交易技术指标

是币安交易市场的基本版视图,trading View是有更复杂的参数追踪。币安的交易的技术指标有主图和副图。有很多指标&#xff0c;让ai解释一下相关概念和意义。加密货币交易中可能遇到的主图指标及其含义&#xff1a; 1. MA&#xff08;移动平均线&#xff0c;Moving Average&…

简单介绍JSONStream的使用

地址 作用 这个模块是根据需要筛选出json数据中自己所需要的数据 使用 var JSONStream require("JSONStream"); var parse require("fast-json-parse"); var fs require("fs");fs.createReadStream("./time.json").pipe(JSONSt…