【RocketMQ】顺序消息实现原理

全局有序
在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用。

局部有序
假设一个Topic分配了两个消息队列,生产者在发送消息的时候,可以对消息设置一个路由ID,比如想保证一个订单的相关消息有序,那么就使用订单ID当做路由ID,在发送消息的时候,通过订单ID对消息队列的个数取余,根据取余结果选择消息队列,这样同一个订单的数据就可以保证发送到一个消息队列中,消费者端使用MessageListenerOrderly处理有序消息,这就是RocketMQ的局部有序,保证消息在某个消息队列中有序。

接下来看RoceketMQ源码中提供的顺序消息例子(稍微做了一些修改):

生产者

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            // 创建生产者
            DefaultMQProducer producer = new DefaultMQProducer("生产者组");
            // 启动
            producer.start();
            // 创建TAG
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                // 生成订单ID
                int orderId = i % 10;
                // 创建消息
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 获取订单ID
                        Integer id = (Integer) arg;
                        // 对消息队列个数取余
                        int index = id % mqs.size();
                        // 根据取余结果选择消息要发送给哪个消息队列
                        return mqs.get(index);
                    }
                }, orderId); // 这里传入了订单ID
                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消费者

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消费者组");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 订阅主题
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        // 注册消息监听器,使用的是MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                // 打印消息
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

从例子中可以看出生产者在发送消息的时候,通过订单ID作为路由信息,将同一个订单ID的消息发送到了同一个消息队列中,保证同一个订单ID的消息有序,那么消费者端是如何保证消息的顺序读取呢?接下来就去看下源码。

顺序消息实现原理

在【RocketMQ】消息的拉取一文中讲到,消费者在启动时会调用DefaultMQPushConsumerImpl的start方法:

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
    
    /**
     * 默认的消息推送实现类
     */
    protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    
    /**
     * 启动
     */
    @Override
    public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        // 启动消费者
        this.defaultMQPushConsumerImpl.start();
        // ...
    }
}

DefaultMQPushConsumerImpl的start方法中,对消息监听器类型进行了判断,如果类型是MessageListenerOrderly表示要进行顺序消费,此时使用ConsumeMessageOrderlyServiceConsumeMessageService进行实例化,然后调用它的start方法进行启动:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    // 消息消费service
    private ConsumeMessageService consumeMessageService;
  
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                // ...
            
                // 如果是顺序消费
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    // 设置顺序消费标记
                    this.consumeOrderly = true;
                    // 创建consumeMessageService,使用的是ConsumeMessageOrderlyService
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    // 并发消费使用ConsumeMessageConcurrentlyService
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                // 启动ConsumeMessageService
                this.consumeMessageService.start();

                // ...
                break;
          // ...
        }
        // ...
    }
}

加锁定时任务

进入到ConsumeMessageOrderlyService的start方法中,可以看到,如果是集群模式,会启动一个定时加锁的任务,周期性的对订阅的消息队列进行加锁,具体是通过调用RebalanceImpl的lockAll方法实现的:

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
    public void start() {
      
        // 如果是集群模式
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 周期性的执行加锁方法
                        ConsumeMessageOrderlyService.this.lockMQPeriodically();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
                    }
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }
  
    public synchronized void lockMQPeriodically() {
        if (!this.stopped) {
            // 进行加锁
            this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
        }
    }
}

为什么集群模式下需要加锁?
因为广播模式下,消息队列会分配给消费者下的每一个消费者,而在集群模式下,一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,所以在广播模式下不存在竞争关系,也就不需要对消息队列进行加锁,而在集群模式下,有可能因为负载均衡等原因将某一个消息队列分配到了另外一个消费者中,因此在集群模式下就要加锁,当某个消息队列被锁定时,其他的消费者不能进行消费。

消息队列加锁

RebalanceImpllockAll方法中,首先从处理队列表中获取当前消费者订阅的所有消息队列MessageQueue信息,返回数据是一个MAP,key为broker名称,value为broker下的消息队列,接着对MAP进行遍历,处理每一个broker下的消息队列:

  1. 获取broker名称,根据broker名称查找broker的相关信息;
  2. 构建加锁请求,在请求中设置要加锁的消息队列,然后将请求发送给broker,表示要对这些消息队列进行加锁;
  3. 加锁请求返回的响应结果中包含了加锁成功的消息队列,此时遍历加锁成功的消息队列,将消息队列对应的ProcessQueue中的locked属性置为true表示该消息队列已加锁成功;
  4. 处理加锁失败的消息队列,如果响应中未包含某个消息队列的信息,表示此消息队列加锁失败,需要将其对应的ProcessQueue对象中的locked属性置为false表示加锁失败;
