消息队列实战应用

适用场景

耗时长,非核心业务,生产者不会用到消息处理结果的情况下,可以将消息交给异步服务去缓存与消费

部署MQ服务

version: "3.0"
services:
  rabbitmq:
    container_name: rabbitmq-15672-1
    image: rabbitmq:3-management
    ports:
      - "15672:15672"
      - "5672:5672"
    environment:
      RABBITMQ_DEFAULT_USER: root
      RABBITMQ_DEFAULT_PASS: root123

15672是rabbitmq server的图形化界面端口
5672是向rabbitmq server发送消息的端口

架构

exchange交换机维护生产者列表和队列列表,queue消息队列维护消费者列表
生产者只需要把消息交给交换机,由交换机决定将消息转发给哪一个队列,最后再由队列根据消费者列表,转发消息

在这里插入图片描述
类似于数据库或者容器,不同的项目或者服务独占一组交换机和队列,为了避免各组交换机和队列相互影响,采用虚拟主机进行隔离
一个管理员用户对应一个虚拟主机,每个管理员只能操作自己对应的那个虚拟主机中的交换机和队列
消息只能缓存在队列中,如果消息无法到达队列,就会出现消息丢失的问题

客户端

rabbitMQ提供了多种语言的代码客户端,这些客户端通过网络与rabbitMQ服务端进行交互

SpringAMQP

Spring封装了官方提供的API
AMQP:advanced MQ protocol
是一种有关消息队列的高级网络协议,规定了消息如何从生产者出发,经过交换机和队列,最终到达消费者的过程
换句话说,只要遵循了AMQP规范,就能用任何语言,实现消息队列的功能
在这里插入图片描述

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>3.2.4</version>
</dependency>

配置文件

RabbitMQ暴露的用于发消息的端口是5672

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /v1
    username: u1
    password: u1

RabbitTemplate

Spring将功能封装在模板类RabbitTemplate中

生产者

convertAndSend方法实现消息发送到RabbitMQ服务器,并且缓存起来
这个方法需要指定服务端的一个消息队列,以及消息内容

String queue = "test.queue1";
        String message = "支付成功3";

        rabbitTemplate.convertAndSend(queue, message);

底层是单独开一个异步线程用来发送消息

	/**
	 * 将指定消息发送到交换机
	 *
	 * @param channel 单个网络连接中的一个缓存区
	 * @param exchangeArg 目标交换机名称
	 * @param routingKeyArg 路由键值
	 * @param message 待发送的消息
	 * @param mandatory 标记字段
	 * @param correlationData 数据
	 */
	public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
			boolean mandatory, @Nullable CorrelationData correlationData) {

		String exch = nullSafeExchange(exchangeArg);
		String rKey = nullSafeRoutingKey(routingKeyArg);

		Message messageToUse = message;
		MessageProperties messageProperties = messageToUse.getMessageProperties();
		if (mandatory) {
			messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid);
		}
		if (this.beforePublishPostProcessors != null) {
			for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
				messageToUse = processor.postProcessMessage(messageToUse, correlationData, exch, rKey);
			}
		}
		setupConfirm(channel, messageToUse, correlationData);
		if (this.userIdExpression != null && messageProperties.getUserId() == null) {
			String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);
			if (userId != null) {
				messageProperties.setUserId(userId);
			}
		}
		if (logger.isDebugEnabled()) {
			logger.debug("Publishing message [" + messageToUse
					+ "] on exchange [" + exch + "], routingKey = [" + rKey + "]");
		}
		observeTheSend(channel, messageToUse, mandatory, exch, rKey);
		// Check if commit needed
		if (isChannelLocallyTransacted(channel)) {
			// Transacted channel created by this template -> commit.
			RabbitUtils.commitIfNecessary(channel);
		}
	}

消费者

在IoC容器中注册一个监听器bean,绑定消息队列,当消息队列接收到消息时,会通知Spring,由Spring将对应的消息交给监听器处理

/**
* 消息队列监听器bean
* */
@Component
@Slf4j
public class SpringRabbitListener {

    @RabbitListener(queues = {"test.queue1"})
    public void listenOnTestQueue1(String message){

        log.info("接收到test.queue1队列的消息:{}",message);
    }

}

消息分配策略

RabbitMq默认采用轮询的机制向同一队列的多个消费者分配消息,属于平均分配消息,但是,实际生产环境中,各个消费者的处理速度并不一样,那么,最优性能的做法,应该是让处理速度快的消费者处理更多的消息,而不是平均分配
需要设置预分配消息数量,消息队列默认一次性将消息全部平均分配给消费者,这样会导致消费者一次接受过多的消息而处理不过来,而且也不是一种最优的消费方式
可以限制发送给消费者的消息数量,这本质上是一种限流策略
底层是交给channel的basicQos实现限流


