全网最细RocketMQ源码二:Producer

入口

这里分析源码用的入口是:
org.apache.rocketmq.example.quickstart

package org.apache.rocketmq.example.quickstart;

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */
        producer.setNamesrvAddr("127.0.0.1:9876");

        /*
         * Launch the instance.
         */
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * Call send message to deliver message to one of brokers.
                 */
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /*
         * Shut down once the producer instance is not longer in use.
         */
        producer.shutdown();
    }
}

在发送消息的时候用的是DefaultMQProducer

DefaultMQProducer

这个就是我们业务层经常使用的对象,用来发送消息

public class DefaultMQProducer extends ClientConfig implements MQProducer {
    /**
     * Wrapping internal implementations for virtually all methods presented in this class.
     */
    // 生产者实现类对象
    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;

    private final InternalLogger log = ClientLogger.getLog();
    /**
     * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
     * important when transactional messages are involved. </p>
     *
     * For non-transactional messages, it does not matter as long as it's unique per process. </p>
     *
     * See {@linktourl http://rocketmq.apache.org/docs/core-concept/} for more discussion.
     */
    // 生产者组(发送事务消息,broker端进行事务回查时,可以选择 当前 生产者组下的任意一个生产者 进行 事务回查)
    private String producerGroup;

    /**
     * Just for testing or demo program
     */
    // TBW102 :broker写死的主题队列信息,当发送消息指定的topic在 nm 未找到路由信息,则使用 该TBW102 作为 模板 去创建 主题发布信息。
    private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;

    /**
     * Number of queues to create per default topic.
     */
    // 默认broker每个主题 创建的 队列数量
    private volatile int defaultTopicQueueNums = 4;

    /**
     * Timeout for sending messages.
     */
    // 发送消息超时限制 默认:3s
    private int sendMsgTimeout = 3000;

    /**
     * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
     */
    // 压缩阈值,当msg body 超过 “4k” 后,选择使用压缩。
    private int compressMsgBodyOverHowmuch = 1024 * 4;

    /**
     * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
     *
     * This may potentially cause message duplication which is up to application developers to resolve.
     */
    // 同步发送失败之后,重试发送次数:2  再加上第一次发送 =》 3
    private int retryTimesWhenSendFailed = 2;

    /**
     * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
     *
     * This may potentially cause message duplication which is up to application developers to resolve.
     */
    // 异步发送失败之后,重试发送次数:2
    private int retryTimesWhenSendAsyncFailed = 2;

    /**
     * Indicate whether to retry another broker on sending failure internally.
     */
    // 消息未存储成功,是否选择其它broker节点进行消息重试? 一般需要设置为true。
    private boolean retryAnotherBrokerWhenNotStoreOK = false;

    /**
     * Maximum allowed message size in bytes.
     */
    // 消息体最大限制,默认值:4mb
    private int maxMessageSize = 1024 * 1024 * 4; // 4M

其中MQProducer是生产者接口,定义生产者发送消息的一些规范
在这里插入图片描述

ClientConfig是客户端配置类,继承ClientConfig说白了就是共享一些通用的客户端配置属性和功能
在这里插入图片描述

  • 构造方法
    在这里插入图片描述

  • start方法
    在这里插入图片描述
    最终还是调用的是defaultMQProducderImpl.start方法

  • send方法
    在这里插入图片描述
    最终调用的还是defaultMQProducderImpl.send方法

DefaultMQProducerImpl详解

public class DefaultMQProducerImpl implements MQProducerInner {
    private final InternalLogger log = ClientLogger.getLog();

    // 生成invokeID,无实际业务意义,打印日志使用
    private final Random random = new Random();

    // 生产者门面对象,在这里主要当做 config 使用。
    private final DefaultMQProducer defaultMQProducer;

    // 主题发布信息映射表
    // key:主题
    // value:主题的发布信息
    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();

    // 发送消息的钩子,留给用户扩展框架使用的。
    private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();

    // rpc Hook 最终会传递给 NettyRemotingClient。 留给用户扩展框架使用的。
    private final RPCHook rpcHook;