public abstract class RebalanceImpl {
    public void lockAll() {
        // 从处理队列表中获取broker对应的消息队列,key为broker名称,value为broker下的消息队列
        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
        // 遍历订阅的消息队列
        Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, Set<MessageQueue>> entry = it.next();
            // broker名称
            final String brokerName = entry.getKey();
            // 获取消息队列
            final Set<MessageQueue> mqs = entry.getValue();
            if (mqs.isEmpty())
                continue;
            // 根据broker名称获取broker信息
            FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
            if (findBrokerResult != null) {
                // 构建加锁请求
                LockBatchRequestBody requestBody = new LockBatchRequestBody();
                // 设置消费者组
                requestBody.setConsumerGroup(this.consumerGroup);
                // 设置ID
                requestBody.setClientId(this.mQClientFactory.getClientId());
                // 设置要加锁的消息队列
                requestBody.setMqSet(mqs);

                try {
                    // 批量进行加锁,返回加锁成功的消息队列
                    Set<MessageQueue> lockOKMQSet =
                        this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
                    // 遍历加锁成功的队列
                    for (MessageQueue mq : lockOKMQSet) {
                        // 从处理队列表中获取对应的处理队列对象
                        ProcessQueue processQueue = this.processQueueTable.get(mq);
                        // 如果不为空,设置locked为true表示加锁成功
                        if (processQueue != null) {
                            if (!processQueue.isLocked()) {
                                log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
                            }
                            // 设置加锁成功标记
                            processQueue.setLocked(true);
                            processQueue.setLastLockTimestamp(System.currentTimeMillis());
                        }
                    }
                    // 处理加锁失败的消息队列
                    for (MessageQueue mq : mqs) {
                        if (!lockOKMQSet.contains(mq)) {
                            ProcessQueue processQueue = this.processQueueTable.get(mq);
                            if (processQueue != null) {
                                // 设置加锁失败标记
                                processQueue.setLocked(false);
                                log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("lockBatchMQ exception, " + mqs, e);
                }
            }
        }
    }
}

在【RocketMQ】消息的拉取一文中讲到,消费者需要先向Broker发送拉取消息请求,从Broker中拉取消息,拉取消息请求构建在RebalanceImpl的updateProcessQueueTableInRebalance方法中,拉取消息的响应结果处理在PullCallback的onSuccess方法中,接下来看下顺序消费时在这两个过程中是如何处理的。

拉取消息

上面已经知道,在使用顺序消息时,会周期性的对订阅的消息队列进行加锁,不过由于负载均衡等原因,有可能给当前消费者分配新的消息队列,此时可能还未来得及通过定时任务加锁,所以消费者在构建消息拉取请求前会再次进行判断,如果processQueueTable中之前未包含某个消息队列,会先调用lock方法进行加锁,lock方法的实现逻辑与lockAll基本一致,如果加锁成功构建拉取请求进行消息拉取,如果加锁失败,则跳过继续处理下一个消息队列:

public abstract class RebalanceImpl {
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
        // ...
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        // 遍历队列集合
        for (MessageQueue mq : mqSet) {
            // 如果processQueueTable之前不包含当前的消息队列
            if (!this.processQueueTable.containsKey(mq)) {
                // 如果是顺序消费,调用lock方法进行加锁,如果加锁失败不往下执行,继续处理下一个消息队列
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }
                // ... 
                // 如果偏移量大于等于0
                if (nextOffset >= 0) {
                    // 放入处理队列表中
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        // 如果之前不存在,构建PullRequest,之后对请求进行处理,进行消息拉取
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
        // 添加消息拉取请求
        this.dispatchPullRequest(pullRequestList);

        return changed;
    }
  
    public boolean lock(final MessageQueue mq) {
        // 获取broker信息
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
        if (findBrokerResult != null) {
            // 构建加锁请求
            LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            // 设置要加锁的消息队列
            requestBody.getMqSet().add(mq);

            try {
                // 发送加锁请求
                Set<MessageQueue> lockedMq =
                    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
                for (MessageQueue mmqq : lockedMq) {
                    ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                    // 如果加锁成功设置成功标记
                    if (processQueue != null) {
                        processQueue.setLocked(true);
                        processQueue.setLastLockTimestamp(System.currentTimeMillis());
                    }
                }
                boolean lockOK = lockedMq.contains(mq);
                log.info("the message queue lock {}, {} {}",
                    lockOK ? "OK" : "Failed",
                    this.consumerGroup,
                    mq);
                return lockOK;
            } catch (Exception e) {
                log.error("lockBatchMQ exception, " + mq, e);
            }
        }

        return false;
    }
}

顺序消息消费

PullCallbackonSuccess方法中可以看到,如果从Broker拉取到消息,会调用ConsumeMessageService的submitConsumeRequest方法将消息提交到ConsumeMessageService中进行消费:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    public void pullMessage(final PullRequest pullRequest) {
        // ...
        // 拉取消息回调函数
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    // 处理拉取结果
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                            subscriptionData);
                    // 判断拉取结果
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            // ...
                            // 如果未拉取到消息
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                // 将拉取请求放入到阻塞队列中再进行一次拉取
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                // ...
                                // 如果拉取到消息,将消息提交到ConsumeMessageService中进行消费
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                        pullResult.getMsgFoundList(),
                                        processQueue,
                                        pullRequest.getMessageQueue(),
                                        dispatchToConsume);
                                // ...
                            }
                        // ...
                    }
                }
            }
        };
    }
}

