SpringBoot学习小结之RocketMQ

文章目录

  • 前言
  • 一、架构设计
    • 1.1 架构图
    • 1.2 消息
    • 1.3 工作流程
  • 二、部署
    • 2.1 单机
    • 2.2 集群
  • 三、Springboot Producter
    • 3.1 准备
    • 3.2 pom依赖、yml 配置
    • 3.3 普通消息
    • 3.4 顺序、批量、延迟消息
    • 3.5 事务消息
  • 四、Springboot Consumer
    • 4.1 配置
    • 4.2 普通Push消费
    • 4.3 回复
    • 4.4 集群和广播
    • 4.5 并发和顺序
    • 4.6 消息过滤
    • 4.7 重试和死信
    • 4.8 设置消费组负载均衡策略
    • 4.9 设置offset
    • 4.10 Pull 消息
  • 五、总结
    • 5.1 优点
    • 5.2 缺点
  • 参考

前言

在当今互联网时代,随着数据规模和业务复杂度的不断增长,分布式消息中间件作为实现系统解耦、异步通信和削峰填谷的重要工具,扮演着越来越关键的角色。而在众多的消息中间件中,Apache RocketMQ 以其出色的性能、高可用性和可扩展性,成为了许多企业构建分布式系统的首选之一。

RocketMQ 是一种开源分布式消息队列系统, 由阿里巴巴集团开发并在2012年开源,现已成为 Apache 软件基金会的顶级项目之一。它具备高吞吐量、低延迟、高可靠性和强大的水平扩展能力等特性,被广泛应用于互联网、金融、电商、物联网等各个领域。

本文将带您深入了解 RocketMQ,探索其架构设计、部署以及在 Springboot 中使用,帮助您更好地理解和应用这一强大的消息中间件,提升系统的性能和可靠性,实现业务的快速发展和创新。

一、架构设计

1.1 架构图

RocketMQ 的组成部分

组件功能特点
Name Server负责维护集群中所有 Broker 的路由信息和消息队列的状态信息各个 NameServer 相互独立,没有信息转发
Broker存储消息,接收来自生产者的消息并将其提供给消费者也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息
Producer消息的生产者,负责将消息发送到 Broker通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递过程支持快速失败和重试
Consumer消息的消费者,从 Broker 中获取消息进行处理支持推(push)和拉(pull)两种模式对消息进行消费,支持集群方式和广播方式的消费,提供实时消息订阅机制,满足大多数用户的需求

消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

  • Push 是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
  • Pull 是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

1.2 消息

一些和消息相关的核心概念

名词含义
Message消息系统所传输信息的物理载体,生产和消费数据的最小单位。一条消息必须有一个主题(Topic)。消息 body 默认最大 4M。
Topic一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题。
Tag为消息设置的标志,用于同一 Topic 下区分不同类型的消息,目前只支持每个消息设置一个, 一般在Topic后加 :Tag名。
ProducerGroup一组 Producer 的集合,这些 Producer 共同实现了某个业务逻辑,通常是发送相同类型或相关类型的消息到同一个 Topic。
ConsumerGroup一组 Consumer 的集合,ConsumerGroup下的消费者主要有两种负载均衡模式,即广播模式和集群模式。在集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费,在广播模式下,同一个 ConsumerGroup 中每个 Consumer 实例都需要处理全部的消息。
Message Queue一个 Topic 下的物理存储单元,用于存储发送到该 Topic 的消息。每个 Topic 可以有多个消息队列,消息会根据一定的规则被分配到这些消息队列中。
Offset消费者在消息队列中消费消息的位置信息。

消息发送返回的状态

