RocketMQ5.0Pop消费模式

前言

RocketMQ 5.0 消费者引入了一种新的消费模式:Pop 消费模式,目的是解决 Push 消费模式的一些痛点。
RocketMQ 4.x 之前,消费模式分为两种:

  • Pull:拉模式,消费者自行拉取消息、上报消费结果
  • Push:推模式,消费者主动拉取消息,看起来像是 Broker 主动推送消息,主动上报消费结果

Pop 消费模式可以看作是 Push 的升级版,Push 消费模式存在以下痛点:

  • 客户端负责队列重平衡、消息拉取、消费位点上报、消费失败重试等功能,逻辑太重了,不利于多语言客户端发展
  • 队列数/消费者数量变更会触发重平衡操作,期间消息无法消费,容易造成消息堆积
  • 队列和消费者强绑定,消费者数量超过队列数后,无法再水平扩容
  • 队列和消费者强绑定,消费者僵死状态下导致消息堆积

Pop 消费模式就是要解决这些痛点的,它的设计目标:

  • 消费者只管消息拉取、消息消费、上报 ACK,客户端 SDK 轻量级
  • 队列不再绑定消费者,消费者可以消费所有队列消息
  • 消费者可以很方便的水平扩容

要实现这个目标还是有不小的挑战,看看 RocketMQ 是如何做到的吧。

设计难点

Pop 消费模式是存在一些设计难点的。
Push 模式下队列为什么要和消费者绑定?也就是一个队列同一时间只允许被一个消费者消费,因为这样实现起来简单啊,主要体现在:

  • Broker 消息投递处理简单,根据消费者请求的拉取位点投递
  • Broker 消费位点管理简单,无需记录消息是否被消费,只需要记录消费位点,位点前的都消费了,位点后的都没消费

如果改成 Pop 模式,首先面临的挑战有:
1、Broker 消息投递时,要记录哪些消息已经投递了,哪些消息还没投递
Pop 模式下一个队列可以被多个消费者消费,假设现在队列里面有 1~10 号消息,消费者A 拉取了 1~3 号消息,消费者B 再拉取的时候,Broker 必须知道 1~3 号消息已经投递过在消费中了,不能再投递给消费者B了,得投递 4 号及之后的消息给消费者B才行。
image.png

2、Broker 要记录哪些消息消费成功了,哪些消息消费失败了,不能单纯记录消费位点了
因为队列可以被多个消费者消费,大家都在上报队列粒度的消费位点,Broker 没法管理了。

你看,为了可以让队列被多个消费者消费,Broker 已经不能再按队列维度去管理信息了,必须精确到消息粒度,这就需要额外的数据结构来支撑。

设计实现

取消客户端队列 Rebalance
客户端重平衡,不仅限制了消费者的消费能力,还增加了客户端复杂度,重平衡期间消息无法被消费,还容易造成消息堆积。Pop 消费模式下,消费者可以消费所有队列,也就不再需要客户端重平衡了。消费者查询给自己分配的队列时,Proxy 返回的是 Broker 维度且 queueId=-1 的逻辑队列,Broker 端会投递所有队列的消息。
image.png

invisibleTime
Pop 消费模式下,消息投递后会有一个invisibleTime的概念,即消息的不可见时间,默认是 60 秒。比如 M1 投递给消费者A后,在不可见时间段内,其它消费者是无法消费这条消息的。
image.png
另外 Broker 还提供了changeInvisibleTime()接口修改单条消息的不可见时间,比如消息消费失败后,会根据重试次数来设置新的不可见时间。
image.png

CK & ACK 消息
RocketMQ 为了记录消息是否被消费成功,引入了 CK 和 ACK 消息,以及一个专属的 Topic:rmq_sys_REVIVE_LOG_{clusterName}
消费者在拉取消息时,Broker 端会给拉取到的这一批消息发一个 CK 消息,CK 消息记录了各消息的偏移量可以定位到具体消息,同时用位图记录了各消息的 ACK 情况。Consumer 消费成功后会调用ackMessage接口,Broker 会发送一个对应的 ACK 消息;消费失败后会调用changeInvisibleTime接口,延长消息不可见时间,底层是先发一个旧消息的 ACK 消息,再发一个新消息的 CK 消息。
image.png
上述流程的运转是基于消费者正常处理消费结果的前提下的,如果消费者挂了,既不发 ack 也不发 nack,Broker 又该怎么处理这些消息呢?其实也能正常处理,因为 CK 和 ACK 消息均是延时消息,延迟的时间即消息的不可见时间,CK 消息会提前一秒消费,目的是匹配 ACK 消息。
RocketMQ 会启动八个线程消费 REVIVE Topic 对应的八个队列,匹配 CK 消息里还有哪些消息没被 ack,再将这些没被 ack 的消息发送到 RETRY Topic,消费者就可以重新消费了。

