RocketMQ-源码架构

源码环境搭建

1、主要功能模块

RocketMQ官方Git仓库地址:GitHub - apache/rocketmq: Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.

RocketMQ的官方网站下载:下载 | RocketMQ

重要模块:

  • broker: Broker 模块(broke 启动进程)

  • client :消息客户端,包含消息生产者、消息消费者相关类

  • example: RocketMQ 示例代码

  • namesrv:NameServer模块

  • store:消息存储模块

  • remoting:远程访问模块

2、源码启动服务

将源码导入IDEA后,需要先对源码进行编译。编译指令clean install -Dmaven.test.skip=true

编译完成后就可以开始调试代码:

调试时,先在项目目录下创建一个conf目录,并从distribution拷贝broker.conflogback_broker.xmllogback_namesrv.xml

window10执行上面的编译指令一直报错A required class was missing while executing org.apache.maven.plugins:maven-remote-resources-plugin:1.5:process: org/apache/commons/collections/ExtendedProperties,最后在Linux执行才成功

unzip rocketmq-rocketmq-all-4.9.5.zip
cd rocketmq-rocketmq-all-4.9.5/
mvn clean install -Dmaven.test.skip=true
启动nameServer

虽然编译指令在windows执行报错,但是nameSrv还是可以正常启动的

如果启动时报错,并提示需要配置ROCKETMQ_HOME环境变量,可以在工程里面添加:

启动Broker

启动Broker之前,需要先修改之前复制的broker.conf文件

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

# 自动创建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存储路径
storePathRootDir=E:\\RocketMQ\\data\\rocketmq\\dataDir
# commitLog路径
storePathCommitLog=E:\\RocketMQ\\data\\rocketmq\\dataDir\\commitlog
# 消息队列存储路径
storePathConsumeQueue=E:\\RocketMQ\\data\\rocketmq\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndex=E:\\RocketMQ\\data\\rocketmq\\dataDir\\index
# checkpoint文件路径
storeCheckpoint=E:\\RocketMQ\\data\\rocketmq\\dataDir\\checkpoint
# abort文件存储路径
abortFile=E:\\RocketMQ\\data\\rocketmq\\dataDir\\abort

启动Broker时,还需要配置一个-c 参数,指向broker.conf配置文件

发送消息

在源码的example模块下,提供了非常详细的测试代码。

启动example模块下的org.apache.rocketmq.example.quickstart.Producer类即可发送消息

在测试源码中,需要指定NameServer地址。这个NameServer地址有两种指定方式,一种是配置一个NAMESRV_ADDR的环境变量。另一种是在源码中指定。

// 源码中指定
producer.setNamesrvAddr("127.0.0.1:9876");
消费消息

启动example模块下的org.apache.rocketmq.example.quickstart.Consumer类来消费消息

consumer.setNamesrvAddr("192.168.232.128:9876");

3、读源码的方法

1、带着问题读源码。一定要自己思考!

2、小步快走。不要觉得一两遍就能读懂源码

3、分步总结。带上自己的理解,及时总结。对各种扩展功能,尝试验证

对于RocketMQ,试着去理解源码中的各种单元测试。

源码热身阶段

NameServer的启动过程

关注重点

RocketMQ集群中,实际上消息存储、推送等核心功能点是Broker。NameServer的作用,和微服务中的注册中心非常类似,只是提供了Broker端的服务注册与发现功能。

源码重点

NameServer的启动入口类是org.apache.rocketmq.namesrv.NamesrvStartup。其中的核心是构建并启动一个NamesrvController。这个Controller对象就跟MVC中的Controller是很类似的,都是响应客户端的请求。只不过,他响应的是基于Netty的客户端请求。

另外,他的实际启动过程,其实可以配合NameServer的启动脚本进行更深入的理解。

从NameServer启动和关闭这两个关键步骤,我们可以总结出NameServer的组件其实并不是很多,整个NameServer的结构是这样的:

这两个配置类就可以用来指导如何优化Nameserver的配置。比如,如何调整nameserver的端口?

可以看出,RocketMQ的整体源码风格就是典型的MVC思想。Controller响应请求,Service处理业务,各种Table保存消息

部分源码示例:

// NamesrvStartup#main
public static void main(String[] args) {
	main0(args);
}