状态含义
SEND_OK消息发送成功,要注意的是消息发送成功也不意味着它是可靠的,要确保不会丢失任何消息,还应启用同步 Master 服务器或同步刷盘,即 SYNC_MASTER 或 SYNC_FLUSH。
FLUSH_DISK_TIMEOUT消息发送成功但是服务器刷盘超时,此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失,消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果 Broker 服务器设置了刷盘方式为同步刷盘,即 FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当 Broker 服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。
FLUSH_SLAVE_TIMEOUT消息发送成功,但是服务器同步到 Slave 时超时,此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失,如果 Broker 服务器的角色是同步 Master,即 SYNC_MASTER(默认是异步 Master 即 ASYNC_MASTER),并且从 Broker 服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到 Slave 服务器超时。
SLAVE_NOT_AVAILABLE消息发送成功,但是此时 Slave 不可用,如果 Broker 服务器的角色是同步 Master,即 SYNC_MASTER(默认是异步 Master 服务器即 ASYNC_MASTER),但没有配置 slave Broker 服务器,则将返回该状态——无 Slave 服务器可用。

Group 和 Cluster 区别

  • Group 和 Cluster 是两个不同的概念,前者是逻辑上,后者是物理上。
  • 以Producer举例,一个 Producer Cluster 可以包含多个 Producer Group,而一个 Producer Group 只属于一个 Producer Cluster。换句话说,一个生产者集群可以包含多个逻辑上不同的生产者组,每个生产者组都有其特定的 ProducerGroup 标识。

1.3 工作流程

  1. 启动 NameServer

    NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心。

  2. 启动 Broker

    与所有 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟 Broker 的映射关系。

  3. 创建 Topic

    创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic , 但不建议。

  4. 生产者发送消息

    启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在于哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker建立长连接从而向 Broker 发消息。

  5. 消费者接受消息

    跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker上,然后直接跟 Broker 建立连接通道,然后开始消费消息。

二、部署

2.1 单机

mqnamesrv
mqbroker -n localhost:9876

mqnamesrv 默认端口9876,可通过添加配置文件 namesrv.conf 修改

listenPort = 9878
mqnamesrv -c ../conf/namesrv.conf
mqbroker -n localhost:9878

broker 默认端口10911,修改 broker 端口号, 在配置文件 broker.cnf 添加

listenPort = 11087

并在启动时指定配置文件

mqbroker -n localhost:9877 -c ../conf/broker.conf

2.2 集群

6台机器,配置两个 NameServer、4个 Broker(2主2从)

mqnamesrv -n 192.168.1.1:9876
mqnamesrv -n 192.168.1.2:9876

mqbroker -n 192.168.1.1:9876;192.161.2:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties
mqbroker -n 192.168.1.1:9876;192.161.2:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties
mqbroker -n 192.168.1.1:9876;192.161.2:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.propertie
mqbroker -n 192.168.1.1:9876;192.161.2:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties

同步双写配置,每个 Master 配置一个 Slave,有多对 Master-Slave ,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

三、Springboot Producter

3.1 准备

mqnamesrv -c ../conf/namesrv.conf
mqbroker -n localhost:9878 -c ../conf/broker.conf

mqadmin updateTopic -b 127.0.0.1:11087 -t demo-topic

3.2 pom依赖、yml 配置

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>
rocketmq:
  name-server: localhost:9878
  producer:
    group: test-producter
demo:
  rocketmq:
    topic: demo-topic
spring:
  application:
    name: producter

3.3 普通消息

Apache RocketMQ 可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。

使用场景:

  • 同步:一些对消息可靠性要求较高的场景,如订单支付、账单通知等。
  • 异步:一些链路耗时较长,对响应时间较为敏感的业务的场景,如视频上传后通知启动转码服务,转码完成后通知推送转码结果 等。
  • 单向:一些不需要关心消息发送结果,只需简单地发送消息而不关心是否成功的场景,如日志记录等。
@Autowired
private RocketMQTemplate rocketMQTemplate;

@Value("${demo.rocketmq.topic}")
private String topic;

@Test
void test1_sync() {
    SendResult sendResult = rocketMQTemplate.syncSend(topic, "同步发送信息");
    log.info("同步发送结果:{}", sendResult);
    rocketMQTemplate.convertAndSend(topic, "simple-send-topic simple hello");
}

