RabbitMq深度学习

什么是RabbitMq?

RabbitMQ是一个开源的消息队列中间件,它实现了高级消息队列协议(AMQP)。它被广泛用于分布式系统中的消息传递和异步通信。RabbitMQ提供了一种可靠的、可扩展的机制来传递消息,使不同的应用程序能够相互之间进行通信。它支持多种编程语言和平台,并且具有灵活的路由和队列配置选项。

同步调用 

同步调用的优点:

  • 时效性较强,可以立即得到结果

同步调用的问题:

  • 耦合度高

  • 性能和吞吐能力下降

  • 有额外的资源消耗

  • 有级联失败问题

异步调用

好处:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速

  • 故障隔离:服务没有直接调用,不存在级联失败问题

  • 调用间没有阻塞,不会造成无效的资源占用

  • 耦合度极低,每个服务都可以灵活插拔,可替换

  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理

  • 需要依赖于Broker的可靠、安全、性能

MQ的种类 

 RabbitMq安装和使用 

 云服务器安装Rabbitmq。

 在docker 中拉去Ribbitmq镜像。

在docker 中运行ribbitmq。

docker run -d -p 5672:5672 -p 15672:15672 -p 25672:25672 --name rabbitmq rabbitmq

 查看rabbitmq的状态。

rabbitmqctl status

接着我们还可以将Rabbitmq的管理面板开启,这样就可以在浏览器上进行实时访问和监控了。 

我们需要先进入rabbitmq容器。

docker exec -it [在docker中对应的ID] [进入容器的路径] #路径一般为/bin/bash

开启rabbitmq的控制面板设置。

rabbitmq-plugins enable rabbitmq_management

打开rabbitmq的控制面板,就是对应的控制面板端口为15672。

账号和密码都是:guest

 消息队列模型

 SpringAMQP

 什么是springAMQP?

Spring AMQP 是一个基于 Spring 框架的 AMQP(高级消息队列协议)的开发框架。它提供了一种简化和抽象化的方式来使用 AMQP,使得在应用程序中使用消息队列变得更加容易。

springAMQP的使用

导入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

编写发送者

编写applcation.yml文件

spring:
  rabbitmq:
    host: 119.9.212.171 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: guest # 用户名
    password: guest # 密码

进行测试

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@RunWith(SpringRunner.class) #如果不加此注解,spring容器无法自动注入RabbitTemplate
@SpringBootTest
public class PublisherTest {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void tess1() {
        String queueName = "queueName";
        String message = "hello, tolen";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

测试结果为下:

 可能会出现没有队列生成的情况,这是因为@Test无法自动一个 queue,我们手动创建一个即可。

编写消费者

编辑application.yml文件

spring:
  rabbitmq:
    host: 192.168.150.101 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: test # 用户名
    password: 123456 # 密码

创建消息监听者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqListener {
    @RabbitListener(queues = "queueName")
    public void getMessage(String message) {
        System.out.println("获取的消息是:" + message);
    }
}

直接配置即可,在后续的项目中消费者会监听对应的消息进行操作。

WorkQueue

我们可以对一个消息标签设置多个监听者,并且默认的设置是预取,也就是即使服务模块处理能力差的情况也会分配到相同个数的信息,不能达到能者多劳的效果,为了到达此效果,我们可以在application.yml中进行设置。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

发布与订阅

FanoutExchange的使用

在消费者模块编写:新建交换机,新建队列,交换机和队列绑定操作。

在配置类中完成上述操作

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfiguration {
    //声明交换机FanoutExchange
    @Bean
    public FanoutExchange fanoutExchange() {
//        设置交换机的名字
        return new FanoutExchange("tolen.fanout");
    }
//    创建一个信息队列1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }
//    创建信息队列2
    @Bean
    public Queue fanoutQueue2() {
         return new Queue("fanout.queue2");
    }
    //将交换机和队列1进行绑定
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        //绑定队列给对应的交换机
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    //将交换机和队列2进行绑定
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

在消费者模块中创建两个队列的监听器

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqListener {
    @RabbitListener(queues = "fanout.queue1")
    public void getMessage1(String message) {
        System.out.println("消息队列1中获取的消息是:" + message);
    }
    @RabbitListener(queues = "fanout.queue2")
    public void getMessage2(String message) {
        System.out.println("消息队列2中获取的消息是:" + message);
    }

}

接下来不信消息发送模块,这里需要注意的是,此时我们是向对应的交换机发送消息,通过交换机发送消息给两个消息队列。

发送消息的代码为下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void tess1() {
        String queueName = "queueName";
        String message = "hello, tolen";
        rabbitTemplate.convertAndSend(queueName, message);
    }
    @Test
    public void fanoutTest() {
        String exchangeName = "tolen.fanout";
        String message = "hi, tolen!";
        //routingKey不进行设置
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
}

如果不设置routingKey的话,就会默认将消息发送到使用绑定的消息队列上。 

测试结果为下:

交换机状态

监听器接收到的消息 

 DirectExchange

可以设置routingKey,交换机可以向指定的队列发送消息。

配置监听器

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqListener {
    //使用注解进行绑定, 不再需要configuration配置
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "directQueue1"),
            exchange = @Exchange(name = "direct"), //默认使用的交换机类型就是directExchange
            key = {"red", "blue"}
    ))
    public void directQueue1(String message) {
        System.out.println("directQueue2:" + message);
    }
    //使用注解进行绑定, 不再需要configuration配置
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "directQueue2"),
            exchange = @Exchange(name = "direct"), //默认使用的交换机类型就是directExchange
            key = {"red"}
    ))
    public void directQueue2(String message) {
        System.out.println("directQueue2:" + message);
    }
}

编写消息发布模块

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void fanoutTest() {
        String exchangeName = "direct";
        String message = "hi, tolen!";
        //设置routingKey
        rabbitTemplate.convertAndSend(exchangeName, "blue", message);
    }
}

测试结果为下:

此时就只有routingKey=blue的监听器才会接收到消息。

TopicExchage

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

修改编写监听器的配置

//使用注解进行绑定, 不再需要configuration配置
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "directQueue2"),
            exchange = @Exchange(name = "direct", type = ExchangeTypes.TOPIC), //默认使用的交换机类型就是directExchange
            key = {"#.new"}
    ))
    public void directQueue2(String message) {
        System.out.println("directQueue2:" + message);
    }

只要发送的消息中的routingKey中尾部为新闻的消息全部会被监听。(routingKey使用"."作间隔)

消息转换器

在springboot中默认使用JDK的序列化,为了提高使用性,我们可以使用json转换器。

在消费者和发送者中都导入对应的依赖。

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

在configuration中配置信息转换器。(消费者和发布者都需要配置)

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfiguration {

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

进行测试,在发送一个对象类型的消息。

对应的监听器

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Objects;

@Component
public class RabbitMqListener {
    //使用注解进行绑定, 不再需要configuration配置
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "directQueue2"),
            exchange = @Exchange(name = "direct"), //默认使用的交换机类型就是directExchange
            key = {"blue"}
    ))
    public void directQueue2(Map<String, String> message) {
        System.out.println("directQueue2:" + message);
    }
}

对应的发送代码

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.LinkedHashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void fanoutTest() {
        String exchangeName = "direct";
        Map<String, String> message = new LinkedHashMap<>();
        message.put("name", "tolen");
        message.put("age", "19");
        //设置routingKey
        rabbitTemplate.convertAndSend(exchangeName, "blue", message);
    }
}

测试效果为下:

接收到的数据 。

 消息队列中的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/96539.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

docker network

docker network create <network>docker network connect <network> <container>docker network inspect <network>使用这个地址作为host即可 TODO&#xff1a;添加docker-compose

S32K324芯片学习笔记-实时控制系统-ADC