前面知道顺序消费时使用的是ConsumeMessageOrderlyService,首先在ConsumeMessageOrderlyService的构造函数中可以看到
初始化了一个消息消费线程池,也就是说顺序消费时也是开启多线程进行消费的:

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
    public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerOrderly messageListener) {
        // ...
        // 设置消息消费线程池
        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl(consumeThreadPrefix));
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
    }
}

接下来看submitConsumeRequest方法,可以看到构建了ConsumeRequest对象,将拉取的消息提交到了消息消费线程池中进行消费:

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
   
    @Override
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) {
            // 构建ConsumeRequest
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }
    
}

消费时的消息队列锁

ConsumeRequestConsumeMessageOrderlyService的内部类,它有两个成员变量,分别为MessageQueue消息队列和它对应的处理队列ProcessQueue对象。
在run方法中,对消息进行消费,处理逻辑如下:

  1. 判断ProcessQueue是否被删除,如果被删除终止处理;
  2. 调用messageQueueLock的ftchLockObject方法获取消息队列的对象锁,然后使用synchronized进行加锁,这里加锁的原因是因为顺序消费使用的是线程池,可以设置多个线程同时进行消费,所以某个线程在进行消息消费的时候要对消息队列加锁,防止其他线程并发消费,破坏消息的顺序性
  3. 如果是广播模式、或者当前的消息队列已经加锁成功(Locked置为true)并且加锁时间未过期,开始对拉取的消息进行遍历:
  • 如果是集群模式并且消息队列加锁失败,调用tryLockLaterAndReconsume稍后重新进行加锁;
  • 如果是集群模式并且消息队列加锁时间已经过期,调用tryLockLaterAndReconsume稍后重新进行加锁;
  • 如果当前时间距离开始处理的时间超过了最大消费时间,调用submitConsumeRequestLater稍后重新进行处理;
  • 获取批量消费消息个数,从ProcessQueue获取消息内容,如果消息获取不为空,添加消息消费锁,然后调用messageListener的consumeMessage方法进行消息消费;
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
 
   class ConsumeRequest implements Runnable {
        private final ProcessQueue processQueue; // 消息队列对应的处理队列
        private final MessageQueue messageQueue; // 消息队列

        public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }

        @Override
        public void run() {
            // 处理队列如果已经被置为删除状态,跳过不进行处理
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }
            // 获取消息队列的对象锁
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            // 对象消息队列的对象锁加锁
            synchronized (objLock) {
                // 如果是广播模式、或者当前的消息队列已经加锁成功并且加锁时间未过期
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                    final long beginTime = System.currentTimeMillis();
                    for (boolean continueConsume = true; continueConsume; ) {
                        // 判断processQueue是否删除
                        if (this.processQueue.isDropped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            break;
                        }
                        // 如果是集群模式并且processQueue的加锁失败
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && !this.processQueue.isLocked()) {
                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                            // 稍后进行加锁
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }
                        // 如果是集群模式并且消息队列加锁时间已经过期
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && this.processQueue.isLockExpired()) {
                            log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                            // 稍后进行加锁
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }

                        long interval = System.currentTimeMillis() - beginTime;
                        // 如果当前时间距离开始处理的时间超过了最大消费时间
                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                            // 稍后重新进行处理
                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                            break;
                        }
                        // 批量消费消息个数
                        final int consumeBatchSize =
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                        // 获取消息内容
                        List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                        defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                        if (!msgs.isEmpty()) {
                            // ...
                            try {
                                // 加消费锁
                                this.processQueue.getConsumeLock().lock();
                                if (this.processQueue.isDropped()) {
                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }
                                // 消费消息
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                                log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
                                    RemotingHelper.exceptionSimpleDesc(e),
                                    ConsumeMessageOrderlyService.this.consumerGroup,
                                    msgs,
                                    messageQueue), e);
                                hasException = true;
                            } finally {
                                // 释放消息消费锁
                                this.processQueue.getConsumeLock().unlock();
                            }
                            // ...
                            ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                                .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        } else {
                            continueConsume = false;
                        }
                    }
                } else {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        return;
                    }

                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }

    }
}