@Test
void test2_async() {
    rocketMQTemplate.asyncSend(topic, "异步发送信息", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步发送结果:{}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.info("异步发送异常: {}", e.toString());
            }
        });
}

@Test
void test3_oneway() {
    rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload("one way message").build());
    rocketMQTemplate.convertAndSend(topic, "simple-send-topic simple hello");
    log.info("发送 oneway message");
}

@Test
void test4_tags() {
    Message message = new Message(topic, "hello tags message".getBytes());
    message.setTags("tagA");
    try {
        rocketMQTemplate.getProducer().send(message);
    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
        log.error("syncSend failed. message:{} ", message);
        throw new RuntimeException(e);
    }
    log.info("发送 tag message");
}

3.4 顺序、批量、延迟消息

  • 顺序消息是一种在消息发送和消费过程中要求严格按照特定顺序进行处理的消息。在 RocketMQ 中,顺序消息遵循先进先出(FIFO)的原则,确保按照消息发布的顺序进行消费

    在 RocketMQ 中,支持分区顺序消息,通过对消息进行分区,确保同一个分区键的消息会被分配到同一个队列中,并按照顺序进行消费。

    RocketMQ 的消息顺序性包括生产顺序性和消费顺序性。为了实现消息的顺序性,需要同时满足生产者和消费者两方面的条件:

    • 生产顺序性:确保单个生产者串行地发送消息,并按序存储和持久化。要满足生产顺序性,需要保证消息的发送是单一生产者、串行发送的。
    • 消费顺序性:消费者按照消息的顺序进行处理。RocketMQ 通过设置相同的分区键,将消息发送至同一队列中,从而保证消费顺序性。

    顺序消息适用于需要严格保持事件顺序的场景,如有序事件处理、撮合交易、数据实时增量同步等。例如,在订单处理场景中,需要确保订单生成、付款和发货等操作按照顺序执行,顺序消息能够满足这种需求。

  • 批量消息是在对吞吐率有一定要求的情况下,RocketMQ 可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少 API 和网络调用次数。

  • 延迟消息发送是指消息发送到 RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到 Consumer 进行消费。

    在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

    投递等级(delay level)延迟时间投递等级(delay level)延迟时间
    11s106min
    25s117min
    310s128min
    430s139min
    51min1410min
    62min1520min
    73min1630min
    84min171h
    95min182h
@Test
void test5_order() {
    for (int q = 0; q < 4; q++) {
        // send to 4 queues
        List<org.springframework.messaging.Message> msgs = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            int msgIndex = q * 10 + i;
            String msg = String.format("Hello RocketMQ Batch Msg#%d to queue: %d", msgIndex, q);
            msgs.add(MessageBuilder.withPayload(msg).
                     setHeader(RocketMQHeaders.KEYS, "KEY_" + msgIndex).build());
        }
        SendResult sr = rocketMQTemplate.syncSendOrderly(topic, msgs, q + "", 60000);
        System.out.println("--- Batch messages orderly to queue :" + sr.getMessageQueue().getQueueId() + " send result :" + sr);
    }
}

@Test
void test6_batch() {
    List<org.springframework.messaging.Message> msgs = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
                 setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
    }

    SendResult sr = rocketMQTemplate.syncSend(topic, msgs, 60000);
    log.info("--- Batch messages send result : {}",  sr);
}

@Test
void test7_lazy() {

    int totalMessagesToSend = 100;
    for (int i = 0; i < totalMessagesToSend; i++) {
        Message message = new Message(topic, ("Hello scheduled message " + i).getBytes());
        // This message will be delivered to consumer 10 seconds later.
        message.setDelayTimeLevel(3);

        try {
            rocketMQTemplate.getProducer().send(message);
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            log.error("error {}", e.toString());
            throw new RuntimeException(e);
        }
    }

    log.info("final send result");
}
}

