【RocketMQ入门-安装部署与Java API测试】

【RocketMQ入门-安装部署与Java API测试】

    • 一、环境说明
    • 二、安装部署
    • 三、Java API 编写Producer和Consumer进行测试
    • 四、小结

一、环境说明

  1. 虚拟机VWMare:安装centos7.6操作系统
  2. 源码包:rocketmq-all-5.1.3-source-release.zip
  3. 单master部署,在一台虚拟机上安装部署name server和proxy以及broker
  4. 流程图:
    在这里插入图片描述

二、安装部署

  1. 源码包安装需要事先安装部署maven,下载apache-maven-3.6.3-bin.tar.gz安装包,然后解压并配置环境变量,如下命令:

    tar -zvxf apache-maven-3.6.3-bin.tar.gz -C /training/
    

    配置环境变量(此处是用root安装),编辑:vi ~/.bash_profile,在文件末尾添加如下内容:

    #maven
    export MVN_HOME=/training/apache-maven-3.6.3
    export PATH=$MVN_HOME/bin:$PATH
    

    执行:source ~/.bash_profile 使环境生效。

  2. 进入/training/apache-maven-3.6.3/conf目录下,配置maven的仓库为阿里云和华为云仓库,执行如下命令:

    cd /training/apache-maven-3.6.3/conf/
    mv settings.xml settings.xml.backup
    vi settings.xml
    

    在打开的settings.xml中,粘贴如下内容即可:

    <?xml version="1.0" encoding="utf-8"?>
    <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="         http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
        <mirrors>
    		<mirror>
    				<id>aliyunmaven</id>
    				<mirrorOf>*</mirrorOf>
    				<name>阿里云公共仓库</name>
    				<url>https://maven.aliyun.com/repository/public</url>
    		</mirror>
    		 <mirror>
    				<id>huaweicloud</id>
    				<mirrorOf>central</mirrorOf>
    				<name>huaweicloud maven</name>
    				<url>https://mirrors.huaweicloud.com/repository/maven/</url>
    		</mirror>
    
        </mirrors>
        <profiles>
            <profile>
    
    			<repositories>
    					<repository>
    					  <id>central</id>
    					  <url>https://maven.aliyun.com/repository/central</url>
    					  <releases>
    							<enabled>true</enabled>
    					  </releases>
    					  <snapshots>
    							<enabled>true</enabled>
    					  </snapshots>
    					</repository>
    			</repositories>
            </profile>
        </profiles>
    </settings>
    
  3. 由于CentOS7.6最小模式安装没有unzip命令,需要事先安装,执行如下命令安装:

    yum install unzip -y
    
  4. 解压源码包rocketmq-all-5.1.3-source-release.zip,进入到解压后的目录下,然后编译安装,执行如下命令:

    unzip rocketmq-all-5.1.3-source-release.zip
    cd rocketmq-all-5.1.3-source-release/
    mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
    
  5. 第5步骤正确后,进入到 /rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3目录下,然后启动NameServer,执行如下命令:

    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    nohup sh bin/mqnamesrv &
    
  6. 验证NameServer是否启动成功,执行如下命令:

    tail -f ~/logs/rocketmqlogs/namesrv.log
    

    会看到如下内容,说明已经正常启动了

    The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
    或者执行jps命令查看是否已经有了NameServer进程:NamesrvStartup,如有说明ok

  7. 第5、6步骤正确后,进入到 /rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3目录下,然后启动Broker和Proxy,执行如下命令:注意:NameServer成功启动后,我们启动Broker和Proxy,5.x 版本下我们建议使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。详情参考其他教程。

    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
    
  8. 验证NameServer是否启动成功,执行如下命令:

    tail -f ~/logs/rocketmqlogs/proxy.log
    

    会看到如下内容,说明已经正常启动了

    The broker[broker-a, 192.168.36.132:10911] boot success. serializeType=JSON and name server is localhost:9876
    或者执行jps命令查看是否已经有了:ProxyStartup 进程,如有说明ok

