rocketmq5源码系列--(一)--搭建调试环境

说在前头:阿里的rocketmq的文档是真他妈的烂的1b,很多东西都不说,全靠自己看源码,摸索,草,真的要吐血了

rocketmq的版本5而不是版本4,版本5比版本4多了个proxy
rocketmq5 三个组件:namesrv、broker、proxy,所以要启动这三个组件,clientsdk是和proxy通信,proxy和broker通信,

0:rocketmq用的jdk1.8


1:配置环境变量,可以直接在idea里配也可以在windows上配,不过windows上配可能要重启才起作用:
(!!!路径一定要用双斜杠,比如 a\\b\\c不能是a\b\c,因为他会解析成转义而不是路径,坑了我好久)
    ROCKETMQ_HOME="D:\\work\\code\\rocketmq"   #namesrv和broker使用的,就是我们的源码路径
    RMQ_PROXY_HOME="D:\\work\\code\\rocketmq"  #proxy使用的,proxy是新增的,所以这个环境变量也是新增的

2:创建配置文件夹
rocketmq默认使用的是ROCKETMQ_HOME\\distribution目录下的配置文件,我们不用,而是新建一个并在启动的时候指定
    2.1:创建文件夹 ROCKETMQ_HOME\\conf
    2.2:创建配置文件 ROCKETMQ_HOME\\conf\\namesrv.conf,ROCKETMQ_HOME\\conf\\broker.conf,ROCKETMQ_HOME\\conf\\rmq-proxy.json
        broker.conf和namesrv.conf的命令可以随便取,内容也可以为空,直接用默认的,
        proxy我偷懒,直接用代码默认的,默认的要求是RMQ_PROXY_HOME\\conf\\rmq-proxy.json这个文件即指定目录下的指定文件
    rmq-proxy.json文件内容如下:
        {
          "rocketMQClusterName": "DefaultCluster",
          "namesrvAddr": "127.0.0.1:9876"  
        }
    broker.conf文件内容如下:
        brokerClusterName = DefaultCluster    #必须和rmq-proxy.json中的clusterName保持一致
        brokerName = broker-a
        namesrvAddr = 127.0.0.1:9876
        storePathRootDir=D:\\work\\code\\rocketmq\\conf\\brokerstore                #!!!我们手动创建就行,还有,路径名一定要双斜杠
        storePathCommitLog=D:\\work\\code\\rocketmq\\conf\\brokerstore\\commitlog   #!!!这个目录他会自动创建,还有,路径名一定要双斜杠
    namesrv.conf文件内容如下:
        listenPort=9876        #指定端口
        (当然也可以啥也不填)
  
3:修改日志文件,以便会在控制台打印日志
    只要修改每个子项目的resource目录下的rmq.xxx.logback.xml文件就行:
    <configuration>
        <root level="INFO">
            <appender-ref ref="STDOUT"/>          #!!!!只要把<root>标签内的ref的名字改成"STDOUT"就会输出到控制台了
        </root>
    </configuration>

4:启动namesrv/broker/proxy,命令行参数:
    broker:  -c D:\\work\\code\\rocketmq\\conf\\broker.conf  #!!!路径一定要双斜杠,否则会启动失败,这个小错误卡了我好久。。
    namesrv: -c D:\\work\\code\\rocketmq\\conf\\namesrv.conf
    proxy:    (空,可以不填,因为我们用的是默认的配置文件,只要配置RMQ_PROXY_HOME以及创建对应的rmq-proxy.json就行)
    proxy:    启动会超级慢,需要三四分钟。。。真的太夸张了,暂不知道为啥

笔记1:namesrv默认端口9876,broker默认端口10911,proxy默认端口8081,默认集群名 DefaultCluster