3.5 事务消息

  1. 生产者将半事务消息发送至 RocketMQ Broker

  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

  6. 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置

  7. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  8. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

@Test
void test8_transaction() {
    for (int i = 0; i < 10; i++) {
        try {
            org.springframework.messaging.Message msg = MessageBuilder.withPayload("rocketmq transactional message " + i).
                setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();
            SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(null,
                                                                              topic, msg, null);
            log.info("------RocketMQTemplate send Transactional msg body = {}  , sendResult= {}",
                     msg.getPayload(), sendResult.getSendStatus());

            Thread.sleep(10);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message msg, Object arg) {
        String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        log.info("#### executeLocalTransaction is executed, msgTransactionId={}", transId);
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(transId, status);
        if (status == 0) {
            // Return local transaction with success(commit), in this case,
            // this message will not be checked in checkLocalTransaction()
            log.info("    # COMMIT # Simulating msg {} related local transaction exec succeeded! ###", msg.getPayload());
            return RocketMQLocalTransactionState.COMMIT;
        }

        if (status == 1) {
            // Return local transaction with failure(rollback) , in this case,
            // this message will not be checked in checkLocalTransaction()
            log.info("    # ROLLBACK # Simulating {} related local transaction exec failed! ", msg.getPayload());
            return RocketMQLocalTransactionState.ROLLBACK;
        }

        log.info("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN!");
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message msg) {
        String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
        Integer status = localTrans.get(transId);
        if (null != status) {
            switch (status) {
                case 0:
                    retState = RocketMQLocalTransactionState.COMMIT;
                    break;
                case 1:
                    retState = RocketMQLocalTransactionState.ROLLBACK;
                    break;
                case 2:
                    retState = RocketMQLocalTransactionState.UNKNOWN;
                    break;
            }
        }
        log.info("------ !!! checkLocalTransaction is executed once," +
                 " msgTransactionId={}, TransactionState=%s status={} {}",
                 transId, retState, status);
        return retState;
    }


}

四、Springboot Consumer

4.1 配置

rocketmq:
  name-server: localhost:9878
  consumer:
    topic: demo-topic
    group: consumer-group 

demo:
  rocketmq:
    group: broadcast-group
    consumer:
      tag: tagA
spring:
  application:
    name: consumer

4.2 普通Push消费

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.consumer.topic}",
        consumerGroup = "${rocketmq.consumer.group}"
)
public class MessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("接收消息:{}", message);
    }
}

4.3 回复

@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${demo.rocketmq.bytesRequestConsumer}")
public class ConsumerWithReplyBytes implements RocketMQReplyListener<MessageExt, byte[]> {

    @Override
    public byte[] onMessage(MessageExt message) {
        System.out.printf("------- ConsumerWithReplyBytes received: %s \n", message);
        return "reply message content".getBytes();
    }
}

4.4 集群和广播

默认集群

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${demo.rocketmq.topic}",
        consumerGroup = "${demo.rocketmq.consumer.group}",
        messageModel = MessageModel.BROADCASTING
)
public class MessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("广播接收消息:{}", message);
    }
}

4.5 并发和顺序

默认并发

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${demo.rocketmq.topic}",
        consumerGroup = "${demo.rocketmq.consumer.group}",
        consumeMode = ConsumeMode.ORDERLY
)
public class OrderMessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("顺序接收消息:{}", message);
    }
}

4.6 消息过滤

消息过滤分为 Tag 过滤和 SQL 过滤,默认Tag过滤

在 SQL 语法中,Tag 的属性值为 TAGS,开启属性过滤首先要在 Broker 端设置配置enablePropertyFilter=true

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.consumer.topic}",
        consumerGroup = "${rocketmq.consumer.group}",
        selectorExpression = "${demo.rocketmq.consumer.tag}"
)
public class FilterMessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("过滤Tag接收消息:{}", message);
    }
}
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.consumer.topic}",
        consumerGroup = "${rocketmq.consumer.group}",
        selectorExpression = "TAGS is not null and TAGS in ('tagA', 'tagB')",
        selectorType = SelectorType.SQL92
)
public class SQLFilterMessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("SQL过滤Tag接收消息:{}", message);
    }
}