文章目录 Analog-to-Digital Converter (ADC)用于内部供应监控的ANAMUXBCTU接口硬件触发ADC多路模式通道功能框图特点功能描述时钟转换正常触发注入触发BCTU接口BCTU Trigger modeBCTU Control mode 配置ADC时钟分压器和采样时间设置预采样启用每个通道的预采样 模拟看门狗功能…

Redis笔记——(狂神说)

Nosql概述 为什么要用NoSql&#xff1f; 1、单机mysql的年代&#xff1a;90年代&#xff0c;网站访问量小&#xff0c;很多使用静态网页html写的&#xff0c;服务器没压力。 当时瓶颈是&#xff1a;1)数据量太大一个机器放不下。2)数据的索引(BTree)&#xff0c;一个机器内存也…

Qt 6.5.2连接MySQL及测试代码(附驱动文件)

版本&#xff1a;Windows_64位&#xff0c;Qt 6.5.2&#xff0c;MySQL 8.0 一、配置驱动文件&#xff08;3步搞定&#xff01;&#xff09; 现在Qt6版本不提供MySQL的驱动&#xff0c;而且也没有pro项目让你生成驱动的dll文件&#xff0c;所以只要你使用Qt6版本就需要手动去生…

【springboot】springboot定时任务:

文章目录 一、文档&#xff1a;二、案例&#xff1a; 一、文档&#xff1a; 【cron表达式在线生成器】https://cron.qqe2.com/ 二、案例&#xff1a; EnableScheduling //开启任务调度package com.sky.task;import com.sky.entity.Orders; import com.sky.mapper.OrderMapper; …

博客系统后端(项目系列2)

目录 前言 &#xff1a; 1.准备工作 1.1创建项目 1.2引入依赖 1.3创建必要的目录 2.数据库设计 2.1博客数据 2.2用户数据 3.封装数据库 3.1封装数据库的连接操作 3.2创建两个表对应的实体类 3.3封装一些必要的增删改查操作 4.前后端交互逻辑的实现 4.1博客列表页 …

使用实体解析和图形神经网络进行欺诈检测

图形神经网络的表示形式&#xff08;作者使用必应图像创建器生成的图像&#xff09; 一、说明 对于金融、电子商务和其他相关行业来说&#xff0c;在线欺诈是一个日益严重的问题。为了应对这种威胁&#xff0c;组织使用基于机器学习和行为分析的欺诈检测机制。这些技术能够实时…

macOS - DOSbox

文章目录 关于 DOSbox安装使用启动设置启动盘、查看文件 debug 关于 DOSbox 官网&#xff1a; https://www.dosbox.com/文档&#xff1a;https://www.dosbox.com/wiki/Basic_Setup_and_Installation_of_DosBox下载&#xff1a; https://www.dosbox.com/download.php https://s…

C语言程序设计——小学生计算机辅助教学系统

题目&#xff1a;小学生计算机辅助教学系统 编写一个程序&#xff0c;帮助小学生学习乘法。然后判断学生输入的答案对错与否&#xff0c;按下列任务要求以循序渐进的方式分别编写对应的程序并调试。 任务1 程序首先随机产生两个1—10之间的正整数&#xff0c;在屏幕上打印出问题…

flutter对数组中某个数据二次加工成单独的数组

如何将数据[2,1,2,2,2,1,2,2,3,2,2,2,2,3,2,2,2,2,2,3,2,4,2,2,1,2,3,2,4,2]加工成 [[2], 1, [2, 2, 2], 1, [2, 2], 3, [2, 2, 2, 2], 3, [2, 2, 2, 2, 2], 3, [2], 4, [2, 2], 1, [2], 3, [2], 4, [2]]。这是实际工作中遇到的问题&#xff0c;UI要求将某一类型数据&#xff…

ioctl、printk及多个此设备支持

一、ioctl操作实现 ioctl&#xff08;Input/Output Control&#xff09;是一个在 Unix-like 操作系统中的系统调用&#xff0c;用于控制设备或文件的各种操作。它允许用户空间程序与内核空间进行交互&#xff0c;执行一些特定的设备控制、状态查询或其他操作&#xff0c;而不必…

