三、Maven项目搭建及Destination(队列、主题)

Maven项目搭建及Destination(队列、主题)

  • 一、Idea中Maven项目准备
    • 1.创建Module
    • 2.创建java包
    • 3.配置pom.xml
  • 二、队列(Queue)
    • 1.JMS编程架构
    • 2.代码实现生产者
    • 3.代码实现消费者
    • 4.队列消费者三大情况
  • 三、消费者类型
    • 1.同步式消费者
      • 1.1 一直阻塞
      • 1.2 超时阻塞
    • 2.异步监听消费者
  • 三、主题(Topic)
    • 1.发布主题生产者
    • 2.订阅主题消费者
  • 四、Queue和Topic对比

一、Idea中Maven项目准备

1.创建Module

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2.创建java包

在这里插入图片描述
在这里插入图片描述

3.配置pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.qingsi.activemq</groupId>
    <artifactId>activemq_test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <!-- activemq所需要的jar包配置 -->
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.11</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>4.15</version>
        </dependency>
        <!--  下面是junit/logback等基础配置  -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

</project>

二、队列(Queue)

1.JMS编程架构

在这里插入图片描述

2.代码实现生产者

package com.qingsi.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProduce {

    public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂, 采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3.创建会话session
        // 两个参数,第一个叫事务,第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4.创建目的地(是队列还是主题)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5.创建消息的生产者
        MessageProducer producer = session.createProducer(queue);
        // 6.使用生产者生成3条消息发送到MQ队列
        for (int i = 1; i <= 3; i++) {
            // 7.创建消息
            TextMessage textMessage = session.createTextMessage("msg--" + i);// 最简单的字符串
            // 8.通过producer发送给mq
            producer.send(textMessage);
        }
        // 9.关闭资源
        producer.close();
        session.close();
        connection.close();

        System.out.println("MQ消息发布完成");
    }

}

  • 运行后,登录ActiveMQ后台查看
    在这里插入图片描述
  • 参数详解:
    • Number Of Pending Messages=等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
    • Number Of Consumers=消费者数量,消费者端的消费者数量。
    • Messages Enqueued=进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
    • Messages Dequeued=出队消息数,可以理解为是消费者消费掉的数量。
  • 总结:
    • 当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
    • 当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
    • 当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。

3.代码实现消费者

package com.qingsi.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂, 采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3.创建会话session
        // 两个参数,第一个叫事务,第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4.创建目的地(是队列还是主题)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5.创建消息的消费者
        MessageConsumer consumer = session.createConsumer(queue);
        while (true){
            TextMessage textMessage = (TextMessage)consumer.receive();
            if(null != textMessage){
                System.out.println("消费者收到消息:" + textMessage.getText());
            }else {
                break;
            }
        }
        consumer.close();
        session.close();
        connection.close();
    }
}

在这里插入图片描述

  • 现象:
    • 还有1个消费者在连接
    • 消息入队3个
    • 消息出队3个

4.队列消费者三大情况

  • 先生产消息,只启动1号消费者,1号消费者可以消费消息。
  • 先生产消息,先启动1号消费者再启动2号消费者,2号消费者不能消费到消息,都被1号消费者消费了。
  • 先启动2个消费者,再生产6条消息。
    • 分析:这6条消息会被发送到同一个队列中。由于队列是点对点的消息传递模型,每条消息只能被一个消费者接收和处理。因此,在这种情况下,两个消费者会以轮询的方式交替接收这6条消息。也就是说,消费者1会接收3条消息,而消费者2会接收另外3条消息。

三、消费者类型

  • 队列和主题都有这种类型

1.同步式消费者

  • 同步消费者是一种阻塞式的消费方式,在接收到消息之后,消费者会暂停执行并等待消息的处理完成,然后再继续执行后续代码。同步消费者使用 receive() 方法来接收消息,它会一直阻塞直到接收到消息或超时。
  • 一直组合和固定时间阻塞:区别在于receive方法,是否传入时间

1.1 一直阻塞

  • 如果一直没有消息,那么会一直阻塞在receive方法