源码

先通过流程图熟悉下 Pop 消费模式的大体流程:
image.png

ProcessQueue

PushConsumer 启动后,会定时向 Proxy 查询分配的队列:

protected void startUp() throws Exception {
	......
	scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {
        try {
            // 扫描分配的队列
            scanAssignments();
        } catch (Throwable t) {
            log.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);
        }
    }, 1, 5, TimeUnit.SECONDS);
	......
}

Pop 消费模式下,客户端无须知道具体的队列数据,因为 Broker 会投递所有队列消息,所以 Proxy 只返回 Broker 维度且 queueId=-1 的一个逻辑队列即可。
消费者拿到分配的队列后,紧接着调用syncProcessQueue()方法处理队列,有两种情况:

  • 队列不再分配给自己,停止拉取消息,移除队列
  • 新分配的队列,立即拉取消息

对于新分配的队列,消费者会创建ProcessQueue对象开始拉取队列消息:

private void receiveMessageImmediately() {
    // Broker端点列表 gRPC负载均衡调用
    final Endpoints endpoints = mq.getBroker().getEndpoints();
    // 构建请求
    final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression);
	// 发请求
    final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,
                consumer.getPushConsumerSettings().getLongPollingTimeout());
    Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
        @Override
        public void onSuccess(ReceiveMessageResult result) {
            // 处理消息
            onReceiveMessageResult(result);
        }
    }
}

PopMessageProcessor

Proxy 端会开启 gRPC 服务,GrpcMessagingApplication 用来处理客户端的请求,拉取消息对应的处理方法是receiveMessage,最终会交给 Broker 的 PopMessageProcessor 处理,主要步骤:

  • 校验请求参数、队列写权限等等
  • 构建消息过滤器 ExpressionMessageFilter
  • 按照 1/5 的概率先尝试从重试队列里拉取消息
  • 遍历所有队列拉取消息,直到拉取最大消息数
  • 如果普通队列拉取的消息数没达到最大消息数,再尝试拉取重试队列
  • 如果还有剩余消息,通知其它被长轮询挂起的请求继续拉取
  • 返回结果
private RemotingCommand processRequest(final Channel channel, RemotingCommand request)
        throws RemotingCommandException {
    ......// 参数、权限校验
	// Topic配置
	TopicConfig topicConfig =
        this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
	// 消费组订阅配置
	SubscriptionGroupConfig subscriptionGroupConfig =
    	this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
	// 消息过滤
	ExpressionMessageFilter messageFilter = ......

	// POP模式下,消费者要遍历所有队列,随机一个下标开始读取,都从0开始读的话,存在热点队列,竞争锁
	int randomQ = random.nextInt(100);
	int reviveQid; // REVEIE队列ID 顺序消息999 普通消息:0~7轮询
	if (requestHeader.isOrder()) {
        reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE;
    } else {
        reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum());
    }

	int commercialSizePerMsg = this.brokerController.getBrokerConfig().getCommercialSizePerMsg();
	GetMessageResult getMessageResult = new GetMessageResult(commercialSizePerMsg);

	long restNum = 0; // 剩余消息数
	// 平均每5次请求,读取一下重试队列
	boolean needRetry = randomQ % 5 == 0;
	if (needRetry && !requestHeader.isOrder()) {
        restNum = popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid,
                        channel, popTime, messageFilter,
                        startOffsetInfo, msgOffsetInfo, orderCountInfo);
    }
	// POP模式下,请求的queueId=-1,读所有队列
	if (requestHeader.getQueueId() < 0) {
        for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
            int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
            restNum = popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter,
                    startOffsetInfo, msgOffsetInfo, orderCountInfo);
        }
    }
	// 拉取到的消息数量不满足消费者期望的数量,接着拉取重试队列:%RETRY%{consumerGroup}_{topic}
	......

	if (!getMessageResult.getMessageBufferList().isEmpty()) {
        // 拉取到消息,且队列里面还有消息,通知其它拉取请求
        if (restNum > 0) {
            notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
                    requestHeader.getQueueId());
        }
    } else {
        // 没有消息,进入长轮询状态 挂起
        int pollingResult = polling(channel, request, requestHeader);
    }
	......
}

通过队列获取消息的方法是popMsgFromQueue,虽然队列可以被多个消费者消费,但是同一个消费组下,针对同一个队列拉取消息的行为必须保证串行,所以 Broker 首先会构建一个topic@group@queueId格式的字符串作为 lockKey 保证加锁成功,然后再获取消息拉取位点 offset,调用 MessageStore 获取消息,最终给这批消息记录 CK 消息。

