2.11日学习打卡----初学RocketMQ(二)

2.11日学习打卡

一. RocketMQ整合springboot

首先配置pom.xml文件

       <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>annotationProcessor</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

其次在配置application.yml文件

spring:
  application:
    name: RocketMq

rocketmq:
  name-server: 192.168.66.100:9876(自己的地址监听9876端口)
  producer:
    group: my-group

编写测试类

生产者

package com.jjy.rocketmq;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

//@RunWith(SpringRunner.class)
//@SpringBootTest(classes = {RocketMqApplication.class})
@SpringBootTest
class RocketMqApplicationTests {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Test
    void testSendMessage() {
        //向broker发送消息
        //第一个参数是主题(当时自己创建的主题)
        //第二个参数是消息内容
        rocketMQTemplate.convertAndSend("topicWarning","hello,RocketMQ");
    }

}

消费者(新建一个springboot项目)

pom.xml文件同producer工程

application.yml配置

spring:
  application:
    name: RocketMQConsumer(自己的项目名字)

rocketmq:
  name-server: 192.168.66.100:9876

消息监听器

package com.jjy.rocketmqconumers.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(topic="topicWarning",consumerGroup="myconsumer")
//consumerGroup(里面的参数随便写就行自己编个名字)
public class Consumer implements RocketMQListener<String> {


    @Override
    
    public void onMessage(String message) {
    //使用日志的形式写出来 也开始使用控制台
        log.info("Received messsge:" + message);
     //使用控制台
     System.out.println(message);
    }
}

在这里插入图片描述

二. RocketMQ架构

技术架构

在这里插入图片描述
RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式消费消息。
  • NameServer:管理Broker代理服务器。
  • BrokerServer:RocketMQ的核心,负责消息的接收和转发。

部署架构

在这里插入图片描述
RocketMQ 网络部署特点:

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无
    任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息。
  • 结合部署架构图,集群工作流程可作如下描述:
    • 启动NameServer,通过监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
    • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
    • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
    • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,包含Topic中所有队列列表然后选择一个队列,与队列所在的Broker建立长连接再向Broker发消息。
    • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

三.RocketMQ高级特性

消息存储

在这里插入图片描述

  • 存储介质
    • 关系型数据库
      ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障。
    • 文件
      目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘损坏,否则一般是不会出现无法持久化的故障问题
      同步的可靠性高 异步的效率高
      同步:就是假如你是老板让员工区送邮件在员工送完邮件向你报告的期间一直属于等待状态;
      异步: 在等待报告期间回去做其他事情
    • 性能对比
      文件系统 > 关系型数据库DB

负载均衡

在这里插入图片描述
RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡

Producer的负载均衡

在这里插入图片描述
如图所示,5 个队列可以部署在一台机器上,也可以分别部署在 5 台不同的机器上,发送消息通过轮询队列的方式 发送,x每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量。 另外也可以自定义方式选择发往哪个队列。

Consumer的负载均衡
在这里插入图片描述
如图所示,如果有 5 个队列,2 个 consumer,那么第一个Consumer 消费 3 个队列,第二consumer 消费 2 个队列。 这样即可达到平均消费的目的,可以水平扩展 Consumer 来提高消费能力。但是 Consumer 数量要小于等于队列数 量,如果 Consumer超过队列数量,那么多余的Consumer 将不能消费消息 。

事务消息

在这里插入图片描述
Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。
在这里插入图片描述
事务消息发送步骤如下:

  • 生产者将半事务消息发送至消息队列RocketMQ服务端。

  • 消息队列RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。

  • 生产者开始执行本地事务逻辑。

  • 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端不会将该消息投递给消费者,并按照如下逻辑进行回查处理。
  • 事务消息回查步骤如下:

    • 在断网或者是生产者应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
    • 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
    • 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

顺序消息

消息有序指的是按照消息的发送顺序来消费(FIFO)。RocketMQ可以保证消息有序,消息有序分为部分有序和全局有序。全局有序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

如果要实现全局顺序消息,那么只能使用一个队列,一个生产者,这会严重影响性能。
在这里插入图片描述