MessageQueueLock中使用了ConcurrentHashMap存储每个消息队列对应的对象锁,对象锁实际上是一个Object类的对象,从Map中获取消息队列的对象锁时,如果对象锁不存在,则新建一个Object对象,并放入Map集合中:

public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable =
        new ConcurrentHashMap<MessageQueue, Object>();

    public Object fetchLockObject(final MessageQueue mq) {
        // 获取消息队列对应的对象锁,也就是一个Object类型的对象
        Object objLock = this.mqLockTable.get(mq);
        // 如果获取尾款
        if (null == objLock) {
            // 创建对象
            objLock = new Object();
            // 加入到Map中
            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
            if (prevLock != null) {
                objLock = prevLock;
            }
        }

        return objLock;
    }
}

消息消费锁

ProcessQueue中持有一个消息消费锁,消费者调用consumeMessage进行消息前,会添加消费锁,上面已经知道在处理拉取到的消息时就已经调用messageQueueLock的fetchLockObject方法获取消息队列的对象锁然后使用syncronized对其加锁,那么为什么在消费之前还要再加一个消费锁呢?

public class ProcessQueue {
    // 消息消费锁
    private final Lock consumeLock = new ReentrantLock();

    public Lock getConsumeLock() {
        return consumeLock;
    }
}

这里讲一个小技巧,如果在查看源码的时候对某个方法有疑问,可以查看一下这个方法在哪里被调用了,结合调用处的代码处理逻辑进行猜测。
那么就来看下getConsumeLock在哪里被调用了,可以看到除了C的run方法中调用了之外,RebalancePushImpl中的removeUnnecessaryMessageQueue方法也调用了getConsumeLock方法:

removeUnnecessaryMessageQueue方法从名字上可以看出,是移除不需要的消息队列,RebalancePushImpl是与负载均衡相关的类,所以猜测有可能在负载均衡时,需要移除某个消息队列,那么消费者在进行消费的时候就要获取ProcessQueue的consumeLock进行加锁,防止正在消费的过程中,消费队列被移除:

public class RebalancePushImpl extends RebalanceImpl {
   @Override
    public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
        this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
        this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
        // 如果是顺序消费并且是集模式
        if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
            && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
            try {
                // 进行加锁
                if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
                    try {
                        return this.unlockDelay(mq, pq);
                    } finally {
                        pq.getConsumeLock().unlock();
                    }
                } else {
                    log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
                        mq,
                        pq.getTryUnlockTimes());

                    pq.incTryUnlockTimes();
                }
            } catch (Exception e) {
                log.error("removeUnnecessaryMessageQueue Exception", e);
            }

            return false;
        }
        return true;
    }
}

不过在消费者在消费消息前已经对队列进行了加锁,负载均衡的时候为什么不使用队列锁而要使用消费锁?

这里应该是为了减小锁的粒度,因为消费者在对消息队列加锁后,还进行了一系列的判断,校验都成功之后从处理队列中获取消息内容,之后才开始消费消息,如果负载均衡使用消息队列锁就要等待整个过程完成才有可能加锁成功,这样显然会降低性能,而如果使用消息消费锁,就可以减少等待的时间,并且消费者在进行消息消费前也会判断ProcessQueue是否被移除,所以只要保证consumeMessage方法在执行的过程中,ProcessQueue不被移除即可。

总结

目前一共涉及了三把锁,它们分别对应不同的情况:

向Broker申请的消息队列锁

集群模式下一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,为了避免负载均衡等原因引起的变动,消费者会向Broker发送请求对消息队列进行加锁,如果加锁成功,记录到消息队列对应的ProcessQueue中的locked变量中,它是boolean类型的:

public class ProcessQueue {
    private volatile boolean locked = false;
}