private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
        PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,
        Channel channel, long popTime,
        ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
        StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
    String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
            requestHeader.getConsumerGroup()) : requestHeader.getTopic();
    // 给队列加锁 同一消费组下的队列串行读取 topic@group@queueId
    String lockKey =
            topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
    // 加锁
    if (!queueLockManager.tryLock(lockKey)) {
        restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
        return restNum;
    }
    // 拉取位点
    offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
    // 获取消息
    GetMessageResult getMessageTmpResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup()
                    , topic, queueId, offset,
                    requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);
	if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {
        // 追加CheckPoint
        appendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());
    }
    ......
    queueLockManager.unLock(lockKey);
}

image.png
这里的锁实现很简单,通过 AtomicBoolean CAS 修改上锁标记位来判断是否加锁成功:

static class TimedLock {
    private final AtomicBoolean lock;
    private volatile long lockTime;
}

加锁成功后,Broker 得知道该从哪里开始给消费者投递消息,这就是拉取位点的获取:

  • 首先取已提交的消费位点
  • 如果 CK 缓冲区有已投递的 PopCheckPoint,则取缓冲区的拉取位点,避免已经投递过的消息重复投递
private long getPopOffset(String topic, PopMessageRequestHeader requestHeader, int queueId, boolean init,
    String lockKey) {
    // 已提交的消费位点
    long offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
        topic, queueId);
    if (offset < 0) {
        // 还没消费过 判断是从最旧还是最新的开始消费
        if (ConsumeInitMode.MIN == requestHeader.getInitMode()) {
            offset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
        } else {
            // pop last one,then commit offset.
            offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1;
            // max & no consumer offset
            if (offset < 0) {
                offset = 0;
            }
            if (init) {
                this.brokerController.getConsumerOffsetManager().commitOffset("getPopOffset",
                    requestHeader.getConsumerGroup(), topic,
                    queueId, offset);
            }
        }
    }
    // CK缓冲区的位点,这些是已投递、还没提交消费的位点
    long bufferOffset = this.popBufferMergeService.getLatestOffset(lockKey);
    if (bufferOffset < 0) {
        return offset;
    } else {
        return bufferOffset > offset ? bufferOffset : offset;
    }
}

有了拉取位点,接下来就是通过 MessageStore 查询消息了,底层会读取 consumequeue 和 commitlog 文件,这里不赘述。拉取到消息列表后,Broker 会给这一批消息记录一个 CK 消息,用于后续匹配 ACK 消息。

PopCheckPoint

CK 消息对应的类是 PopCheckPoint,主要记录了:消息拉取时间、消息偏移量、不可见时间、消息 ACK 位图等等。

public class PopCheckPoint {
    // 起始偏移量
    @JSONField(name = "so")
    private long startOffset;
    // 消息拉取时间
    @JSONField(name = "pt")
    private long popTime;
    @JSONField(name = "it")
    private long invisibleTime;
    // 位图 收到ACK消息则把对应位设为1
    @JSONField(name = "bm")
    private int bitMap;
    // 消息数量
    @JSONField(name = "n")
    private byte num;
    @JSONField(name = "q")
    private byte queueId;
    @JSONField(name = "t")
    private String topic;
    // 消费组
    @JSONField(name = "c")
    private String cid;
    @JSONField(name = "ro")
    private long reviveOffset;
    // 消息增量偏移量
    @JSONField(name = "d")
    private List<Integer> queueOffsetDiff;
    @JSONField(name = "bn")
    String brokerName;
}

构建好 PopCheckPoint 对象,Broker 会把它作为一个普通消息写入 commitlog 持久化,消息内容:

topic: rmq_sys_REVIVE_LOG_{clusterName}
queueId: 0~7轮询
tag: ck
body: json(PopCheckPoint)
deliverTimeMs: invisibleTime-1s

CK 消息存储完毕后,就可以正常返回消息了。需要注意的是,消息在返回给消费者前,Broker 会给消息设置一个很重要的属性:POP_CK。它是消息关联的 CK 消息的句柄字符串,消费者基于该属性来 ack 消息。

messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
    ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(key), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
        responseHeader.getReviveQid(), messageExt.getTopic(), messageQueue.getBrokerName(), messageExt.getQueueId(), msgQueueOffset)
);

POP_CK由以下八个属性构成,空格连接,通过该属性可以快速定位到 CK 消息。

{ckQueueOffset} {popTime} {invisibleTime} {reviveQid} {0/1} {brokerName} {queueId} {msgQueueOffset}

# 示例值
2 1698656741635 60000 0 0 broker-a 3 2

AckMessageProcessor

消费者消费完消息后,会调用eraseMessage()擦除消息,也就是根据消费结果判断是 ack 还是 nack。

public void eraseMessage(MessageViewImpl messageView, ConsumeResult consumeResult) {
    statsConsumptionResult(consumeResult);
    ListenableFuture<Void> future = ConsumeResult.SUCCESS.equals(consumeResult) ? ackMessage(messageView) :
        nackMessage(messageView);
    future.addListener(() -> evictCache(messageView), MoreExecutors.directExecutor());
}