package com.qingsi.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂, 采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3.创建会话session
        // 两个参数,第一个叫事务,第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4.创建目的地(是队列还是主题)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5.创建消息的消费者
        MessageConsumer consumer = session.createConsumer(queue);
        while (true){
            TextMessage textMessage = (TextMessage)consumer.receive();
            if(null != textMessage){
                System.out.println("消费者收到消息:" + textMessage.getText());
            }else {
                break;
            }
        }
        consumer.close();
        session.close();
        connection.close();
    }
}

1.2 超时阻塞

  • 如果一直没有消息,等待超过设定的时候,那么就结束receive方法的阻塞
package com.qingsi.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂, 采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3.创建会话session
        // 两个参数,第一个叫事务,第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4.创建目的地(是队列还是主题)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5.创建消息的消费者
        MessageConsumer consumer = session.createConsumer(queue);
        while (true){
            TextMessage textMessage = (TextMessage)consumer.receive(4000L);
            if(null != textMessage){
                System.out.println("消费者收到消息:" + textMessage.getText());
            }else {
                break;
            }
        }
        consumer.close();
        session.close();
        connection.close();
    }
}

2.异步监听消费者

  • 异步消费者是一种非阻塞式的消费方式,它通过注册一个消息监听器(Message Listener)来接收消息,并在消息到达时异步地触发监听器进行消息处理。异步消费者不会暂停执行,而是继续执行后续代码。
package com.qingsi.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException, IOException {
        // 1.创建连接工厂, 采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3.创建会话session
        // 两个参数,第一个叫事务,第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4.创建目的地(是队列还是主题)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5.创建消息的消费者
        MessageConsumer consumer = session.createConsumer(queue);
        // 6.通过监听的方式来消费消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者消费消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        // 保证控制台不关掉
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }
}

三、主题(Topic)

  • 注意:先启动订阅者再启动生产者,不然发送的消息是废消息

1.发布主题生产者

package com.qingsi.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsTopicProduce {

    public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂, 采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3.创建会话session
        // 两个参数,第一个叫事务,第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4.创建目的地(是队列还是主题)
        Topic topic = session.createTopic(TOPIC_NAME);
        // 5.创建消息的生产者
        MessageProducer producer = session.createProducer(topic);
        // 6.使用生产者生成3条消息发送到MQ主题
        for (int i = 1; i <= 3; i++) {
            // 7.创建消息
            TextMessage textMessage = session.createTextMessage("msg-topic--" + i);// 最简单的字符串
            // 8.通过producer发送给mq
            producer.send(textMessage);
        }
        // 9.关闭资源
        producer.close();
        session.close();
        connection.close();

        System.out.println("MQ消息发布到topic完成");
    }

}

2.订阅主题消费者

package com.qingsi.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

public class JmsTopicConsumer {
    public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws JMSException, IOException {
        System.out.println("我是1号消费者");
        // 1.创建连接工厂, 采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3.创建会话session
        // 两个参数,第一个叫事务,第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4.创建目的地(是队列还是主题)
        Topic topic = session.createTopic(TOPIC_NAME);
        // 5.创建消息的消费者
        MessageConsumer consumer = session.createConsumer(topic);
        // 6.通过监听的方式来消费消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("我是1号消费者消费消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        // 保证控制台不关掉
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }
}

在这里插入图片描述

  • 结果:运行了2个消费者,入队3条消息,每个消费者都要消费一次,那么总共消费了6条

四、Queue和Topic对比

比较的项目Topic队列模式Queue队列模式
工作模式"订阅-发布"模式,如果当前没有订阅者,消息将会被丢弃,如果有多个订阅者,那么这些订阅者都会收到消息"负载均衡"模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送给其中一个消费者,并且要求消费者ack信息
有无状态无状态Queue数据默认会在mq服务器上已文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面,也可以配置成DB存储
传递完整性如果没有订阅者,消息会被丢弃消息不会被丢弃
处理效率由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/390945.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【MATLAB】鲸鱼算法优化混合核极限学习机(WOA-HKELM)回归预测算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 鲸鱼算法优化混合核极限学习机&#xff08;WOA-HKELM&#xff09;回归预测算法是一种结合鲸鱼优化算法和混合核极限学习机的混合算法。其原理主要包含以下几个步骤&#xff1a; 初始化&am…