三、Java API 编写Producer和Consumer进行测试

  1. 上述正常启动NameServer和Broker及Proxy后,首先需要创建名为TestTopic的Topic,执行如下命令:
    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
    
    查看新创建的Topic,验证是否已经创建好,执行:
    sh bin/mqadmin topicList -n localhost:9876
    
    结果如下:
    在这里插入图片描述
  2. 创建消费者组,执行如下命令:
    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    sh bin/sh mqadmin updateSubGroup -g testgroup -c DefaultCluster -n localhost:9876
    
    执行命令无任何错误即说明已经创建成功。
  3. 在Idea中创建Maven工程,添加rocketmq依赖,添加如下依赖到pom.xml中:
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <rocketmq-client-java-version>5.0.5</rocketmq-client-java-version>
        <slf4j.version>1.7.25</slf4j.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>${rocketmq-client-java-version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
    </dependencies>
    
  4. 编写ProducerTest生产者,代码如下:
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
    import org.apache.rocketmq.client.apis.ClientException;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.message.Message;
    import org.apache.rocketmq.client.apis.producer.Producer;
    import org.apache.rocketmq.client.apis.producer.SendReceipt;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    public class ProducerTest {
        private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);
    
        public static void main(String[] args) throws Exception {
            testMain();
        }
    
        public static void testMain() throws ClientException, IOException {
            // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
            String endpoint = "192.168.36.132:8081";
            // 消息发送的目标Topic名称,需要提前创建。
            // 执行:sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
            String topic = "TestTopic";
            ClientServiceProvider provider = ClientServiceProvider.loadService();
            ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
            ClientConfiguration configuration = builder.build();
            // 初始化Producer时需要设置通信配置以及预绑定的Topic。
            Producer producer = provider.newProducerBuilder()
                    .setTopics(topic)
                    .setClientConfiguration(configuration)
                    .build();
            int temp = 0;
            while (true) {
                String msg = "第 " + temp + " 条消息,我喜欢rocketmq!!";
                temp++;
                // 普通消息发送。
                Message message = provider.newMessageBuilder()
                        .setTopic(topic)
                        // 设置消息索引键,可根据关键字精确查找某条消息。
                        .setKeys("messageKey")
                        // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                        .setTag("messageTag")
                        // 消息体。
                        .setBody(msg.getBytes())
                        .build();
                try {
                    // 发送消息,需要关注发送结果,并捕获失败等异常。
                    SendReceipt sendReceipt = producer.send(message);
                    Thread.sleep(1000);
                    logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
                } catch (Exception e) {
                    logger.error("Failed to send message", e);
                }
            }
            // producer.close();
        }
    }
    
  5. 编写CommonUtils工具类,用于将ByteBuffer转成String,代码如下:
    import java.nio.ByteBuffer;
    import java.nio.charset.Charset;
    import java.nio.charset.StandardCharsets;
    
    public class CommonUtils {
    
        public static void main(String[] args) {
            System.out.println("Hello world!");
        }
    
        public static String decodeKey(ByteBuffer bytes) {
            Charset charset = StandardCharsets.UTF_8;
            return charset.decode(bytes).toString();
        }
    
        
        public static byte[] decodeValue(ByteBuffer bytes) {
            int len = bytes.limit() - bytes.position();
            byte[] bytes1 = new byte[len];
            bytes.get(bytes1);
            return bytes1;
        }
    
        
        public static ByteBuffer encodeKey(String key) {
            return ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8));
        }
    
        public static ByteBuffer encodeValue(byte[] value) {
            ByteBuffer byteBuffer = ByteBuffer.allocate(value.length);
            byteBuffer.clear();
            byteBuffer.get(value, 0, value.length);
            return byteBuffer;
        }
    }
    
  6. 编写ConsumerTest生产者,代码如下:
    import java.util.Collections;
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
    import org.apache.rocketmq.client.apis.consumer.FilterExpression;
    import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
    import org.apache.rocketmq.client.apis.consumer.PushConsumer;
    import org.rocketmq.producer.CommonUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class PushConsumerTest {
        private static final Logger logger = LoggerFactory.getLogger(PushConsumerTest.class);
    
        private PushConsumerTest() {
        }
    
        public static void main(String[] args) throws Exception {
            final ClientServiceProvider provider = ClientServiceProvider.loadService();
            // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
            String endpoints = "192.168.36.132:8081";
            ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                    .setEndpoints(endpoints)
                    .build();
            // 订阅消息的过滤规则,表示订阅所有Tag的消息。
            String tag = "*";
            FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
            // 为消费者指定所属的消费者分组,Group需要提前创建。
            // 执行:sh bin/sh mqadmin updateSubGroup -g testgroup -c DefaultCluster -n localhost:9876
            String consumerGroup = "testgroup";
            // 指定需要订阅哪个目标Topic,Topic需要提前创建。
            String topic = "TestTopic";
            // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
            PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                    .setClientConfiguration(clientConfiguration)
                    // 设置消费者分组。
                    .setConsumerGroup(consumerGroup)
                    // 设置预绑定的订阅关系。
                    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                    // 设置消费监听器。
                    .setMessageListener(messageView -> {
                        // 处理消息并返回消费结果。
                        logger.info("Consume message successfully, messageId={},messageBody={}", messageView.getMessageId(), CommonUtils.decodeKey(messageView.getBody()));
                        return ConsumeResult.SUCCESS;
                    })
                    .build();
            Thread.sleep(Long.MAX_VALUE);
            // 如果不需要再使用 PushConsumer,可关闭该实例。
            // pushConsumer.close();
        }
    }
    
  7. 为了能查看到控制台日志输入,需要在resources目录下新建log4j.properties、log4j2.properties,具体内容如下:
    log4j.properties内容:
    log4j.rootLogger=INFO,console
    
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.out
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
    
    log4j2.properties内容:
    name = PropertiesConfig
    property.filename = target/logs
    
    #appenders = console, file
    #配置值是appender的类型,并不是具体appender实例的name
    appenders = rolling
    
    appender.rolling.type = RollingFile
    appender.rolling.name = RollingLogFile
    appender.rolling.fileName=${filename}/automationlogs.log
    appender.rolling.filePattern = ${filename}/automationlogs-%d{MM-dd-yy-HH-mm-ss}-%i.log
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 5
     
    rootLogger.level = INFO,console
    rootLogger.appenderRef.rolling.ref = RollingLogFile
    
  8. 到此,完成了所有准备工作了,整个工程如下所示:
    在这里插入图片描述
  9. 运行ProducerTest程序进行消息的发送,控制台中会看到如下内容:
    在这里插入图片描述
  10. 运行ConsumerTest程序接收消息,控制台中会看到如下内容:
    在这里插入图片描述