如果消费成功,则调用ackMessage接口,Broker 的处理方式也很简单,就是构建一个 AckMsg 对象,然后把它作为消息体发一个 ACK 消息。

private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
        boolean brokerAllowSuspend) throws RemotingCommandException {
    ......
	AckMsg ackMsg = new AckMsg();
	ackMsg.setAckOffset(requestHeader.getOffset());
    ackMsg.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfo));
    ackMsg.setConsumerGroup(requestHeader.getConsumerGroup());
    ackMsg.setTopic(requestHeader.getTopic());
    ackMsg.setQueueId(requestHeader.getQueueId());
    ackMsg.setPopTime(ExtraInfoUtil.getPopTime(extraInfo));
    ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));

	if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
        顺序消息处理
    }
	// 构建消息存储
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
	msgInner.setTopic(reviveTopic);
    msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
    msgInner.setQueueId(rqId);
    msgInner.setTags(PopAckConstants.ACK_TAG);
    msgInner.setBornTimestamp(System.currentTimeMillis());
    msgInner.setBornHost(this.brokerController.getStoreHost());
    msgInner.setStoreHost(this.brokerController.getStoreHost());
    // 延时消息 拉取时间+不可见时间
    msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + ExtraInfoUtil.getInvisibleTime(extraInfo));
    msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
	return response;
}

ACK 消息内容如下:

topic: rmq_sys_REVIVE_LOG_{clusterName}
queueId: 0~7轮询
tag: ack
body: json(AckMsg)
deliverTimeMs: invisibleTime

注意:CK & ACK 消息同属一个Topic

ChangeInvisibleTimeProcessor

如果消费失败,会根据重试次数调用changeInvisibleDuration接口延长消息不可见时间。Broker 最终会由 ChangeInvisibleTimeProcessor 处理请求,因为消息是按照紧凑的方式顺序写入 commitlog,所以写入后就不支持修改了,所谓的修改消息不可见时间,其实是先发一个新的 CK 消息,再发一个旧消息的 ACK 消息。

private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
        boolean brokerAllowSuspend) throws RemotingCommandException {
	......
    // add new ck
    long now = System.currentTimeMillis();
    PutMessageResult ckResult = appendCheckPoint(requestHeader, ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(), requestHeader.getOffset(), now, ExtraInfoUtil.getBrokerName(extraInfo));

	// ack old msg
	ackOrigin(requestHeader, extraInfo);
}

所以,无论消费者是调用 ack 还是 nack,消息一定会被 ack 掉。另外还有一种场景就是消费者异常,没有上报消费结果,Broker 在消息不可见时间到期后会把这些没被 ack 掉的消息发到重试队列里,让其它消费者消费。

PopReviveService

CK & ACK 消息存储下来以后,由谁来处理呢?AckMessageProcessor 会开启八个线程,消费 REVIVE Topic 的八个队列,线程对应的服务是 PopReviveService。
PopReviveService 线程每秒执行一次,消费 REVIVE Topic 里的消息。如果是 CK 消息则重新构建 PopCheckPoint 对象;如果是 ACK 消息则更新 PopCheckPoint 里的位图,通过 tag 来区分消息类型。
最后处理 PopCheckPoint,已经被 ack 的消息不做处理,未被 ack 的消息会构建一条新消息重新发送到重试队列,Topic 规则是:%RETRY%{consumerGroup}_{topic}
image.png

public void run() {
    ......
	ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj();
	// 消费Revive队列消息,构建PopCheckPoint放入map
	consumeReviveMessage(consumeReviveObj);
	// 处理PopCheckPoint
	mergeAndRevive(consumeReviveObj);
}
protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
    HashMap<String, PopCheckPoint> map = consumeReviveObj.map;
	// 消费位点
	long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(PopAckConstants.REVIVE_GROUP, reviveTopic, queueId);
	while (true) {
        // 查询REVIVE队列消息
        List<MessageExt> messageExts = getReviveMessage(offset, queueId);
        for (MessageExt messageExt : messageExts) {
            //ck消息
            if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {
                String raw = new String(messageExt.getBody(), DataConverter.charset);
                PopCheckPoint point = JSON.parseObject(raw, PopCheckPoint.class);
                map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);
            } else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {
                // ack消息
                String raw = new String(messageExt.getBody(), DataConverter.charset);
                AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
                PopCheckPoint point = map.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime());
                // 设置CheckPoint ACK位图
                int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
                if (indexOfAck > -1) {
                    point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));
                } else {
                    POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point);
                }
            }        
        }
    }
}

CK 消息到期后,会触发reviveMsgFromCk恢复没有被 ack 的消息,处理方式是构建新消息发到重试队列。

