SpringBoot activemq收发消息、配置及原理

SpringBoot集成消息处理框架

Spring framework提供了对JMS和AMQP消息框架的无缝集成,为Spring项目使用消息处理框架提供了极大的便利。

与Spring framework相比,Spring Boot更近了一步,通过auto-configuration机制实现了对jms及amqp主流框架如ActiveMQ、RabbitMQ以及Kafka的自动配置,应用层开发过程中无需自己配置,只要classpath下加入了相应的消息处理框架包,Spring Boot会自动完成加载,程序员直接就可以使用,简直不能再方便了。

JMS

The Java Message Service (JMS) API is a messaging standard that allows application components based on the Java Platform Enterprise Edition (Java EE) to create, send, receive, and read messages. It enables distributed communication that is loosely coupled, reliable, and asynchronous.

JMS是java message service的简称,是一个基于java的消息处理规范,允许基于JAVA EE平台的应用组件创建、发送、接收、读取消息。使得分布式通讯耦合度更低、更加可靠、异步处理。

JMS提供两种编程模式:

Point-to-Point—Messages are sent to a single consumer using a JMS queue.
Publish and Subscribe—Messages are broadcast to all registered listeners through JMS topics.

点对点通讯:消息通过队列发送给单一的消费者。
发布订阅模式:消息通过主题以广播的形式发送给所有的订阅者。

JMS规范规定了5种不同类型的消息体:
在这里插入图片描述

  1. StreamMessage:流式消息,顺序读取。
  2. MapMessage:键值对消息,可顺序读取,也可以通过键随机读取。
  3. TextMessage:文本消息,当初制定规范时候认为xml会成为最主流的消息载体,通过TextMessage可以发送xml格式的文本数据。
  4. ObjectMessage:对象消息,java对象序列化后发送。
  5. BytesMessage:字节消息。

JMS API对消息的发送、接收、存储等操作做了约定,每一个JMS消息框架的提供者(实现者)都必须遵守这些约定。

ActiveMQ

ActiveMQ是Apache旗下的、基于JMS的一款开源消息处理中间件,官网介绍:

Apache ActiveMQ® is the most popular open source, multi-protocol, Java-based message broker. It supports industry standard protocols so users get the benefits of client choices across a broad range of languages and platforms. Connect from clients written in JavaScript, C, C++, Python, .Net, and more. Integrate your multi-platform applications using the ubiquitous AMQP protocol. Exchange messages between your web applications using STOMP over websockets. Manage your IoT devices using MQTT. Support your existing JMS infrastructure and beyond. ActiveMQ offers the power and flexibility to support any messaging use-case.

最流行的开源、多协议、基于JAVA的消息处理中间件。支持工业级协议所以用户可以从多语言、跨平台的客户端选择中受益。等等…

目前ActiveMQ有两个主流版本:
在这里插入图片描述

There are currently two “flavors” of ActiveMQ available - the well-known “classic” broker and the “next generation” broker code-named Artemis. Once Artemis reaches a sufficient level of feature parity with the “Classic” code-base it will become the next major version of ActiveMQ. Initial migration documentation is available as well as a development roadmap for Artemis.

等到代表“下一代”的Artemis成熟之后,就会替代“classic”成为ActiveMQ的主版本。

ActiveMQ的下载安装

官网找一个合适的版本下载安装即可,非常简单。

安装后提供了一个管理端口:
在这里插入图片描述
可以通过管理端口做测试,管理端口是8161,而默认的MQ服务端口是61616。

SpringBoot项目自动配置ActiveMQ

首先初步了解一下SpringBoot对ActiveMQ的集成情况,轻车熟路的,检查一下auto-configuration:

在这里插入图片描述
找到这个JmsAutoConfiguration类:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ Message.class, JmsTemplate.class })
@ConditionalOnBean(ConnectionFactory.class)
@EnableConfigurationProperties(JmsProperties.class)
@Import(JmsAnnotationDrivenConfiguration.class)
public class JmsAutoConfiguration {

很明显,他就是SpringBoot的自动配置类,如果classpath下存在Message.class, JmsTemplate.class 类、以及Spring容器中存在ConnectionFactory Bean的话就会被启用。检查一下代码发现他会自动装配JmsTemplate、JmsMessagingTemplate等对象到Spring IoC容器中。

SpringBoot项目引入ActiveMQ

POM文件引入:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

以上starter包含了spring-jms以及activemq的相关配置,所以通过以上对auto-configuration的分析,JmsTemplate以及JmsMessagingTemplate等相关组件会被SpringBoot自动配置好,后面我们就可以直接拿来使用了。

ps:JmsTemplate组件的自动配置过程源码也比较复杂,今天暂时不涉及。

到此,其实我们并没做什么具体的工作,但是ActiveMQ已经准备好了,我们项目中就可以直接使用JmsTemplate收发消息了。

消息发送

在application.yml文件中加入activeMQ的配置:

spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin

消息发送类中通过自动装配引入JmsTemplate:

