RocketMQ常用基本操作

文章中的rabbitmq使用的是rocketmq-all-5.1.3-bin-release版本,需要安装包的可自行下载

RockerMQ启动停止命令

启动命令

nohup sh bin/mqnamesrv &

nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

查看日志

tail -f ~/logs/rocketmqlogs/namesrv.log

tail -f ~/logs/rocketmqlogs/proxy.log

停止命令

sh bin/mqshutdown broker

sh bin/mqshutdown namesrv

集群状态

sh mqadmin clusterList -n 127.0.0.1:9876

创建topic

sh mqadmin updateTopic -n 127.0.0.1:9876 rocket_test

查看所有topic信息

sh mqadmin topicList -n 127.0.0.1:9876

sh mqadmin topicList -n 127.0.0.1:9876 -c

查看 Topic 路由信息

sh mqadmin topicRoute -n 127.0.0.1:9876 -t TopicTest

发送测试消息

export NAMESRV_ADDR=localhost:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

消费消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

Java代码收发消息

Producer

package com.rocket.demo;

import com.alibaba.fastjson.JSON;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.HashMap;

import java.util.Map;

public class RocketProducerDemo {

    private final static String nameServer = "127.0.0.1:9876";

    private final static String producerGroup = "my_group2";

    // debezium-mysql-source-topic topic-test

    private final static String topic = "TopicTest";

    public static void main(String[] args) {

        try {

            // 初始化一个producer并设置Producer group name

            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);

//            DefaultMQProducer producer = new DefaultMQProducer();

            // 设置NameServer地址

            producer.setNamesrvAddr(nameServer);

            // 启动producer

            producer.start();

            for (int i = 0; i < 100; i++) {

                Map<String, String> data = new HashMap();

                data.put("id", i+"");

                data.put("name", i+","+System.currentTimeMillis());

                // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤

                Message msg = new Message("TopicTest", "tagA", JSON.toJSONString(data).getBytes(RemotingHelper.DEFAULT_CHARSET));

                // 利用producer进行发送,并同步等待发送结果

                SendResult sendResult = producer.send(msg, 10000);

                System.out.println(sendResult);

            }

            // 一旦producer不再使用,关闭producer

            producer.shutdown();

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

Consumer

package com.rocket.demo;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RocketConsumerDemo {

    public static void main(String[] args) throws Exception {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_group");

        consumer.setNamesrvAddr("localhost:9876");

        // debezium-mysql-source-topic  topic-test debezium-mysql-source db-history-debezium-topic debezium-mysql-source

        consumer.subscribe("TopicTest", "*"); // 订阅主题和标签,* 表示订阅所有标签

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {

                for (MessageExt message : messages) {

                    System.out.println("Received message: " + new String(message.getBody()));

                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }

        });

        consumer.start();

        System.out.println("Consumer started");

    }

}

常见问题

service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 0.96 CQ: 0.96 INDEX: 0.96], messages are put to the slave, message store has been shut down

错误原因:博主测试的服务器磁盘使用率到0.96了,rocketmq不允许磁盘超过0.9,清理下磁盘数据即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/759587.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

对话贾扬清:我创业这一年所看到的 AI

引言 在这次对话中&#xff0c;前阿里巴巴人工智能专家、现LIBRINAI创始人贾扬清分享了他在AI领域创业一年的见解和经历。作为一位从科学家转型为CEO的创业者&#xff0c;他探讨了AI计算、异构计算和云原生软件的结合带来的革命性变化&#xff0c;并讨论了LIBRINAI如何在激烈的…

EasyExcel数据导入

前言&#xff1a; 我先讲一种网上信息的获取方式把&#xff0c;虽然我感觉和后面的EasyExcel没有什么关系&#xff0c;可能是因为这个项目这个操作很难实现&#xff0c;不过也可以在此记录一下&#xff0c;如果需要再拆出来也行。 看上了网页信息&#xff0c;怎么抓到&#x…

浅谈区块链

区块链是一种分布式数据库技术&#xff0c;也被称为分布式账本技术。它的本质是一个去中心化的数据库&#xff0c;使用密码学相关联产生的数据块串连而成&#xff0c;用于验证其信息的有效性&#xff08;防伪&#xff09;和生成下一个区块。区块链具有“不可伪造”“全程留痕”…

【ajax实战02】数据管理网站—验证码登录

一&#xff1a;数据提交&#xff08;提交手机验证码&#xff09; 核心思路整理 利用form-serialize插件&#xff0c;收集对象形式的表单数据后&#xff0c;一并提交给服务器。后得到返回值&#xff0c;进一步操作 基地址&#xff1a; axios.defaults.baseURL http://geek.…

【简易版tinySTL】 哈希表与移动语义

基本概念 哈希表&#xff08;HashTable&#xff09;是一个重要的底层数据结构, 无序关联容器包括unordered_set, unordered_map内部都是基于哈希表实现。 哈希表是一种通过哈希函数将键映射到索引的数据结构&#xff0c;存储在内存空间中。哈希函数负责将任意大小的输入映射到…

垃圾回收与算法

目录 一、判断对象已经 "死亡" 1、引用计数法 2、可达性分析 二、垃圾收集算法 1、标记清楚算法 2、复制算法 3、标记整理算法 4、分代收集算法 4.1、新生代与复制算法 4.2老年代与标记复制算法 一、判断对象已经 "死亡" 1、引用计数法 在 Java 中&#…

3.ROS串口实例