private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable {
    for (int j = 0; j < popCheckPoint.getNum(); j++) {
        if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) {
            // 已经ack了,跳过
            continue;
        }
        // 查询实际消息
        long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
        MessageExt messageExt = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName());
        if (messageExt == null) {
            POP_LOGGER.warn("reviveQueueId={},can not get biz msg topic is {}, offset is {} , then continue ",
                queueId, popCheckPoint.getTopic(), msgOffset);
            continue;
        }
        //skip ck from last epoch
        if (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {
            POP_LOGGER.warn("reviveQueueId={},skip ck from last epoch {}", queueId, popCheckPoint);
            continue;
        }
        // 重试 构建一条消息重新Put到commitlog topic:%RETRY%{consumerGroup}_{topic}
        reviveRetry(popCheckPoint, messageExt);
    }
}

PopBufferMergeService

为了记录消息的 ack 状态,RocketMQ 需要存储额外的 CK & ACK 消息,开销还是比较大的。而消息存储的目的是为了匹配消息的 ack 状态,把未被 ack 的消息发送到重试队列里。
消息消费一般是很快的,意味着正常情况下,很快就能收到消费者发过来的 ack 和 nack 请求,那为什么不直接在内存里完成匹配呢?这样做可以大幅提升性能,消息落盘只作为一个兜底方案。
RocketMQ 是支持优先在内存里匹配 ack 消息的,默认是关闭状态,需要手动把enablePopBufferMerge打开。
开启内存匹配后,PopCheckPoint 会优先只追加到缓冲区,只有当缓冲区添加失败才会落地磁盘。

public boolean addCk(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {
    // key: point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt()
    if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {
        // 启用内存匹配
        return false;
    }
    if (!serving) {
        return false;
    }

    long now = System.currentTimeMillis();
    if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {
        if (brokerController.getBrokerConfig().isEnablePopLog()) {
            POP_LOGGER.warn("[PopBuffer]add ck, timeout, {}, {}", point, now);
        }
        return false;
    }

    if (this.counter.get() > brokerController.getBrokerConfig().getPopCkMaxBufferSize()) {
        POP_LOGGER.warn("[PopBuffer]add ck, max size, {}, {}", point, this.counter.get());
        return false;
    }

    PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset);
    if (!checkQueueOk(pointWrapper)) {// 队列默认不超过20000个
        return false;
    }
    // 添加到commitOffsets
    putOffsetQueue(pointWrapper);
    // 添加到缓冲区
    this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);
    this.counter.incrementAndGet();
    if (brokerController.getBrokerConfig().isEnablePopLog()) {
        POP_LOGGER.info("[PopBuffer]add ck, {}", pointWrapper);
    }
    return true;
}

同样的,Broker 在接收到消息 ack 请求时,也会优先只改缓冲区的 PopCheckPoint ack 位图,无需存储 ACK 消息。

 public boolean addAk(int reviveQid, AckMsg ackMsg) {
        if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {
            return false;
        }
        if (!serving) {
            return false;
        }
        try {
            PopCheckPointWrapper pointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName());
            if (pointWrapper == null) {
                if (brokerController.getBrokerConfig().isEnablePopLog()) {
                    POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, no ck, {}", reviveQid, ackMsg);
                }
                return false;
            }

            if (pointWrapper.isJustOffset()) {
                return false;
            }
            PopCheckPoint point = pointWrapper.getCk();
            long now = System.currentTimeMillis();
            if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {
                if (brokerController.getBrokerConfig().isEnablePopLog()) {
                    POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);
                }
                return false;
            }
            if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() - 1500) {
                if (brokerController.getBrokerConfig().isEnablePopLog()) {
                    POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, stay too long, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);
                }
                return false;
            }
            // 直接更改内存里的位图
            int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
            if (indexOfAck > -1) {
                markBitCAS(pointWrapper.getBits(), indexOfAck);
            } else {
                POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);
                return true;
            }
            if (brokerController.getBrokerConfig().isEnablePopLog()) {
                POP_LOGGER.info("[PopBuffer]add ack, rqId={}, {}, {}", reviveQid, pointWrapper, ackMsg);
            }
            return true;
        } catch (Throwable e) {
            POP_LOGGER.error("[PopBuffer]add ack error, rqId=" + reviveQid + ", " + ackMsg, e);
        }
        return false;
    }

内存资源有限,一直往里堆 PopCheckPoint 也不行啊,这时候就需要做两件事:

  • 把内存里已经匹配完 ack 的 PopCheckPoint 移除掉
  • 隔太久还没 ack 掉的 PopCheckPoint,必须要落盘存储了

PopBufferMergeService 本身也是个线程,会每隔 5ms 扫描一次缓冲区,执行上述操作。
image.png
内存里的 PopCheckPoint 移除前需要满足两个条件:

  • PopCheckPoint 消息已经持久化了
  • PopCheckPoint 在内存里就已经全被 ack 掉了

