2.13日学习打卡----初学RocketMQ(四)

2.13日学习打卡

目录:

  • 2.13日学习打卡
  • 一.RocketMQ之Java Class
    • DefaultMQProducer类
    • DefaultMQPushConsumer类
    • Message类
    • MessageExt类
  • 二.RocketMQ 消费幂
    • 消费过程幂等
    • 消费速度慢的处理方式
  • 三.RocketMQ 集群服务
    • 集群特点
    • 单master模式
    • 多master模式
    • 多master多Slave模式-异步复制
    • 多Master多Slave模式-同步双写
  • 四.RocketMQ消息消费
    • 集群消费
    • 广播消费
    • 消息消费时的权衡

一.RocketMQ之Java Class

DefaultMQProducer类

概述
DefaultMQProducer类是应用发送消息使用的基类,封装一些通用的方法方便开发者在更多场景中使用。属于线程安全类,在配置并启动后可在多个线程间共享此对象。
其可以通过无参构造方法快速创建一个生产者,通过getter/setter方法,调整发送者的参数。主要负责消息的发送,支持同步/异步/oneway的发送方式,这些发送方式均支持批量发送。

方法

属性内容
DefaultMQProducerImpl defaultMQProducerImpl;生产者内部默认实现类
String producerGroup;Producer组名, 默认值为DEFAULT_PRODUCER。多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。
String createTopicKey;自动创建测试的topic名称, 默认值为TBW102;在发送消息时,自动创建服务器不存在的topic,需要指定Key。broker必须开启isAutoCreateTopicEnable
int defaultTopicQueueNums;创建默认topic的queue数量。默认4
int sendMsgTimeout;发送消息超时时间,默认值10000,单位毫秒
int compressMsgBodyOverHowmuch;消息体压缩阈值,默认为4k(Consumer收到消息会自动解压缩)
int retryTimesWhenSendFailed;同步模式,返回发送消息失败前内部重试发送的最大次数。可能导致消息重复。默认2
int retryTimesWhenSendAsyncFailed;异步模式,返回发送消息失败前内部重试发送的最大次数。可能导致消息重复。默认2
boolean retryAnotherBrokerWhenNotStoreOK;声明发送失败时,下次是否投递给其他Broker,默认false
int maxMessageSize;最大消息大小。默认4M; 客户端限制的消息大小,超过报错,同时服务端也会限制
TraceDispatcher traceDispatcher消息追踪器,定义了异步传输数据接口。使用rcpHook来追踪消息

DefaultMQPushConsumer类

概述
DefaultMQPushConsumer类是rocketmq客户端消费者的实现,从名字上已经可以看出其消息获取方式为broker往消费端推送数据,其内部实现了流控,消费位置上报等等。DefaultMQPushConsumer是Push消费模式下的默认配置。

方法

字段内容
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;消费者实现类,所有的功能都委托给DefaultMQPushConsumerImpl来实现
String consumerGroup;消费者组名,必须设置,参数默认值是:DEFAULT_CONSUMER (需要注意的是,多个消费者如果具有同样的组名,那么这些消费者必须只消费同一个topic)
MessageModel messageModel;消费的方式,支持以下两种 1、集群消费 2、广播消费。BROADCASTING 广播模式,即所有的消费者可以消费同样的消息CLUSTERING 集群模式,即所有的消费者平均来消费一组消息
ConsumeFromWhere consumeFromWhere;消费者从那个位置消费,分别为:CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 ;CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费;CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费(以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始)
AllocateMessageQueueStrategy allocateMessageQueueStrategy;消息分配策略,用于集群模式下,消息平均分配给所有客户端;默认实现为AllocateMessageQueueAveragely
Map<String, String> subscription;topic对应的订阅tag
MessageListener messageListener;消息监听器 ,处理消息的业务就在监听里面。目前支持的监听模式包括:MessageListenerConcurrently,对应的处理逻辑类是MessageListener messageListener ;ConsumeMessageConcurrentlyService MessageListenerOrderly 对应的处理逻辑类是ConsumeMessageOrderlyService;两者使用不同的ACK机制。RocketMQ提供了ack机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。上面两个不同的监听模式使用的ACK机制是不一样的。
OffsetStore offsetStore;offset存储实现,分为本地存储或远程存储 。集群消费:从远程Broker获取。广播消费:从本地文件获取。