public static NamesrvController main0(String[] args) {

	try {
		NamesrvController controller = createNamesrvController(args);
		start(controller);
		String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
		log.info(tip);
		System.out.printf("%s%n", tip);
		return controller;
	} catch (Throwable e) {
		e.printStackTrace();
		System.exit(-1);
	}

	return null;
}

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
	System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
	//PackageConflictDetect.detectFastjson();

	Options options = ServerUtil.buildCommandlineOptions(new Options());
	commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
	if (null == commandLine) {
		System.exit(-1);
		return null;
	}

	// 两个配置类在这里
	final NamesrvConfig namesrvConfig = new NamesrvConfig();
	final NettyServerConfig nettyServerConfig = new NettyServerConfig();
	// 端口号简单粗暴
	nettyServerConfig.setListenPort(9876);
	// -c指令指定配置文件,可以替换端口号。配置文件内容:listenPort=9876
	if (commandLine.hasOption('c')) {
		String file = commandLine.getOptionValue('c');
		if (file != null) {
			InputStream in = new BufferedInputStream(new FileInputStream(file));
			properties = new Properties();
			properties.load(in);
			MixAll.properties2Object(properties, namesrvConfig);
			MixAll.properties2Object(properties, nettyServerConfig);

			namesrvConfig.setConfigStorePath(file);

			System.out.printf("load config properties file OK, %s%n", file);
			in.close();
		}
	}

	if (commandLine.hasOption('p')) {
		InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
		MixAll.printObjectProperties(console, namesrvConfig);
		MixAll.printObjectProperties(console, nettyServerConfig);
		System.exit(0);
	}
    
	... ...
        
	final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

	// remember all configs to prevent discard
	controller.getConfiguration().registerConfig(properties);

	return controller;
}

// NamesrvController#NamesrvController
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
	this.namesrvConfig = namesrvConfig;
	this.nettyServerConfig = nettyServerConfig;
	this.kvConfigManager = new KVConfigManager(this);
	this.routeInfoManager = new RouteInfoManager();
	this.brokerHousekeepingService = new BrokerHousekeepingService(this);
	this.configuration = new Configuration(
		log,
		this.namesrvConfig, this.nettyServerConfig
	);
	this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

// NamesrvController#initialize
public boolean initialize() {

	this.kvConfigManager.load();

	this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

	... ...
}

Broker服务启动过程

关注重点

Broker是整个RocketMQ的业务核心。所有消息存储、转发这些重要的业务都是Broker进行处理。

重点梳理Broker有哪些内部服务。这些内部服务将是整理Broker核心业务流程的起点

源码重点

Broker启动的入口在BrokerStartup这个类,可以从他的main方法开始调试

启动过程关键点:重点也是围绕一个BrokerController对象,先创建,然后再启动

// BrokerStartup#main
public static void main(String[] args) {
	start(createBrokerController(args));
}

public static BrokerController createBrokerController(String[] args) {
	System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

	try {
		//PackageConflictDetect.detectFastjson();
		Options options = ServerUtil.buildCommandlineOptions(new Options());
		commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
			new PosixParser());
		if (null == commandLine) {
			System.exit(-1);
		}
		
		// Broker服务配置
		final BrokerConfig brokerConfig = new BrokerConfig();
		// Netty服务端占用了10911端口。同样也可以在配置文件中覆盖
		final NettyServerConfig nettyServerConfig = new NettyServerConfig();
		// Broker既要作为Netty服务端,向客户端提供核心业务能力,又要作为Netty客户端,向NameServer注册心跳
		final NettyClientConfig nettyClientConfig = new NettyClientConfig();

		nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
			String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
		nettyServerConfig.setListenPort(10911);
		// 消息存储配置。 这两个配置参数都可以在broker.conf文件中进行配置
		final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
		
		... ...
	} catch (Throwable e) {
		e.printStackTrace();
		System.exit(-1);
	}

	return null;
}

BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig 这几个配置是了解如何优化 RocketMQ 使用的关键

在BrokerController.start方法启动了一大堆Broker的核心服务,部分源码:

// BrokerController#start
public void start() throws Exception {
	if (this.messageStore != null) {
		//启动核心的消息存储组件
		this.messageStore.start();
	}

	if (this.remotingServer != null) {
		this.remotingServer.start();
	}

	if (this.fastRemotingServer != null) {
		//启动两个Netty服务
		this.fastRemotingServer.start();
	}

	if (this.brokerOuterAPI != null) {
		//启动客户端,往外发请求
		this.brokerOuterAPI.start();
	}

	... ...

	this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

		@Override
		public void run() {
			try {
				//向NameServer注册心跳
				BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
			} catch (Throwable e) {
				log.error("registerBrokerAll Exception", e);
			}
		}
	}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

	if (this.brokerStatsManager != null) {
		this.brokerStatsManager.start();
	}

	if (this.brokerFastFailure != null) {
		//负责具体业务的功能组件
		this.brokerFastFailure.start();
	}
}

抽象出Broker的一个整体结构:

实际上,在应用中,可以通过producer.setSendMessageWithVIPChannel(true),让少量比较重要的producer走VIP的通道。而在消费者端,也可以通过consumer.setVipChannelEnabled(true),让消费者支持VIP通道的数据。

小试牛刀阶段

开始理解一些比较简单的业务逻辑

Netty服务注册框架

关注重点

网络通信服务是构建分布式应用的基础,也是理解RocketMQ底层业务的基础。

重点梳理RocketMQ的这个服务注册框架,理解各个业务进程之间是如何进行RPC远程通信的

Netty的所有远程通信功能都由remoting模块实现。RemotingServer模块里包含了RPC的服务端RemotingServer以及客户端RemotingClient。在RocketMQ中,NameServer主要是RPC的服务端RemotingServer,Broker对于客户端来说,是RPC的服务端RemotingServer,而对于NameServer来说,又是RPC的客户端。各种Client是RPC的客户端RemotingClient。

RocketMQ基于Netty保持客户端与服务端的长连接Channel。RemotingServer和RemotingClient都需要注册自己的服务。

源码重点

1、NameServer需要NettyServer。客户端,Producer和Consumer,需要NettyClient。

2、所有的RPC请求数据都封装成RemotingCommand对象。每个处理消息的服务逻辑,都会封装成一个NettyRequestProcessor对象。

3、服务端和客户端都维护一个processorTable,这是个HashMap。key是服务码requestCode,value是对应的运行单元 Pair<NettyRequestProcessor,ExecutorService>类型,包含了处理Processor和执行线程的线程池。具体的Processor,由业务系统自行注册。Broker服务注册:BrokerController.registerProcessor(),客户端的服务注册:MQClientAPIImpl。NameServer则会注册一个大的DefaultRequestProcessor,统一处理所有服务。

4、请求类型分为REQUEST和RESPONSE。这是为了支持异步的RPC调用。NettyServer处理完请求后,可以先缓存到responseTable中,等NettyClient下次来获取,这样就不用阻塞Channel了,可以提升请求吞吐量。

5、重点理解remoting包中是如何实现全流程异步化。

整体RPC框架流程:

RocketMQ使用Netty框架提供了一套基于服务码的服务注册机制,让各种不同的组件都可以按照自己的需求,注册自己的服务方法。RocketMQ的这一套服务注册机制,是非常简洁实用的。要开发一个大型的IM项目,要加减好友、发送文本,图片,甚至红包、维护群聊信息等等各种各样的请求,这些请求如何封装,就可以很好的参考这个框架。

关于RocketMQ的同步结果推送与异步结果推送

RocketMQ的RemotingServer服务端,会维护一个responseTable,这是一个线程同步的Map结构。 key为请求的ID,value是异步的消息结果。ConcurrentMap<Integer /* opaque */, ResponseFuture>

处理同步请求(NettyRemotingAbstract#invokeSyncImpl)时,处理的结果会存入responseTable,通过ResponseFuture提供一定的服务端异步处理支持,提升服务端的吞吐量。 请求返回后,立即从responseTable中移除请求记录。

//NettyRemotingAbstract#invokeSyncImpl
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
	final long timeoutMillis)
	throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
	final int opaque = request.getOpaque();

	try {
		final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
		this.responseTable.put(opaque, responseFuture);
		final SocketAddress addr = channel.remoteAddress();
		channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
			@Override
			public void operationComplete(ChannelFuture f) throws Exception {
				if (f.isSuccess()) {
					responseFuture.setSendRequestOK(true);
					return;
				} else {
					responseFuture.setSendRequestOK(false);
				}

				responseTable.remove(opaque);
				responseFuture.setCause(f.cause());
				responseFuture.putResponse(null);
				log.warn("send a request command to channel <" + addr + "> failed.");
			}
		});

		RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
		if (null == responseCommand) {
			if (responseFuture.isSendRequestOK()) {
				throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
					responseFuture.getCause());
			} else {
				throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
			}
		}

		return responseCommand;
	} finally {
		this.responseTable.remove(opaque);
	}
}

//ResponseFuture#waitResponse
//实际上,同步也是通过异步实现的
//发送消息后,通过countDownLatch阻塞当前线程,造成同步等待的效果
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
	this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
	return this.responseCommand;
}

//ResponseFuture#putResponse
//等待异步获取到消息后,再通过countDownLatch释放当前线程
public void putResponse(final RemotingCommand responseCommand) {
	this.responseCommand = responseCommand;
	this.countDownLatch.countDown();
}