还没有完全被 ack 掉的 PopCheckPoint 移除前需要做两件事:

  • PopCheckPoint 消息持久化
  • 已经被 ack 掉的消息也要持久化
private void scan() {
    long startTime = System.currentTimeMillis();
    int count = 0, countCk = 0;
    // 迭代缓冲区的PopCheckPoint
    Iterator<Map.Entry<String, PopCheckPointWrapper>> iterator = buffer.entrySet().iterator();
    while (iterator.hasNext()) {
        Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();
        PopCheckPointWrapper pointWrapper = entry.getValue();
        // CheckPoint已持久化,或已被ACK,从缓冲区删除
        // just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)
        if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)
            || isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
            if (brokerController.getBrokerConfig().isEnablePopLog()) {
                POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper);
            }
            iterator.remove();
            counter.decrementAndGet();
            continue;
        }
        // 把超时或停留时间超10s的CheckPoint从缓冲区删除,删除前要先持久化
        PopCheckPoint point = pointWrapper.getCk();
        long now = System.currentTimeMillis();

        boolean removeCk = !this.serving;
        // ck will be timeout
        if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
            removeCk = true;
        }

        // 内存停留时间超过10秒,也要移除掉
        if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {
            removeCk = true;
        }

        if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() * 2L) {
            POP_LOGGER.warn("[PopBuffer]ck finish fail, stay too long, {}", pointWrapper);
        }

        // double check
        if (isCkDone(pointWrapper)) {
            continue;
        } else if (pointWrapper.isJustOffset()) {
            // just offset should be in store.
            if (pointWrapper.getReviveQueueOffset() < 0) {
                // reviveQueueOffset<0代表还没存储,要先存储
                putCkToStore(pointWrapper, false);
                countCk++;
            }
            continue;
        } else if (removeCk) {
            // put buffer ak to store
            if (pointWrapper.getReviveQueueOffset() < 0) {
                putCkToStore(pointWrapper, false);
                countCk++;
            }

            if (!pointWrapper.isCkStored()) {
                continue;
            }

            for (byte i = 0; i < point.getNum(); i++) {
                // reput buffer ak to store
                // 存储ack消息
                if (DataConverter.getBit(pointWrapper.getBits().get(), i)
                    && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
                    if (putAckToStore(pointWrapper, i)) {
                        count++;
                        markBitCAS(pointWrapper.getToStoreBits(), i);
                    }
                }
            }

            if (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
                if (brokerController.getBrokerConfig().isEnablePopLog()) {
                    POP_LOGGER.info("[PopBuffer]ck finish, {}", pointWrapper);
                }
                iterator.remove();
                counter.decrementAndGet();
                continue;
            }
        }
    }
    // 扫描commitOffsets,提交消费位点
    int offsetBufferSize = scanCommitOffset();

    long eclipse = System.currentTimeMillis() - startTime;
    if (eclipse > brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() - 1000) {
        POP_LOGGER.warn("[PopBuffer]scan stop, because eclipse too long, PopBufferEclipse={}, " +
                "PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",
            eclipse, count, countCk, counter.get(), offsetBufferSize);
        this.serving = false;
    } else {
        if (scanTimes % countOfSecond1 == 0) {
            POP_LOGGER.info("[PopBuffer]scan, PopBufferEclipse={}, " +
                    "PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",
                eclipse, count, countCk, counter.get(), offsetBufferSize);
        }
    }
    scanTimes++;

    if (scanTimes >= countOfMinute1) {
        counter.set(this.buffer.size());
        scanTimes = 0;
    }
}

内存里的 PopCheckPoint 除了会根据唯一键,构建一个 Map,还会根据topic@cid@queueId构建一个 QueueWithTime 队列连接起来。

ConcurrentHashMap<String/*topic@cid@queueId*/, QueueWithTime<PopCheckPointWrapper>> commitOffsets =
        new ConcurrentHashMap<>();

QueueWithTime 队列的用途有两个:

  • 内存里的 PopCheckPoint 处理完毕后,提交消费位点
  • 消息拉取时,获取拉取位点,避免已经投递的消息重复投递

内存里的 PopCheckPoint 只要持久化了或者全被 ack 掉了,就可以提交消费位点了。因为这些消息要么被成功消费了,要么后续在处理 CK 消息时也会被发送到重试队列里。提交消费位点的方法是commitOffset()