    // 异步发送消息,异步任务线程池使用的队列
    private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;

    // 缺省的异步送法消息线程池
    private final ExecutorService defaultAsyncSenderExecutor;

    // 定时任务,执行:RequestFutureTable.scanExpiredRequest();
    private final Timer timer = new Timer("RequestHouseKeepingService", true);

    protected BlockingQueue<Runnable> checkRequestQueue;
    protected ExecutorService checkExecutor;

    // 状态
    private ServiceState serviceState = ServiceState.CREATE_JUST;

    // 客户端实例对象,生产者启动后 需要注册到该客户端对象内。(观察者模式)
    private MQClientInstance mQClientFactory;

    // 注意和 SendMessageHook 区别,它可以抛异常,控制消息 是否发送。
    private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();

    // zip 压缩算法 压缩级别,默认:5
    private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));

    // 选择队列容错策略
    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();

    // 异步消息发送线程池,如果指定的话,就不在选择使用defaultAsyncSenderExecutor这个线程池了..
    private ExecutorService asyncSenderExecutor;
  • 构造方法
    在这里插入图片描述

  • start方法

 /**
     * 正常路径:startFactory => true
     */
    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                // 修改状态为 启动失败,后面启动成功后会修改...
                this.serviceState = ServiceState.START_FAILED;

                // 判断生产者组名,不能是空,也不能是 “DEFAULT_PRODUCER”
                this.checkConfig();

                // 条件成立:说明当前生产者 不是 内部生产者 (什么是内部生产者?  处理消息回退这种情况使用的生产者 )
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    // 修改生产者实例名称为:当前进程的 PID
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

                // todo 获取当前进程的RocketMQ 客户端实例对象
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

                // 将生产者自己注册到 RocketMQ 客户端实例内 (观察者模式)
                // todo 将生产者自己注册到rockemt 客户端实例
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                // 添加一个主题发布信息
                // key:TBW102   value:空对象
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());


                if (startFactory) {
                    // 启动RocketMQ 客户端实例对象 入口:
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                // 设置生产者实例状态为:运行中
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        // 强制RocketMQ 客户端实例 向 已知的broker节点 发送一次心跳。(讲 客户端定时任务时 再聊..)
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        // request 发送的消息 需要 消费者 回执一条消息。
        // 怎么实现的呢?
        // 生产者 msg 加了一些信息  关联ID  客户端ID ,发送到 broker 之后
        // 消费者 从 broker 拿到 这条 消息,检查msg 类型 发现 是一个 需要回执的消息
        // 处理完消息 之后,根据 msg 关联ID 和 客户端ID  生成一条消息 (封装响应给 生产者的结果) 发送到 broker
        // Broker 拿到这条消息之后,它知道这是一条 回执消息,根据 客户端ID 找到 ch ,将消息 推送给 生产者。
        // 生产者 这边 拿到 回执消息之后,读取出来 关联ID 找到 对应的  RequestFuture ,将阻塞的线程 唤醒。
        // 类似于 生产者 和 消费者 之间 进行了 一次 RPC ,只不过 中间 由  broker 代理完成的。

        // 定时任务 处理 回执太慢的情况..
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    RequestFutureTable.scanExpiredRequest();
                } catch (Throwable e) {
                    log.error("scan RequestFutureTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
  • send方法
    在这里插入图片描述
 // 同步发送
    // 参数1:msg
    // 参数2:发送模式 (同步)
    // 参数3:回调  同步发送时,不需要提供这个参数。 只有异步时才需要!
    // 参数4:发送超时时间(默认 3秒 )
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 确定生产者状态 是 运行中,否则抛异常。
        this.makeSureStateOK();

        // 判断消息规格
        Validators.checkMessage(msg, this.defaultMQProducer);

        // 生成一个 调用ID,打日志使用
        final long invokeID = random.nextLong();

        // 发送的初始时间
        long beginTimestampFirst = System.currentTimeMillis();

        // 本轮发送开始时间
        long beginTimestampPrev = beginTimestampFirst;

        // 本轮发送结束时间
        long endTimestamp = beginTimestampFirst;

        // 获取当前消息 主题的发布信息,需要依赖它里面的 MessageQueues 信息,选择 一个队列 后面去发送消息使用。
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            // 是否超时
            boolean callTimeout = false;
            // 选中的队列
            MessageQueue mq = null;
            // 异常
            Exception exception = null;
            // 发送结果
            SendResult sendResult = null;
            // timesTotal 发送总尝试次数,同步模式发送时:1 + "2" = 3      ,异步情况 重试次数: 1
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            // 当前第几次发送
            int times = 0;
            // 下标值 代表 发送的第几次, 值 代表 这次发送 选择的 brokerName
            String[] brokersSent = new String[timesTotal];

            // 循环发送,什么时候跳出循环?  1. 发送成功   2. 发送尝试次数 达到上限
            for (; times < timesTotal; times++) {
                // 上次发送时的 brokerName ,第一次发送时 lastBrokerName 值为 null,其它情况 就是 上次发送时的 BrokerName
                String lastBrokerName = null == mq ? null : mq.getBrokerName();

                // 从 主题发布信息中 选择一个 队列
                // 参数1:主题发布信息
                // 参数:上次发送失败的BrokerName
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

                // 条件成立:说明已经选择出来一个 可以 发送的 MessageQueue
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();

                    try {
                        // 记录 本轮发送的开始时间
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        // 计算执行到这里时的耗时,如果已经超过 timeout 限制,则直接不发送消息了,跳出循环。
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        // 参数1:msg
                        // 参数2:mq,选择的队列
                        // 参数3:发送模式 (可选 同步 或者 异步 或者 单向)
                        // 参数4:异步发送时 需要传递一个 回调处理对象,同步 或者 单向时 这里为null
                        // 参数5:主题发布信息
                        // 参数6:计算出一个剩余的超时限制
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

                        // 本轮发送结束时间
                        endTimestamp = System.currentTimeMillis();

                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);


                        switch (communicationMode) {
                            // 异步 或者 单向,直接返回null
                            // 异步:返回值由sendCallback和回调线程处理。
                            // 单向:服务器不返回任何数据
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;

                            case SYNC:
                                // 条件成立:说明 服务端broker 存储失败..
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    // 消息未存储成功,是否选择其它broker节点进行消息重试? 一般需要设置为true。
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        // 重试
                                        continue;
                                    }
                                }
                                // 正常 从这里返回
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

            // 执行到这里 会有几种情况? 3种情况
            // 1. for循环尝试发送了 几次 都没能发送成功
            // 2. sendKernelImpl方法异常,此时 sendResult 是null
            // 3. 发送超时


            if (sendResult != null) {// 1. for循环尝试发送了 几次 都没能发送成功
                return sendResult;
            }

            // 2. sendKernelImpl方法异常,此时 sendResult 是null
            // 3. 发送超时

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);

            if (callTimeout) {      // 3. 发送超时
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }

            // 2. sendKernelImpl方法异常,此时 sendResult 是null
            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }

        validateNameServerSetting();

        // 未找到当前主题的路由数据异常...没办法发送消息。
        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

