【RocketMQ入门-安装部署与Java API测试】
- 一、环境说明
- 二、安装部署
- 三、Java API 编写Producer和Consumer进行测试
- 四、小结
一、环境说明
- 虚拟机VWMare:安装centos7.6操作系统
- 源码包:rocketmq-all-5.1.3-source-release.zip
- 单master部署,在一台虚拟机上安装部署name server和proxy以及broker
- 流程图:
二、安装部署
-
源码包安装需要事先安装部署maven,下载apache-maven-3.6.3-bin.tar.gz安装包,然后解压并配置环境变量,如下命令:
tar -zvxf apache-maven-3.6.3-bin.tar.gz -C /training/
配置环境变量(此处是用root安装),编辑:
vi ~/.bash_profile
,在文件末尾添加如下内容:#maven export MVN_HOME=/training/apache-maven-3.6.3 export PATH=$MVN_HOME/bin:$PATH
执行:
source ~/.bash_profile
使环境生效。 -
进入/training/apache-maven-3.6.3/conf目录下,配置maven的仓库为阿里云和华为云仓库,执行如下命令:
cd /training/apache-maven-3.6.3/conf/ mv settings.xml settings.xml.backup vi settings.xml
在打开的settings.xml中,粘贴如下内容即可:
<?xml version="1.0" encoding="utf-8"?> <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd"> <mirrors> <mirror> <id>aliyunmaven</id> <mirrorOf>*</mirrorOf> <name>阿里云公共仓库</name> <url>https://maven.aliyun.com/repository/public</url> </mirror> <mirror> <id>huaweicloud</id> <mirrorOf>central</mirrorOf> <name>huaweicloud maven</name> <url>https://mirrors.huaweicloud.com/repository/maven/</url> </mirror> </mirrors> <profiles> <profile> <repositories> <repository> <id>central</id> <url>https://maven.aliyun.com/repository/central</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> </profile> </profiles> </settings>
-
由于CentOS7.6最小模式安装没有unzip命令,需要事先安装,执行如下命令安装:
yum install unzip -y
-
解压源码包rocketmq-all-5.1.3-source-release.zip,进入到解压后的目录下,然后编译安装,执行如下命令:
unzip rocketmq-all-5.1.3-source-release.zip cd rocketmq-all-5.1.3-source-release/ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
-
第5步骤正确后,进入到 /rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3目录下,然后启动NameServer,执行如下命令:
cd /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3 nohup sh bin/mqnamesrv &
-
验证NameServer是否启动成功,执行如下命令:
tail -f ~/logs/rocketmqlogs/namesrv.log
会看到如下内容,说明已经正常启动了
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
或者执行jps命令查看是否已经有了NameServer进程:NamesrvStartup
,如有说明ok -
第5、6步骤正确后,进入到 /rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3目录下,然后启动Broker和Proxy,执行如下命令:
注意:
NameServer成功启动后,我们启动Broker和Proxy,5.x 版本下我们建议使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。详情参考其他教程。cd /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3 nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
-
验证NameServer是否启动成功,执行如下命令:
tail -f ~/logs/rocketmqlogs/proxy.log
会看到如下内容,说明已经正常启动了
The broker[broker-a, 192.168.36.132:10911] boot success. serializeType=JSON and name server is localhost:9876
或者执行jps命令查看是否已经有了:ProxyStartup
进程,如有说明ok
三、Java API 编写Producer和Consumer进行测试
- 上述正常启动NameServer和Broker及Proxy后,首先需要创建名为
TestTopic
的Topic,执行如下命令:
查看新创建的Topic,验证是否已经创建好,执行:cd /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3 sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
结果如下:sh bin/mqadmin topicList -n localhost:9876
- 创建消费者组,执行如下命令:
执行命令无任何错误即说明已经创建成功。cd /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3 sh bin/sh mqadmin updateSubGroup -g testgroup -c DefaultCluster -n localhost:9876
- 在Idea中创建Maven工程,添加rocketmq依赖,添加如下依赖到pom.xml中:
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <rocketmq-client-java-version>5.0.5</rocketmq-client-java-version> <slf4j.version>1.7.25</slf4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>${rocketmq-client-java-version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> </dependencies>
- 编写ProducerTest生产者,代码如下:
import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientConfigurationBuilder; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; public class ProducerTest { private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class); public static void main(String[] args) throws Exception { testMain(); } public static void testMain() throws ClientException, IOException { // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。 String endpoint = "192.168.36.132:8081"; // 消息发送的目标Topic名称,需要提前创建。 // 执行:sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster String topic = "TestTopic"; ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint); ClientConfiguration configuration = builder.build(); // 初始化Producer时需要设置通信配置以及预绑定的Topic。 Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); int temp = 0; while (true) { String msg = "第 " + temp + " 条消息,我喜欢rocketmq!!"; temp++; // 普通消息发送。 Message message = provider.newMessageBuilder() .setTopic(topic) // 设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") // 设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("messageTag") // 消息体。 .setBody(msg.getBytes()) .build(); try { // 发送消息,需要关注发送结果,并捕获失败等异常。 SendReceipt sendReceipt = producer.send(message); Thread.sleep(1000); logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Exception e) { logger.error("Failed to send message", e); } } // producer.close(); } }
- 编写CommonUtils工具类,用于将ByteBuffer转成String,代码如下:
import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; public class CommonUtils { public static void main(String[] args) { System.out.println("Hello world!"); } public static String decodeKey(ByteBuffer bytes) { Charset charset = StandardCharsets.UTF_8; return charset.decode(bytes).toString(); } public static byte[] decodeValue(ByteBuffer bytes) { int len = bytes.limit() - bytes.position(); byte[] bytes1 = new byte[len]; bytes.get(bytes1); return bytes1; } public static ByteBuffer encodeKey(String key) { return ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8)); } public static ByteBuffer encodeValue(byte[] value) { ByteBuffer byteBuffer = ByteBuffer.allocate(value.length); byteBuffer.clear(); byteBuffer.get(value, 0, value.length); return byteBuffer; } }
- 编写ConsumerTest生产者,代码如下:
import java.util.Collections; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.rocketmq.producer.CommonUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PushConsumerTest { private static final Logger logger = LoggerFactory.getLogger(PushConsumerTest.class); private PushConsumerTest() { } public static void main(String[] args) throws Exception { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。 String endpoints = "192.168.36.132:8081"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build(); // 订阅消息的过滤规则,表示订阅所有Tag的消息。 String tag = "*"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); // 为消费者指定所属的消费者分组,Group需要提前创建。 // 执行:sh bin/sh mqadmin updateSubGroup -g testgroup -c DefaultCluster -n localhost:9876 String consumerGroup = "testgroup"; // 指定需要订阅哪个目标Topic,Topic需要提前创建。 String topic = "TestTopic"; // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。 PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // 设置消费者分组。 .setConsumerGroup(consumerGroup) // 设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) // 设置消费监听器。 .setMessageListener(messageView -> { // 处理消息并返回消费结果。 logger.info("Consume message successfully, messageId={},messageBody={}", messageView.getMessageId(), CommonUtils.decodeKey(messageView.getBody())); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); // 如果不需要再使用 PushConsumer,可关闭该实例。 // pushConsumer.close(); } }
- 为了能查看到控制台日志输入,需要在resources目录下新建log4j.properties、log4j2.properties,具体内容如下:
log4j.properties
内容:log4j.rootLogger=INFO,console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j2.properties
内容:name = PropertiesConfig property.filename = target/logs #appenders = console, file #配置值是appender的类型,并不是具体appender实例的name appenders = rolling appender.rolling.type = RollingFile appender.rolling.name = RollingLogFile appender.rolling.fileName=${filename}/automationlogs.log appender.rolling.filePattern = ${filename}/automationlogs-%d{MM-dd-yy-HH-mm-ss}-%i.log appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 5 rootLogger.level = INFO,console rootLogger.appenderRef.rolling.ref = RollingLogFile
- 到此,完成了所有准备工作了,整个工程如下所示:
- 运行ProducerTest程序进行消息的发送,控制台中会看到如下内容:
- 运行ConsumerTest程序接收消息,控制台中会看到如下内容:
四、小结
至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们也通过编写Java程序进行简单的消息收发。如本文对您有帮助,麻烦您动动发财的手指点个赞~~~~~,谢谢您的阅读!!!