private boolean commitOffset(final PopCheckPointWrapper wrapper) {
    if (wrapper.getNextBeginOffset() < 0) {
        return true;
    }
    final PopCheckPoint popCheckPoint = wrapper.getCk();
    final String lockKey = wrapper.getLockKey();
    // 加锁
    if (!queueLockManager.tryLock(lockKey)) {
        return false;
    }
    try {
        // 旧的消费位点
        final long offset = brokerController.getConsumerOffsetManager().queryOffset(popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId());
        if (wrapper.getNextBeginOffset() > offset) {
            if (brokerController.getBrokerConfig().isEnablePopLog()) {
                POP_LOGGER.info("Commit offset, {}, {}", wrapper, offset);
            }
        } else {
            // maybe store offset is not correct.
            POP_LOGGER.warn("Commit offset, consumer offset less than store, {}, {}", wrapper, offset);
        }
        // 提交消费位点
        brokerController.getConsumerOffsetManager().commitOffset(getServiceName(),
            popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId(), wrapper.getNextBeginOffset());
    } finally {
        queueLockManager.unLock(lockKey);
    }
    return true;
}

尾巴

RocketMQ 5.0 的 Pop 消费模式是 Push 模式的升级版,它解决了原先 Push 模式下队列只能由一个消费者消费的问题、去除了客户端繁重的重平衡逻辑、降低了消息堆积的风险。核心逻辑是 Broker 给每次拉取的一批消息发一个 CK 延时消息,客户端 ack 时再发一个 ACK 延时消息,消息到期后对 CK 消息做 ACK 匹配,把未被 ack 掉的消息发到重试独立里。这种模式下,不可避免的会带来额外开销,所以 RocketMQ 也支持优先在内存里完成匹配,CK 和 ACK 消息就不用存储了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/288341.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

探索Allure Report:提升自动化测试效率的秘密武器

亲爱的小伙伴们&#xff0c;由于微信公众号改版&#xff0c;打乱了发布时间&#xff0c;为了保证大家可以及时收到文章的推送&#xff0c;可以点击上方蓝字关注测试工程师成长之路&#xff0c;并设为星标就可以第一时间收到推送哦&#xff01; 一.使用 Allure2 运行方式-Python…

【操作系统xv6】学习记录4 -CPU上下文:进程上下文、线程上下文、中断上下文

什么是cpu上下文 CPU 寄存器和程序计数器就是 CPU 上下文&#xff0c;因为它们都是 CPU 在运行任何任务前&#xff0c;必须的依赖环境。 什么是 CPU 上下文切换 先把前一个任务的 CPU 上下文&#xff08;也就是 CPU 寄存器和程序计数器&#xff09;保存起来&#xff0c;然后…

equals()比较字符串和MySQL中=比较结果不一致

问题&#xff1a; 普通车辆入园统计结果数量和普通车辆统计列表数量不一致&#xff1f; 列子&#xff1a;数量:967&#xff0c;列表:974 解决问题步骤 对比统计数量和统计列表的统计方法 统计数量代码实现 一&#xff1a;查询出车辆滞留表数据List 二&#xff1a;查询出…

112. 雷达设备(贪心/逆向思考)

题目&#xff1a; 112. 雷达设备 - AcWing题库 输入样例&#xff1a; 3 2 1 2 -3 1 2 1输出样例&#xff1a; 2 思路&#xff1a; 代码&#xff1a; #include <cstdio> #include <cstring> #include <iostream> #include <algorithm> #include<…

海外住宅IP代理的工作原理和应用场景分析,新手必看

海外住宅IP代理作为一种技术解决方案&#xff0c;为用户提供了访问全球网络资源和维护隐私安全的方法。本文将介绍海外住宅IP代理的工作原理和应用场景&#xff0c;帮助读者更好地理解和利用这一技术。 一、工作原理 海外住宅IP代理的工作原理基于代理服务器和IP地址的转发。它…

【springboot配置文件加载源码分析】

在Spring Boot的源码中&#xff0c;配置文件的加载是在应用程序启动的早期阶段进行的。具体来说&#xff0c;配置文件加载的主要步骤发生在SpringApplication类的run()方法中的prepareEnvironment方法中&#xff0c;真正读取我们的配置文件还是PropertySourceLoader。 本篇博客…

Docker安装Flarum(开源论坛)

Flarum介绍安装命令 #---------------------------------------------------------- mkdir -p /opt/flarum && cd /opt/flarum #---------------------------------------------------------- docker run -p 8888:8888 --name flarum \ --restartalways \ -v /opt/flar…

靠着这份年终总结,我涨薪8K,成为领导眼中最闪亮的星~

2023 年即将接近尾声&#xff0c;各大公司的“测试媛/猿”们又到了提交年终总结报告的时候了。 每年到这个时候都是抓耳挠腮、冥思苦想的时候&#xff0c;猛然一想&#xff0c;今年跟去年做的事情好像差不多&#xff0c;那么年终总结可以敷衍了事么&#xff1f; 当然是不可以…

chatGPT带你学习设计模式 (二)抽象工厂模式(创建型模式) GURU