DefaultMQPushConsumer类

重要字段
							
	int consumeThreadMin = 20		线程池自动调整

	int consumeThreadMax = 64		线程池自动调整

	long adjustThreadPoolNumsThreshold = 100000 

	int consumeConcurrentlyMaxSpan = 2000
									单队列并行消费最大跨度,用于流控 

	int pullThresholdForQueue = 1000
									一个queue最大消费的消息个数,用于流控 

	long pullInterval = 0			检查拉取消息的间隔时间,由于是长轮询,所以为 0,但是如果应用为了流控,
									也可以设置大于 0 的值,单位毫秒,取值范围: [0, 65535]

	consumeMessageBatchMaxSize = 1	并发消费时,一次消费消息的数量,默认为1,
									假如修改为50,此时若有100条消息,那么会创建两个线程,每个线程分配50条消息。
									换句话说,批量消费最大消息条数,取值范围: [1, 1024]。默认是1
									
	pullBatchSize = 32				消费者去broker拉取消息时,一次拉取多少条。取值范围: [1, 1024]。默认是32 。可选配置

	boolean postSubscriptionWhenPull = false 

	boolean unitMode = false 

重要方法

	 subscribe(String topic, String subExpression) 
									订阅某个topic,subExpression传*为订阅该topic所有消息 

	registerMessageListener(MessageListenerConcurrently messageListener) 
									注册消息回调,如果需要顺序消费,需要注册MessageListenerOrderly的实现 
	
	start 							启动消息消费	

Message类

含义

	Producer发送的消息定义为Message类

位置

	org.apache.rocketmq.common.message

字段定义

	如图