void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
  • 请求特定的“服务质量”设置。这些设置对服务器在要求确认之前将交付给使用者的数据量施加了限制。因此,它们提供了一种由消费者发起的流量控制手段。请注意,预取计数必须介于
    0 和 65535 之间(AMQP 0-9-1 中的无符号短行)。Params: prefetchSize –
    服务器将交付的最大内容量(以八位字节为单位),0 if unlimited prefetchCount – 服务器将传递的最大消息数,0
    if unlimited global – true 如果设置应应用于整个通道而不是每个使用者 Throws: IOException

队列

一个队列负责绑定同一类服务的消费者,执行同一个功能
一个交换机可以给多个队列发送消息,也就是可以通知多个不同类型的服务

代码创建队列

SpringAMQP提供了一套用于创建队列的Api


/**
* 创建交换机以及绑定交换机关系
* */
@Configuration
public class FanoutConfiguration {

    @Bean
    public FanoutExchange fanoutExchange(){

        return ExchangeBuilder.fanoutExchange("v1.fanout2").build();
    }

    @Bean
    public Queue fanoutQueue3(){

        return QueueBuilder.durable("fanout.queue1").build();
    }
    
    @Bean
    public Binding fanoutBinding1(Queue queue,FanoutExchange exchange){
        
        return BindingBuilder.bind(queue).to(exchange);
    }

}

也可以适用注解指定交换机和队列的定义信息和绑定信息

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "direct.queue1",durable = "true"),
                            exchange = @Exchange(name = "v1.direct",type = ExchangeTypes.DIRECT),
                            key = {"china.#","japan.#"}
                    ),
                    @QueueBinding(
                            value = @Queue(name = "direct.queue2",durable = "true"),
                            exchange = @Exchange(name = "v1.direct2",type = ExchangeTypes.DIRECT),
                            key = {"china.#","japan.#"}
                    ),
                    @QueueBinding(
                            value = @Queue(name = "direct.queue3",durable = "true"),
                            exchange = @Exchange(name = "v1.direct3",type = ExchangeTypes.DIRECT),
                            key = {"china.#","japan.#"}
                    )
            }
    )

交换机

交换机只负责将同样的消息发给多个队列,并且不会缓存消息
根据交换机转发消息的策略的不同,可以将交换机分类
RabbitMQ也提供了convertAndSend的重载方法,用于指定交换机

Fanout广播交换机

广播:交换机会将同样的消息发送给所有与它绑定的消费者

public void publish() throws Exception{

        String exchange = "v1.fanout";

        for (int i = 0; i < 50; i++) {
            String message = "消费者消息:"+i;
            rabbitTemplate.convertAndSend(exchange,"", message);
            Thread.sleep(20);
        }

    }

Direct定向交换机

通过进一步划分消息队列,实现消息的精准投递,划分的关键是RoutingKey和BindingKey的配对,只有RoutingKey和BindingKey匹配时,交换机才会向该队列发送消息
生产者发送的消息都带上一个RoutingKey,那么,这条消息只能发送给拥有配对的BindingKey的队列
类似于,客户端向某个端口发送消息,服务端监听某个端口

Topic交换机

与Direct交换机的投递机制相同,只不过,Topic交换机的key可以是以点分隔的多个单词,并且,支持通配符匹配
*匹配一个单词
#匹配0个或多个单词

序列化和反序列化

