SpringBoot+RocketMQ集群(dledger)部署完整学习笔记

文章目录

  • 前言
  • 一、单台集群部署
  • 二、多台集群部署
    • 1.修改配置
    • 2.dashboard修改
  • 三、整合springboot
    • 1.引入pom和修改yml
    • 2.编写消费者
    • 3.编写生产者
    • 4.测试效果
  • 总结


前言


RocketMQ集群方式有好几种
官网地址 https://rocketmq.apache.org/zh/docs/4.x/deployment/01deploy

  • 2m-2s-async:2主2从异步刷盘(吞吐量较大,但是消息可能丢失
  • 2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全)
  • 2m-noslave :2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置
  • dledger:用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,
    其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。

dledger搭建参考文档 https://rocketmq.apache.org/zh/docs/4.x/bestPractice/02dledger

MQ安装部署请看这篇:https://blog.csdn.net/HBliucheng/article/details/135357998

搭建过程中踩过的坑也也会记录下来

一、单台集群部署

## 启动
nohup sh bin/dledger/fast-try.sh start
## 关闭
nohup sh bin/dledger/fast-try.sh stop

先启动 fast-try.sh start
启动时发现权限不足
nohup: 无法运行命令"bin/mqbroker": 权限不够
查看启动脚本

cat bin/dledger/fast-try.sh

在这里插入图片描述
那我们就修改下nohup 后面加上sh
修改后如下

function startNameserver() {
    export JAVA_OPT_EXT=" -Xms512m -Xmx512m  "
    nohup sh bin/mqnamesrv &
}

function startBroker() {
    export JAVA_OPT_EXT=" -Xms1g -Xmx1g  "
    conf_name=$1
    nohup sh bin/mqbroker -c $conf_name &
}

再次启动发现可以了
在这里插入图片描述
执行命令 查看集群情况 BID =0的是主节点

sh bin/mqadmin clusterList -n 127.0.0.1:9876

在这里插入图片描述
再看看dashboarb
启动之前请先开放6个端口 如果还有端口访问不了的请自行开放出来

firewall-cmd --zone=public --add-port=30909/tcp --permanent
firewall-cmd --zone=public --add-port=30911/tcp --permanent
firewall-cmd --zone=public --add-port=30919/tcp --permanent
firewall-cmd --zone=public --add-port=30921/tcp --permanent
firewall-cmd --zone=public --add-port=30929/tcp --permanent
firewall-cmd --zone=public --add-port=30931/tcp --permanent

### 如果不想一次次开放下面命令也可以
firewall-cmd --zone=public --add-port=30900-30930/tcp --permanent
## 重启防火墙
systemctl reload firewalld
## 查看开放的端口
firewall-cmd --list-ports
## 其它命令
### 关闭端口
firewall-cmd --zone=public --remove-port=30909/tcp --permanent

在这里插入图片描述

启动生产者和消费者再看 master消费一个
在这里插入图片描述
停止master

 lsof  -i:30911
 ## 找到pid杀死 我的是118276
 kill  118276

在这里插入图片描述
在这里插入图片描述
我们再启动 被杀死的broker

nohup sh  bin/mqbroker -c conf/dledger/broker-n0.conf &

在这里插入图片描述
在这里插入图片描述
发现30911作为slave回来了

二、多台集群部署

先准备三台机器
192.168.141.101
192.168.141.102
192.168.141.103

1.修改配置

192.168.141.101修改如下
profile不修改也可以

vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR=192.168.141.101:9876
source /etc/profile

修改 broker.conf 后面我们启动哪个就修改哪个 我是把 broker-n0.conf复制一份到broker.conf,也可以直接修改broker-n0.conf,启动时启动自己配置的conf文件就可以



cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf 
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16


192.168.141.102修改如下
profile不修改也可以

vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR=92.168.141.102:9876
source /etc/profile

cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf 
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16

192.168.141.103修改如下
profile不修改也可以

vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR=192.168.141.103:9876
cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf 
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16

开放端口
每台机器都要开放

firewall-cmd --zone=public --add-port=30911/tcp --permanent
firewall-cmd --zone=public --add-port=40911/tcp --permanent
systemctl reload firewalld

如果还有端口没开放,请自行开放

启动
每台机器都要启动

cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0
nohup sh bin/mqnamesrv &
nohup sh  bin/mqbroker -c conf/dledger/broker.conf &