Linux 可重入、异步信号安全和线程安全

可重入函数 当一个被捕获的信号被一个进程处理时&#xff0c;进程执行的普通的指令序列会被一个信号处理器暂时地中断。它首先执行该信号处理程序中的指令。如果从信号处理程序返回&#xff08;例如没有调用exit或longjmp&#xff09;&#xff0c;则继续执行在捕获到信号时进程…

移动隔断墙的用途和空间布局,设计合适的结构,包括固定方式

移动隔断墙的用途&#xff1a; 1. 划分空间&#xff1a;移动隔断墙可以在需要时将一个大空间划分为多个小空间&#xff0c;以满足不同的使用需求。 2. 提供隐私&#xff1a;移动隔断墙可以为需要隐私的区域提供屏障&#xff0c;例如办公室中的会议室或私人办公室。 3. 增加灵活…

一文速学-让神经网络不再神秘,一天速学神经网络基础-输出层(四)

前言 思索了很久到底要不要出深度学习内容&#xff0c;毕竟在数学建模专栏里边的机器学习内容还有一大半算法没有更新&#xff0c;很多坑都没有填满&#xff0c;而且现在深度学习的文章和学习课程都十分的多&#xff0c;我考虑了很久决定还是得出神经网络系列文章&#xff0c;不…

【SpringSecurity】七、SpringSecurity集成thymeleaf

文章目录 1、thymeleaf2、依赖部分3、定义Controller4、创建静态页面5、WebSecurityConfigurerAdapter6、权限相关7、当用户没有某权限时&#xff0c;页面不展示该按钮 1、thymeleaf 查了下读音&#xff0c;leaf/li:f/&#xff0c;叶子&#xff0c;前面的单词发音和时间time一…

RV64函数调用流程分析

RV64函数调用流程分析 1 RV64 函数调用实例2 对应代码的分析2.1 main函数及其对应的汇编程序2.1.1 main的C代码实现2.1.2 main函数对应汇编及其分析2.1.3 执行完成之后栈的存放情况 2.2 test_fun_a函数及其对应的汇编程序2.2.1 test_fun_a函数的C实现2.2.2 test_fun_a函数对应汇…

8_分类算法-k近邻算法(KNN)

文章目录 1 KNN算法1.1 KNN算法原理1.2 KNN过程1.3 KNN三要素1.4 KNN分类预测规则1.5 KNN回归预测规则1.6 KNN算法实现方式&#xff08;重点&#xff09;1.7 k近邻算法优缺点 2 KD-Tree2.1 KD Tree构建方式2.2 KD Tree查找最近邻2.3 KNN参数说明 1 KNN算法 定义&#xff1a;如…

司徒理财:8.30黄金原油今日最新行情分析及操作策略

黄金走势分析      从日线形态来看&#xff0c;昨晚经历了快速拉升&#xff0c;价格成功稳定在关键的1924压力位之上&#xff0c;最高甚至触及了1938的高点。这表明市场开启了新一轮走势的空间。在当天的日内交易中&#xff0c;我们应特别关注1925一线作为支撑&#xff0c;…

opencv 案例实战02-停车场车牌识别SVM模型训练及验证

1. 整个识别的流程图&#xff1a; 2. 车牌定位中分割流程图&#xff1a; 三、车牌识别中字符分割流程图&#xff1a; 1.准备数据集 下载车牌相关字符样本用于训练和测试&#xff0c;本文使用14个汉字样本和34个数字跟字母样本&#xff0c;每个字符样本数为40&#xff0c;样本尺…

基于Django的博客管理系统

1、克隆仓库https://gitee.com/lylinux/DjangoBlog.git 若失效&#xff1a;https://gitee.com/usutdzxy/DjangoBlog.git 2、环境安装 pip install -Ur requirements.txt3、修改djangoblog/setting.py 修改数据库配置&#xff0c;其他的步骤就按照官方文档。 DATABASES {def…