消费者处理拉取消息时的消息队列锁

消费者在处理拉取到的消息时,由于可以开启多线程进行处理,所以处理消息前通过MessageQueueLock中的mqLockTable获取到了消息队列对应的锁,锁住要处理的消息队列,这里加消息队列锁主要是处理多线程之间的竞争:

public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable =
        new ConcurrentHashMap<MessageQueue, Object>();

消息消费锁

消费者在调用consumeMessage方法之前会加消费锁,主要是为了避免在消费消息时,由于负载均衡等原因,ProcessQueue被删除:


public class ProcessQueue {
    private final Lock consumeLock = new ReentrantLock();
}

参考
丁威、周继锋《RocketMQ技术内幕》

RocketMQ版本:4.9.3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/8219.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Web 攻防之业务安全:接口未授权访问/调用测试(敏感信息泄露)

Web 攻防之业务安全&#xff1a;接口未授权访问/调用测试 业务安全是指保护业务系统免受安全威胁的措施或手段。广义的业务安全应包括业务运行的软硬件平台&#xff08;操作系统、数据库&#xff0c;中间件等&#xff09;、业务系统自身&#xff08;软件或设备&#xff09;、业…

ViT/vit/VIT详解

参考&#xff1a; Vision Transformer详解: https://blog.csdn.net/qq_37541097/article/details/118242600 目录&#xff1a; x.1 (论文中)模型理解x.2 代码理解 建议阅读时间&#xff1a;10min x.1 模型理解 ViT是发表在ICLR2021上的一篇文章&#xff0c;通过将图片分割…

Java并发控制 学习笔记1

一、并发控制的方法 1、悲观锁&#xff1a;常用的互斥锁都属于悲观锁&#xff0c;一个线程访问共享资源时其他线程不能访问。 2、乐观锁&#xff1a;允许同时访问共享数据&#xff0c;只有在提交时利用如版本号检查是否有冲突&#xff0c;应用github。 3、什么时候用乐观锁、什…

携程平台增长部总经理王绩强:原生互联网企业正在经历一场数字升级丨数据猿专访...

‍数据智能产业创新服务媒体——聚焦数智 改变商业以大数据和人工智能为核心&#xff0c;众多新兴技术开始赋能数字营销。于是&#xff0c;智能营销已然从工具化走向了业务化。如今&#xff0c;数字化营销已经成为了企业数字化转型中的重要一环。相较于传统营销逻辑&#xff0…

新版新款影视直播粉红色UI的麻豆CMS源码/带教程/支付已接

基于苹果CMS v10影视系统框架开发的前端模板&#xff0c;带会员中心&#xff0c;可设置试看付费观看等功能。 经过测试及修复&#xff0c;这套源码功能还是很强大的&#xff0c;可以设置一键采集&#xff0c;并且支付我们给他接到了易支付&#xff0c;拓展性强&#xff0c;基本…

【压测】通过Jemeter进行压力测试(超详细)

文章目录背景一、前言二、关于JMeter三、准备工作四、创建测试4.1、创建线程组4.2、配置元件4.3、构造HTTP请求4.4、添加HTTP请求头4.5、添加断言4.6、添加察看结果树4.7、添加Summary Report4.8、测试计划创建完成五、执行测试计划总结背景 通过SpringCloudGateway整合Nacos进…

如何下载ChatGPT-ChatGPT如何写作

CHATGPT能否改一下文章 ChatGPT 作为一种自然语言处理技术&#xff0c;生成的文章可能存在表达不够准确或文风不符合要求等问题。在这种情况下&#xff0c;可以使用编辑和修改来改变输出的文章&#xff0c;使其符合特定的要求和期望。 具体来说&#xff0c;可以采用以下步骤对…

超越竞争对手:利用Facebook A/B测试优化广告效果!

随着社交媒体广告的普及&#xff0c;Facebook已经成为了许多公司推广业务的重要平台。但是&#xff0c;在Facebook上发布广告并不意味着成功&#xff0c;这也让许多公司开始关注如何优化广告效果。 在这篇文章中&#xff0c;我将介绍如何使用A/B测试来优化Facebook广告&#x…

纳米软件关于集成电路测试的分类介绍

集成电路测试可以按照测试目的、测试内容、按照器件开发和制造阶段分类。参照需要达到的测试目的对集成电路测试进行分类&#xff0c;可以分为:验证测试、制造测试、老化测试、入厂测试等。按照测试所涉及内容&#xff0c;集成电路测试可分为:参数测试、功能测试、结构测试等。…