总结一下做了几件事:

  1. 获取当前消息 主题的发布信息
    在这里插入图片描述

  2. 从主题发布信息中,选择一个队列
    在这里插入图片描述
    选的时候,用的算法就是一个自增值%队列数目

  3. 调用sendKernelImpl

 // 参数1:msg
    // 参数2:mq,选择的队列
    // 参数3:发送模式 (可选 同步 或者 异步 或者 单向)
    // 参数4:异步发送时 需要传递一个 回调处理对象,同步 或者 单向时 这里为null
    // 参数5:主题发布信息
    // 参数6:计算出一个剩余的超时限制
    private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 开始时间
        long beginStartTime = System.currentTimeMillis();
        // 获取指定brokerName的主机地址 master 节点 addr
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());

        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        // 执行sendMessageHook 时 使用的 context
        SendMessageContext context = null;
        if (brokerAddr != null) {

            // 如果客户端开启了 走 VIP 通道的话,broker地址的 端口 将是 VIP 通道的端口号。
            // broker 启动的时候,会绑定两个服务器端口,一个是 普通端口  一个是 VIP 端口。(服务器端 根据 不同端口 创建 的NioSocketChannel 提供 线程
            // 资源 不是同一组。)
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            // 获取消息体
            byte[] prevBody = msg.getBody();
            try {

                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    /**
                     * msgId 由 前缀 + 内容 组成:
                     * 前缀
                     * ip 地址,进程号,classLoader 的 hashcode
                     * 内容
                     * 时间差(当前时间减去当月一日),计数器
                     */
                    // 给消息生成唯一ID,即在 msg.properties.put("UNIQ_KEY", "msgId")
                    // 服务器 broker 会给 消息 按照 UNIQ_KEY 建立一个 hash索引。
                    MessageClientIDSetter.setUniqID(msg);
                }

                boolean topicWithNamespace = false;
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }


                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                // 返回true 说明消息 已经压缩了
                if (this.tryToCompressMessage(msg)) {
                    // 设置标记位 ,表明此条消息 被压缩过。
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }


                // 用户扩展点,用户可以注册 CheckForbiddenHook 控制消息发送。
                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                // 用户扩展点,执行 msgHook.before 方法。(比如实现 监控埋点...)
                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    context.setNamespace(this.defaultMQProducer.getNamespace());
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }


                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                // 生产者组
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                // 消息主题
                requestHeader.setTopic(msg.getTopic());
                // 缺省主题:TBW102
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                // 生产者主题队列数:4
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                // 选中的消息队列 队列ID
                requestHeader.setQueueId(mq.getQueueId());
                // 系统标记变量
                requestHeader.setSysFlag(sysFlag);
                // 消息创建时间
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                // 消息 flag
                requestHeader.setFlag(msg.getFlag());
                // 消息properties
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);

                // 消息重试的逻辑...以后讲消费时 再说。
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }


                // 发送结果
                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            msg.setBody(prevBody);
                        }

                        if (topicWithNamespace) {
                            if (!messageCloned) {
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                messageCloned = true;
                            }
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                        }

                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        // 单向 和 同步 都是走这里!

                        // 当前耗时,如果已经大于 timeout 限制,则抛异常,不再发送消息了...
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }

                        // 获取API对象,调用它的 发送方法,完成发送!
                        // 参数1:broker地址
                        // 参数2:brokerName
                        // 参数3:消息
                        // 参数4:SendMessageRequestHeader
                        // 参数5:剩余的超时限制
                        // 参数6:发送模式
                        // 参数7:context
                        // 参数8:生产者对象
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);

                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
            }
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