四、小结

至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们也通过编写Java程序进行简单的消息收发。如本文对您有帮助,麻烦您动动发财的手指点个赞~~~~~,谢谢您的阅读!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/72013.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何微调优化你的ChatGPT提示来提高对话质量

ChatGPT会话质量很大程度上取决于微调优化提示的艺术。本文旨在阐明微调提示的复杂性&#xff0c;以确保你可以充分发挥ChaGPT这一颠覆性工具的潜力。 与ChatGPT对话的关键部分是“提示”。即&#xff1a;你输入的问题或陈述&#xff0c;它决定了人工智能的响应。类似于引导对…

软件测试基础篇——Docker

1、docker技术概述 docker描述&#xff1a;docker是一项虚拟化的容器技术&#xff08;类似于虚拟机&#xff09;&#xff0c;docker技术给使用者提供一个平台&#xff0c;在该平台上可以利用提供的容器&#xff0c;对每一个应用程序进行单独的封装隔离&#xff0c;每一个应用程…

Blender如何给fbx模型添加材质贴图并导出带有材质贴图的模型

推荐&#xff1a;使用 NSDT场景编辑器快速助你搭建可二次编辑的3D应用场景 此教程适合新手用户&#xff0c;专业人士直接可直接绕路。 本教程中介绍了利用Blender建模软件&#xff0c;只需要简单几步就可以为模型添加材质贴&#xff0c;图&#xff0c;并且导出带有材质的模型文…

netty基础与原理

Netty线程模型和Reactor模式 简介&#xff1a;reactor模式 和 Netty线程模型 设计模式——Reactor模式&#xff08;反应器设计模式&#xff09;&#xff0c;是一种基于 事件驱动的设计模式&#xff0c;在事件驱动的应用中&#xff0c;将一个或多个客户的 服务请求分离&#x…

Verilog求log10和log2近似

Verilog求log10和log2近似 Verilog求10对数近似方法&#xff0c;整数部分用位置index代替&#xff0c;小数部分用查找表实现 参考&#xff1a; Verilog写一个对数计算模块Log2(x) FPGA实现对数log2和10*log10

【LangChain学习】基于PDF文档构建问答知识库(三)实战整合 LangChain、OpenAI、FAISS等

接下来&#xff0c;我们开始在web框架上整合 LangChain、OpenAI、FAISS等。 一、PDF库 因为项目是基于PDF文档的&#xff0c;所以需要一些操作PDF的库&#xff0c;我们这边使用的是PyPDF2 from PyPDF2 import PdfReader# 获取pdf文件内容 def get_pdf_text(pdf):text "…

SQL常见命令语句

1.连接数据库 mysql (-h IP) -u root -p 密码2.查看数据库 show databases3.使用数据库 use db_name4.查看表 show tables [from db_name]5.查看表结构 desc tb_name6.创建、删除、选择数据库 create database db_namedrop database db_nameuse db_name7.数据类型 参考链…

技术应用:Docker安全性的最佳实验|聊聊工程化Docker

&#x1f525; 技术相关&#xff1a;《技术应用》 ⛺️ I Love you, like a fire! 文章目录 首先&#xff0c;使用Docker Hub控制访问其次&#xff0c;保护密钥写在最后 不可否认&#xff0c;能生存在互联网上的软件都是相互关联的&#xff0c;当我们开发一款应用程序时&#x…