因此,我们常说的顺序消息通常指部分顺序消息。
在这里插入图片描述

顺序消费的原理解析,在默认的情况下消息发送会采取轮询方式把消息发送到不同的分区队列;而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue
只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每queue,消息都是有序的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/385084.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

服装效果图为何要用云渲染100?渲染100邀请码1a12

服装行业是充满创意和竞争的领域&#xff0c;而服装效果图是其中重要一环&#xff0c;以前效果图都是本地渲染&#xff0c;现在越来越多的设计师转向云渲染&#xff0c;以国内最专业的平台渲染100为例&#xff0c;云渲染有以下好处&#xff1a; 1、提高工作效率 设计师可以利用…

Netty源码系列 之 FastThreadLocal源码

目录 Netty优化方案之 FastThreadLocal 前言 ThreadLocal ThreadLocal是干什么的&#xff1f; 为什么要使用ThreadLocal工具类去操控存取目标数据到Thread线程 &#xff1f; ThreadLocal的使用场景 目标数据存储到Thread线程对象的哪里&#xff1f; 怎么样把一个目标数据…

JavaWeb:SpingBoot原理 --黑马笔记

1. 配置优先级 在我们前面的课程当中&#xff0c;我们已经讲解了SpringBoot项目当中支持的三类配置文件&#xff1a; application.properties application.yml application.yaml 在SpringBoot项目当中&#xff0c;我们要想配置一个属性&#xff0c;可以通过这三种方式当中…

Ubuntu Desktop - Terminal 输出全部选中 + 复制

Ubuntu Desktop - Terminal 输出全部选中 复制 1. Terminal2. Terminal 最大化3. Edit -> Select All4. Copy & PasteReferences 1. Terminal 2. Terminal 最大化 3. Edit -> Select All 4. Copy & Paste Edit -> Copy or Shift Ctrl C Edit -> Paste…

【Python网络编程之TCP三次握手】

&#x1f680; 作者 &#xff1a;“码上有前” &#x1f680; 文章简介 &#xff1a;Python开发技术 &#x1f680; 欢迎小伙伴们 点赞&#x1f44d;、收藏⭐、留言&#x1f4ac; Python网络编程之[TCP三次握手] 代码见资源&#xff0c;效果图如下一、实验要求二、协议原理2.…

Microsoft Excel 加载数据分析工具

Microsoft Excel 加载数据分析工具 1. 打开 Excel&#xff0c;文件 -> 选项2. 加载项 -> 转到…3. 分析工具库、分析工具库 - VBA4. 打开 Excel&#xff0c;数据 -> 数据分析References 1. 打开 Excel&#xff0c;文件 -> 选项 2. 加载项 -> 转到… ​​​ 3…

Win32 控制台绘图2

之前已经了解在控制台可以调用Win32 api绘图&#xff1b;下面继续加深一下此概念&#xff1b; #include <stdio.h> #include <stdlib.h> #include <windows.h>HWND WINAPI GetConsoleWindow();int main(int argc, char *argv[]) {HWND hwnd; HDC hdc; HPE…

【MySQL探索之旅】MySQL数据库下载及安装教程

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 | 《数据结构与算法》 | 《C生万物》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ &#x1f64f;小杨水平有…

c++ 内存

c 内存 内存分区 1.代码区&#xff1a;程序的机器指令&#xff0c;可以被机器直接执行。 属性&#xff1a;只读和共享 代码区包含什么&#xff1a; 在程序编译时就已经被分配好了地址&#xff0c;并保存在可执行文件的代码段中。当程序运行时&#xff0c;操作系统会将代码段的…

【C++】类的隐式类型转换

文章目录 前言一、隐式类型转换二、explicit关键字总结 前言 一、隐式类型转换 C 类的隐式类型转换是指当一个类定义了适当的构造函数或转换函数时&#xff0c;可以在需要时自动进行类型转换&#xff0c;而无需显式调用转换函数或构造函数。这使得代码更具灵活性和简洁性。下面…

一种简单的车辆过减速带识别方法