这里做了几件事:

  1. 获取指定brokerName主机地址master节点地址,在MQClientInstance里面存放了broker 映射表,具体内容如下:在这里插入图片描述
    通过brokerName就知道了broker节点地址

  2. 创建了requestHeader,带了很多发送message的元数据

  3. 调用mQClientFactory.getMQClientAPIImpl().sendMessage()发送消息

MQClientInstance 详解

在sendMessage的过程中一直用到这个类,我们来讲讲这个类是做什么用

public class MQClientInstance {
    private final static long LOCK_TIMEOUT_MILLIS = 3000;
    private final InternalLogger log = ClientLogger.getLog();
    // 客户端配置
    private final ClientConfig clientConfig;
    // 索引值,一般是0,因为 客户端实例 一般都是一个进程 只有一个。
    private final int instanceIndex;
    // 客户端ID,ip@pid
    private final String clientId;
    // 客户端启动时间
    private final long bootTimestamp = System.currentTimeMillis();

    // 生产者 消费者 映射表,key:组名  value:生产者 或者 消费者
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();

    // 客户端网络层(netty)配置
    private final NettyClientConfig nettyClientConfig;

    // 核心的一个API实现,它几乎包含了 所有 服务端的API,它的作用就是将 MQ业务层的数据 转换为 网络层 RemotingCommand 对象,
    // 然后使用内部的 NettyRemotingClient 的invoke 系列方法 完成 网络IO。
    private final MQClientAPIImpl mQClientAPIImpl;
    private final MQAdminImpl mQAdminImpl;