VMware Tools安装教程(适用windows虚拟机)

一、资源 VMware-tools安装包已绑定在资源中 二、步骤 1、点击已经开启的虚拟机中的此图标&#xff0c;点击设置 2、将镜像文件选中&#xff0c;点击确定 3、之后会自动进入安装过程&#xff0c;点击下一步 4、选择典型安装&#xff0c;下一步直到完成&#xff0c;完成后重启…

Swift Combine 合并多个管道以更新 UI 元素 从入门到精通十七

Combine 系列 Swift Combine 从入门到精通一Swift Combine 发布者订阅者操作者 从入门到精通二Swift Combine 管道 从入门到精通三Swift Combine 发布者publisher的生命周期 从入门到精通四Swift Combine 操作符operations和Subjects发布者的生命周期 从入门到精通五Swift Com…

【Deep Learning 3】CNN卷积神经网络

&#x1f31e;欢迎来到机器学习的世界 &#x1f308;博客主页&#xff1a;卿云阁 &#x1f48c;欢迎关注&#x1f389;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; &#x1f31f;本文由卿云阁原创&#xff01; &#x1f4c6;首发时间&#xff1a;&#x1f339;2024年2月17日&…

Netty中的内置通信模式、Bootstrap和ChannelInitializer

内置通信传输模式 NIO:io.netty.channel.socket.nio 使用java.nio.channels包作为基础–基于选择器的方式Epoll:io.netty.channel.epoll由JNI驱动的epoll()和非阻塞IO.这个传输支持只有在Linux上可用的多种特性&#xff0c;如果SO_REUSEPORT&#xff0c;比NIO传输更快&#xf…

【深度学习】Pytorch 系列教程(三):PyTorch数据结构:2、张量的数学运算(1):向量运算(加减乘除、数乘、内积、外积、范数、广播机制)

文章目录 一、前言二、实验环境三、PyTorch数据结构0、分类1、Tensor&#xff08;张量&#xff09;1. 维度&#xff08;Dimensions&#xff09;2. 数据类型&#xff08;Data Types&#xff09;3. GPU加速&#xff08;GPU Acceleration&#xff09; 2、张量的数学运算1. 向量运算…

数字孪生与智慧城市:共筑未来城市的科技基石

一、引言 随着科技的飞速发展&#xff0c;数字孪生与智慧城市已成为未来城市建设的两大关键技术。数字孪生为城市提供了一个虚拟的数字镜像&#xff0c;使我们能全面、深入地了解城市的运行状态。而智慧城市则借助先进的信息通信技术&#xff0c;提升城市的智能化水平&#xf…

算法刷题:复写零

复写零 .习题链接题目描述算法原理初始值步骤1步骤2我的答案: . 习题链接 复写零 题目描述 给你一个长度固定的整数数组 arr &#xff0c;请你将该数组中出现的每个零都复写一遍&#xff0c;并将其余的元素向右平移。 注意&#xff1a;请不要在超过该数组长度的位置写入元素…

【OpenAI Sora】开启未来:视频生成模型作为终极世界模拟器的突破之旅

这份技术报告主要关注两个方面&#xff1a;&#xff08;1&#xff09;我们的方法将各种类型的视觉数据转化为统一的表示形式&#xff0c;从而实现了大规模生成模型的训练&#xff1b;&#xff08;2&#xff09;对Sora的能力和局限性进行了定性评估。报告中不包含模型和实现细节…

CCF编程能力等级认证GESP—C++6级—20231209

CCF编程能力等级认证GESP—C6级—20231209 单选题&#xff08;每题 2 分&#xff0c;共 30 分&#xff09;判断题&#xff08;每题 2 分&#xff0c;共 20 分&#xff09;编程题 (每题 25 分&#xff0c;共 50 分)闯关游戏工作沟通 答案及解析单选题判断题编程题1编程题2 单选题…