查看日志
在这里插入图片描述
发现未创建文件夹,创建文件夹

 mkdir -p  /tmp/rmqstore/node00/commitlog
## 关掉再启动
sh bin/mqshutdown broker
## 启动broker
nohup sh  bin/mqbroker -c conf/dledger/broker.conf &

查看集群情况

sh bin/mqadmin clusterList -n 127.0.0.1:9876

在这里插入图片描述

踩坑 这个值不要随便写,这里从0开始递增 ,不然选举会有问题
在这里插入图片描述

2.dashboard修改

修改配置

## 根据自己的服务器地址修改,注意中间是分号不是逗号
rocketmq.config.namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876

启动访问
在这里插入图片描述
关闭master再查看集群情况,然后再重启,和前面的单机集群一样的,大家可自行测试

三、整合springboot

1.引入pom和修改yml

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>

rocketmq:
# 集群中间以分号隔开
  name-server: 192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
  producer:
    group: my_group_test

2.编写消费者

package com.study.config.rocketmq;

import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;

/**
 * @author: 
 * @time: 2024/1/5 10:00
 */
@Component
@RocketMQMessageListener(consumerGroup = "my_group_test",
        topic = "topic_test",
        selectorType = SelectorType.TAG,
        selectorExpression = "tagA")
@Slf4j
public class MQMsgListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        String msg = new String(message.getBody(), CharsetUtil.UTF_8);
        log.info("msgId={} msg={}",msgId,msg);
    }
}

@RocketMQMessageListener 注解参数如下:

  • topic: 消费者订阅的主题,即消费者将从这个主题中接收消息。
  • consumerGroup: 消费者组,多个消费者可以组成一个消费者组,共同从一个主题中接收消息。
  • consumeMode: 消费模式,指定消费者是以并发的方式接收消息还是以有序的方式接收消息。并发模式下,多个消费者可以同时接收消息;有序模式下,每个消费者按照消息的顺序依次接收消息。
  • messageModel: 消息模式,指定消息是以集群模式还是广播模式发送。集群模式下,消息将被发送到同一个主题的其中一个消费者;广播模式下,消息将被发送到主题的所有消费者。
  • selectorType: 过滤消息的方式,可以使用标签(Tag)或SQL92表达式(SQL92)来过滤消息。
  • selectorExpression: 过滤消息的表达式,可以使用标签(Tag)或SQL92表达式(SQL92)来指定过滤条件。
  • maxReconsumeTimes: 消息消费失败后,可被重复投递的最大次数。超过最大重试次数后,消息将被放入死信队列。
  • delayLevelWhenNextConsume: 并发模式的消息重试策略,指定消息消费失败后的重试延迟级别。设置为-1时,表示无需重试,直接将消息放入死信队列。

3.编写生产者

package com.study.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

/**
 * @author: 
 * @time: 2024/1/5 10:17
 */
@RestController
@RequestMapping("/mq")
@Slf4j
public class RocketMQProducerController {
    @Resource
    RocketMQTemplate rocketMQTemplate;

    @PostMapping("/sendMessage")
    @ResponseBody
    public void sendMessage(String msg){
      rocketMQTemplate.asyncSend("topic_test", "hello mq", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("msgId={}",sendResult.getMsgId());
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });

    }
}

同步会有一点小问题,第一次启动不会消费,直接写成异步

4.测试效果

发现没有主题
在这里插入图片描述
追踪源码发现主题和过滤消息的表达式按照冒号分割
topic取第一位,过滤表达式取第二位
在这里插入图片描述

修改再试下

  rocketMQTemplate.asyncSend("topic_test:tagA", "hello mq", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("msgId={}",sendResult.getMsgId());
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });

发现可以了
在这里插入图片描述
前面写了个java客户端的消费者,改下消费组发现也可以消费

java客户端代码

package com.bsoft;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

/**
 * @author: liucheng
 * @time: 2023/12/29 15:39
 */
public class MQConsumer {
    private final static String nameServer = "192.168.141.101:9876";

    private final static String consumerGroup = "my_group_test02";

    private final static String topic = "topic_test";
    public static void main(String[] args) throws MQClientException, IOException, InterruptedException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        // 设置NameServer的地址
        consumer.setNamesrvAddr(nameServer);
        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe(topic, "tagA");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                msgs.forEach((msg)->{
                    byte[] body = msg.getBody();
                    String s = new String(body, Charset.defaultCharset());
                    System.out.println("msg=================> " +s);

                });
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started......");
//        Thread.sleep(5000);
//        consumer.shutdown();
        System.in.read();
    }
}

