系列五、Java操作RocketMQ简单消息之同步消息

一、概述

        同步消息的特征是消息发出后会有一个返回值,即RocketMQ服务器收到消息后的一个确认,这种方式非常安全,但是性能上却没有那么高,而且在集群模式下,也是要等到所有的从机都复制了消息以后才会返回,适用于重要的消息传递,例如:短信通知

二、案例代码

2.1、pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.star</groupId>
  <artifactId>rocketmq-example</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>rocketmq-example</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>

    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.5.0</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.13.2</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>2.0.25</version>
    </dependency>
    <dependency>
      <groupId>cn.hutool</groupId>
      <artifactId>hutool-all</artifactId>
      <version>5.8.16</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-collections4</artifactId>
      <version>4.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.12.0</version>
    </dependency>

    <!-- 普通maven项目中使用Sl4j注解 -->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.22</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.32</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.2.10</version>
    </dependency>

  </dependencies>

  <!-- 锁定Java编译的版本 -->
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

2.2、RocketMQConstant

package org.star.constants;

/**
 * @Author: 一叶浮萍归大海
 * @Date: 2023/7/27 16:42
 * @Description:
 */
public class RocketMQConstant {

    /**
     * 配置RocketMQ NameSrv的地址
     */
    public static final String NAME_SERVER_ADDR = "192.168.173.219:9876";

}

2.3、SimpleConsumer

package org.star.simple.consumer;

import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;

import java.util.List;

/**
 * @Author: 一叶浮萍归大海
 * @Date: 2023/8/25 10:20
 * @Description: 简单消息消费者
 */
@Slf4j
public class SimpleConsumer {

    public static void main(String[] args) throws Exception {
        /**
         * 消费消息分两种
         * (1)拉模式:消费者主动去Broker上拉消息
         * (2)推模式:消费者等待Broker把消息推送过来
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SimpleMessageGroup");
        consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
        consumer.subscribe("SimpleTopic", "*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                if (CollectionUtils.isNotEmpty(list)) {
                    String body = StrUtil.utf8Str(list.get(0).getBody());
                    log.info("收到消息 body:{}",body);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer started!");
    }

}

2.4、SyncProducer

package org.star.simple.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.star.constants.RocketMQConstant;

import java.nio.charset.StandardCharsets;

/**
 * @Author: 一叶浮萍归大海
 * @Date: 2023/8/25 10:12
 * @Description: 同步发送:等待消息返回后再继续执行下面的操作
 */
@Slf4j
public class SyncProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducerGroup");
        producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
        producer.start();
        for (int i = 0; i < 3; i++) {
            Message message = new Message("SimpleTopic", ("我是第[" + i + "]个简单消息").getBytes(StandardCharsets.UTF_8));
            SendResult sendResult = producer.send(message);
            log.info("第[" + i + "]个简单消息发送成功 sendStatus:{},msgId:{},topic:{}", sendResult.getSendStatus(),sendResult.getMsgId(),sendResult.getMessageQueue().getTopic());
        }
        producer.shutdown();
    }

}

2.5、控制台打印结果

# 生产者端
12:14:32.339 [main] INFO org.star.simple.producer.SyncProducer - 第[0]个简单消息发送成功 sendStatus:SEND_OK,msgId:C0A81FB25D9418B4AAC207C6D9850000,topic:SimpleTopic
12:14:32.343 [main] INFO org.star.simple.producer.SyncProducer - 第[1]个简单消息发送成功 sendStatus:SEND_OK,msgId:C0A81FB25D9418B4AAC207C6D9940001,topic:SimpleTopic
12:14:32.348 [main] INFO org.star.simple.producer.SyncProducer - 第[2]个简单消息发送成功 sendStatus:SEND_OK,msgId:C0A81FB25D9418B4AAC207C6D9970002,topic:SimpleTopic


# 消费者端
12:14:32.337 [ConsumeMessageThread_10] INFO org.star.simple.consumer.SimpleConsumer - 收到消息 body:我是第[0]个简单消息
12:14:32.345 [ConsumeMessageThread_11] INFO org.star.simple.consumer.SimpleConsumer - 收到消息 body:我是第[1]个简单消息
12:14:32.349 [ConsumeMessageThread_12] INFO org.star.simple.consumer.SimpleConsumer - 收到消息 body:我是第[2]个简单消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/102222.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