4.7 重试和死信

默认重试次数为 -1,即 异步为16,顺序为Interger.MAXVALUE

异步重试时间间隔

第几次重试与上次重试的间隔时间第几次重试与上次重试的间隔时间
110s97min
230s108min
31min119min
42min1210min
53min1320min
64min1430min
75min151h
86min162h
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.consumer.topic}",
        consumerGroup = "${rocketmq.consumer.group}",
        maxReconsumeTimes = 3
)
public class RetryMessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("接收消息:{}", message);
    }
}

当一条消息在初次消费时失败,RocketMQ会自动进行消息重试。若达到最大重试次数后仍然失败,则表明该消息在正常情况下无法被正确消费。

此时,该消息并非立即丢弃,而是会被发送到特殊队列,称为死信队列(Dead-Letter Queue),而这类消息则被称为死信消息(Dead-Letter Message)。

死信队列是死信Topic下唯一的单独队列,而死信Topic的名称通常为%DLQ%ConsumerGroupName,其中 ConsumerGroupName 为对应消费者组的名称。

通过RocketMQ Admin工具或 RocketMQ Dashboard,可以查询到这些死信消息的信息,但它们不会再被消费。

4.8 设置消费组负载均衡策略

例如一个 Topic 有8个队列,一个消费组中有3个消费者,那这三个消费者各自去消费哪些队列。RocketMQ 默认提供了如下负载均衡算法:

  • AllocateMessageQueueAveragely:平均连续分配算法。
  • AllocateMessageQueueAveragelyByCircle:平均轮流分配算法。
  • AllocateMachineRoomNearby:机房内优先就近分配。
  • AllocateMessageQueueByConfig:手动指定,这个通常需要配合配置中心,在消费者启动时,首先先创建 AllocateMessageQueueByConfig 对象,然后根据配置中心的配置,再根据当前的队列信息,进行分配,即该方法不具备队列的自动负载,在 Broker 端进行队列扩容时,无法自动感知,需要手动变更配置。
  • AllocateMessageQueueByMachineRoom:消费指定机房中的队列,该分配算法首先需要调用该策略的 setConsumeridcs(Set<String> consumerIdCs) 方法,用于设置需要消费的机房,将刷选出来的消息按平均连续分配算法进行队列负载。
  • AllocateMessageQueueConsistentHash 一致性 Hash 算法。
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.consumer.topic}",
        consumerGroup = "${demo.rocketmq.rebalancegroup}"
)
public class RebalanceMessageListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {


    @Override
    public void onMessage(String message) {
        log.info("rebalance: {}", message);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());
    }
}

4.9 设置offset

消费者将从上次消费的位置开始消费消息

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.consumer.topic}",
        consumerGroup = "${rocketmq.consumer.group}")
public class PushMessageListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(String message) {
        log.info("push {}", message);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    }
}

4.10 Pull 消息

// 配置
rocketMQTemplate.getConsumer().subscribe(topic, "*");
rocketMQTemplate.getConsumer().setPullBatchSize(20);
List<MessageExt> messageExts = rocketMQTemplate.getConsumer().poll(1000);
log.info("poll 拉取消息:{}", messageExts);

// 类似
List<String> receive = rocketMQTemplate.receive(String.class, 1000);
log.info("poll 拉取消息:{}", receive);

五、总结

5.1 优点

  1. 稳定性高:RocketMQ 在阿里巴巴内部被广泛应用,经过多年的生产环境验证,稳定性高。
  2. 高并发:支持高并发的消息处理,可以满足大量的消息生产和消费需求。
  3. 适应性广:支持多种消息协议,如 JMS、OpenMessaging 等,并且可以很容易地与不同的系统进行集成。
  4. 高可用性:RocketMQ 支持主从和分布式部署,可以保证在任何节点宕机的情况下服务仍然可用。
  5. 高可靠性:提供了三种级别的消息传递保证,并且支持事务消息,可以保证消息的可靠传递。
  6. 支持集群:提供了完善的集群机制,可以实现高可用和负载均衡。

