RocketMQ5-03RocketMQ-Dashboard和Java客户端访问示例

接上篇02快速部署RocketMQ5.x(手动和容器部署)
已经完成 RocketMQ5.0 环境的部署,就需要对这个环境进行测试,查看集群、写入消息、读取消息等

本篇教你如何使用和查看部署的服务:

  • Docker部署 Dashboard
    • 获取镜像并下载
    • 部署服务
  • 客户端连接
    • pom文件
    • 生产者代码
    • 消费者代码
    • 接口测试
    • 问题: broker资源不足无法提供服务

Docker部署 Dashboard

以上通过可执行文件部署或者容器部署的形式,都需要有一个可以查看的集群的地方,对于官方自己配备的有 rocketmq-dashboard, 可以使用docker快速部署,便于测试

获取镜像并下载

docker search rocketmq-dashboard & docker pull apacherocketmq/rocketmq-dashboard

部署服务

docker run -d --name rmqdashboard -e "JAVA_OPTS=-Xmx256M -Xms256M -Xmn128M -Drocketmq.namesrv.addr=192.168.2.92:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8088:8080 apacherocketmq/rocketmq-dashboard

这边将端口映射到了8088,所以访问 localhost:8088,就可以查看到集群,如果有数据正在写入与读取,就能够大概看到数据量
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

客户端连接

手动创建 topic: sh bin/mqadmin updatetopic -n 192.168.2.92:9876 -t dataTopic2 -c DefaultCluster

pom文件

 <properties>
        <java.version>17</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>3.0.2</spring-boot.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.5</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>17</source>
                    <target>17</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.learning.springbootrmq5.SpringbootRmq5Application</mainClass>
                    <skip>true</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

生产者代码

 @GetMapping("/sendSync")
    public String sendSync() throws ClientException, IOException {
        String endpoint = "192.168.2.92:8081";
        String topic = "dataTopic2";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        ClientConfiguration configuration = builder.enableSsl(true).build();
        Producer producer = provider.newProducerBuilder()
                                    .setTopics(topic)
                                    .setClientConfiguration(configuration)
                                    .build();
        Message message = provider.newMessageBuilder()
                                  .setTopic(topic)
                                  .setKeys("messageKey")
                                  .setTag("messageTag")
                                  .setBody("messageBodySync".getBytes())
                                  .build();
        try {
            SendReceipt sendReceipt = producer.send(message);
            log.info("Send sync message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
            log.error("Failed to send message", e);
        }
        producer.close();
        return "success";
    }

    @GetMapping("/sendAsync")
    public String sendAsync() throws ClientException, InterruptedException, IOException {
        String endpoint = "192.168.2.92:8081";
        String topic = "dataTopic2";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().enableSsl(true).setEndpoints(endpoint);
        ClientConfiguration configuration = builder.build();
        Producer producer = provider.newProducerBuilder()
                                    .setTopics(topic)
                                    .setClientConfiguration(configuration)
                                    .build();
        Message message = provider.newMessageBuilder()
                                  .setTopic(topic)
                                  .setKeys("messageKey")
                                  .setTag("messageTag")
                                  .setBody("messageBodyASync".getBytes())
                                  .build();
        producer.sendAsync(message);
        log.info("Send async message successfully, messageId");
        return "success";
    }

消费者代码

@Slf4j
@Component
public class MessageConsumerRunner implements CommandLineRunner {
    @Override
    public void run(final String... args) throws Exception {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        String endpoints = "192.168.2.92:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                                                                     .setEndpoints(endpoints)
                                                                     .build();
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        String consumerGroup = "YourConsumerGroup";
        String topic = "dataTopic2";
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                                            .setClientConfiguration(clientConfiguration)
                                            .setConsumerGroup(consumerGroup)
                                            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                                            .setMessageListener(messageView -> {
                                                log.info("Consume message successfully, messageId={}", messageView.getMessageId());
                                                return ConsumeResult.SUCCESS;
                                            })
                                            .build();
        Thread.sleep(Long.MAX_VALUE);
    }
}

接口测试

请求接口 /msg/sendAsync
在这里插入图片描述

能够正常收发

问题: broker资源不足无法提供服务

可能出现的客户端报错为:

org.apache.rocketmq.client.java.exception.InternalErrorException: [request-id=e3f9dxxxx1aa872, response-code=50001] org.apache.rocketmq.proxy.common.ProxyException: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL:  0.96 CQ:  0.96 INDEX: -1.00], messages are put to the slave, message store has been shut down, etc.

