# 消息中间件 RocketMQ 高级功能和源码分析(十)

消息中间件 RocketMQ 高级功能和源码分析(十)

一、消息中间件 RocketMQ 源码分析: 消息消费概述

1、集群模式和广播模式

消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费者组可订阅多个主题,消费组之间有集群模式和广播模式两种消费模式。

  • 集群模式,主题下的同一条消息只允许被其中一个消费者消费。
  • 广播模式,主题下的同一条消息,将被集群内的所有消费者消费一次。

2、消息服务器与消费者之间的消息传递也有两种模式:推模式、拉模式。

所谓的拉模式,是消费端主动拉起拉消息请求,
而推模式是消息达到消息服务器后,推送给消息消费者。

RocketMQ 消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。

3、集群模式下,多个消费者如何对消息队列进行负载呢?消息队列负载机制遵循一个通用思想:一个消息队列同一个时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。

4、RocketMQ 支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。不支持消息全局顺序消费,如果要实现某一个主题的全局顺序消费,可以将该主题的队列数设置为1,牺牲高可用性。

二、消息中间件 RocketMQ 源码分析: 消息消费初探

1、消息推送模式

消息推送 示例图:

2、 消息消费重要方法


void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName):发送消息确认
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) :获取消费者对主题分配了那些消息队列
void registerMessageListener(final MessageListenerConcurrently messageListener):注册并发事件监听器
void registerMessageListener(final MessageListenerOrderly messageListener):注册顺序消息事件监听器
void subscribe(final String topic, final String subExpression):基于主题订阅消息,消息过滤使用表达式
void subscribe(final String topic, final String fullClassName,final String filterClassSource):基于主题订阅消息,消息过滤使用类模式
void subscribe(final String topic, final MessageSelector selector) :订阅消息,并指定队列选择器
void unsubscribe(final String topic):取消消息订阅

3、 DefaultMQPushConsumer

在这里插入图片描述


//消费者组
private String consumerGroup;	
//消息消费模式
private MessageModel messageModel = MessageModel.CLUSTERING;	
//指定消费开始偏移量(最大偏移量、最小偏移量、启动时间戳)开始消费
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
//集群模式下的消息队列负载策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
//订阅信息
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
//消息业务监听器
private MessageListener messageListener;
//消息消费进度存储器
private OffsetStore offsetStore;
//消费者最小线程数量
private int consumeThreadMin = 20;
//消费者最大线程数量
private int consumeThreadMax = 20;
//并发消息消费时处理队列最大跨度
private int consumeConcurrentlyMaxSpan = 2000;
//每1000次流控后打印流控日志
private int pullThresholdForQueue = 1000;
//推模式下任务间隔时间
private long pullInterval = 0;
//推模式下任务拉取的条数,默认32条
private int pullBatchSize = 32;
//每次传入MessageListener#consumerMessage中消息的数量
private int consumeMessageBatchMaxSize = 1;
//是否每次拉取消息都订阅消息
private boolean postSubscriptionWhenPull = false;
//消息重试次数,-1代表16次
private int maxReconsumeTimes = -1;
//消息消费超时时间
private long consumeTimeout = 15;

三、消息中间件 RocketMQ 源码分析: 消息消费启动流程

1、消息消费启动流程 示例图:

在这里插入图片描述

2、 代码:DefaultMQPushConsumerImpl#start


public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            
                this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;
			//检查消息者是否合法
            this.checkConfig();
			//构建主题订阅信息
            this.copySubscription();
			//设置消费者客户端实例名称为进程ID
            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }
			//创建MQClient实例
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
			//构建rebalanceImpl
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactor
            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookLis
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
           		switch (this.defaultMQPushConsumer.getMessageModel()) {
               
           	    case BROADCASTING:	 //消息消费广播模式,将消费进度保存在本地
           	        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
           	            break;
           	        case CLUSTERING:	//消息消费集群模式,将消费进度保存在远端Broker
           	            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
           	            break;
           	        default:
           	            break;
           	    }
           	    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
           	}
            this.offsetStore.load
            //创建顺序消息消费服务
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                //创建并发消息消费服务
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }
            //消息消费服务启动
            this.consumeMessageService.start();
            //注册消费者实例
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown();
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            //启动消费者客户端
            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
            case RUNNING:
            case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    this.mQClientFactory.checkClientInBroker();
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    this.mQClientFactory.rebalanceImmediately();
}