5.2 缺点

  1. 学习曲线较陡峭:RocketMQ 的配置和使用较为复杂,需要一定时间来学习。
  2. 运维要求较高:RocketMQ 的运维工作较为复杂,需要有专业的团队来维护。
  3. 不适合大数据处理: 相对于一些专注于大数据处理的消息中间件,如Kafka,RocketMQ在大数据处理方面的性能可能不如人意。

参考

  1. https://rocketmq.apache.org/zh/docs/4.x/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/647727.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

兆原数通基于Apache SeaTunnel的探索实践

随着大数据技术的不断发展&#xff0c;数据同步工具在企业中的应用变得愈发重要。为了满足复杂多样的业务需求&#xff0c;找到一款高效、灵活的数据同步工具变得尤为关键。 在这篇文章中&#xff0c;我们将分享兆原数通研发经理李洪军对Apache SeaTunnel的选择、应用及经验。这…

蓝桥杯物联网竞赛_STM32L071KBU6_关于size of函数产生的BUG

首先现象是我在用LORA发送信息的时候&#xff0c;左边显示长度是8而右边接收到的数据长度却是4 我以为是OLED显示屏坏了&#xff0c;又或者是我想搞创新用了const char* 类型强制转换数据的原因&#xff0c;结果发现都不是 void Function_SendMsg( unsigned char* data){unsi…

【代码随想录】动态规划经典题

前言 更详细的在大佬的代码随想录 (programmercarl.com) 本系列仅是简洁版笔记&#xff0c;为了之后方便观看 做题步骤 含义公式初始化顺序检查 确定dp数组以及下标的含义递推公式dp数组如何初始化遍历顺序打印dp数组&#xff08;看哪里有问题&#xff09; 斐波那契数 c…

高性能推理框架漫谈

传统模型分布式推理框架 Tensorflow servingPytorch ServingTriton Server 大语言模型的推理框架 其中&#xff0c; VLLM 后端接入了Ray 框架&#xff0c; 作为调度请求的分发处理&#xff1b;除此之外&#xff0c;还包括Nvidia 最新推出的TensorRT-LLM&#xff0c; 增加了对…

若依 ruoyi-vue 用户账号前后端参数校验密码 手机号 邮箱

前端 <el-dialog :title"title" :visible.sync"open" width"800px" append-to-body><el-form ref"form" :model"form" :rules"rules" label-width"120px"><el-row><el-col :span…

IOT技术怎么落地?以宝马,施耐德为例

物联网技术 物联网&#xff08;IoT&#xff09;技术正逐渐成为数字化工厂转型的核心驱动力。本文将通过实际案例&#xff0c;探讨IoT技术如何促进制造业的数字化转型&#xff0c;提高生产效率&#xff0c;降低成本&#xff0c;并提升产品质量。 1. 物联网技术简介 物联网技术通…

记录一次Netty的WSS异常

概述 业务场景 应用通过 WSS 客户端连接三方接口。在高并发压测时&#xff0c;出现了请求服务器写入失败的异常&#xff0c;该异常是偶发&#xff0c;出现的概率不到千分之一&#xff0c;异常如下图所示。 问题概述 注意&#xff1a; 因为握手是通过 http 协议进行的。所以…

SpringBoot整合WebSocket实现聊天室

1.简单的实现了聊天室功能&#xff0c;注意页面刷新后聊天记录不会保存&#xff0c;后端没有做消息的持久化 2.后端用户的识别只简单使用Session用户的身份 0.依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-…

firewalld 防火墙