java.util.concurrent.RejectedExecutionException: Task org.apache.rocketmq.shaded.io.grpc.internal.DelayedStream$4@72ba34c2 rejected from java.util.concurrent.ThreadPoolExecutor@7deb0119[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 13]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) ~[na:na]

以上大体就是描述资源不足无法进行接入、服务不可达等,通常就是因为环境的资源不足,可能是内存、可能是硬盘

.../broker/logs/rocketmqlogs/store.log 中可以看出端倪,是磁盘存储不够了

2024-01-08 13:34:24 ERROR StoreScheduledThread1 - physic disk maybe full soon 0.95, so mark disk full, storePathPhysic=/home/rocketmq/store/commitlog

可以通过清除以下数据暂时缓解 .../broker/store/commitlog,可以发现没怎么用也有好多G。不过确实需要使用的话尽早考虑扩容啊

扩大存储增加可用磁盘空间,就能够正常使用连接了

如果这篇文章对你有用的话,帮忙留个关注吧~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/302241.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

静态网页设计——旅游网(HTML+CSS+JavaScript)

前言 声明&#xff1a;该文章只是做技术分享&#xff0c;若侵权请联系我删除。&#xff01;&#xff01; 感谢大佬的视频&#xff1a; https://www.bilibili.com/video/BV1KN4y1v7jx/?vd_source5f425e0074a7f92921f53ab87712357b 使用技术&#xff1a;HTMLCSSJS&#xff08;…

springboot集成cas客户端

Background 单点登录SSO(Single Sign ON)&#xff0c;指在多个应用系统中&#xff0c;只需登录一次&#xff0c;即可在多个应用系统之间共享登录。统一身份认证CAS&#xff08;Central Authentication Service&#xff09;是SSO的开源实现&#xff0c;利用CAS实现SSO可以很大程…

TrueNAS数据共享——开启SSH

目录 1、开启ssh 2、开启root登录 1、开启ssh 服务--找到SSH 启动 勾选自动启动 点击动作 2、开启root登录 勾选使用密码以root身份登录

物理机与vm文件共享与传输的设置方法

今天跟各位小伙伴&#xff0c;分享一下物理机与vm虚拟机文件共享与传输的设置方法&#xff0c;以供大家参考&#xff01; 一、物理机与虚拟机文件共享设置方法 第一步&#xff1a;先关闭虚拟机&#xff08;客户机&#xff09; 第二步&#xff1a;选择编辑虚拟机设置 第三步&am…

01.Typora1.7.6安装以及更换主题方法

重所周知&#xff0c;程序员的笔记资料有多么重要&#xff0c;不仅是自我提升也是加强记忆 那么一定少不了Typora这个软件 今天就来感受一下它应该如何打开正确的安装的方法。 双击文件夹以后&#xff0c;里面会有这样两个文件 我们双击后缀为.exe的文件 更改安装位置&#…

c语言:求1-100的奇数和|练习题

一、题目 求1-100以内的奇数和 二、代码截图【带注释】 三、源代码【带注释】 #include <stdio.h> //思路分析 //1、一个除以2&#xff0c;除不尽的&#xff0c;便是奇数 //设计常量N为100&#xff0c;常量随时可以变动 #define N 100 int main() { int sum0;//设…

Zookeeper的基础介绍和安装教程

1、 Zookeeper入门 1.1 概述 Zookeeper是一个开源的分布式的&#xff0c;为分布式应用提供协调服务的Apache项目。 1.2 特点 1.3 数据结构 1.4 应用场景 提供的服务包括&#xff1a;统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。 统一…

年销5万的岚图没有爆款

作者 | 辰纹 来源 | 洞见新研社 3款车一年卖了5万台&#xff0c;这个销量不算多&#xff0c;可对于岚图来说&#xff0c;却很不容易&#xff0c;CEO卢放称这是“一场翻身仗”&#xff0c;在写给全体员工的“家信”中表达谢意&#xff0c;称是“大家的团结奋斗&#xff0c;驱动…