    // 客户端本地路由数据
    // key:主题名称  value:主题路由数据
    private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();

    private final Lock lockNamesrv = new ReentrantLock();
    private final Lock lockHeartbeat = new ReentrantLock();

    // broker 物理节点映射表
    // key: brokerName 逻辑层面的东西
    // value: map<long /* brokerId 0 的节点是 master,其它的是slave*/, String /* addr ip:port*/>
    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
        new ConcurrentHashMap<String, HashMap<Long, String>>();

    // broker 物理节点版本映射表
    // key:brokerName 逻辑层面的东西
    // value:map<String /* addr 物理节点地址*/, Integer /* 版本号 */>
    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
        new ConcurrentHashMap<String, HashMap<String, Integer>>();

    // 单线程的调度线程池,用于执行定时任务
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });

    // 客户端协议处理器,用于处理 IO 事件
    private final ClientRemotingProcessor clientRemotingProcessor;

    // 拉消息服务
    private final PullMessageService pullMessageService;
    // 消费者负载均衡服务
    // todo 消费者负载均衡 源头
    //  1、内部线程调用rebalanceService 2、查询 consumerTable中所有的消费者,然后调用消费者的doReblance 3、最后调用消费者的rebalanceImpl.doRebalance
    private final RebalanceService rebalanceService;

    // 内部生产者实例,用于处理 消费端 消息回退。
    private final DefaultMQProducer defaultMQProducer;
    private final ConsumerStatsManager consumerStatsManager;

    // 心跳次数统计
    private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);

    // RocketMQ 客户端状态
    private ServiceState serviceState = ServiceState.CREATE_JUST;

通过属性其实也看的出来,这个类做的几件事:

  1. 缓存生产者、消费者等数据:
    在这里插入图片描述
    在这里插入图片描述

其中DefaultMQProducerImpl就是MQProducerInner的实现类
3. 缓存客户端路由数据:ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable
在这里插入图片描述

  1. 缓存borker元数据:ConcurrentMap<String/* Broker Name /, HashMap<Long/ brokerId /, String/ address */>> brokerAddrTable
    在这里插入图片描述

  2. 客户端统一rpc:MQClientAPIImpl mQClientAPIImpl,使用他来发送消息的

MQClientAPIImpl

客户端与server通信的实现,通过名字就能看得出来

public class MQClientAPIImpl {

    private final static InternalLogger log = ClientLogger.getLog();
    private static boolean sendSmartMsg =
        Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));

    static {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    }
    // 客户端网络层对象,管理 客户端 与服务器 之间 连接 NioSocketChannel 对象。
    // 通过  它 提供的 invoke 系列方法,客户端可以与服务端进行远程调用。
    // 服务器 也可以 直接 调用 客户端
    private final RemotingClient remotingClient;

    private final TopAddressing topAddressing;

    private final ClientRemotingProcessor clientRemotingProcessor;

    private String nameSrvAddr = null;

还是继续之前的sendMessage流程:

  public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
        boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
        if (isReply) {
            if (sendSmartMsg) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
            } else {
            // 在封装RemotingCommand, 很有意思的东西,后面学习网络通信的可以再研究
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
            }
        } else {
            if (sendSmartMsg || msg instanceof MessageBatch) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
            }
        }
        // 将消息的消息体 放到 网络传输层的body中。
        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                // 当前耗时,如果已经超过 超时限制,则异常
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                // 进行同步调用,将消息传递到 broker ,broker完成存储后 或者其他 情况 都会返回。
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }

        return null;
    }

在这里插入图片描述
这里会使用remotingClient.invokeSync(客户端网络通信)

在这里插入图片描述
在这里插入图片描述
最终会使用netty 将request writeAndFlush

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/312009.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

有效的回文

常用方法就是双指针。使用两个指针从字符串的两端向中间移动&#xff0c;同时比较对应位置的字符&#xff0c;直到两个指针相遇。由于题目忽略非字母和非数字的字符且忽略大小写&#xff0c;所以跳过那些字符&#xff0c;并将字母转换为小写&#xff08;或大写&#xff09;进行…