在这里插入图片描述
到此集群搭建完成,大家搭建过程中有遇到问题可以交流

总结

整个搭建过程不难就是有点繁琐,需要配置多台服务器

  • 其中配置brocker.conf时dLegerSelfId值这块要注意 ,dLegerSelfId是节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一。这个值从0开始递增
    在这里插入图片描述

  • 同一台服务器上启动时先启动 namesrv 再启动 broker

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/297240.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ATTCK视角下的信息收集:主机发现

目录 1、利用协议主动探测主机存活 利用ICMP发现主机 利用ARP发现主机 利用NetBIOS协议发现主机 利用TCP/UDP发现主机 利用DNS协议发现主机 利用PRC协议发现主机程序 2、被动主机存活检测 利用Browser主机探测存活主机 利用ip段探测主机存活 利用net命令探测主机存活…

unity C# 中通俗易懂LINQ使用案例

文章目录 1. 从数组或列表中查询元素**&#xff1a;2. **排序与分组**&#xff1a;3. **连接多个数据源**&#xff1a;4. **聚合操作**&#xff1a;5. **分页查询**&#xff1a;6. **多条件查询**&#xff1a;7. **转换和投影&#xff08;Select&#xff09;**&#xff1a;8. *…

jdbc源码研究

JDBC介绍 JDBC&#xff08;Java Data Base Connectivity,java数据库连接&#xff09;是一种用于执行SQL语句的Java API&#xff0c;可以为多种关系数据库提供统一访问&#xff0c;它由一组用Java语言编写的类和接口组成。 开发者不必为每家数据通信协议的不同而疲于奔命&#…

竞赛保研 基于深度学习的人脸专注度检测计算系统 - opencv python cnn

文章目录 1 前言2 相关技术2.1CNN简介2.2 人脸识别算法2.3专注检测原理2.4 OpenCV 3 功能介绍3.1人脸录入功能3.2 人脸识别3.3 人脸专注度检测3.4 识别记录 4 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于深度学习的人脸专注度…

Flask 会员列表展示

感谢编程浪子师傅的源码信息分享 web/controllers/member/Member.py # -*- coding: utf-8 -*- from flask import Blueprint,request,redirect,jsonify from common.libs.Helper import ops_render,iPagination,getCurrentDate,getDictFilterField,selectFilterObj from comm…

【uniapp】APP打包上架应用商-注意事项

初雪云-uniapp启动图自定义生成&#xff08;支持一键生成storyboard&#xff09; 一、修改App端上传图片/视频 uni.uploadFile let thatthis; uni.chooseImage({count: 1,sourceType: [camera,album],sizeType: [compressed, original],success: rey > {uni.showLoading({ t…

Linux操作系统——进程控制(一) 进程创建和进程终止

进程创建 fork函数 在linux中fork函数时非常重要的函数&#xff0c;它从已存在进程中创建一个新进程。新进程为子进程&#xff0c;而原进程为父进程。 #include <unistd.h> pid_t fork(void); 返回值&#xff1a;自进程中返回0&#xff0c;父进程返回子进程id&#xff…

BetaFlight开源代码之电流校准

BetaFlight开源代码之电流校准 1. 源由2. 分析2.1 常规逻辑2.2 数据流2.3 采样电路2.3.1 采样实现2.3.2 采样原理2.3.3 Layout参考2.3.4 INA169芯片2.3.5 INA169 Near-Zero Vsense 3. 原理4. 示例4.1 实测&转换数据4.2 线性拟合-小电流4.3 线性拟合-大电流4.4 大电流/小电流…

HDMI彩条显示实验与方块移动实验

一、HDMI接口简介 一种数字音视频接口标准&#xff0c;提供高质量的数字音视频传输&#xff0c;同时支持多通道音频、高分辨率视频和其他数据传输功能。提供更高的数据传输带宽&#xff08;带宽&#xff1a;1s内传输多少比特数据&#xff09; 数字传输&#xff1a; HDMI是一种全…

【VRTK】启用多种VR设备的Passthrough功能