C++模板——(2)函数模板的声明和使用

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 勤奋&#xff0c;机会&#xff0c;&am…

【书生大模型00--开源体系介绍】

书生大模型开源体系介绍 0 通用人工智能1 InternLM性能及模型2 从模型到应用 大模型成为目前很热的关键词&#xff0c;最热门的研究方向&#xff0c;热门的应用&#xff1b;ChatGPT的横空出世所引爆&#xff0c;快速被人们上手应用到各领域&#xff1b; 0 通用人工智能 相信使…

物理机搭建hive

一、修改Hadoop配置 修改core-site.xml 配置yarn-site.xml 分发文件&#xff0c;然后重启集群 二、 Hive解压安装 上传文件 添加hive环境便量&#xff0c;source生效 启动高可用集群&#xff0c;启动hive 三、配置mysql元数据库 检查当前系统是否安装过Mysql&#xf…

Python采集微博评论做词云图

嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! python更多源码/资料/解答/教程等 点击此处跳转文末名片免费获取 环境使用: Python 3.10 Pycharm 第三方模块使用: import requests >>> pip install requests import wordcloud >>> pip install wordclou…

软件工程造价师证书有用吗?难不难考?

&#x1f3af;软件工程造价师证书是有用的&#xff0c;它证明了持有人具备评估和估算软件开发cheng本、进度和资源规划的能力。✔️在IT行业中&#xff0c;受高度重视&#xff0c;特别是在软件开发和项目管理领域。 &#x1f469;软件工程造价师考试难易程度因人而异。该证书需…

Java技术专题:「入门到精通系列」深入探索常用的六种加密技术和实现

文章目录 1. 引言2. 对称加密3. 非对称加密4. 哈希算法5. 消息摘要6. 数字签名7. 数字证书8. 拓展功能与未来展望 &#x1f389;欢迎来到Java学习路线专栏~探索Java中的静态变量与实例变量 ☆* o(≧▽≦)o *☆嗨~我是IT陈寒&#x1f379;✨博客主页&#xff1a;IT陈寒的博客&am…

【Python学习】Python学习5-条件语句

目录 【Python学习】Python学习5-条件语句 前言if语句if语句判断条件简单的语句组参考 文章所属专区 Python学习 前言 本章节主要说明Python的条件语句&#xff0c;Python条件语句是通过一条或多条语句的执行结果&#xff08;True或者False&#xff09;来决定执行的代码块。 …

这些专利知识你知道吗?

专利作为一种重要的知识产权保护形式。专利不仅成为了企业核心竞争力的重要组成部分&#xff0c;也成为了国家创新发展的重要支撑。 专利是指国家专利主管机关授予发明创造申请人的一种专有权&#xff0c;这种专有权具有独占性、排他性和法律强制性&#xff0c;能够为持有者带来…

Rough.js:创建手绘、草图外观的图形

Rough.js 是一个小型的(<9kB gzipped)图形库&#xff0c;它可以让你以草图、手绘风格进行绘制。 该库定义了绘制直线、曲线、圆弧、多边形、圆和椭圆的基元。它还支持绘制 SVG 路径。 Rough.js 可以同时处理 Canvas 和 SVG。 安装 从npm安装&#xff1a; npm install --s…

Linux文件系统和日志分析

一、inode表结构 1. inode表 inode号在同一个设备上是唯一的。 inode号是有限资源&#xff0c;它的大小和磁盘大小有关。 访问文件的基本流程 根据文件夹的文件名和inode号的关系找到对应的inode表&#xff0c;再根据inode表&#xff08;属主 属组&#xff09;当中的指针找到磁…

基于SSM的校内信息服务发布系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

NX/UG二次开发—C\C++开发单个DLL支持多版本NX一种方法

1、去除附加包含目录下的NX相关的lib文件&#xff1a; 2、从对应的dll导出ufun函数和NXopen函数&#xff1a; libufun.dll; libufun_cam.dll; libufun_cae.dll; libufun_die.dll; libufun_vdac.dll; libufun_weld.dll; libugopenint.dll; libugopenint_cae.dll; libugopenint_…