5:用mqadmin源码来创建topic
    5.1:修改代码。运行mqadmin创建topic前必须先修改源码中的timeout,
        否则因为它连接需要耗时很长但是超时时间只有5s导致原本可以连接却因为超时而中断而topic创建失败
    源码修改如下:
        public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
            @Override
            public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
                throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
                long beginStartTime = System.currentTimeMillis();
                final Channel channel = this.getAndCreateChannel(addr);
                String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
                //TODO 恢复
                timeoutMillis=25000;        #!!!!!!在进入循环前设置timeoutMillis为25s,这样就不会超时了,只要改这里就可以了
                if (channel != null && channel.isActive()) {
                    long left = timeoutMillis;
                    try {
                        long costTime = System.currentTimeMillis() - beginStartTime;
    5.2: 运行mqadmin来创建topic,不要用mqadmin.cmd,这个创建不了,老是连不上,
         而且broker设置autoTopicCreateEnable=true不起作用必须手动创建
         mqadmin启动命令如下:
            mqadmin对应的class为:org.apache.rocketmq.tools.command.MQAdminStartup
            mqadmin:  updateTopic -b 127.0.0.1:10911 -t testx -n 127.0.0.1:9876    #创建topic testx

client测试程序:

pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-tools</artifactId>
            <version>4.9.7</version>
        </dependency>
    </dependencies>

Producer代码:

package producer;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.io.IOException;
import java.time.Duration;

public class ProducerExample {


    public static void main(String[] args) throws ClientException {
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
        String endpoint = "127.0.0.1:8081";
        // 消息发送的目标Topic名称,需要提前创建。
        String topic = "testx";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint)
                .setRequestTimeout(Duration.ofSeconds(25));
        ClientConfiguration configuration = builder.build();
        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)

                .build();
        // 普通消息发送。
        String messageBody = "hello world";
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                // 消息体。
                .setBody(messageBody.getBytes())
                .build();
        try {
            // 发送消息,需要关注发送结果,并捕获失败等异常。
            for (int i = 0; i < 10000; i++) {
                SendReceipt sendReceipt = producer.send(message);
                System.out.println("Send message= {" + messageBody + "} successfully, messageId={" + sendReceipt.getMessageId() + "}");
                int ch = System.in.read();
            }
        } catch (ClientException e) {
            System.out.println("Failed to send message" + e.toString());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        // producer.close();
    }
}

consumer程序:

package consumer;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoints = "localhost:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .setRequestTimeout(Duration.ofSeconds(25)) //!!!!这个超时时间一定要设置长一点,不然会导致连不上而报错
                                                           //!!!!搞了一天,真的吐血,还好今天搞完了,虽然搞完了,但还是得骂两句
                                                           //2024/11/18 22:48,又是加班暂调休的一天。。。。。。
                .build();
        // 订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // 为消费者指定所属的消费者分组,Group需要提前创建。
        String consumerGroup = "myconsumer";
        // 指定需要订阅哪个目标Topic,Topic需要提前创建。
        String topic = "testx";
        // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者分组。
                .setConsumerGroup(consumerGroup)
                // 设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 设置消费监听器。
                .setMessageListener(messageView -> {
                    // 处理消息并返回消费结果。
                    System.out.println("Consume message={" + messageView.getBody().toString() + "} successfully, messageId={" + messageView.getMessageId() + "}");
                    return ConsumeResult.SUCCESS;
                })
                .build();

        Thread.sleep(Long.MAX_VALUE);
        // 如果不需要再使用 PushConsumer,可关闭该实例。
        // pushConsumer.close();
    }
}

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/918632.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【网页设计】CSS3 进阶(动画篇)

1. CSS3 2D 转换 转换&#xff08;transform&#xff09;是CSS3中具有颠覆性的特征之一&#xff0c;可以实现元素的位移、旋转、缩放等效果 转换&#xff08;transform&#xff09;你可以简单理解为变形 移动&#xff1a;translate旋转&#xff1a;rotate缩放&#xf…

django安装与项目创建

一、安装 在终端输入 pip install django //或者(&#xff09;指定安装版本 pip install django2.2 二、创建项目 2.1创建项目 django-admin startproject 项目名 2.2Django 项目中的关键文件 _init_.py:将目录标识为python包setting.py:核心配置文件&#xff0c;定义项目…

【redis】—— 初识redis(redis基本特征、应用场景、以及重大版本说明)

序言 本文将引导读者探索Redis的世界&#xff0c;深入了解其发展历程、丰富特性、常见应用场景、使用技巧等&#xff0c;最后会对Redis演进过程中具有里程碑意义的版本进行详细解读。 目录 &#xff08;一&#xff09;初始redis &#xff08;二&#xff09;redis特性 &#…

SpringBoot学习记录(三)之多表查询

SpringBoot学习记录&#xff08;三&#xff09;之多表查询 一、多表查询概述1、数据准备2、介绍3、分类 二、内连接三、外连接四、子查询1、标量子查询2、列子查询3、行子查询4、表子查询 三、案例1、准备环境2、需求实现3、&#xff08;附&#xff09;数据准备 一、多表查询概…

泰矽微重磅发布超高集成度车规触控芯片TCAE10

市场背景 智能按键和智能表面作为汽车智能化的重要部分&#xff0c;目前正处于快速发展阶段&#xff0c;电容式触摸按键凭借其操作便利性与小体积的优势&#xff0c;在汽车内饰表面的应用越来越广泛。对于空调控制面板、档位控制器、座椅扶手、门饰板、车顶控制器等多路开关的…

HarmonyOs学习笔记-布局单位

鸿蒙开发中布局存在很多单位 鸿蒙的默认单位是vp 下方先展示一下在RrkTsUI中我们应该怎么书写&#xff0c;然后讲一下各大单位具体的含义。 Text("这是一个文本, 用默认单位进行展示&#xff0c;也就是vp") .width(100) .height(100);//此段代码与上方代码是一样的…