【背景】 透视可以让VR头盔展现AR能力,通过VRTK,可以快速实现多种设备平台可用的透视功能。包括主流的Oculus,Pico等。整个不成不需要自己写代码。 【操作】 针对WaveXR,点击场景中的CameraRigsWaveXR-》WaveRig-》Camera Offset-》Main Camera,追加一个新组件,名为Und…

QT自定义信号和槽

信号和槽 介绍实现创建文件对teacher的h和cpp文件进行处理对student的h和cpp文件进行处理对widget的h和cpp文件进行处理 介绍 Qt中的信号和槽是一种强大的机制&#xff0c;用于处理对象之间的通信。它们是Qt框架中实现事件驱动编程的核心部分。 信号&#xff08;Signal&#x…

vite4项目中,vant兼容750适配

一般非vite项目&#xff0c;使用postcss-px-to-viewport。在设计稿为750时候&#xff0c;可使用以下配置兼容vant。 在vite4项目中&#xff0c;以上配置不行。需要调整下&#xff0c;使用postcss-px-to-viewport-8-plugin&#xff0c;并修改viewportWidth&#xff0c;具体如下…

51单片机定时/计数器相关知识点

51单片机定时/计数器相关知识点 结构组成 51单片机的定时/计数器中有两个寄存器&#xff1a; T0&#xff1a;低位&#xff1a;TL0&#xff08;字节地址8AH&#xff09;高位&#xff1a;TH0&#xff08;字节地址8CH&#xff09;T1&#xff1a;低位&#xff1a;TL1&#xff08…

走向云原生 破局数字化

近年来&#xff0c;随着云计算概念和技术的普及&#xff0c;云原生一词也越来越热门&#xff0c;云原生成为云计算领域的新变量。行业内&#xff0c;华为、阿里巴巴、字节跳动等各个大厂都在“抢滩”云原生市场。行业外&#xff0c;云原生也逐渐出圈&#xff0c;出现在大众视野…

Visual Studio Code安装C#开发工具包并编写ASP.NET Core Web应用

前言 前段时间微软发布了适用于VS Code的C#开发工具包&#xff08;注意目前该包还属于预发布状态但是可以正常使用&#xff09;&#xff0c;因为之前看过网上的一些使用VS Code搭建.NET Core环境的教程看着还挺复杂的就一直没有尝试使用VS Code来编写.NET Core。不过听说C# 开发…

设计模式Java实战,彻底学会

​这是全网最强的Java设计模式实战教程。此教程用实际项目场景&#xff0c;结合SpringBoot&#xff0c;让你真正掌握设计模式。 网址是&#xff1a;Java设计模式实战专栏介绍 - 自学精灵&#xff08;也可以百度搜索“自学精灵”&#xff09;。 本设计模式专栏的威力 用Java实…

MySQL学习笔记2: MySQL的前置知识

目录 1. MySQL是什么?2. 什么是客户端&#xff0c;什么是服务器&#xff1f;3. 服务器的特点4. 安装mysql5. mysql 客户端6. mysql 服务器7. mysql的本体8. MySQL 使用什么来存储数据&#xff1f;9. 数据库的多种含义10. MySQL 存储数据的组织方式 1. MySQL是什么? MySQL 是…

算法每日一题: 被列覆盖的最多行数 | 二进制 - 状态压缩

大家好&#xff0c;我是星恒 今天的题目又是一道有关二进制的题目&#xff0c;有我们之前做的那道 参加考试的最大学生数的 感觉&#xff0c;哈哈&#xff0c;当然&#xff0c;比那道题简单多了&#xff0c;这道题感觉主要的考点就是二进制&#xff0c;大家可以好好总结一下这道…

使用 Maven 的 dependencyManagement 管理项目依赖项

使用 Maven 的 dependencyManagement 管理项目依赖项 介绍 在开发 Java 项目时&#xff0c;管理和协调依赖项的版本号是一项重要而繁琐的任务。 而 Maven 提供了 <dependencyManagement> 元素&#xff0c;用于定义项目中所有依赖项的版本。它允许您指定项目中每个依赖…

Java网络编程 、UDP、TCP、Socket通信

这个是第一篇&#xff0c;我先写udp&#xff0c; 首先我解释一下这个的特点是什么&#xff0c;他的特点主要是&#xff1a; 我发送消息之后就不管这个消息的任何情况&#xff0c;也就是&#xff0c;我只要把这个消息发送出去就不管了 这个是大白话的解释&#xff0c;具体的就…