2023/4/4总结

题解&#xff1a; Problem ​​​​​​ A - Codeforces 1.这道题目我们需要判断。 2.如果是奇数&#xff0c;亦或出来的总值不为0&#xff0c;那么每一个数字再去亦或任何一个数字&#xff0c;都不会为0。 3.如果是偶数并且亦或总值为0&#xff0c;那么我们亦或的总值不满…

记录重启csdn

有太多收藏的链接落灰了&#xff0c;在此重启&#xff5e; 1、社会 https://mp.weixin.qq.com/s/Uq0koAbMUk8OFZg2nCg_fg https://mp.weixin.qq.com/s/yCtLdEWSKVVAKhvLHxjeig https://zhuanlan.zhihu.com/p/569162335?utm_mediumsocial&utm_oi938179755602853888&ut…

使用npm包,全局共享数据,分包

使用 npm 包 1、Vant Weapp 1.1、什么是 Vant Weapp Vant Weapp 是有赞前端团队开源的一套小程序 UI 组件库&#xff0c;助力开发者快速搭建小程序应用。它所使用的是MIT 开源许可协议&#xff0c;对商业使用比较友好。 官方文档地址 https://youzan.github.io/vant-weapp …

Huggingface微调BART的代码示例:WMT16数据集训练新的标记进行翻译

BART模型是用来预训练seq-to-seq模型的降噪自动编码器&#xff08;autoencoder&#xff09;。它是一个序列到序列的模型&#xff0c;具有对损坏文本的双向编码器和一个从左到右的自回归解码器&#xff0c;所以它可以完美的执行翻译任务。 如果你想在翻译任务上测试一个新的体系…

游戏运营专员的职责有哪些?提高游戏收入的关键是什么?

游戏运营是将一款游戏平台推入市场&#xff0c;通过对平台的运作&#xff0c;使用户从接触、认识、再到了解实际线上的一种操作、最终成为这款游戏平台的忠实玩家的这一过程。同时通过一系列的营销手段达到提高线上人数&#xff0c;刺激消费增长利润的目的。 游戏运营专员的职…

Go 连接池的设计与实现

为什么需要连接池 如果不用连接池&#xff0c;而是每次请求都创建一个连接是比较昂贵的&#xff0c;因此需要完成3次tcp握手 同时在高并发场景下&#xff0c;由于没有连接池的最大连接数限制&#xff0c;可以创建无数个连接&#xff0c;耗尽文件描述符 连接池就是为了复用这…

高效的实现金蝶云星空ERP与自研MES系统数据集成

一、项目背景 随着企业数字化转型的不断深入&#xff0c;数据集成变得愈发重要。金蝶云星空ERP与自研MES系统之间的数据集成是企业提高管理效率、降低运营成本的关键。为了实现这一目标&#xff0c;企业选择了轻易云数据集成平台进行数据集成。 二、项目实施过程 低耦合、高内…

二叉树的前序遍历(力扣144)

目录 题目描述&#xff1a; 解法一&#xff1a;递归法 解法二&#xff1a;迭代法 解法三&#xff1a;Morris 遍历 二叉树的前序遍历 题目描述&#xff1a; 给你二叉树的根节点 root &#xff0c;返回它节点值的 前序 遍历。 示例 1&#xff1a; 输入&#xff1a;root […

Unity反编译:AssetStudio资源浏览器及代码查看器

前言 假如你手上有Unity发布出来的exe文件、apk文件或者webGL文件&#xff0c;但就是没有工程源文件&#xff0c;那么&#xff0c;如何从这些文件里面一窥究竟呢&#xff1f;这就需要资源提取工具以及代码反编译工具&#xff01; 本文所涉软件【文中附有下载链接】&#xff1…

【接口测试工具】Eolink Apikit 快速入门教程

Eolink Apikit 下载安装【官方版】&#xff1a;https://www.eolink.com/apikit 发起 API 测试 进入 API 文档详情页&#xff0c;点击上方 测试 标签&#xff0c;进入 API 测试页&#xff0c;系统会根据 API 文档自动生成测试界面并且填充测试数据。 填写请求参数 首先填写好请…

【创作赢红包】python学习——【第七弹】

前言 上一篇文章 python学习——【第六弹】中介绍了 python中的字典操作&#xff0c;这篇文章接着学习python中的可变序列 集合 集合 1&#xff1a; 集合是python语言提供的内置数据结构&#xff0c;具有无序性&#xff08;集合中的元素无法通过索引下标访问&#xff0c;并且…