(二)结构型模式:1、适配器模式(Adapter Pattern)(C++实现示例)

目录 1、适配器模式&#xff08;Adapter Pattern&#xff09;含义 2、适配器模式应用场景 3、适配器模式的UML图学习 4、C实现适配器模式的示例 1、适配器模式&#xff08;Adapter Pattern&#xff09;含义 将一个接口转换为客户端所期待的接口&#xff0c;从而使两个接口…

JVM 查看配置 jinfo 及使用 jstat,查看堆栈jstack及GC

1. Jinfo 查看正在运行的Java应用程序的扩展参数: 包含 JVM 参数与 java 系统参数 命令&#xff1a; jinfo pid 2 jstat 查看堆内存使用情况及 GC 回收频率等&#xff1a; jstat [-命令选项] [vmid] [间隔时间(毫秒)] [查询次数] 2.1 jstat -gc pid 最常用&#xff0c;可…

MySQL数据库----------安装anaconda---------python与数据库的链接

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; ​&#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382; &#x1f389;&#x1f389;&#x1f389…

【Linux】进程间通信之管道

【Linux】进程间通信之管道 进程间通信进程间通信目的进程间通信的方式 管道&#xff08;内核维护的缓冲区&#xff09;匿名管道&#xff08;用于父子间进程间通信&#xff09;简单使用阻塞状态读写特征非阻塞状态读写特征 匿名管道特点命名管道 匿名管道与命名管道的区别 进程…

时序预测 | MATLAB实现基于CNN卷积神经网络的时间序列预测-递归预测未来(多指标评价)

时序预测 | MATLAB实现基于CNN卷积神经网络的时间序列预测-递归预测未来(多指标评价) 目录 时序预测 | MATLAB实现基于CNN卷积神经网络的时间序列预测-递归预测未来(多指标评价)预测结果基本介绍程序设计参考资料 预测结果 基本介绍 1.Matlab实现CNN卷积神经网络时间序列预测未…

中国首款量子计算机操作系统本源司南 PilotOS正式上线

中国安徽省量子计算工程研究中心近日宣布&#xff0c;中国国产量子计算机操作系统本源司南 PilotOS 客户端正式上线。 如果把量子芯片比喻成人的“心脏”&#xff0c;那么量子计算机操作系统就相当于人的“大脑”&#xff0c;量子计算应用软件则是人的“四肢”。 据安徽省量子…

C++入门篇7---string类

所谓的string类&#xff0c;其实就是我们所说的字符串&#xff0c;本质和c语言中的字符串数组一样&#xff0c;但是为了更符合C面向对象的特性&#xff0c;特地将它写成了一个单独的类&#xff0c;方便我们的使用 对其定义有兴趣的可以去看string类的文档介绍&#xff0c;这里…

运维监控学习笔记3

DELL的IPMI页面的登录&#xff1a; 风扇的状态&#xff1a; 电源温度&#xff1a;超过70度就告警&#xff1a; 日志信息&#xff1a; 可以看到更换过磁盘。 iDRAC的设置 虚拟控制台&#xff1a;启动远程控制台&#xff1a; 可以进行远程控制。 机房工程师帮我们接远程控制&…

opencv 基础54-利用形状场景算法比较轮廓-cv2.createShapeContextDistanceExtractor()

注意&#xff1a;新版本的opencv 4 已经没有这个函数 cv2.createShapeContextDistanceExtractor() 形状场景算法是一种用于比较轮廓或形状的方法。这种算法通常用于计算两个形状之间的相似性或差异性&#xff0c;以及找到最佳的匹配方式。 下面是一种基本的比较轮廓的流程&…

Dynamic CRM开发 - 实体介绍

实体简介 在CRM中,实体(Entity)是数据的基本载体,也是构建业务逻辑网络的基础节点。 实体可以理解为数据库中的一张表(实体中的字段对应数据库表的字段),比如创建一个实体存储客户信息,创建一个实体存储产品信息,产品实体里可以创建一个查找类型的字段(类似表的外键)…

Json简述(C++)

目录 1.介绍 2.格式 3.底层 3.1数据对象表示 3.2序列化接口 3.3反序列化接口 4.使用 1.介绍 Json&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式&#xff0c;其最早是为JavaScript编程语言设计的格式。不过发发展至今&#xff0c;Jso…

C++——缺省参数

缺省参数的定义 缺省参数是声明或定义函数时为函数的参数指定一个缺省值。在调用该函数的时候&#xff0c;如果没有指定实参&#xff0c;则采用该形参的缺省值&#xff0c;否则使用指定的实参。 void Func(int a 0) {cout << a << endl; } int main() { Func()…