字段详解

	topic

		Message都有Topic这一属性,Producer发送指定Topic的消息,Consumer订阅Topic下的
		消息。
		通过Topic字段,Producer会获取消息投递的路由信息,决定发送给哪个Broker。

	flag

		网络通信层标记。
		
	body

		Producer要发送的实际消息内容,以字节数组形式进行存储。Message消息有一定大小限制。

	transactionId
	
		RocketMQ 4.3.0引入的事务消息相关的事务编号。

	properties

		该字段为一个HashMap,存储了Message其余各项参数,比如tag、key等关键的消息属性。
		RocketMQ预定义了一组内置属性,除了内置属性之外,
		还可以设置任意自定义属性。当然属性的数量也是有限的,
		消息序列化之后的大小不能超过预设的最大消息大小。
		系统内置属性定义于org.apache.rocketmq.common.message.MessageConst	(如图)

		对于一些关键属性,Message类提供了一组set接口来进行设置,

		class Message {
		    public void setTags(String tags) {...}
		    public void setKeys(Collection<String> keys) {...}
		    public void setDelayTimeLevel(int level) {...}
		    public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {...}
		    public void setBuyerId(String buyerId) {...}
		}

		这几个set接口对应的作用分别为为,

		属性								接口				用途
		MessageConst.PROPERTY_TAGS		setTags			在消费消息时可以通过tag进行消
														息过滤判定
														
		MessageConst.PROPERTY_KEYS		setKeys			可以设置业务相关标识,用于消费
														处理判定,或消息追踪查询
														
		MessageConst.PROPERTY_DELAY_TIME_LEVEL	setDelayTimeLevel	
														消息延迟处理级别,不同级别对应不同延迟时间
														
		MessageConst.PROPERTY_WAIT_STORE_MSG_OK	setWaitStoreMsgOK	
														在同步刷盘情况下是否需要等待数
														据落地才认为消息发送成功
		
		`MessageConst.PROPERTY_BUYER_ID	setBuyerId		没有在代码中找到使用的地方,所
														以暂不明白其用处	

		这几个字段为什么用属性定义,而不是单独用一个字段进行表示?方便之处可能在于消息数据存
		盘结构早早定义,一些后期添加上的字段功能为了适应之前的存储结构,以属性形式存储在一个
		动态字段更为方便,自然兼容。

MessageExt类

含义

	对于发送方来说,上述Message的定义以足够。但对于RocketMQ的整个处理流程来说,
	还需要更多的字段信息用以记录一些必要内容,比如消息的id、创建时间、存储时间等
	等。在同package下可以找到与之相关的其余类定义。首先就是MessageExt,

字段

	字段							用途
	
	queueId						记录MessageQueue编号,消息会被发送到Topic下的MessageQueue

	storeSize					记录消息在Broker存盘大小

	queueOffset					记录在ConsumeQueue中的偏移

	sysFlag						记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识

	bornTimestamp				消息创建时间,在Producer发送消息时设置

	storeHost					记录存储该消息的Broker地址

	msgId						消息Id

	commitLogOffset				记录在Broker中存储便宜

	bodyCRC						消息内容CRC校验值

	reconsumeTimes				消息重试消费次数

	preparedTransactionOffset	事务详细相关字段	

注意

	Message还有一个名为MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX的属性,
	在消息发送时由Producer生成创建。
	上面的msgId则是消息在Broker端进行存储时通过MessageDecoder.createMessageId方法生成
	的,其构成为(如图)

	这个MsgId是在Broker生成的,Producer在发送消息时没有该信息,Consumer在消费消息时则能
	获取到该值。RocketMQ也提供了相关命令,

	命令				实现类					描述
	
	queryMsgById	QueryMsgByIdSubCommand	根据MsgId查询消息		

二.RocketMQ 消费幂

在这里插入图片描述

消费过程幂等

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。

消费速度慢的处理方式

提高消费并行度

绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:

同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。

提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。

批量方式消费

批量方式消费可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 参数值,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。

跳过非重要消息

发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:

    public ConsumeConcurrentlyStatus consumeMessage(
      List<MessageExt> msgs,
      ConsumeConcurrentlyContext context) {
      //队列偏移量
    long offset = msgs.get(0).getQueueOffset();
    //最大偏移量
    String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
    long diff = Long.parseLong(maxOffset) - offset;
    if (diff > 100000) {
      // TODO 消息堆积情况的特殊处理
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
    // TODO 正常消费过程
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   } 

优化每条消息消费过程
举例如下,原来某条消息的消费过程如图中左侧的流程,优化后变成右侧
在这里插入图片描述
这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。

消费打印日志

如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题

public ConsumeConcurrentlyStatus consumeMessage(
      List<MessageExt> msgs,
      ConsumeConcurrentlyContext context) {
    log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
    // TODO 正常消费过程
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   } 

如果能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更方便。

三.RocketMQ 集群服务

在这里插入图片描述

集群特点

NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。 每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有 NameServer。

Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

单master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
启动 NameServer

### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

启动 Broker

### 启动Broker
$ nohup sh bin/mqbroker -n localhost:9876 &

### 验证Broker是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/broker.log 
The broker[broker-a, 192.169.1.2:10911] boot success...

多master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
启动NameServer
NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同,如下:

 ### 首先启动Name Server
 $ nohup sh mqnamesrv &

 ### 验证Name Server 是否启动成功
 $ tail -f ~/logs/rocketmqlogs/namesrv.log
 The Name Server boot success...

启动Broker集群

 ### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
 $ nohup sh mqbroker -n 192.168.1.1:9876 -c
 $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &

 ### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
 $ nohup sh mqbroker -n 192.168.1.1:9876 -c
 $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &

 ...

多master多Slave模式-异步复制

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样

缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

启动NameServer

### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

启动Broker集群

### 在机器A,启动第一个Master,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
### 在机器B,启动第二个Master,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
### 在机器C,启动第一个Slave,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
### 在机器D,启动第二个Slave,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

多Master多Slave模式-同步双写

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;

缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

启动NameServer

### 首先启动Name Server
 $ nohup sh mqnamesrv &

 ### 验证Name Server 是否启动成功
  $ tail -f ~/logs/rocketmqlogs/namesrv.log
 The Name Server boot success...

启动Broker集群

### 在机器A,启动第一个Master,例如NameServer的IP
为:192.168.1.1
 $ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &

 ### 在机器B,启动第二个Master,例如NameServer的IP
为:192.168.1.1
 $ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &

 ### 在机器C,启动第一个Slave,例如NameServer的IP
为:192.168.1.1
 $ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &

 ### 在机器D,启动第二个Slave,例如NameServer的IP
为:192.168.1.1
 $ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

以上Broker与Slave配对是通过指定相同的BrokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数。另外一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分。$ROCKETMQ_HOME指的RocketMQ安装目录,需要用户自己设置此环境变量

四.RocketMQ消息消费

集群消费

在这里插入图片描述
消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。

实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3条Q,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的1条Q。

而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。

这种模式下,消费进度(Consumer Offset)的存储会持久化到Broker。


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import static com.morris.rocketmq.util.Contant.CONSUMER_GROUP;
import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;

/**
 * 集群消费消息(默认)
 * ClusterConsumer 类,用于从 RocketMQ 集群消费消息
 */
public class ClusterConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 使用指定的消费者组名来实例化 DefaultMQPushConsumer。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
         
        // 指定 NameServer 地址。
        consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
        consumer.setMessageModel(MessageModel.CLUSTERING); // 设置消息模式为集群消费模式(默认)。
        
        // 订阅一个或多个要消费的主题。
        consumer.subscribe("TopicTest", "*");
        
        // 注册回调函数,以便在从代理获取的消息到达时执行。
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 标记消息已成功消费。
        });

        // 启动消费者实例。
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

  • CONSUMER_GROUP 和 NAME_SERVER_ADDRESS 是从 Contant 类中获取的常量,用于指定消费者组和 NameServer 地址。
  • MessageModel.CLUSTERING 设置消费模式为集群消费模式,默认情况下 RocketMQ 使用集群消费模式。
  • consumer.subscribe(“TopicTest”, ““) 订阅了名为 “TopicTest” 的主题,”” 表示消费该主题下的所有消息。
  • consumer.registerMessageListener(…) 注册了一个消息监听器,用于接收从代理获取的消息,并处理这些消息。
  • consumer.start() 启动了消费者实例,开始消费消息。

广播消费

在这里插入图片描述
消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。

实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import static com.morris.rocketmq.util.Contant.CONSUMER_GROUP;
import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;

/**
 * 广播消费消息(默认)
 * BroadcastingConsumer 类,用于从 RocketMQ 广播消费消息
 */
public class BroadcastingConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 使用指定的消费者组名来实例化 DefaultMQPushConsumer。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
         
        // 指定 NameServer 地址。
        consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
        consumer.setMessageModel(MessageModel.BROADCASTING); // 设置消息模式为广播消费模式(默认)。
        
        // 订阅一个或多个要消费的主题。
        consumer.subscribe("TopicTest", "*");
        
        // 注册回调函数,以便在从代理获取的消息到达时执行。
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 标记消息已成功消费。
        });

        // 启动消费者实例。
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

  • CONSUMER_GROUP 和 NAME_SERVER_ADDRESS 是从 Contant 类中获取的常量,用于指定消费者组和 NameServer 地址。
  • MessageModel.BROADCASTING 设置消费模式为广播消费模式,默认情况下 RocketMQ 使用广播消费模式。
  • consumer.subscribe(“TopicTest”, ““) 订阅了名为 “TopicTest” 的主题,”” 表示消费该主题下的所有消息。
  • consumer.registerMessageListener(…) 注册了一个消息监听器,用于接收从代理获取的消息,并处理这些消息。
  • consumer.start() 启动了消费者实例,开始消费消息。

这种模式下消费者只能收到启动后发送MQ中的消息。

消息消费时的权衡

集群模式

  • 消费端集群化部署,每条消息只需要被处理一次。
  • 由于消费进度在服务端维护,可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

广播模式

  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 每条消息都需要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。

广播模式下,消息队列RocketMQ保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。

广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。

广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。

目前仅Java客户端支持广播模式。广播模式下服务端不维护消费进度,所以消息队列RocketMQ控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/390389.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【研究生复试】计算机软件工程人工智能研究生复试——资料整理(速记版)——数据库

1、JAVA 2、计算机网络 3、计算机体系结构 4、数据库 5、计算机租场原理 6、软件工程 7、大数据 8、英文 自我介绍 4. 数据库 1. B树相对于B树的区别及优势 B树中有重复元素&#xff0c;B树没有重复元素B树种每个节点都存储了key和data&#xff0c;B树内节点去掉了其中指向数…

数学实验第三版(主编:李继成 赵小艳)课后练习答案(十一)(1)(2)(3)

目录 实验十一&#xff1a;非线性方程&#xff08;组&#xff09;求解 练习一 练习二 练习三 实验十一&#xff1a;非线性方程&#xff08;组&#xff09;求解 练习一 1.求莱昂纳多方程 的解 clc;clear; p[1,2,10,-20]; roots(p)ans -1.6844 3.4313i -1.6844 - 3.4313i…

数据结构与算法:二叉树(寻找最近公共祖先、寻找后继节点、序列化和反序列化、折纸问题的板子和相关力扣题目)

最近公共祖先 第一版&#xff08;前提&#xff1a;p和q默认存在于这棵树中&#xff09; 可以层序遍历每个节点时用个HashMap存储该结点和其直接父节点的信息。然后从p开始溯源&#xff0c;将所有的父节点都添加到一个HashSet集合里。然后从q开始溯源&#xff0c;每溯源一步看…

题目:3.神奇的数组(蓝桥OJ 3000)

问题描述&#xff1a; 解题思路&#xff1a; 官方&#xff1a; 我的总结&#xff1a; 利用双指针遍历每个区间并判断是否符合条件&#xff1a;若一个区间符合条件则该区间在其左端点不变的情况下的每一个子区间都符合条件&#xff0c;相反若一个区间内左端点固定情况下有一个以…

ssm的网上招聘系统(有报告)。Javaee项目。ssm项目。

演示视频&#xff1a; ssm的网上招聘系统&#xff08;有报告&#xff09;。Javaee项目。ssm项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring SpringMv…

什么是位段?位段的作用是什么?他与结构体有什么关系?

目录 1.什么是位段&#xff1f; 2.位段的内存分配 判断当前机器位段的内存分配形式 1.什么是位段&#xff1f; 位段的声明和结构是类似的&#xff0c;有两个不同&#xff1a; 1.位段的成员必须是 int、unsigned int 或signed int或char 。 2.位段的成员名后边有一个冒号和…

Leetcode3010. 将数组分成最小总代价的子数组 I

Every day a Leetcode 题目来源&#xff1a;3010. 将数组分成最小总代价的子数组 I 题目描述&#xff1a; 给你一个长度为 n 的整数数组 nums 。 一个数组的代价是它的第一个元素。比方说&#xff0c;[1,2,3] 的代价是 1 &#xff0c;[3,4,1] 的代价是 3 。 你需要将 num…

C语言---指针进阶

1.字符指针 int main() {char str1[] "hello world";char str2[] "hello world";const char* str3 "hello world.";const char* str4 "hello world.";if (str3 str4){//常量字符串在内存里面是无法修改的&#xff0c;所以没必要…

“分布式透明化”在杭州银行核心系统上线之思考

导读 随着金融行业数字化转型的需求&#xff0c;银行核心系统的升级改造成为重要议题。杭州银行成功上线以 TiDB 为底层数据库的新一代核心业务系统&#xff0c;该实践采用应用与基础设施解耦、分布式透明化的设计开发理念&#xff0c;推动银行核心系统的整体升级。 本文聚焦…

【Effective Objective - C 2.0】——读书笔记(五)

文章目录 二十九、理解引用计数三十、以ARC简化引用计数三十一、在dealloc方法中只释放引用并解除监听三十二、编写异常安全代码时留意内存管理问题三十三、以弱引用避免保留环三十四、以”自动释放池块“降低内存峰值三十五、用"僵尸对象"调试内存管理问题三十六、不…

【C语言】实现队列

目录 &#xff08;一&#xff09;队列 &#xff08;二&#xff09;头文件 &#xff08;三&#xff09; 功能实现 &#xff08;1&#xff09;初始化 &#xff08;2&#xff09; 销毁队列 &#xff08;3&#xff09; 入队 &#xff08;4&#xff09;出队 &#xff08;5&a…

论文阅读:四足机器人对抗运动先验学习稳健和敏捷的行走

论文&#xff1a;Learning Robust and Agile Legged Locomotion Using Adversarial Motion Priors 进一步学习&#xff1a;AMP&#xff0c;baseline方法&#xff0c;TO 摘要&#xff1a; 介绍了一种新颖的系统&#xff0c;通过使用对抗性运动先验 (AMP) 使四足机器人在复杂地…

luigi,一个好用的 Python 数据管道库!

🏷️个人主页:鼠鼠我捏,要死了捏的主页 🏷️付费专栏:Python专栏 🏷️个人学习笔记,若有缺误,欢迎评论区指正 前言 大家好,今天为大家分享一个超级厉害的 Python 库 - luigi。 Github地址:https://github.com/spotify/luigi 在大数据时代,处理海量数据已经成…

NVIDIA 刚刚揭秘了他们的最新大作——Eos,一台跻身全球十强的超级计算机

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

android获取sha1

1.cmd在控制台获取 切换到Android Studio\jre\bin目录下执行keytool -list -v -keystore 签名文件路径例如&#xff1a; 2.也可以在android studio中获取 在Terminal中输入命令&#xff1a;keytool -list -v -keystore 签名文件路径获取 获取到的sha1如下&#xff1a;

ubuntu屏幕小的解决办法

1. 安装vmware tools , 再点自适应客户机 执行里面的vmware-install.pl这个文件 &#xff1a;sudo ./vmware-install.pl 执行不了可以放到家目录&#xff0c;我放在了/home/book 里面 最后点这个自适应客户机 然后我这里点不了是因为我点了控制台视图和拉伸客户机&#xff0c…

ClickHouse--07--Integration 系列表引擎

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 Integration 系列表引擎1 HDFS1.1 语法1.2 示例&#xff1a; 2 MySQL2.1 语法2.2 示例&#xff1a; 3 Kafka3.1 语法3.2 示例&#xff1a;3.3 数据持久化方法 Integ…

Leetcode 94.二叉树的中序遍历

题目 给定一个二叉树的根节点 root &#xff0c;返回 它的中序 遍历 。 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[1,3,2] 进阶: 递归算法很简单&#xff0c;你可以通过迭代算法完成吗&#xff1f; 递归遍历 递归遍历就是一个很简单的中序遍历 /*** Definitio…

LEETCODE 69. x 的平方根

class Solution { public:int mySqrt(int x) {int left0;int rightx;int midleft(right-left)/2;int ans-1;while(left<right){midleft(right-left)/2;if((long long)mid*mid<x){ansmid;leftmid1;}else{rightmid-1;}}return ans;} };*(long long)

GPT SOVITS项目 一分钟克隆 (文字输出)

步骤流程&#xff1a;&#xff08;首先使用UVR 提取人声文件&#xff0c;然后按下面步骤进行&#xff09; 注意这里提交的音频是参考的音频