java基础day04 -- 命令行运行java文件

package com.exmaple;/*** 命令行参数*/ public class ArgsOfMain {public static void main(String[] args) {//增强for循环for(String arg : args){System.out.println(arg);}} }当我打开idea终端运行javac命令完成后&#xff08;需要配置java环境变量&#xff0c;注意idea使…

FineBI实战项目一(16):下订单总用户数分析开发

点击新建组件&#xff0c;创建下订单总用户数组件。 选择自定义图表&#xff0c;选择文本&#xff0c;拖拽要分析的字段到文本中。 修改指针值名称。 将指针值名称修改为用户数。 进入仪表板&#xff0c;拖拽刚刚的组件进入仪表板&#xff0c;然后在再编辑标题。 效果如下&…

【排序】归并排序(C语言实现)

文章目录 1. 递归版的归并排序1.1 归并排序的思想2. 递归版的归并排序的实现 2. 非递归版的归并排序 1. 递归版的归并排序 1.1 归并排序的思想 归并排序&#xff08;MERGE - SORT&#xff09;是建立在归并操作上的一种有效的排序算法, 该算法是采用分治法&#xff08;Divide a…

快速了解VR全景拍摄技术运用在旅游景区的优势

豆腐脑加了糖、烤红薯加了勺&#xff0c;就连索菲亚大教堂前都有了“人造月亮”&#xff0c;在这个冬季&#xff0c;“尔滨”把各地游客宠上了天。面对更多的游客无法实地游玩&#xff0c;哈尔滨冰雪世界再添新玩法&#xff0c;借助VR全景拍摄技术对冬季经典冰雪体验项目进行全…

vue上传文件加进度条,fake-progress一起使用

el-upload上传过程中加进度条&#xff0c;进度条el-progress配合fake-progress一起使用&#xff0c;效果如下&#xff1a; 安装 npm install fake-progress 在用到的文件里面引用 import Fakeprogress from "fake-progress"; 这个进度条主要是假的进度条&#xff…

129基于matlab的粒子群算法、遗传算法、鲸鱼算法、改进鲸鱼算法优化最小二乘支持向量机(lssvm)的gam正则化参数和sig2RBF函数的参数

基于matlab的粒子群算法、遗传算法、鲸鱼算法、改进鲸鱼算法优化最小二乘支持向量机&#xff08;lssvm&#xff09;的gam正则化参数和sig2RBF函数的参数。输出适应度曲线&#xff0c;测试机和训练集准确率。程序已调通&#xff0c;可直接运行。 129 matlabLSSVM优化算法 (xiaoh…

14.kubernetes部署Dashboard

Dashboard 是基于网页的 Kubernetes 用户界面。 你可以使用 Dashboard 将容器应用部署到 Kubernetes 集群中,也可以对容器应用排错,还能管理集群资源。 你可以使用 Dashboard 获取运行在集群中的应用的概览信息,也可以创建或者修改 Kubernetes 资源 (如 Deployment,Job,D…

云服务器租用价格表,阿里云腾讯云华为云2024年优惠对比

作为多年站长使市面上大多数的云厂商的云服务器都使用过&#xff0c;很多特价云服务器都是新用户专享的&#xff0c;本文有老用户特价云服务器&#xff0c;阿腾云atengyun.com有多个网站、小程序等&#xff0c;国内头部云厂商阿里云、腾讯云、华为云、UCloud、京东云都有用过&a…

基础篇_开发命令行程序(输入输出,类型、变量、运算符,条件语句,循环语句,方法,package与jar)

文章目录 一. 输入输出1. System.out2. System.in3. Scanner4. 变量名5. 关键字 二. 类型、变量、运算符1. 字符与字符串字符值与字符串值转义字符文本块 2. 类型何为类型数字类型字符类型 3. 变量与运算符变量运算符 4. 练习 - 房贷计算器Math.pow()数字格式化查阅 Javadoc 三…

虽然是个去年的旧新闻,但这透露了IBM的新去向

引言&#xff1a;老树盘根发新芽&#xff0c;只为云数添新彩。 【科技明说 &#xff5c; 科技热点关注】 就在2023年12月25日左右&#xff0c;外媒有消息被传入国内&#xff0c;IBM正在斥资21.3亿欧元收购德国企业软件公司Software AG旗下的两个iPaaS企业技术平台。 具体包括&…

YOLOv5 损失函数改进 | 引入 Shape-IoU 考虑边框形状与尺度的度量

🗝️改进YOLOv8注意力系列一:结合ACmix、Biformer、BAM注意力机制 论文讲解加入代码本文提供了改进 YOLOv8注意力系列包含不同的注意力机制以及多种加入方式,在本文中具有完整的代码和包含多种更有效加入YOLOv8中的yaml结构,读者可以获取到注意力加入的代码和使用经验,总…

关于网络安全,你了解多少?

随着互联网技术的飞速发展&#xff0c;网络安全问题日益凸显。国际标准化组织&#xff08;ISO&#xff09;对计算机网络安全做出了明确定义&#xff1a;为保护数据处理系统而采取的技术和管理的安全措施&#xff0c;旨在保护计算机硬件、软件和数据免受偶然和故意原因造成的破坏…

使用Pygame显示文字的示例代码

import pygame import syspygame.init()# 设置窗口尺寸 win_size (800, 600) screen pygame.display.set_mode(win_size) pygame.display.set_caption("文字显示示例")# 设置字体和文字内容 font pygame.font.SysFont(None, 48) # 使用系统默认字体&#xff0c;字…

全网最好的Java集合总结

1.List &#xff08;有序、可重复&#xff09; 1.1 ArrayList 底层是一个Object[]&#xff0c;在不指定容量的时候&#xff0c;会进行懒加载&#xff0c;创建一个{}对象&#xff0c;然后在add的时候创建一个容量为10的Object[],当数组容量不够的时候会扩容&#xff0c;每次扩容…

实现线程同步的几种方式

线程同步 1. 线程同步概念 线程同步是指多个线程协调它们的执行顺序&#xff0c;以确保它们正确、安全地访问共享资源。在并发编程中&#xff0c;当多个线程同时访问共享数据或资源时&#xff0c;可能会导致竞争条件&#xff08;Race Condition&#xff09;和其他并发问题 所…

【Python基础】一文搞懂:Python 中 csv 文件的写入与读取

文章目录 1 引言2 CSV 文件简介3 Python 中的 csv 模块4 写入 CSV 文件4.1 基本用法4.2 高级用法 5 读取 CSV 文件5.1 基本用法5.2 高级用法 6 实例演示7 注意事项8 总结 1 引言 在数据处理和数据分析领域&#xff0c;CSV (逗号分隔值) 文件是一种常见的文件格式&#xff0c;用…

Day31 贪心算法 part01 理论基础 455.分发饼干 376.摆动序列 53.最大子序和

贪心算法 part01 理论基础 455.分发饼干 376.摆动序列 53.最大子序和 理论基础&#xff08;转载自代码随想录&#xff09; 什么是贪心 贪心的本质是选择每一阶段的局部最优&#xff0c;从而达到全局最优。 这么说有点抽象&#xff0c;来举一个例子&#xff1a; 例如&#…

Qt QProgressBar进度条控件

文章目录 1 属性和方法1.1 值1.2 方向1.3 外观1.4 信号和槽 2 实例2.1 布局2.2 代码实现 QProgressBar是进度条控件&#xff0c;进度条用来指示任务的完成情况 1 属性和方法 QProgressBar有很多属性&#xff0c;完整的可查看帮助文档。这里以QProgressBar为例&#xff0c;列出…

小马识途:十个营销故事 启发营销思路

在营销过程中&#xff0c;优势是相对的&#xff0c;只有凭借着客观的营销环境创造优势才能够取胜市场。在企业营销中&#xff0c;狗猛酒酸的案例比比皆是。接下来&#xff0c;就与小马识途一起来看看十个经典的营销故事吧&#xff01; 一、摩托车公司 有家德国摩托车公司&…