四、消息中间件 RocketMQ 源码分析:消息拉取介绍

1、消息拉取 介绍

  • 消息消费模式有两种模式:广播模式与集群模式。广播模式比较简单,每一个消费者需要拉取订阅主题下所有队列的消息。

  • 在集群模式下,同一个消费者组内有多个消息消费者,同一个主题存在多个消费队列,消费者通过负载均衡的方式消费消息。

  • 消息队列负载均衡,通常的作法是一个消息队列在同一个时间只允许被一个消费消费者消费,一个消息消费者可以同时消费多个消息队列。

2、 PullMessageService 实现机制

从 MQClientInstance 的启动流程中可以看出,RocketMQ 使用一个单独的线程 PullMessageService 来负责消息的拉取。

在这里插入图片描述

3、 代码:PullMessageService#run


public void run() {
    log.info(this.getServiceName() + " service started");
	//循环拉取消息
    while (!this.isStopped()) {
        try {
            //从请求队列中获取拉取消息请求
            PullRequest pullRequest = this.pullRequestQueue.take();
            //拉取消息
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

4、 PullRequest

在这里插入图片描述


private String consumerGroup;	//消费者组
private MessageQueue messageQueue;	//待拉取消息队列
private ProcessQueue processQueue;	//消息处理队列
private long nextOffset;	//待拉取的MessageQueue偏移量
private boolean lockedFirst = false;	//是否被锁定

5、 代码:PullMessageService#pullMessage


private void pullMessage(final PullRequest pullRequest) {
    //获得消费者实例
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        //强转为推送模式消费者
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        //推送消息
        impl.pullMessage(pullRequest);
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}

6、 ProcessQueue 实现机制

ProcessQueue 是 MessageQueue 在消费端的重现、快照。PullMessageService 从消息服务器默认每次拉取 32 条消息,按照消息的队列偏移量顺序存放在 ProcessQueue 中,PullMessageService 然后将消息提交到消费者消费线程池,消息成功消费后从 ProcessQueue 中移除。

在这里插入图片描述

7、 属性


//消息容器
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
//读写锁
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
//ProcessQueue总消息树
private final AtomicLong msgCount = new AtomicLong();
//ProcessQueue队列最大偏移量
private volatile long queueOffsetMax = 0L;
//当前ProcessQueue是否被丢弃
private volatile boolean dropped = false;
//上一次拉取时间戳
private volatile long lastPullTimestamp = System.currentTimeMillis();
//上一次消费时间戳
private volatile long lastConsumeTimestamp = System.currentTimeMillis();

8、 方法


//移除消费超时消息
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer)
//添加消息
public boolean putMessage(final List<MessageExt> msgs)
//获取消息最大间隔
public long getMaxSpan()
//移除消息
public long removeMessage(final List<MessageExt> msgs)
//将consumingMsgOrderlyTreeMap中消息重新放在msgTreeMap,并清空consumingMsgOrderlyTreeMap   
public void rollback() 
//将consumingMsgOrderlyTreeMap消息清除,表示成功处理该批消息
public long commit()
//重新处理该批消息
public void makeMessageToCosumeAgain(List<MessageExt> msgs) 
//从processQueue中取出batchSize条消息
public List<MessageExt> takeMessags(final int batchSize)

五、消息中间件 RocketMQ 源码分析: 客户端发起拉取消息请求

1、消息拉取基本流程 示例图:

在这里插入图片描述

2、客户端发起拉取请求

代码:DefaultMQPushConsumerImpl#pullMessage


public void pullMessage(final PullRequest pullRequest) {
    //从pullRequest获得ProcessQueue
    final ProcessQueue processQueue = pullRequest.getProcessQueue();
    //如果处理队列被丢弃,直接返回
    if (processQueue.isDropped()) {
        log.info("the pull request[{}] is dropped.", pullRequest.toString());
        return;
    }
	//如果处理队列未被丢弃,更新时间戳
    pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

    try {
        this.makeSureStateOK();
    } catch (MQClientException e) {
        log.warn("pullMessage exception, consumer state not ok", e);
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        return;
    }
	//如果处理队列被挂起,延迟1s后再执行
    if (this.isPause()) {
        log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
        return;
    }
	//获得最大待处理消息数量
	long cachedMessageCount = processQueue.getMsgCount().get();
    //获得最大待处理消息大小
	long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
	//从数量进行流控
	if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
	    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
	    if ((queueFlowControlTimes++ % 1000) == 0) {
	        log.warn(
	            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
	            this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
	    }
	    return;
	}
	//从消息大小进行流控
	if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
	    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
	    if ((queueFlowControlTimes++ % 1000) == 0) {
	        log.warn(
	            "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
	            this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
	    }
	    return;
    }
    	//获得订阅信息
		 final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
    	if (null == subscriptionData) {
    	    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    	    log.warn("find the consumer's subscription failed, {}", pullRequest);
    	    return;
		//与服务端交互,获取消息
	    this.pullAPIWrapper.pullKernelImpl(
	    pullRequest.getMessageQueue(),
	    subExpression,
	    subscriptionData.getExpressionType(),
	    subscriptionData.getSubVersion(),
	    pullRequest.getNextOffset(),
	    this.defaultMQPushConsumer.getPullBatchSize(),
	    sysFlag,
	    commitOffsetValue,
	    BROKER_SUSPEND_MAX_TIME_MILLIS,
	    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
	    CommunicationMode.ASYNC,
	    pullCallback
	);
            
}

六、消息中间件 RocketMQ 源码分析: Broker 组装消息

1、消息服务端 Broker 组装消息 示例图:

在这里插入图片描述

2、 代码:PullMessageProcessor#processRequest


//构建消息过滤器
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
    messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
        this.brokerController.getConsumerFilterManager());
} else {
    messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
        this.brokerController.getConsumerFilterManager());
}
//调用MessageStore.getMessage查找消息
final GetMessageResult getMessageResult =
    this.brokerController.getMessageStore().getMessage(
    				requestHeader.getConsumerGroup(), //消费组名称								
    				requestHeader.getTopic(),	//主题名称
        			requestHeader.getQueueId(), //队列ID
    				requestHeader.getQueueOffset(), 	//待拉取偏移量
    				requestHeader.getMaxMsgNums(), 	//最大拉取消息条数
    				messageFilter	//消息过滤器
    		);