操作系统实验 C++实现生产者-消费者问题

实验目的 1、进一步加深理解进程同步的概念 2、加深对进程通信的理解 3、了解Linux下共享内存的使用方法 实验内容 1、按照下面要求&#xff0c;写两个c程序&#xff0c;分别是生产者producer.c以及customer.c 2、一组生产者和一组消费者进程共享一块环形缓冲区 使用共…

Easyexcel(1-注解使用)

文章链接&#xff1a; Easyexcel&#xff08;1-注解使用&#xff09; 版本依赖 <dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>3.3.3</version> </dependency>ExcelProperty 指定…

最新版xAI LLM 模型Grok-2 上线

xAI&#xff01;Grok-2 最新版开启公测&#xff01;”。这是我注册成功的截图&#xff0c;使用国内的邮箱就可以注册使用了&#xff01; Grok API公测与免费体验: Grok API开启公测&#xff0c;提供免费体验128k上下文支持&#xff0c;。Grok-Beta与马斯克: 马斯克庆祝特朗普当…

css数据不固定情况下,循环加不同背景颜色

<template><div><p v-for"(item, index) in items" :key"index" :class"getBackgroundClass(index)">{{ item }}</p></div> </template><script> export default {data() {return {items: [学不会1, …

MySQL的聚簇索引和二级索引

索引按照物理实现方式&#xff0c;索引可以分为 2 种&#xff1a;聚簇&#xff08;聚集&#xff09;和非聚簇&#xff08;非聚集&#xff09;索引。也可以把非聚集索引称为二级索引或者辅助索引。 一.聚簇索引 聚簇索引并不是一种单独的索引类型&#xff0c;而是一种数据存储方…

【Pytorch】torch.nn.functional模块中的非线性激活函数

在使用torch.nn.functional模块时&#xff0c;需要导入包&#xff1a; from torch.nn import functional 以下是常见激活函数的介绍以及对应的代码示例&#xff1a; tanh (双曲正切) 输出范围&#xff1a;(-1, 1) 特点&#xff1a;中心对称&#xff0c;适合处理归一化后的数据…

神经网络11-TFT模型的简单示例

Temporal Fusion Transformer (TFT) 是一种用于时间序列预测的深度学习模型&#xff0c;它结合了Transformer架构的优点和专门为时间序列设计的一些优化技术。TFT尤其擅长处理多变量时间序列数据&#xff0c;并且能够捕捉到长期依赖关系&#xff0c;同时通过自注意力机制有效地…

学习threejs,使用TWEEN插件实现动画

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;threejs gis工程师 文章目录 一、&#x1f340;前言1.1 ☘️THREE.PLYLoader PLY模型加…

世界坐标系、相机坐标系、图像物理坐标系、像素平面坐标系

坐标系及其转换在计算机视觉领域占据核心地位。理解如何从一个坐标系转换到另一个坐标系&#xff0c;不仅是理论上的需要&#xff0c;也是实际应用中不可或缺的技能。 一、世界坐标系的定义 世界坐标系是一个全局的坐标系统&#xff0c;用于定义场景中物体的位置。在这个坐标…

03-axios常用的请求方法、axios错误处理

欢迎来到“雪碧聊技术”CSDN博客&#xff01; 在这里&#xff0c;您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者&#xff0c;还是具有一定经验的开发者&#xff0c;相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导&#xff0c;我将…

Redis/Codis性能瓶颈揭秘:网卡软中断的影响与优化

目录 现象回顾 问题剖析 现场分析 解决方案 总结与反思 1.调整中断亲和性&#xff08;IRQ Affinity&#xff09;&#xff1a; 2.RPS&#xff08;Receive Packet Steering&#xff09;和 RFS&#xff08;Receive Flow Steering&#xff09;&#xff1a; 近期&#xff0c;…

openwebui使用

文章目录 1、feature2、安装使用2.1 安装过程2.2 安装好后 1、feature 可以加载多个大模型 同时回复 模型问答: 使用vLLM框架部署模型&#xff0c;再使用Open WebUI直接进行模型问答 多模型支持: 多模型回复比对&#xff08;Qwen2-72B-Instruct, llama3-70b-8192, mixtral-8x7…

汽车资讯新引擎:Spring Boot技术领航

3系统分析 3.1可行性分析 通过对本汽车资讯网站实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本汽车资讯网站采用SSM框架&#xff0c;JAVA作为开发语言&#…

应用系统开发(12) Zync中实现数字相敏检波

在 Xilinx Zynq 系列(如 Zynq-7000 或 Zynq UltraScale+)中实现数字相敏检波(DSP,Digital Synchronous Detection)可以通过硬件(PL部分,FPGA逻辑)和软件(PS部分,ARM Cortex-A 处理器)的协同工作来实现。以下是一个详细的设计方法,包括基本原理和 Zynq 的实现步骤。…