处理异步请求(NettyRemotingAbstract#invokeAsyncImpl)时,处理的结果依然会存入responsTable,等待客户端后续再来请求结果。但是他保存的依然是一个ResponseFuture,也就是在客户端请求结果时再去获取真正的结果。 另外,在RemotingServer启动时,会启动一个定时的线程任务,不断扫描responseTable,将其中过期的response清除掉。

//NettyRemotingAbstract#invokeAsyncImpl
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
	final InvokeCallback invokeCallback)
	throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
	long beginStartTime = System.currentTimeMillis();
	final int opaque = request.getOpaque();
	boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
	if (acquired) {
		final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
		long costTime = System.currentTimeMillis() - beginStartTime;
		if (timeoutMillis < costTime) {
			once.release();
			throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
		}

		final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
		this.responseTable.put(opaque, responseFuture);
		try {
			channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
				@Override
				public void operationComplete(ChannelFuture f) throws Exception {
					if (f.isSuccess()) {
						responseFuture.setSendRequestOK(true);
						return;
					}
					requestFail(opaque);
					log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
				}
			});
		} catch (Exception e) {
			responseFuture.release();
			log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
			throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
		}
	} else {
		if (timeoutMillis <= 0) {
			throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
		} else {
			String info =
				String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
					timeoutMillis,
					this.semaphoreAsync.getQueueLength(),
					this.semaphoreAsync.availablePermits()
				);
			log.warn(info);
			throw new RemotingTimeoutException(info);
		}
	}
}

//org.apache.rocketmq.remoting.netty.NettyRemotingServer
this.timer.scheduleAtFixedRate(new TimerTask() {

	@Override
	public void run() {
		try {
			NettyRemotingServer.this.scanResponseTable();
		} catch (Throwable e) {
			log.error("scanResponseTable exception", e);
		}
	}
}, 1000 * 3, 1000);

Broker心跳注册管理

关注重点

Broker会在启动时向所有NameServer注册自己的服务信息,并且会定时往NameServer发送心跳信息。而NameServer会维护Broker的路由列表,并对路由表进行实时更新。

源码重点

Broker启动后会立即发起向NameServer注册心跳。方法入口:BrokerController.this.registerBrokerAll。 然后启动一个定时任务,以10秒延迟,默认30秒的间隔持续向NameServer发送心跳。

NameServer内部会通过RouteInfoManager组件及时维护Broker信息。同时在NameServer启动时,会启动定时任务,扫描不活动的Broker。方法入口:NamesrvController.initialize方法。

极简化的服务注册发现流程

为什么RocketMQ要自己实现一个NameServer,而不用Zookeeper、Nacos这样现成的注册中心?

首先,依赖外部组件会对产品的独立性形成侵入,不利于自己的版本演进。Kafka要抛弃Zookeeper就是一个先例。

另外,其实更重要的还是对业务的合理设计。NameServer之间不进行信息同步,而是依赖Broker端向所有NameServer同时发起注册。这让NameServer的服务可以非常轻量。

但是,要知道,这种极简的设计,其实是以牺牲数据一致性为代价的。Broker往多个NameServer同时发起注册,有可能部分NameServer注册成功,而部分NameServer注册失败了。这样,多个NameServer之间的数据是不一致的。作为注册中心,这是不可接受的。但是对于RocketMQ,这又变得可以接受了。因为客户端从NameServer上获得的,只要有一个正常运行的Broker就可以了,并不需要完整的Broker列表。

Producer发送消息过程

关注重点

回顾Producer使用案例

Producer有两种:

  • 一种是普通发送者:DefaultMQProducer。只负责发送消息,发送完消息,就可以停止了。

  • 另一种是事务消息发送者: TransactionMQProducer。支持事务消息机制。需要在事务消息过程中提供事务状态确认的服务,这就要求事务消息发送者虽然是一个客户端,但是也要完成整个事务消息的确认机制后才能退出。

事务消息机制后面将结合Broker进行整理分析。这一步暂不关注。这里只关注DefaultMQProducer的消息发送过程。

整个Producer的使用流程,大致分为两个步骤:一是调用start方法,进行一大堆的准备工作。 二是各种send方法,进行消息发送。

重点关注以下几个问题:

1、Producer启动过程中启动了哪些服务

2、Producer如何管理broker路由信息。 可以设想一下,如果Producer启动了之后,NameServer挂了,那么Producer还能不能发送消息?

3、关于Producer的负载均衡。也就是Producer到底将消息发到哪个MessageQueue中。这里可以结合顺序消息机制来理解一下。消息中那个莫名奇妙的MessageSelector到底是如何工作的。

源码重点
  • Producer的核心启动流程

所有Producer的启动过程,最终都会调用到DefaultMQProducerImpl#start方法。在start方法中的通过一个mQClientFactory对象,启动生产者的一大堆重要服务。