3、 代码:DefaultMessageStore#getMessage


GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;	//查找下一次队列偏移量
long minOffset = 0;		//当前消息队列最小偏移量
long maxOffset = 0;		//当前消息队列最大偏移量
GetMessageResult getResult = new GetMessageResult();
final long maxOffsetPy = this.commitLog.getMaxOffset();	//当前commitLog最大偏移量
//根据主题名称和队列编号获取消息消费队列
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);

...
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
//消息偏移量异常情况校对下一次拉取偏移量
if (maxOffset == 0) {	//表示当前消息队列中没有消息
    status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
    nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) {	//待拉取消息的偏移量小于队列的其实偏移量
    status = GetMessageStatus.OFFSET_TOO_SMALL;
    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) {	//待拉取偏移量为队列最大偏移量
    status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
    nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {	//偏移量越界
    status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
    if (0 == minOffset) {
        nextBeginOffset = nextOffsetCorrection(offset, minOffset);
    } else {
        nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
    }
}
...
//根据偏移量从CommitLog中拉取32条消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);

4、 代码:PullMessageProcessor#processRequest


//根据拉取结果填充responseHeader
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

//判断如果存在主从同步慢,设置下一次拉取任务的ID为主节点
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
    case ASYNC_MASTER:
    case SYNC_MASTER:
        break;
    case SLAVE:
        if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
            responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
        }
        break;
}
...
//GetMessageResult与Response的Code转换
switch (getMessageResult.getStatus()) {
    case FOUND:			//成功
        response.setCode(ResponseCode.SUCCESS);
        break;
    case MESSAGE_WAS_REMOVING:	//消息存放在下一个commitLog中
        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);	//消息重试
        break;
    case NO_MATCHED_LOGIC_QUEUE:	//未找到队列
    case NO_MESSAGE_IN_QUEUE:	//队列中未包含消息
        if (0 != requestHeader.getQueueOffset()) {
            response.setCode(ResponseCode.PULL_OFFSET_MOVED);
            requestHeader.getQueueOffset(),
            getMessageResult.getNextBeginOffset(),
            requestHeader.getTopic(),
            requestHeader.getQueueId(),
            requestHeader.getConsumerGroup()
            );
        } else {
            response.setCode(ResponseCode.PULL_NOT_FOUND);
        }
        break;
    case NO_MATCHED_MESSAGE:	//未找到消息
        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
        break;
    case OFFSET_FOUND_NULL:	//消息物理偏移量为空
        response.setCode(ResponseCode.PULL_NOT_FOUND);
        break;
    case OFFSET_OVERFLOW_BADLY:	//offset越界
        response.setCode(ResponseCode.PULL_OFFSET_MOVED);
        // XXX: warn and notify me
        log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
                requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
        break;
    case OFFSET_OVERFLOW_ONE:	//offset在队列中未找到
        response.setCode(ResponseCode.PULL_NOT_FOUND);
        break;
    case OFFSET_TOO_SMALL:	//offset未在队列中
        response.setCode(ResponseCode.PULL_OFFSET_MOVED);
        requestHeader.getConsumerGroup(), 
        requestHeader.getTopic(), 
        requestHeader.getQueueOffset(),
        getMessageResult.getMinOffset(), channel.remoteAddress());
        break;
    default:
        assert false;
        break;
}
...
//如果CommitLog标记可用,并且当前Broker为主节点,则更新消息消费进度
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
    && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
    this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
        requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}