二叉树入门算法题详解

二叉树入门题目详解 首先知道二叉树是什么&#xff1a; 代码随想录 (programmercarl.com) 了解后知道其实二叉树就是特殊的链表&#xff0c;只是每个根节点节点都与两个子节点相连而其实图也是特殊的链表&#xff0c;是很多节点互相连接&#xff1b;这样说只是便于理解和定义…

安卓TextView 拖动命名

需求&#xff1a;该布局文件使用线性布局来排列三个文本视图和一个按钮&#xff0c;分别用于显示两个动物名称以及占位文本视图。在占位文本视图中&#xff0c;我们为其设置了背景和居中显示样式&#xff0c;并用其作为接收拖放操作的目标 效果图&#xff1b; 实现代码 第一布…

大数据02-数据仓库

零、文章目录 大数据02-数据仓库 1、数据仓库介绍 &#xff08;1&#xff09;基本概念 数据仓库&#xff0c;英文名称为Data Warehouse&#xff0c;可简写为DW或DWH。数据仓库的目的是构建面向分析的集成化数据环境&#xff0c;为企业提供决策支持&#xff08;Decision Sup…

牛客网SQL进阶123:高难度试卷的得分的截断平均值

官网链接&#xff1a; SQL类别高难度试卷得分的截断平均值_牛客题霸_牛客网牛客的运营同学想要查看大家在SQL类别中高难度试卷的得分情况。 请你帮她从exam_。题目来自【牛客题霸】https://www.nowcoder.com/practice/a690f76a718242fd80757115d305be45?tpId240&tqId2180…

[Android]Frida-hook环境配置

准备阶段 反编译工具:Jadx能够理解Java语言能编写小型的JavaScript代码连接工具:adb设备:Root的安卓机器&#xff0c;或者模拟器 Frida&#xff08;https://frida.re/&#xff09; 就像是你计算机或移动设备的妙妙工具。它帮助你查看其他程序或应用内部发生的事情&#xff0…

OpenAI发布文生视频大模型Sora

关注卢松松&#xff0c;会经常给你分享一些我的经验和观点。 一觉醒来发现自己快失业了&#xff0c;Open AI又放大招了。没有任何消息&#xff0c;没有任何预热&#xff0c;直接王炸。 OpenAI突然发布文生视频大模型Sora&#xff0c;生成一段长达1分钟的高清流畅视频。它能模…

机器学习8-决策树

决策树&#xff08;Decision Tree&#xff09;是一种强大且灵活的机器学习算法&#xff0c;可用于分类和回归问题。它通过从数据中学习一系列规则来建立模型&#xff0c;这些规则对输入数据进行递归的分割&#xff0c;直到达到某个终止条件。 决策树的构建过程&#xff1a; 1.…

selenium定位元素报错:‘WebDriver‘ object has no attribute ‘find_element_by_id‘

Selenium更新到 4.x版本后&#xff0c;以前的一些常用的代码的语法发生了改变 from selenium import webdriver browser webdriver.Chrome() browser.get(https://www.baidu.com) input browser.find_element_by_id(By.ID,kw) input.send_keys(Python)目标&#xff1a;希望通…

Python静态方法和类方法的区别和应用

实际上&#xff0c;Python 完全支持定义类方法&#xff0c;甚至支持定义静态方法。Python 的类方法和静态方法很相似&#xff0c;它们都推荐使用类来调用&#xff08;其实也可使用对象来调用&#xff09;。 类方法和静态方法的区别在于&#xff0c;Python会自动绑定类方法的第…

第三节:基于 InternLM 和 LangChain 搭建你的知识库(课程笔记)

视频链接&#xff1a;https://www.bilibili.com/video/BV1sT4y1p71V/?vd_source3bbd0d74033e31cbca9ee35e111ed3d1 文档地址&#xff1a; https://github.com/InternLM/tutorial/tree/main/langchain 课程笔记&#xff1a; 1.仅仅包含训练时间点之前的数据&#xff0c;无法…