这里其实就是一种设计模式,虽然有很多种不同的客户端,但是这些客户端的启动流程最终都是统一的,全是交由MQClientFactory对象来启动。而不同之处在于这些客户端在启动过程中,按照服务端的要求注册不同的信息。例如生产者注册到producerTable,消费者注册到consumerTable,管理控制端注册到adminExtTable

  • 发送消息的核心流程

1、发送消息时,会维护一个本地的topicPublishInfoTable缓存,DefaultMQProducer会尽量保证这个缓存数据是最新的。但是,如果NameServer挂了,那么DefaultMQProducer还是会基于这个本地缓存去找Broker。只要能找到Broker,还是可以正常发送消息到Broker的。

2、生产者如何找MessageQueue: 默认情况下,生产者是按照轮训的方式,依次轮训各个MessageQueue。但是如果某一次往一个Broker发送请求失败后,下一次就会跳过这个Broker。

//org.apache.rocketmq.client.impl.producer.TopicPublishInfo
//如果进到这里lastBrokerName不为空,那么表示上一次向这个Broker发送消息是失败的,这时就尽量不要再往这个Broker发送消息了。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
	if (lastBrokerName == null) {
		return selectOneMessageQueue();
	} else {
		for (int i = 0; i < this.messageQueueList.size(); i++) {
			int index = this.sendWhichQueue.incrementAndGet();
			int pos = Math.abs(index) % this.messageQueueList.size();
			if (pos < 0)
				pos = 0;
			MessageQueue mq = this.messageQueueList.get(pos);
			if (!mq.getBrokerName().equals(lastBrokerName)) {
				return mq;
			}
		}
		return selectOneMessageQueue();
	}
}

3、如果在发送消息时传了Selector,那么Producer就不会走这个负载均衡的逻辑,而是会使用Selector去寻找一个队列。 具体参见org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl 方法