firewalld概述 Linux系统防火墙从CentOS7开始的默认防火墙工作在网络层&#xff0c;属于包过滤防火墙 Firewalld和iptables的关系 netfilter 位于Linux内核中的包过滤功能体系称为Linux防火墙的“内核态” firewalld Centos默认的管理防火墙规则的工具称为防火墙的“用…

高中数学:平面向量-题型总结及解题思路梳理

一、知识点及解题思路梳理 高中&#xff0c;2/3的向量题目是坐标向量题&#xff0c;1/3是几何向量题。但是&#xff0c;这1/3的几何向量题可以转换成坐标向量题。 二、练习 例题1 几何型向量题 例题2

QML的Image 路径问题(source)

四种路径格式 在 QML 中&#xff0c;当你使用 Image 元素的 source 属性来指定一个图片的路径时&#xff0c;有几种不同的方式可以指定这个路径&#xff0c;每种方式都有其特定的用途和上下文。 相对路径&#xff1a; QML 文件和一个名为 close.png 的图片在同一目录下&#x…

比较两列数据

点其中一个数据 删掉S&#xff0c;回车 大的标红

基于SpringBoot+Vue+Mysql的实验室低值易耗品管理系统

博主介绍&#xff1a; 大家好&#xff0c;本人精通Java、Python、C#、C、C编程语言&#xff0c;同时也熟练掌握微信小程序、Php和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我有丰富的成品Java、Python、C#毕设项目经验&#xff0c;能够为学生提供各类…

基于springboot的毕业设计系统的开发源码

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的毕业设计系统的开发。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 毕业设计系统能够实现…

Git Core Lecture

1、Git 简介 官方介绍&#xff1a;Git is a fast distributed revision control system (Git 是一个快速的分布式版本控制系统) 2、Git Core Command 2.1 git init git 工程初始化&#xff0c;会在工作区 (working directory) 根目录中创建.git 目录 # 创建目录 $ mkdir git-i…

智能合约语言(eDSL)—— 并行化方案 2

这个并行算法最初其实是在aptos上实现的&#xff0c;aptos上使用的是move虚拟机&#xff0c;后来我把它移植到我们链上了&#xff0c;但是wasm虚拟机。还是费了不少事情。 目前evm并行也比较火&#xff0c;像monad&#xff0c;sei等。经过调研发现&#xff0c;其实evm的并行&am…

Python 获取当前IP地址(爬虫代理)

Python 获取当前IP地址&#xff08;爬虫代理&#xff09; 在Python中&#xff0c;获取当前的公网IP地址通常涉及到发送一个请求到外部服务&#xff0c;因为本地IP地址通常只在你的私有网络内部是可见的&#xff0c;而公网IP地址是由你的ISP&#xff08;互联网服务提供商&#x…

如何查看哪些组策略应用于你的电脑和用户帐户?这里有详细步骤

如果你希望在电脑上查看所有有效的组策略设置,以下是操作方法。 什么是Windows中的组策略 在Windows世界中,组策略为网络管理员提供了一种将特定设置分配给用户组或计算机组的方法。然后,无论何时组中的用户登录到联网的PC,或无论何时启动组中的PC,都会应用这些设置。 …

牛客NC222 插入区间【中等 数组,区间合并问题 Java/Go/PHP/C++】lintcode30 插入区间

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/1d784b5472ab4dde88ea2331d16ee909 https://www.lintcode.com/problem/30/solution/56586 思路 Java代码 import java.util.*;/** public class Interval {* int start;* int end;* public Interval(int …

python web自动化(分布式测试Grid)

Grid介绍 Selenium Grid 是 Selenium 提供的⼀个⼯具&#xff0c;⽤于⽀持在多台计算机上并⾏运⾏测试。 它允许将测试分发到不同的机器和浏览器组合上&#xff0c;同时收集结果。 1.并⾏执⾏测试⽤例&#xff1a;在不同的机器上并⾏执⾏测试⽤例&#xff0c;从⽽加速整个测试过…