QT(8.30)常用类与组件,实现登录界面

1.作业&#xff1a; 完成一个登录界面(图片未附带): 头文件: #ifndef WIDGET_H #define WIDGET_H#include <QWidget>#include <QLineEdit>//行编辑器#include<QIcon>//图标#include<QLabel>//标签#include<QPushButton>//按钮#include<QIc…

GA遗传算法

储备知识 GA算法主要解决数学模型中最优化的搜索算法&#xff0c;是进化算法中的一种&#xff0c;基因算法借鉴了自然界基因的遗传的主要现象&#xff0c;分别为遗传&#xff0c;变异&#xff0c;自然选择&#xff0c;杂交等。 GA算法参数 GA算法的参数如下所示。 种群规模…

JAVA 求最小公因数

JAVA 求最小公因数 文章目录 JAVA 求最小公因数方法一&#xff1a;枚举法的第一种方法一&#xff1a;枚举法的第二种方法二&#xff1a;展转相除法(欧几里德算法)方法三&#xff1a;递归拓展 求最小公倍数公式为综合 辗转相除法递归 求n个数的最大公约数和最小公倍数 题目&…

一百六十九、Hadoop——Hadoop退出NameNode安全模式与查看磁盘空间详情(踩坑,附截图)

一、目的 在海豚跑定时跑kettle的从Kafka到HDFS的任务时&#xff0c;由于Linux服务器的某个文件磁盘空间满了&#xff0c;导致Hadoop的NodeName进入安全模式&#xff0c;此时光执行hdfs dfsadmin -safemode leave命令语句没有效果&#xff08;虽然显示Safe mode is OFF&#x…

EFLK日志平台(filebeat-->kafka-->logstash-->es-->kiabana)

ELK平台是一套完整的日志集中处理解决方案&#xff0c;将 ElasticSearch、Logstash 和 Kiabana 三个开源工具配合使用&#xff0c; 完成更强大的用户对日志的查询、排序、统计需求。 安装顺序 1.安装es 7.17.12 2.安装kibana 7.17.12 3.安装x-pack 保证以上调试成功后开始下面…

C语言:动态内存(一篇拿捏动态内存!)

目录 学习目标&#xff1a; 为什么存在动态内存分配 动态内存函数&#xff1a; 1. malloc 和 free 2. calloc 3. realloc 常见的动态内存错误&#xff1a; 1. 对NULL指针的解引用操作 2. 对动态开辟空间的越界访问 3. 对非动态开辟内存使用free释放 4. 使用free释…

汽车自适应巡航系统控制策略研究

目 录 第一章 绪论 .............................................................................................................................. 1 1.1 研究背景及意义 ..........................................................................................…

Spring Boot 中 Nacos 配置中心使用实战

官方参考文档 https://nacos.io/zh-cn/docs/quick-start-spring-boot.html 本人实践 1、新建一个spring boot项目 我的spirngboot版本为2.5.6 2、添加一下依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-…

【Locomotor运动模块】瞬移

文章目录 一、原理二、两种类型1、Instant(立刻)2、Dash&#xff08;猛冲&#xff09; 三、瞬移区域、瞬移点1、瞬移区域2、瞬移点 一、原理 抛物线指针选择好目标位置&#xff0c;然后告诉瞬移预设体&#xff1a;你想法把游戏区域弄到目标位置来 解释&#xff1a;抛物线指针选…

uniapp 微信小程序仿抖音评论区功能,支持展开收起

最近需要写一个评论区功能&#xff0c;所以打算仿照抖音做一个评论功能&#xff0c;支持展开和收起&#xff0c; 首先我们需要对功能做一个拆解&#xff0c;评论区功能&#xff0c;两个模块&#xff0c;一个是发表评论模块&#xff0c;一个是评论展示区。接下来对这两个模块进行…