//DefaultMQProducerImpl#sendSelectImpl
private SendResult sendSelectImpl(
	Message msg,
	MessageQueueSelector selector,
	Object arg,
	final CommunicationMode communicationMode,
	final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
	long beginStartTime = System.currentTimeMillis();
	this.makeSureStateOK();
	Validators.checkMessage(msg, this.defaultMQProducer);

	TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
	if (topicPublishInfo != null && topicPublishInfo.ok()) {
		MessageQueue mq = null;
		try {
			List<MessageQueue> messageQueueList =
				mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
			Message userMessage = MessageAccessor.cloneMessage(msg);
			String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
			userMessage.setTopic(userTopic);

			mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
		} catch (Throwable e) {
			throw new MQClientException("select message queue threw exception.", e);
		}

		long costTime = System.currentTimeMillis() - beginStartTime;
		if (timeout < costTime) {
			throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
		}
		if (mq != null) {
			return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
		} else {
			throw new MQClientException("select message queue return null.", null);
		}
	}

	validateNameServerSetting();
	throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

Consumer拉取消息过程

关注重点

回顾消费者的几个重点问题:

  • 消费者也是有两种,推模式消费者和拉模式消费者。优秀的MQ产品都会有一个高级的目标,就是要提升整个消息处理的性能。而要提升性能,服务端的优化手段往往不够直接,最为直接的优化手段就是对消费者进行优化。所以在RocketMQ中,整个消费者的业务逻辑是非常复杂的,甚至某种程度上来说,比服务端更复杂,所以,在这里重点关注用得最多的推模式的消费者。

  • 消费者组之间有集群模式和广播模式两种消费模式。就要了解下这两种集群模式是如何做的逻辑封装。

  • 然后关注消费者端的负载均衡的原理。即消费者是如何绑定消费队列的,那些消费策略到底是如何落地的。

  • 最后关注在推模式的消费者中,MessageListenerConcurrently 和MessageListenerOrderly这两种消息监听器的处理逻辑到底有什么不同,为什么后者能保持消息顺序。

源码重点

Consumer的核心启动过程和Producer是一样的, 最终都是通过MQClientFactory对象启动。不过之间添加了一些注册信息。整体启动过程:

广播模式与集群模式的Offset处理

在DefaultMQPushConsumerImpl的start方法中,启动了非常多的核心服务。 比如,对于广播模式与集群模式的Offset处理

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
	this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
	switch (this.defaultMQPushConsumer.getMessageModel()) {
		case BROADCASTING:
			this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
			break;
		case CLUSTERING:
			this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
			break;
		default:
			break;
	}
	this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();

广播模式是使用LocalFileOffsetStore,在Consumer本地保存Offset,而集群模式是使用RemoteBrokerOffsetStore,在Broker端远程保存offset。而这两种Offset的存储方式,最终都是通过维护本地的offsetTable缓存来管理Offset。

Consumer与MessageQueue建立绑定关系

start方法中还一个比较重要的东西是给rebalanceImpl设定了一个AllocateMessageQueueStrategy,用来给Consumer分配MessageQueue的。

this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//Consumer负载均衡策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

AllocateMessageQueueStrategy就是用来给Consumer和MessageQueue之间建立一种对应关系的。也就是说,只要Topic当中的MessageQueue以及同一个ConsumerGroup中的Consumer实例都没有变动,那么某一个Consumer实例只是消费固定的一个或多个MessageQueue上的消息,其他Consumer不会来抢这个Consumer对应的MessageQueue。

为什么要让一个MessageQueue只能由同一个ConsumerGroup中的一个Consumer实例来消费?

因为Broker需要按照ConsumerGroup管理每个MessageQueue上的Offset,如果一个MessageQueue上有多个同属一个ConsumerGroup的Consumer实例,他们的处理进度就会不一样。这样的话,Offset就乱套了。

顺序消费与并发消费

在start方法中,启动了consumerMessageService线程,进行消息拉取。

//Consumer中自行指定的回调函数
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
	this.consumeOrderly = true;
	this.consumeMessageService =
		new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
	this.consumeOrderly = false;
	this.consumeMessageService =
		new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

this.consumeMessageService.start();

Consumer通过registerMessageListener方法指定的回调函数,都被封装成了ConsumerMessageService的子实现类。

而对于这两个服务实现类的调用,会延续到DefaultMQPushConsumerImpl的pullCallback对象中。也就是Consumer每拉过来一批消息后,就向Broker提交下一个拉取消息的的请求。

这里也可以印证一个点,就是顺序消息,只对异步消费也就是推模式有效。同步消费的拉模式是无法进行顺序消费的。因为这个pullCallback对象,在拉模式的同步消费时,根本就没有往下传。

当然,这并不是说拉模式不能锁定队列进行顺序消费,拉模式在Consumer端应用就可以指定从哪个队列上拿消息。

PullCallback pullCallback = new PullCallback() {
	@Override
	public void onSuccess(PullResult pullResult) {
		if (pullResult != null) {
			pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
				subscriptionData);

			switch (pullResult.getPullStatus()) {
				case FOUND:
					... ...
					DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
						pullResult.getMsgFoundList(),
						processQueue,
						pullRequest.getMessageQueue(),
						dispatchToConsume);

这里提交的,实际上是一个ConsumeRequest线程。而提交的这个ConsumeRequest线程,在两个不同的ConsumerService中有不同的实现。

这其中,两者最为核心的区别在于ConsumerMessageOrderlyService是锁定了一个队列,处理完了之后,再消费下一个队列。

@Override
public void run() {
	... ...

	final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
	synchronized (objLock) {
		... ...
	}
}

为什么给队列加个锁,就能保证顺序消费呢?

从源码中可以看到,Consumer提交请求时,都是往线程池里异步提交的请求。如果不加队列锁,那么就算Consumer提交针对同一个MessageQueue的拉取消息请求,这些请求都是异步执行,他们的返回顺序是乱的,无法进行控制。给队列加个锁之后,就保证了针对同一个队列的第二个请求,必须等第一个请求处理完了之后,释放了锁,才可以提交。这也是在异步情况下保证顺序的基础思路。

实际拉取消息还是通过PullMessageService完成的

start方法中,相当于对很多消费者的服务进行初始化,包括指定一些服务的实现类,以及启动一些定时的任务线程,比如清理过期的请求缓存等。最后,会随着mQClientFactory组件的启动,启动一个PullMessageService。实际的消息拉取都交由PullMesasgeService进行。

所谓消息推模式,其实还是通过Consumer拉消息实现的。

//org.apache.rocketmq.client.impl.consumer.PullMessageService
private void pullMessage(final PullRequest pullRequest) {
	final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
	if (consumer != null) {
		DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
		impl.pullMessage(pullRequest);
	} else {
		log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
	}
}

客户端负载均衡管理总结

Producer负载均衡

Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker。

Producer轮训时,如果发现往某一个Broker上发送消息失败了,那么下一次会尽量避免再往同一个Broker上发送消息。但是,如果你的应用场景允许发送消息长延迟,也可以给Producer设定setSendLatencyFaultEnable(true)。这样对于某些Broker集群的网络不是很好的环境,可以提高消息发送成功的几率。

同时生产者在发送消息时,可以指定一个MessageQueueSelector。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以保证消息局部有序。

Consumer负载均衡

Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式和广播模式。

1、集群模式

在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。

而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。

每次分配时,都会将MessageQueue和消费者ID进行排序后,再用不同的分配算法进行分配。内置的分配的算法共有六种,分别对应AllocateMessageQueueStrategy下的六种实现类,可以在consumer中直接set来指定。默认情况下使用的是最简单的平均分配策略。

  • AllocateMachineRoomNearby: 将同机房的Consumer和Broker优先分配在一起。

这个策略可以通过一个machineRoomResolver对象来定制Consumer和Broker的机房解析规则。然后还需要引入另外一个分配策略来对同机房的Broker和Consumer进行分配。一般也就用简单的平均分配策略或者轮询分配策略。

源码中有测试代码AllocateMachineRoomNearByTest。

  • AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者

  • AllocateMessageQueueAveragelyByCircle: 轮询分配。轮流的给一个消费者分配一个MessageQueue。

  • AllocateMessageQueueByConfig: 不分配,直接指定一个messageQueue列表。类似于广播模式,直接指定所有队列。

  • AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置。

  • AllocateMessageQueueConsistentHash。源码中有测试代码AllocateMessageQueueConsitentHashTest。这个一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在环上分布更为均匀。

最常用的就是平均分配和轮训分配了。

2、广播模式

广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。而在实现上,就是在Consumer分配Queue时,所有Consumer都分到所有的Queue。

广播模式实现的关键是将消费者的消费偏移量不再保存到broker当中,而是保存到客户端当中,由客户端自行维护自己的消费偏移量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/231577.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

001 LLM大模型之Transformer 模型

参考《大规模语言模型--从理论到实践》 目录 一、综述 二、Transformer 模型 三、 嵌入表示层&#xff08;位置编码代码&#xff09; 一、综述 语言模型目标是建模自然语言的概率分布&#xff0c;在自然语言处理研究中具有重要的作用&#xff0c;是自然 语言处理基础任务之一…

【软考中级——软件设计师】备战经验 笔记总结分享

考试成绩 我第一次备考是在2022 然后那时候取消了这次是第二次 靠前我一个月复习的看了以前的笔记 然后刷了七八道历年题目学习资料推荐 &#xff1a;zst——2021 b站链接自荐一下我的笔记 &#xff1a; 软考笔记专栏 视频确实很长 &#xff0c; 我的建议就是先看笔记 然后不会…

深度学习之全面了解网络架构

在这篇文章中&#xff0c;我们将和大家探讨“深度学习中的网络架构”这个主题&#xff0c;解释相关背景知识&#xff0c;并就一些问题进行解答。 我选择的问题反映的是常见用法&#xff0c;而不是学术用例。我将概括介绍该主题&#xff0c;然后探讨以下四个问题&#xff1a; …

泽攸科技二维材料转移台的应用场景及优势

随着二维材料的广泛研究和各种潜在应用的开发&#xff0c;对于二维材料样品的精密操控与转移的需求日益增加。特别是一些新型二维材料的制备和器件集成制备中&#xff0c;需要在显微镜下对样品进行观察与定位&#xff0c;并能够在微米甚至纳米量级上精确移動和转移样品。 传统…

做数据分析为何要学统计学(6)——什么问题适合使用方差分析?

方差分析&#xff08;ANOVA&#xff0c;也称变异数分析&#xff09;是英国统计学家Fisher&#xff08;1890.2.17&#xff0d;1962.7.29&#xff09;提出的对两个或以上样本总体均值进行差异显著性检验的方法。 它的基本思想是将测量数据的总变异&#xff08;即总方差&#xff…

Go语言初始化数组的六种方式

介绍 在Go语言中&#xff0c;有多种方式可以初始化数组&#xff0c;本文将介绍初始化数组的六种方法。 方式1&#xff1a;指定数组大小并初始化 var array [3]int [3]int{1, 2, 3}指定数组的大小为3&#xff0c;并初始化为指定的值1, 2, 3。 方式2&#xff1a;根据初始化值…

Linux 中用户与权限

目录 1.添加用户 useradd 2.修改用户属性 usermod 3.用户组管理 3.1新建组 3.2 组和用户的关系 3.3 修改组名 3.4 删除组 4.权限管理 4.1.文件类型 4.2 权限 4.3 修改文件权限 1.添加用户 useradd 1&#xff09;创建用户 useradd 用户名 2&#xff09;设置用户密码…

Vue3问题:如何在页面上添加水印?

前端功能问题系列文章&#xff0c;点击上方合集↑ 序言 大家好&#xff0c;我是大澈&#xff01; 本文约3100字&#xff0c;整篇阅读大约需要5分钟。 本文主要内容分三部分&#xff0c;如果您只需要解决问题&#xff0c;请阅读第一、二部分即可。如果您有更多时间&#xff…

【npm | npm常用命令及镜像设置】

npm常用命令及镜像设置 概述常用命令对比本地安装全局安装--save &#xff08;或 -S&#xff09;--save-dev &#xff08;或 -D&#xff09; 镜像设置设置镜像方法切换回npm官方镜像选择镜像源 主页传送门&#xff1a;&#x1f4c0; 传送 概述 npm致力于让 JavaScript 开发变得…

MyBatis进阶之结果集映射注解版

文章目录 注解实现结果集映射注解实现关系映射常用功能注解汇总 注解实现结果集映射 注意 配置结果集映射&#xff0c;只用看 SQL 执行结果&#xff0c;不看 SQL 语句&#xff01; 注意 由于注解在映射结果集上没有实现 <resultMap> 的 100% 功能&#xff0c;因此&#x…

在AWS Lambda上部署标准FFmpeg工具——Docker方案

大纲 1 确定Lambda运行时环境1.1 Lambda系统、镜像、内核版本1.2 运行时1.2.1 Python1.2.2 Java 2 启动EC23 编写调用FFmpeg的代码4 生成docker镜像4.1 安装和启动Docker服务4.2 编写Dockerfile脚本4.3 生成镜像 5 推送镜像5.1 创建存储库5.2 给EC2赋予角色5.2.1 创建策略5.2.2…

Swagger2的使用

手写Api文档的几个痛点&#xff1a; 文档需要更新的时候&#xff0c;需要再次发送一份给前端&#xff0c;也就是文档更新交流不及时。 接口返回结果不明确 不能直接在线测试接口&#xff0c;通常需要使用工具&#xff0c;比如postman 接口文档太多&#xff0c;不好管理 Sw…

kafka学习笔记--生产者消息发送及原理

本文内容来自尚硅谷B站公开教学视频&#xff0c;仅做个人总结、学习、复习使用&#xff0c;任何对此文章的引用&#xff0c;应当说明源出处为尚硅谷&#xff0c;不得用于商业用途。 如有侵权、联系速删 视频教程链接&#xff1a;【尚硅谷】Kafka3.x教程&#xff08;从入门到调优…

HNU计算机视觉作业二

前言 选修的是蔡mj老师的计算机视觉&#xff0c;上课还是不错的&#xff0c;但是OpenCV可能需要自己学才能完整把作业写出来。由于没有认真学&#xff0c;这门课最后混了80多分&#xff0c;所以下面作业解题过程均为自己写的&#xff0c;并不是标准答案&#xff0c;仅供参考 …

【改进YOLOv8】融合可扩张残差(DWR)注意力模块的小麦病害检测系统

1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 研究背景与意义&#xff1a; 随着计算机视觉技术的快速发展&#xff0c;深度学习在图像识别和目标检测领域取得了巨大的突破。其中&#xff0c;YOLO&#xff08;You Only Look O…

《opencv实用探索·八》图像模糊之均值滤波、高斯滤波的简单理解

1、前言 什么是噪声&#xff1f; 该像素与周围像素的差别非常大&#xff0c;导致从视觉上就能看出该像素无法与周围像素组成可识别的图像信息&#xff0c;降低了整个图像的质量。这种“格格不入”的像素就被称为图像的噪声。如果图像中的噪声都是随机的纯黑像素或者纯白像素&am…

短剧分销平台搭建:短剧变现新模式

短剧作为今年大热的行业&#xff0c;深受大众追捧&#xff01;短剧剧情紧凑&#xff0c;几乎每一集都有高潮剧情&#xff0c;精准击中了当下网友的碎片化时间。 短剧的形式较为灵活&#xff0c;可以轻松融入各种的元素&#xff0c;比如喜剧、悬疑、爱情等&#xff0c;可以满足…

工业 4.0 | 数字孪生入门指南

工业 4.0 在多年热议后悄然落地&#xff0c;如今&#xff0c;制造、能源和运输企业正在越来越多地从中受益。 仿真未来场景 公司可以使用数字孪生仿真未来场景&#xff0c;以了解天气、车队规模或工况差异等因素对性能的影响。该方法可为维护计划提供决策支撑&#xff0c;并可…

[陇剑杯 2021]简单日志分析

[陇剑杯 2021]简单日志分析 题目做法及思路解析&#xff08;个人分享&#xff09; 问一&#xff1a;某应用程序被攻击&#xff0c;请分析日志后作答&#xff1a; 黑客攻击的参数是______。&#xff08;如有字母请全部使用小写&#xff09;。 题目思路&#xff1a; 分析…

探索Python中封装的概念与实践

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 封装是面向对象编程中的核心概念&#xff0c;它能够帮助程序员隐藏类的内部细节&#xff0c;并限制对类成员的直接访问。本文将深入探讨Python中封装的机制&#xff0c;介绍封装的类型和优势&#xff0c;并提供详…