#include <iostream> #include <ros/ros.h> #include <serial/serial.h> #include<geometry_msgs/Twist.h> using namespace std;//运行打开速度控制插件&#xff1a; rosrun rqt_robot_steering rqt_robot_steering //若串口访问权限不够&#xff1a…

PTA:7-12 斐波那契数列

斐波那契数列 (FibonacciSequence)&#xff0c;又称黄金分割数列&#xff0c;因数学家莱昂纳多斐波那契 (LeonardoFibonacci) 以兔子繁殖为例子而引入&#xff0c;故又称为“兔子数列”&#xff0c;指的是这样一个数列&#xff1a;1,1,2,3,5,8,13,21,⋯ 在数学上&#xff0c;斐…

常用字符串方法<python>

导言 在python中内置了许多的字符串方法&#xff0c;使用字符串方法可以方便快捷解决很多问题&#xff0c;所以本文将要介绍一些常用的字符串方法。 目录 导言 string.center(width[,fillchar]) string.capitalize() string.count(sub[,start[,end]]) string.join(iterabl…

2.linux操作系统CPU使用率和平均负载区别

目录 概述cpu使用率区别 结束 概述 linux操作系统CPU 使用率 和 平均负载 区别 负载高并不一定使用率高&#xff0c;有可能 cpu 被占用&#xff0c;但不干活。 cpu使用率 cpu使用率&#xff1a;cpu非空闲态运行的时间占比&#xff0c;反映cpu的繁忙程度&#xff0c;和平均负载…

大模型上下文长度扩展中的检索增强技术简述

基于Transformer的语言模型在众多自然语言处理任务上都取得了十分优异的成绩&#xff0c;在一些任务上已经达到SOTA的效果。但是&#xff0c;经过预训练后&#xff0c;模型能够较好处理的序列长度就固定下来。而当前的众多场景往往需要处理很长的上下文&#xff08;如&#xff…

如何安装多版本CUDA?

在这篇文章中&#xff0c;我们不仅要安装好CUDA&#xff0c;还有安装多版本的CUDA 首先聊一个题外话&#xff1a;前几天在csdn上看到的一个话题”安装pytorch一定要去nvidia官网下载安装cuda和cudnn吗&#xff1f;“ 我相信任何一个刚开始接触或者从事深度学习的炼丹者都会从安…

java中break和continue的标签使用

break标签的使用 break label是退出label对应的循环 //BreakDetail.java //2024.06.29 public class BreakDetail{public static void main(String[] args) {label1:for(int j 0; j < 4; j){label2:for(int i 0; i < 10; i){if(i 2){//break; //情况1//break label2…

五、Pentium 微处理器保护模式存储管理,《微机系统》第一版,赵宏伟

一、分段存储管理 Pentium支持分段存储管理、分页存储管理和段页式存储管理。 1.1 分段存储管理的基本思想 一个程序由多个模块组成。 每一个模块都是一个特定功能的独立的程序段。 段式管理&#xff1a;把主存按段分配的存储管理方式。 程序模块→段→段描述符→段描述符…

热题系列章节7

剑指 Offer 04. 二维数组中的查找 题目描述&#xff1a; 在一个二维数组中&#xff08;每个一维数组的长度相同&#xff09;&#xff0c;每一行都按照从左到右递增的顺序排序&#xff0c;每一列都按照从上到下递增的顺序排序。请完成一个函数&#xff0c;输入这样的一个二维数…

Chrome浏览器web调试(js调试、css调试、篡改前置)

目录 1. 打开开发者工具(Dev Tool) 2. 打开命令菜单 截图 3. 面板介绍 4. CSS调试 右键检查快速到达元素处 查找DOM数 利用面板Console查找DOM节点 内置函数查找上一个选择点击的元素 5. 调试JS代码(Javascript调试) 日志调试 选择查看日志等级 眼睛观测变量 …

创新前沿:Web3如何颠覆传统计算机模式

随着Web3技术的快速发展&#xff0c;传统的计算机模式正面临着前所未有的挑战和改变。本文将深入探讨Web3技术的定义、原理以及它如何颠覆传统计算机模式&#xff0c;以及对全球科技发展的潜在影响。 1. 引言&#xff1a;Web3技术的兴起与背景 Web3不仅仅是技术创新的一种&…

可编程定时计数器8253/8254 - 8253入门

时钟-给设备打拍子 概述 在计算机系统中&#xff0c;为了使所有设备之间的通信井然有序&#xff0c;各通信设备间必须有统一的节奏&#xff0c;不能各干各的&#xff0c;这个节奏就被称为定时或时钟 时钟并不是计算机处理速度的衡量&#xff0c;而是一种使设备间相互配合而避…

2024 Parallels Desktop for Mac 功能介绍

Parallels Desktop的简介 Parallels Desktop是一款由Parallels公司开发的桌面虚拟化软件&#xff0c;它允许用户在Mac上运行Windows和其他操作系统。通过强大的技术支持&#xff0c;用户无需重新启动电脑即可在Mac上运行Windows应用程序&#xff0c;实现了真正的无缝切换。 二…

动手学深度学习(Pytorch版)代码实践 -计算机视觉-48全连接卷积神经网络(FCN)

48全连接卷积神经网络&#xff08;FCN&#xff09; 1.构造函数 import torch import torchvision from torch import nn from torch.nn import functional as F import matplotlib.pyplot as plt import liliPytorch as lp from d2l import torch as d2l# 构造模型 pretrained…