七、消息中间件 RocketMQ 源码分析: 消息拉取客户端处理服务端响应

1、消息拉取客户端处理消息 示例图:

在这里插入图片描述

2、 代码:MQClientAPIImpl#processPullResponse


private PullResult processPullResponse(
    final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
    PullStatus pullStatus = PullStatus.NO_NEW_MSG;
   	//判断响应结果
    switch (response.getCode()) {
        case ResponseCode.SUCCESS:
            pullStatus = PullStatus.FOUND;
            break;
        case ResponseCode.PULL_NOT_FOUND:
            pullStatus = PullStatus.NO_NEW_MSG;
            break;
        case ResponseCode.PULL_RETRY_IMMEDIATELY:
            pullStatus = PullStatus.NO_MATCHED_MSG;
            break;
        case ResponseCode.PULL_OFFSET_MOVED:
            pullStatus = PullStatus.OFFSET_ILLEGAL;
            break;

        default:
            throw new MQBrokerException(response.getCode(), response.getRemark());
    }
	//解码响应头
    PullMessageResponseHeader responseHeader =
        (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
	//封装PullResultExt返回
    return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
        responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}

3、 PullResult 类


private final PullStatus pullStatus;	//拉取结果
private final long nextBeginOffset;	//下次拉取偏移量
private final long minOffset;	//消息队列最小偏移量
private final long maxOffset;	//消息队列最大偏移量
private List<MessageExt> msgFoundList;	//拉取的消息列表

在这里插入图片描述

4、 代码:DefaultMQPushConsumerImpl$PullCallback#OnSuccess


//将拉取到的消息存入processQueue
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//将processQueue提交到consumeMessageService中供消费者消费
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);
//如果pullInterval大于0,则等待pullInterval毫秒后将pullRequest对象放入到PullMessageService中的pullRequestQueue队列中
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}

八、消息中间件 RocketMQ 源码分析: 拉取消息的流程小结

### 1、消息拉取流程总结

上一节关联链接请点击:
# 消息中间件 RocketMQ 高级功能和源码分析(九)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/732080.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