    @Autowired
    private JmsTemplate jmsTemplate;

消息发送方法:

    public void sendMessage(String message){

        jmsTemplate.send("active.myqueue", new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                // 也可以创建对象 session.createObjectMessage()
                TextMessage textMessage = session.createTextMessage();
                textMessage.setText(message);
                return textMessage;
            }
        });
    }

Controller中加入对sendMessage方法的调用,以便测试:

    @GetMapping ("/sendmessage/{msg}")
    public String sendMessage(@PathVariable String msg){
        userService.sendMessage(msg);
        return "hello";
    }

消息接收

同样,消息接收方法中通过自动装配引入JmsTemplate。

消息接收方法:

    public String revieveMessage(){
        String message = (String)jmsTemplate.receiveAndConvert("active.myqueue");
        log.info("revieved message:"+message);
        return message;
    }

Controller方法中加入接收方法以便测试:

    @GetMapping ("/recievemsg")
    public String recieveMessage(){
        return userService.revieveMessage();
    }

这样,activeMQ的“点对点模式”收发消息的代码准备完毕,非常简单。

验证

首先启动activeMQ,启动之后在ActiveMQ的管理端监控队列(截图之前我已经测试过一次消息收发了,如果没测试过的话,队列是空的,不会存在active.myqueue这个队列):
在这里插入图片描述
启动我们刚才创建的测试应用,测试发送消息:
在这里插入图片描述
发送消息 1230230,收到反馈“hello”,说明消息发送成功。

我们从ActiveMQ管理端查看队列:
在这里插入图片描述
Number of pending Message 队列中尚未消费的消息数量为1,说明我们刚才的消息已经成功发送到队列中了。

消息接收测试:
在这里插入图片描述
成功接收到消息1230230。

再次从ActiveMQ管理端验证:
在这里插入图片描述
队列中的pending message数量变为0,入队数量和出队数量都为1,说明刚才的消息已经被成功消费。

此时,队列中尚未消费的数量为0的情况下,如果再次执行消息消费方法(recievemsg方法),消费方法会阻塞等待,直到再次调用消息发送方法发送一条新消息到队列、消费方法获取到新消息后结束阻塞等待。

监听器方式接收消息

发布订阅模式与点对点模式的区别主要是在消息接收端,Spring提供了接收消息的注解@JmsListener。

@JmsListener需要与@Component家族的注解结合使用,UserService中编写两个listener监听active.listenqueue::


@Service
@Slf4j
public class UserService {
    @Autowired
    private JmsTemplate jmsTemplate;

   @JmsListener(destination = "active.listenqueue")
    public void revieveMsgListener(String content){
        log.info("revieveMsgListener:"+ content);
    }

    @JmsListener(destination = "active.listenqueue")
    public void revieveMsgListenerA(String content){
        log.info("revieveMsgListenerA"+content);
    }

UserService编写一个向该ActiveMQ发送消息的方法:

 public void sendMessage(String message){
        jmsTemplate.send("active.listenqueue", new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                // 也可以创建对象 session.createObjectMessage()
                TextMessage textMessage = session.createTextMessage();
                textMessage.setText(message);
                return textMessage;
            }
        });
    }

Controller中调用发送方法:

    @GetMapping ("/sendmessage/{msg}")
    public String sendMessage(@PathVariable String msg){
        userService.sendMessage(msg);
        return "hello";
    }

}

启动activeMQ,启动项目之后,查看activemq的admin端:
在这里插入图片描述

active.listenqueue以及2个监听器已经注册到ActiveMQ中,发送消息:
在这里插入图片描述

检查应用已经接收到了消息:
在这里插入图片描述
但是只有一个监听器接收到了消息,反复发送消息,后台log发现两个监听器轮番收到消息、但是一条消息不能被两个监听器同时接收到:
在这里插入图片描述

发布订阅模式

发布订阅模式下,消息发送给topic,订阅者仅订阅感兴趣的topic内的消息,消息消费完成后并不会从topic中消失,多个消费者可以从同一个topic内消费消息,所以,一条消息允许被多次消费。