最新ChatGPT程序源码+AI系统+详细图文部署教程/支持GPT4.0/支持Midjourney绘画/Prompt知识库

一、AI系统 如何搭建部署人工智能源码、AI创作系统、ChatGPT系统呢&#xff1f;小编这里写一个详细图文教程吧&#xff01;SparkAi使用Nestjs和Vue3框架技术&#xff0c;持续集成AI能力到AIGC系统&#xff01; 1.1 程序核心功能 程序已支持ChatGPT3.5/GPT-4提问、AI绘画、Mi…

Kubernetes技术--使用kubeadm搭建高可用的K8s集群(贴近实际环境)

1.高可用k8s集群架构(多master) 2.安装硬件要求 一台或多台机器,操作系统 CentOS7.x-86_x64 硬件配置:2GB或更多RAM,2个CPU或更多CPU,硬盘30GB或更多 注: 这里属于教学环境,所以使用三台虚拟机模拟实现。 3.部署规划 4.部署前准备 (1).关闭防火墙 systemctl stop fi…

数据结构|栈和队列以及实现

栈和队列 一、栈1.1栈的概念及结构1.2栈的实现 二、队列2.1队列的概念及结构2.2队列的实现 一、栈 1.1栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。进行数据插入和数据删除的一端称为栈顶&#xff0c;另一端称为栈…

深度学习入门(Python)学习笔记1

第1章 Python入门 1.1python是什么 Python是一个简单、易读、易记的编程语言&#xff0c;而且是开源的&#xff0c;可以免费地自由使用。 使用Python不仅可以写出可读性高的代码&#xff0c;还可以写出性能高&#xff08;处理速度快&#xff09;的代码。 再者&#xff0c;在…

7.2 项目2 学生通讯录管理:文本文件增删改查(C 版本)(自顶向下设计+断点调试) (A)

C自学精简教程 目录(必读) 该作业是 作业 学生通讯录管理&#xff1a;文本文件增删改查&#xff08;C版本&#xff09; 的C 语言版本。 具体的作业题目描述&#xff0c;要求&#xff0c;可以参考 学生通讯录管理&#xff1a;文本文件增删改查&#xff08;C版本&#xff09;。…

Windows下Redis的安装和配置

文章目录 一,Redis介绍二,Redis下载三,Redis安装-解压四,Redis配置五,Redis启动和关闭(通过terminal操作)六,Redis连接七,Redis使用 一,Redis介绍 远程字典服务,一个开源的,键值对形式的在线服务框架,值支持多数据结构,本文介绍windows下Redis的安装,配置相关,官网默认下载的是…

iTOP-RK3588开发板Android12 设置系统默认不休眠

修改文件&#xff1a; device/rockchip/rk3588/overlay/frameworks/base/packages/SettingsProvider/res/values/defaults. xml 文件&#xff0c;如下图所示&#xff1a; - <integer name"def_screen_off_timeout">60000</integer> <integer name&q…

卡特兰数和算法

在组合数学中&#xff0c;卡特兰数是一系列自然数&#xff0c;出现在各种组合计数问题中&#xff0c;通常涉及递归定义的对象。它们以比利时数学家尤金查尔斯卡特兰&#xff08;Eugne Charles Catalan&#xff09;的名字命名。 卡特兰数序列是1, 1, 2, 5, 14, 42......&#xf…

传承精神 缅怀伟人——湖南多链优品科技有限公司赴韶山开展红色主题活动

8月27日上午&#xff0c; 湖南多链优品科技有限公司全体员工怀着崇敬之情&#xff0c;以红色文化为引领&#xff0c;参加了毛泽东同志诞辰130周年的纪念活动。以董事长程小明为核心的公司班子成员以及全国优秀代表近70人一行专赴韶山&#xff0c;缅怀伟人毛泽东同志的丰功伟绩。…

230902-部署Gradio到已有FastAPI及服务器中

1. 官方例子 run.py from fastapi import FastAPI import gradio as grCUSTOM_PATH "/gradio"app FastAPI()app.get("/") def read_main():return {"message": "This is your main app"}io gr.Interface(lambda x: "Hello, …