萨科微slkor宋仕强论道华强北假货之六

萨科微slkor宋仕强论道华强北假货之六&#xff0c;华强北的假货这么多&#xff0c;搞得客户害怕、同行焦虑&#xff0c;话说“在华强北没有被坑过的&#xff0c;就不是华强北人”。我们金航标Kinghelm&#xff08;www.kinghelm.com.cn&#xff09;公司以前有一个贸易部&#xf…

【单片机】MSP430G2553单片机 Could not find MSP-FET430UIF on specified COM port 解决方案

文章目录 MSP430G2553开发板基础知识解决办法如何实施解决办法4步骤一步骤二步骤三 MSP430G2553开发板基础知识 MSP430G2553开发板如下图&#xff0c;上半部分就是UIF程序下载调试区域的硬件。个人觉得MSP430G2553开发板的这个部分没有做好硬件设计&#xff0c;导致很多系统兼…

三相光伏逆变并网电流电压双闭环仿真

三相并网发电系统的拓扑结构图展示了系统的基本构成和连接方式。图中&#xff0c;&#x1d456;&#x1d451;&#x1d450;1为直流输入电源&#xff0c;&#x1d436;1为输入直流母线滤波电容&#xff0c;&#x1d447;1~&#x1d447;6为三相逆变桥的6个IGBT开关管。这些开关…

MyBatis系列六: 映射关系多对一

动态SQL语句-更复杂的查询业务需求 官方文档基本介绍映射方式配置Mapper.xml的方式-应用实例注解的方式实现-应用实例课后练习 官方文档 文档地址: https://mybatis.org/mybatis-3/zh_CN/sqlmap-xml.html 基本介绍 ●基本介绍 1.项目中多对1的关系是一个基本的映射关系, 也可…

搜索python包的说明

当我发现bug时&#xff0c;就怀疑是sns包的版本问题了&#xff08;原代码是原作者以前成功运行的代码&#xff09;&#xff0c;于是直接到网上搜&#xff0c;找到对应的说明文档 根据该示例代码进行改写&#xff1a; 达成目的。

Harbor本地仓库搭建003_Harbor常见错误解决_以及各功能使用介绍_镜像推送和拉取---分布式云原生部署架构搭建003

首先我们去登录一下harbor,但是可以看到,用户名密码没有错,但是登录不上去 是因为,我们用了负债均衡,nginx会把,负载均衡进行,随机分配,访问的 是harbora,还是harborb机器. loadbalancer中 解决方案,去loadbalance那个机器中,然后 这里就是25机器,我们登录25机器 然后去配置…

【尚庭公寓SpringBoot + Vue 项目实战】预约看房与租约管理(完结)

【尚庭公寓SpringBoot Vue 项目实战】预约看房与租约管理&#xff08;完结&#xff09; 文章目录 【尚庭公寓SpringBoot Vue 项目实战】预约看房与租约管理&#xff08;完结&#xff09;1、业务说明2、接口开发2.1、预约看房管理2.1.1.保存或更新看房预约2.1.2. 查询个人预约…

首个AI高考全卷评测结果出分,大模型“考生”表现如何?

内容提要 大部分大模型“考生”语文、英语科目表现良好&#xff0c;但在数学方面还有待加强。阅卷老师点评&#xff0c;在语文科目上&#xff0c;对于语言中的一些“潜台词”&#xff0c;大模型尚无法完全理解。在数学科目上&#xff0c;大模型的主观题回答相对凌乱&#xff0…

2005年上半年软件设计师【下午题】试题及答案

文章目录 2005年上半年软件设计师下午题--试题2005年上半年软件设计师下午题--答案2005年上半年软件设计师下午题–试题

力扣每日一题 6/22 字符串/贪心

博客主页&#xff1a;誓则盟约系列专栏&#xff1a;IT竞赛 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ 2663.字典序最小的美丽字符串【困难】 题目&#xff1a; 如果一个字符串满…

NLP大语言模型的缩放定律