识别方法参考以下图片上的这篇论文第三章&#xff0c;有需要的自行知网下载。 一、离散冲击路面建立 我们之前已经搭建了C级路面&#xff0c;直接在C级路面中间某一段加上减速带就可以。这里我加的减速带&#xff0c;如下图所示&#xff0c;高30mm&#xff0c;凸台宽约20mm&am…

刘谦春晚魔术的数学原理

刘谦春晚魔术的数学原理 文章目录 前言魔术介绍魔术揭秘STEP 1STEP 2STEP 3STEP 4STEP 5STEP 6STEP 7 总结 前言 2024 春晚刘谦的第二个魔术很多人跟着一起做了&#xff0c;都觉得非常神奇。我也跟着操作了一遍&#xff0c;结果一眼就让我看出了背后的数学原理。下面给大家介…

自己搭建的幻兽帕鲁服务器怎么一键配置游戏参数?可视化面板调整参数

单击面板内的【调整参数】按钮&#xff0c;即可在下方表格中开启编辑模式。找到“死亡惩罚”配置项&#xff0c;并将它的值修改为&#xff1a;无丢失。 点击【保存】按钮&#xff0c;此时将弹出气泡&#xff0c;提示你当前操作需要在游戏服务重启后才可生效&#xff08;不会…

数据结构哈希表

这里个大家用数组来模拟哈希表 法一&#xff1a;拉链法 法二&#xff1a;开放寻址法 /** Project: 11_哈希表* File Created:Sunday, January 17th 2021, 2:11:23 pm* Author: Bug-Free* Problem:AcWing 840. 模拟散列表 拉链法*/ #include <cstring> #include <iostr…

AI:126-基于深度学习的人体情绪识别与分析

🚀点击这里跳转到本专栏,可查阅专栏顶置最新的指南宝典~ 🎉🎊🎉 你的技术旅程将在这里启航! 从基础到实践,深入学习。无论你是初学者还是经验丰富的老手,对于本专栏案例和项目实践都有参考学习意义。 ✨✨✨ 每一个案例都附带有在本地跑过的关键代码,详细讲解供…

【Linux】进程概念(冯诺依曼体系结构、操作系统、进程)-- 详解

一、冯诺依曼体系结构 1、概念 &#xff08;1&#xff09;什么是冯诺伊曼体系结构&#xff1f; 数学家冯诺伊曼于 1946 年提出存储程序原理&#xff0c;把程序本身当作数据来对待&#xff0c;程序和该程序处理的数据用同样的方式储存。 冯诺伊曼理论的要点是&#xff1a;计算…

复旦TravelPlanner让大语言模型挑战旅程规划

引言&#xff1a;探索语言智能的新疆界——旅行规划 在人工智能的发展历程中&#xff0c;规划一直是核心追求之一。然而&#xff0c;由于缺乏人类水平规划所需的多种认知基础&#xff0c;早期的AI代理主要集中在受限的环境中。随着大语言模型&#xff08;LLMs&#xff09;的出…

【设计模式】springboot3项目整合模板方法深入理解设计模式之模板方法(Template Method)

&#x1f389;&#x1f389;欢迎光临&#x1f389;&#x1f389; &#x1f3c5;我是苏泽&#xff0c;一位对技术充满热情的探索者和分享者。&#x1f680;&#x1f680; &#x1f31f;特别推荐给大家我的最新专栏《Spring 狂野之旅&#xff1a;底层原理高级进阶》 &#x1f680…

GPT4:你是故意的吧!

请问下面选项中哪个是中文&#xff1f; A.Chinese B.英文 这是一个关于语言识别的问题。我们需要分析并确定所给选项中哪个表示中文。 对于选项A.Chinese&#xff1a;这个词本身表示“中文”或“中国的”。在多种语境中&#xff0c;它经常被用来指代中国的语言&#xff0c;即中…

(三十五)大数据实战——Superset可视化平台搭建

前言 本节内容是关于Apache Superset可视化平台的搭建&#xff0c;Apache Superset是一个现代的数据探索和可视化平台 。它功能强大且十分易用&#xff0c;可对接各种数据源&#xff0c;包括很多现代的大数据分析引擎&#xff0c;拥有丰富的图表展示形式&#xff0c;并且支持自…