默认情况下,SpringBoot集成ActiveMQ采用的是点对点队列模式,application.yml文件配置 spring:jms:pub-sub-domain参数为true开启topic模式:

spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
  jms:
    pub-sub-domain: true

启动activeMQ,启动项目,topic下看到项目中启动的两个topic:
在这里插入图片描述
调用sendmessage方法发送消息:
在这里插入图片描述
检查ActiveMQ状态,可以发现1条消息成功发送到队列中,被2个消费者分别消费了一次、消息共被消费了2次:
在这里插入图片描述
检查log:
在这里插入图片描述
两个监听方法都接收到了消息。

OK,let’s say it’s a day!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/351995.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

《WebKit技术内幕》学习之十五(3): Web前端之未来

3 Web应用和Web运行环境 3.1 Web应用 HTML5提供了强大的能力&#xff0c;而不是支持Web网页这么简单。就目前而言&#xff0c;它已经初步提供了支持Web网页向Web应用方向发展的能力。相对于本地应用&#xff08;Native Application&#xff09;&#xff0c;Web前端领域也能够…

NIO-Selector详解

NIO-Selector详解 Selector概述 Selector选择器&#xff0c;也可以称为多路复⽤器。它是Java NIO的核⼼组件之⼀&#xff0c;⽤于检查⼀个或多个Channel的状态是否处于可读、可写、可连接、可接收等。通过⼀个Selector选择器管理多个Channel&#xff0c;可以实现⼀个线程管理…

深入浅出AI落地应用分析:AI视频生成Top 5应用

接下俩会每周集中体验一些通用或者垂直的AI落地应用&#xff0c;主要以一些全球或者国外国内排行较前的产品为研究对象&#xff0c;「AI 产品榜&#xff1a; aicpb.com」以专题的方式在博客进行分享。 一、Loom 二、Runway 产品链接&#xff1a;https://app.runwayml.com/ …

企业职能部门员工忙闲不均,如何调动积极性?

案例企业背景&#xff1a; 某企业隶属于中国航天科技集团公司&#xff0c;致力于光纤陀螺系统、微机电惯性系统、光纤传感系统等高新技术产品的研发。公司具有雄厚的新型惯导和光电传感技术基础&#xff0c;多年来开创了我国光纤陀螺技术在武器、卫星和载人飞船等多个任务上的…

代码随想录Day32 | 122.买卖股票的最佳时机II 55. 跳跃游戏 45.跳跃游戏II

代码随想录Day32 | 122.买卖股票的最佳时机II 55. 跳跃游戏 45.跳跃游戏II 122.买卖股票的最佳时机II55.跳跃游戏45.跳跃游戏II 122.买卖股票的最佳时机II 文档讲解&#xff1a;代码随想录 视频讲解&#xff1a; 贪心算法也能解决股票问题&#xff01;LeetCode&#xff1a;122.…

【笔记】顺利通过EMC试验(16-41)-视频笔记

目录 视频链接 P1:电子设备中有哪些主要骚扰源 P2:怎样减小DC模块的骚扰 P3:PCB上的辐射源究竟在哪里 P4:怎样控制PCB板的电磁辐射 P5:多层线路板是解决电磁兼容问题的简单方法 P6:怎样处理地线上的裂缝 P7:怎样降低时钟信号的辐射 P8:为什么IO接口的处理特别重要 P9…

ARIMA模型:Python实现

ARIMA模型&#xff1a;Python实现 自回归移动平均模型&#xff08;ARIMA&#xff09;是一种经典的时间序列分析和预测方法。前期已介绍了ARIMA的概念和公式&#xff0c;本文将介绍ARIMA模型的理论基础&#xff0c;并提供详细的Python代码实现&#xff0c;帮助读者了解如何应用…

VS生成报错:MSB8036 The Windows SDK version 8.1 was not found.找不到 Windows SDK 版本 8.1

目录 一、查看本机SDK二、 解决法一&#xff1a;适配本电脑的SDK法二&#xff1a;下载SDK 8.1 VS生成报错&#xff1a;MSB8036 找不到 Windows SDK 版本 8.1。请安装所需版本的 Windows SDK&#xff0c;或者在项目属性页中或通过右键单击解决方案并选择“重定解决方案目标”来更…

用ChatGPT写申请文书写进常春藤联盟?

一年前&#xff0c;ChatGPT 的发布引发了教育工作者的恐慌。现在&#xff0c;各大学正值大学申请季&#xff0c;担心学生会利用人工智能工具伪造入学论文。但是&#xff0c;聊天机器人创作的论文足以骗过大学招生顾问吗&#xff1f; ChatGPT简介 ChatGPT&#xff0c;全称聊天生…