深入理解抽象工厂模式 引言 在面向对象编程中&#xff0c;对象的创建是一个常见且关键的挑战。尤其在需要管理一系列相关对象的创建时&#xff0c;传统的对象创建方法&#xff08;如直接使用 new 关键字&#xff09;可能导致代码的高耦合和低灵活性。这时&#xff0c;抽象工厂…

rime中州韵小狼毫 中英互绎 滤镜

英文在日常生活中已经随处可见&#xff0c;我们一般中英互译需要使用专业的翻译软件来实现。但如果我们在输入法中&#xff0c;在输入中文的时候&#xff0c;可以顺便瞟一眼对应的英文词汇&#xff0c;或者在输入英文的时候可以顺便了解对应的中文词汇&#xff0c;那将为我们的…

【Qt第三方库】QXlsx库——对 Excel 文件进行相关操作

0 前言 关键词&#xff1a;Qt&#xff1b;Excel&#xff1b;QXlsx&#xff1b;QInt 简介&#xff1a; QXlsx 是第三方开源的库&#xff0c;能够对 Excel 文件进行相关操作&#xff08;读写等&#xff09; 地址&#xff1a; QXlsx官网 QXlsx的Github主页 1 快速上手 对于第一次…

设置代理IP地址对网络有什么影响?爬虫代理IP主要有哪些作用?

在互联网的广泛应用下&#xff0c;代理IP地址成为了一种常见的网络技术。代理IP地址可以改变用户的上网行为&#xff0c;进而影响网络访问的速度和安全性。本篇文章将探讨设置代理IP地址对网络的影响&#xff0c;以及爬虫代理IP的主要作用。 首先&#xff0c;让我们来了解一下代…

【Java】实验三 抽象类与接口

实验名称 实验三 抽象类与接口 实验目的 1. 深刻理解抽象类、接口的意义。 2. 熟练掌握抽象类和接口的定义、继承抽象类以及实现接口的方法。 3. 理解和掌握多态。 实验内容 &#xff08;一&#xff09;抽象类实验&#xff1a;项目源码中新建一个ahpu.shape的包&a…

Transformer从菜鸟到新手(一)

引言 这是从Transformer到LLM(大语言模型)系列的第一篇文章&#xff0c;几乎所有的大语言模型都是基于Transformer结构&#xff0c;因此本文回顾一下Transformer的原理与实现细节&#xff0c;包括分词算法BPE的实现。最终利用从零实现的Transformer模型进行英中翻译。 本文主…

IOS:Safari无法播放MP4(H.264编码)

一、问题描述 MP4使用H.264编码通常具有良好的兼容性&#xff0c;因为H.264是一种广泛支持的视频编码标准。它可以在许多设备和平台上播放&#xff0c;包括电脑、移动设备和流媒体设备。 使用caniuse查询H.264兼容性&#xff0c;看似确实具有良好的兼容性&#xff1a; 然而…

Windows系统镜像检测修复建议

当通过镜像检测功能检测出Windows操作系统磁盘上有残留驱动项、系统中存在残留Xen驱动或者存在禁止安装驱动属性设置等异常检测项时&#xff0c;您可以参考本文的操作指导进行修复。 清理注册表残留驱动 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control注册表树包含了控…

c++基础(对c的扩展)

文章目录 命令空间引用基本本质引用作为参数引用的使用场景 内联函数引出基本概念 函数补充默认参数函数重载c中函数重载定义条件函数重载的原理 命令空间 定义 namespace是单独的作用域 两者不会相互干涉 namespace 名字 { //变量 函数 等等 }eg namespace nameA {int num;v…

TextView ClickableSpan 事件分发的坑

TextView 的 ClickableSpan 有两个坑&#xff1a; 默认情况下&#xff0c;点击 ClickableSpan 的文本时会同时触发绑定在 TextView 的监听事件&#xff1b;默认情况下&#xff0c;点击 ClickableSpan 的文本之外的文本时&#xff0c;TextView 会消费该事件&#xff0c;而不会传…

MySQL运维实战(2.2)忘记密码如何处理

作者&#xff1a;俊达 引言 当你突然忘记了一个普通用户的密码&#xff0c;而又想着通过管理员账号去改密码时&#xff0c;却猛的发现所有管理员账号的密码都离谱地被你忘了。嗨呀&#xff0c;这可真是个尴尬的大麻烦&#xff01;root账户通常是MySQL中的大boss&#xff0c;你…

Redis(一)

1、redis Redis是一个完全开源免费的高性能&#xff08;NOSQL&#xff09;的key-value数据库。它遵守BSD协议&#xff0c;使用ANSI C语言编写&#xff0c;并支持网络和持久化。Redis拥有极高的性能&#xff0c;每秒可以进行11万次的读取操作和8.1万次的写入操作。它支持丰富的数…