消息通过网络传输,以二进制数据形式传输,所以,需要先将消息序列化成字节数组
RabbitMQ客户端默认先将消息转成message对象,再进行序列化,适用默认的SimpleMessageConverter

	/**
	 * 根据提供的消息对象创建message对象
	 */
	@Override
	protected Message createMessage(Object object, MessageProperties messageProperties)
			throws MessageConversionException {

		byte[] bytes = null;
		if (object instanceof byte[]) {
			bytes = (byte[]) object;
			messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
		}
		else if (object instanceof String) {
			try {
				bytes = ((String) object).getBytes(this.defaultCharset);
			}
			catch (UnsupportedEncodingException e) {
				throw new MessageConversionException("failed to convert to Message content", e);
			}
			messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
			messageProperties.setContentEncoding(this.defaultCharset);
		}
		else if (object instanceof Serializable) {
			try {
				bytes = SerializationUtils.serialize(object);
			}
			catch (IllegalArgumentException e) {
				throw new MessageConversionException("failed to convert to serialized Message content", e);
			}
			messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
		}
		if (bytes != null) {
			messageProperties.setContentLength(bytes.length);
			return new Message(bytes, messageProperties);
		}
		throw new IllegalArgumentException(getClass().getSimpleName()
				+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
	}

核心序列化代码:

new ObjectOutputStream(stream).writeObject(object);

可以看出rabbitMQ默认使用jdk的序列化流将对象转化成二进制数据,底层实际是通过字节码进行计算得到一串二进制数
jdk序列化缺点太多,在很多业务中都不适用,缺点有以下:
篡改序列化后的数据,将导致无法通过反序列化还原数据
序列化后的数据体积会增大很多倍
序列化后的数据可读性差

最优的解决方案是选择将对象序列化成Json字符串
通过配置类,来替换rabbitMQ默认的序列化器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/654594.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

短视频再度重逢:四川京之华锦信息技术公司

短视频再度重逢 在数字化时代的浪潮中&#xff0c;短视频以其独特的魅力迅速崛起&#xff0c;成为现代人生活中不可或缺的一部分。而当我们谈论起短视频&#xff0c;我们不仅仅是在谈论一种娱乐方式&#xff0c;更是在谈论一种情感的载体&#xff0c;一种回忆的媒介。今天&…

【ai】LiveKit Agent 的example及python本地开发模式工程实例

title: ‘LiveKit Agent Playground’ playgroundLiveKit Community playground的环境变量&#xff1a;LiveKit API # LiveKit API Configuration LIVEKIT_API_KEYYOUR_API_KEY LIVEKIT_API_SECRETYOUR_API_SECRET# Public configuration NEXT_PUBLIC_LIVEKIT_URLwss://YOUR_…

pytorch比较操作

文章目录 常用的比较操作1.torch.allclose()2.torch.argsort()3.torch.eq()4.torch.equal()5.torch.greater_equal()6.torch.gt()7.torch.isclose()8.torch.isfinite()9.torch.isif()10.torch.isposinf()11.torch.isneginf()12.torch.isnan()13.torch.kthvalue()14.torch.less_…

【从零开始学习RabbitMQ | 第二篇】如何确保MQ的可靠性和消费者可靠性

目录 前言&#xff1a; MQ可靠性&#xff1a; 数据持久化&#xff1a; Lazy Queue&#xff1a; 消费者可靠性&#xff1a; 消费者确认机制&#xff1a; 消费失败处理&#xff1a; MQ保证幂等性&#xff1a; 方法一&#xff1a; 总结&#xff1a; 前言&#xff1a; …

以梦为马,不负韶华(3)-AGI在企业服务的应用

AGI在企业服务中&#xff0c;各应⽤已覆盖企业全流程&#xff0c;包含⼈⼒、法务、财税、流程⾃动化、知识管理和软件开发各领域。 由于⼤语⾔模型对⽂本处理类场景有着天然且直接的适配性&#xff0c;⽂本总结、⽂本内容⽣成、服务指引等发展起步早且应⽤成熟度更⾼。 在数据…

Captura完全免费的电脑录屏软件

一、简介 1、Captura 是一款免费开源的电脑录屏软件&#xff0c;允许用户捕捉电脑屏幕上的任意区域、窗口、甚至是全屏画面&#xff0c;并将这些画面录制为视频文件。这款软件具有多种功能&#xff0c;例如可以设置是否显示鼠标、记录鼠标点击、键盘按键、计时器以及声音等。此…

LeetCode题练习与总结:有序链表转换二叉搜索树--109

一、题目描述 给定一个单链表的头节点 head &#xff0c;其中的元素 按升序排序 &#xff0c;将其转换为平衡二叉搜索树。 示例 1: 输入: head [-10,-3,0,5,9] 输出: [0,-3,9,-10,null,5] 解释: 一个可能的答案是[0&#xff0c;-3,9&#xff0c;-10,null,5]&#xff0c;它表…

医疗图像处理2023:Transformers in medical imaging: A survey

医学成像中的transformer:综述 目录 一、介绍 贡献与安排 二、CNN和Transformer 1.CNN 2.ViT 三、Transformer应用于各个领域 1.图像分割 1&#xff09;器官特异性 ①2D ②3D 2&#xff09;多器官类别 ①纯transformer ②混合架构 单尺度 多尺度 3&#xff09;…

Kubernetes——监听机制与调度约束

目录 前言 一、监听机制 1.Pod启动创建过程 2.调度过程 1.指定调度节点 1.1强制匹配 1.2强制约束 二、硬策略和软策略 1.键值运算关系 1.硬策略——requiredDuringSchedulingIgnoredDuringExecution 2.软策略——preferredDuringSchedulingIgnoredDuringExecution …

Varjo XR-4功能详解:由凝视驱动的XR自动对焦相机系统

Varjo是XR市场中拥有领先技术的虚拟现实设备供应商&#xff0c;其将可变焦距摄像机直通系统带入到虚拟和混合现实场景中。在本篇文章中&#xff0c;Varjo的技术工程师维尔蒂莫宁详细介绍了这项在Varjo XR-4焦点版中投入应用的技术。 对可变焦距光学系统的需求 目前所有其他XR头…

基于STM32实现智能饮水机控制系统

目录 引言环境准备智能饮水机控制系统基础代码示例&#xff1a;实现智能饮水机控制系统 温度传感器数据读取水泵和加热器控制水位传感器数据读取用户界面与显示应用场景&#xff1a;家庭和办公室的智能饮水管理问题解决方案与优化收尾与总结 1. 引言 本教程将详细介绍如何在S…

自适应感兴趣区域的级联多尺度残差注意力CNN用于自动脑肿瘤分割| 文献速递-深度学习肿瘤自动分割

Title 题目 Cascade multiscale residual attention CNNs with adaptive ROI for automatic brain tumor segmentation 自适应感兴趣区域的级联多尺度残差注意力CNN用于自动脑肿瘤分割 01 文献速递介绍 脑肿瘤是大脑细胞异常和不受控制的增长&#xff0c;被认为是神经系统…

第二证券炒股知识:股票破发后怎么办?

当一只新股的价格跌破其发行价时&#xff0c;往往会受到商场出资者的关注。关于股票破发后怎么办&#xff0c;第二证券下面就为我们具体介绍一下。 股票破发是指股票的商场价格低于其发行价格或最近一次增发价格&#xff0c;股票破发往往是由于多种要素共同作用的结果&#xf…

强化学习——学习笔记2

在上一篇文章中对强化学习进行了基本的概述&#xff0c;在此篇文章中将继续深入强化学习的相关知识。 一、什么是DP、MC、TD&#xff1f; 动态规划法&#xff08;DP&#xff09;&#xff1a;动态规划法离不开一个关键词&#xff0c;拆分 &#xff0c;就是把求解的问题分解成若…

亡羊补牢,一文讲清各种场景下GIT如何回退

系列文章目录 手把手教你安装Git&#xff0c;萌新迈向专业的必备一步 GIT命令只会抄却不理解&#xff1f;看完原理才能事半功倍&#xff01; 常用GIT命令详解&#xff0c;手把手让你登堂入室 GIT实战篇&#xff0c;教你如何使用GIT可视化工具 GIT使用需知&#xff0c;哪些操作…

Meta 推出新型多模态 AI 模型“变色龙”(Chameleon),挑战 GPT-4o,引领多模态革命

在人工智能领域&#xff0c;Meta 近日发布了一款名为“变色龙”&#xff08;Chameleon&#xff09;的新型多模态 AI 模型&#xff0c;旨在挑战 OpenAI 的 GPT-4o&#xff0c;并刷新了当前的技术标准&#xff08;SOTA&#xff09;。这款拥有 34B 参数的模型通过 10 万亿 token 的…

2-EMMC启动及各分区文件生成过程

EMMC的使用比nand flash还是复杂一些&#xff0c;有其特有的分区和电器性能 1、启动过程介绍 跟普通nand或spi flash不同&#xff0c;uboot前面还有好几级 在vendor某些厂商的设计中&#xff0c;ATF并不是BOOTROM加载后的第一个启动镜像&#xff0c;可能是这样的&#xff1a; …

微信小程序多端应用Donut Android生成签名

一、生成签名的作用 确保应用的完整性&#xff1a;签名可以确保应用在发布后没有被修改。如果应用被修改&#xff0c;签名就会改变&#xff0c;Android系统就会拒绝安装。确定应用的唯一身份&#xff1a;签名是应用的唯一标识&#xff0c;Android系统通过签名来区分不同的应用…

【Postman接口测试】第二节.Postman界面功能介绍(上)

文章目录 前言一、Postman前言介绍二、Postman界面导航说明三、使用Postman发送第一个请求四、Postman 基础功能介绍 4.1 常见类型的接口请求 4.1.1 查询参数的接口请求 4.1.2 表单类型的接口请求 4.1.3 上传文件的表单请求 4.1.4 JSON 类…

Linux软硬链接详解

软链接&#xff1a; ln -s file1 file2//file1为目标文件&#xff0c;file2为软链接文件 演示&#xff1a; 从上图可以得出&#xff1a; 软链接本质不是同一个文件&#xff0c;因为inode不同。 作用&#xff1a; 软连接就像是Windows里的快捷方式&#xff0c;里面存放的是目标…