C++ 之LeetCode刷题记录(二十)

&#x1f604;&#x1f60a;&#x1f606;&#x1f603;&#x1f604;&#x1f60a;&#x1f606;&#x1f603; 开始cpp刷题之旅。 依旧是追求耗时0s的一天。 110. 平衡二叉树 给定一个二叉树&#xff0c;判断它是否是高度平衡的二叉树。 本题中&#xff0c;一棵高度平衡二…

构建外卖跑腿系统:技术实现与架构设计

在当今数字化时代&#xff0c;外卖跑腿系统已成为人们生活中不可或缺的一部分。本文将探讨如何利用先进的技术和架构设计&#xff0c;开发一个高效、可靠的外卖跑腿系统。 1. 技术选型 在开发外卖跑腿系统之前&#xff0c;我们需要仔细选择适合的技术栈&#xff0c;以确保系…

[C++13]:stack queue priority_queue 模拟实现

stack && queue && priority_queue 模拟实现 一.stack1.概念&#xff1a;2.使用&#xff1a;3.模拟实现&#xff1a;一些题目&#xff1a;1.最小栈&#xff1a;2.栈的压入弹出序列&#xff1a;3.逆波兰表达式求值&#xff1a; 二.queue1.概念&#xff1a;2.使用…

SpringBoot之时间数据前端显示格式化

背景 在实际我们通常需要在前端显示对数据操作的时间或者最近的更新时间&#xff0c;如果我们只是简单的使用 LocalDateTime.now()来传入数据不进行任何处理那么我们就会得到非常难看的数据 解决方式&#xff1a; 1). 方式一 在属性上加上注解&#xff0c;对日期进行格式…

Web3 游戏开发者的数据分析指南

作者&#xff1a;lesleyfootprint.network 在竞争激烈的 Web3 游戏行业中&#xff0c;成功不仅仅取决于游戏的发布&#xff0c;还需要在游戏运营过程中有高度的敏锐性&#xff0c;以应对下一次牛市的来临。 人们对 2024 年的游戏行业充满信心。A16Z GAMES 和 GAMES FUND ONE …

探索IOC和DI:解密Spring框架中的依赖注入魔法

IOC与DI的详细解析 IOC详解1 bean的声明2 组件扫描 DI详解 IOC详解 1 bean的声明 IOC控制反转&#xff0c;就是将对象的控制权交给Spring的IOC容器&#xff0c;由IOC容器创建及管理对象。IOC容器创建的对象称为bean对象。 要把某个对象交给IOC容器管理&#xff0c;需要在类上…

深度学习知识

context阶段和generation阶段的不同 context阶段&#xff08;又称 Encoder&#xff09;主要对输入编码&#xff0c;产生 CacheKV(CacheKV 实际上记录的是 Transformer 中 Attention 模块中 Key 和 Value 的值&#xff09;&#xff0c;在计算完 logits 之后会接一个Sampling 采…

【MySQL进阶】InnoDB引擎存储结构和架构

文章目录 逻辑存储结构架构内存结构Buffer Pool&Adaptive Hash IndexChange BufferLog Buffer 磁盘结构 逻辑存储结构 表空间&#xff08;Tablespaces&#xff09;&#xff1a;InnoDB使用表空间来管理存储表和索引的数据文件。每个表空间包含一个或多个数据文件&#xff0c…

【学网攻】 第(9)节 -- 路由器使用以及原理

系列文章目录 目录 系列文章目录 文章目录 前言 一、路由器是什么&#xff1f; 二、实验 1.引入 总结 文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交换机认识及使用【学网攻】 第(3)节 -- 交换机配置聚合端口【学网攻】 第(4)节 -- 交换机划分Vlan…

解锁一些SQL注入的姿势

昨天课堂上布置了要去看一些sql注入的案例&#xff0c;以下是我的心得&#xff1a; ​​​​​​​ ​​​​​​​ ​​​​​​​ 1.新方法 打了sqli的前十关&#xff0c;我发现一般都是联合查询&#xff0c;但是有没有不是联合查询的方法呢&#xf…

Python基础学习 -05 基本类型

Python3 基本数据类型 Python 中的变量不需要声明。每个变量在使用前都必须赋值&#xff0c;变量赋值以后该变量才会被创建。 在 Python 中&#xff0c;变量就是变量&#xff0c;它没有类型&#xff0c;我们所说的"类型"是变量所指的内存中对象的类型。 等号&…