一、简述 ​论文《神经语言模型的缩放定律》包含对交叉熵损失的语言模型性能的经验缩放定律的研究&#xff0c;重点关注Transformer架构。 https://arxiv.org/pdf/2001.08361.pdfhttps://arxiv.org/pdf/2001.08361.pdf 实验表明&#xff0c;测试损失与模型大小、数据集…

基于STM8系列单片机驱动74HC595驱动两个3位一体的数码管

1&#xff09;单片机/ARM硬件设计小知识&#xff0c;分享给将要学习或者正在学习单片机/ARM开发的同学。 2&#xff09;内容属于原创&#xff0c;若转载&#xff0c;请说明出处。 3&#xff09;提供相关问题有偿答疑和支持。 为了节省单片机MCU的IO口资源驱动6个数码管&…

STM32单片机USART串口打印和收发数据

文章目录 1. 串口通信 1.1 串口初始化 1.2 库函数 2. 串口打印 2.1 Serial.c 2.2 Serial.h 2.3 main.c 3. 串口收发数据 3.1 Serial.c 3.2 Serial.h 3.3 main.c 1. 串口通信 对于串口通信的详细解析可以看下面这篇文章 STM32单片机USART串口详解-CSDN博客 STM32单片…

基于java+springboot+vue实现的智慧生活商城系统(文末源码+Lw)244

摘 要 计算机网络发展到现在已经好几十年了&#xff0c;在理论上面已经有了很丰富的基础&#xff0c;并且在现实生活中也到处都在使用&#xff0c;可以说&#xff0c;经过几十年的发展&#xff0c;互联网技术已经把地域信息的隔阂给消除了&#xff0c;让整个世界都可以即时通…

数据中心:AI范式下的内存挑战与机遇

在过去的十年里&#xff0c;数据中心和服务器行业经历了前所未有的扩张&#xff0c;这一进程伴随着CPU核心数量、内存带宽(BW)&#xff0c;以及存储容量的显著增长。这种超大规模数据中心的扩张不仅带来了对计算能力的急剧需求&#xff0c;也带来了前所未有的内存功率密度挑战&…

BigDataCloud 反向地理编码

在当今数字化飞速发展的时代&#xff0c;地理信息的精确获取和游戏数据的深入分析成为众多领域的关键需求。2024 年的今天&#xff0c;技术的创新为我们带来了更为出色的 API 服务。BigDataCloud 反向地理编码服务&#xff0c;能够将经纬度迅速而准确地转换为详细位置信息&…

iOS 中,autoreleasepool 的底层实现

在 iOS 中&#xff0c;autoreleasepool 的底层实现基于 Objective-C 运行时&#xff08;runtime&#xff09;和内存管理机制。 图解说明 Objective-C Runtime 和 Autoreleasepool 的创建 在 Objective-C 中&#xff0c;每次进入一个 autoreleasepool 块时&#xff0c;都会创建…

MySQL之复制(十)

复制 改变主库 确定期望的日志位置 如果有备库和新主库的位置不相同&#xff0c;则需要找到该备库最后一条执行的时间在新主库的二进制日志中相应的位置&#xff0c;然后再执行CHANGE MASTER TO.可以通过mysqlbinlog工具来找到备库执行的最后一条查询&#xff0c;然后在主库上…

宇宙星空星辰美景素材哪里找?高清无水印分享

宇宙星空的美丽总能激发人们的无限遐想和灵感&#xff0c;不仅在科学教育领域&#xff0c;更在电影制作和视觉艺术中占有一席之地。为了帮助您找到高质量的宇宙星空视频素材&#xff0c;以下平台将成为您获取令人难忘天体视频素材的首选。 蛙学府 蛙学府作为新媒体创作者的宝库…

FEP容量瓶生产厂商半导体行业耐强酸强碱耐高低温

FEP容量瓶&#xff0c;氟四六容量瓶&#xff0c;特氟龙容量瓶&#xff0c;耐腐蚀耐高温。广泛应用于ICP-MS、ICP-OES等痕量分析以及同位素分析等实验。地质、电子化学品、半导体分析测试、疾控中心、制药厂、环境检测中心等一些机构定容用。 规格参考&#xff1a;10ml、25ml、5…