Maven项目搭建及Destination(队列、主题)
- 一、Idea中Maven项目准备
- 1.创建Module
- 2.创建java包
- 3.配置pom.xml
- 二、队列(Queue)
- 1.JMS编程架构
- 2.代码实现生产者
- 3.代码实现消费者
- 4.队列消费者三大情况
- 三、消费者类型
- 1.同步式消费者
- 1.1 一直阻塞
- 1.2 超时阻塞
- 2.异步监听消费者
- 三、主题(Topic)
- 1.发布主题生产者
- 2.订阅主题消费者
- 四、Queue和Topic对比
一、Idea中Maven项目准备
1.创建Module
2.创建java包
3.配置pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qingsi.activemq</groupId>
<artifactId>activemq_test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<!-- activemq所需要的jar包配置 -->
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.15</version>
</dependency>
<!-- 下面是junit/logback等基础配置 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
</project>
二、队列(Queue)
1.JMS编程架构
2.代码实现生产者
package com.qingsi.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂, 采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话session
// 两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(是队列还是主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 5.创建消息的生产者
MessageProducer producer = session.createProducer(queue);
// 6.使用生产者生成3条消息发送到MQ队列
for (int i = 1; i <= 3; i++) {
// 7.创建消息
TextMessage textMessage = session.createTextMessage("msg--" + i);// 最简单的字符串
// 8.通过producer发送给mq
producer.send(textMessage);
}
// 9.关闭资源
producer.close();
session.close();
connection.close();
System.out.println("MQ消息发布完成");
}
}
- 运行后,登录ActiveMQ后台查看
- 参数详解:
- Number Of Pending Messages=等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
- Number Of Consumers=消费者数量,消费者端的消费者数量。
- Messages Enqueued=进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
- Messages Dequeued=出队消息数,可以理解为是消费者消费掉的数量。
- 总结:
- 当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
- 当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
- 当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。
3.代码实现消费者
package com.qingsi.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂, 采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话session
// 两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(是队列还是主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 5.创建消息的消费者
MessageConsumer consumer = session.createConsumer(queue);
while (true){
TextMessage textMessage = (TextMessage)consumer.receive();
if(null != textMessage){
System.out.println("消费者收到消息:" + textMessage.getText());
}else {
break;
}
}
consumer.close();
session.close();
connection.close();
}
}
- 现象:
- 还有1个消费者在连接
- 消息入队3个
- 消息出队3个
4.队列消费者三大情况
- 先生产消息,只启动1号消费者,1号消费者可以消费消息。
- 先生产消息,先启动1号消费者再启动2号消费者,2号消费者不能消费到消息,都被1号消费者消费了。
- 先启动2个消费者,再生产6条消息。
分析:这6条消息会被发送到同一个队列中。由于队列是点对点的消息传递模型,每条消息只能被一个消费者接收和处理。因此,在这种情况下,两个消费者会以轮询的方式交替接收这6条消息。也就是说,消费者1会接收3条消息,而消费者2会接收另外3条消息。
三、消费者类型
- 队列和主题都有这种类型
1.同步式消费者
- 同步消费者是一种阻塞式的消费方式,在接收到消息之后,消费者会暂停执行并等待消息的处理完成,然后再继续执行后续代码。同步消费者使用 receive() 方法来接收消息,它会一直阻塞直到接收到消息或超时。
- 一直组合和固定时间阻塞:区别在于receive方法,是否传入时间
1.1 一直阻塞
- 如果一直没有消息,那么会一直阻塞在receive方法
package com.qingsi.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂, 采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话session
// 两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(是队列还是主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 5.创建消息的消费者
MessageConsumer consumer = session.createConsumer(queue);
while (true){
TextMessage textMessage = (TextMessage)consumer.receive();
if(null != textMessage){
System.out.println("消费者收到消息:" + textMessage.getText());
}else {
break;
}
}
consumer.close();
session.close();
connection.close();
}
}
1.2 超时阻塞
- 如果一直没有消息,等待超过设定的时候,那么就结束receive方法的阻塞
package com.qingsi.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂, 采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话session
// 两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(是队列还是主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 5.创建消息的消费者
MessageConsumer consumer = session.createConsumer(queue);
while (true){
TextMessage textMessage = (TextMessage)consumer.receive(4000L);
if(null != textMessage){
System.out.println("消费者收到消息:" + textMessage.getText());
}else {
break;
}
}
consumer.close();
session.close();
connection.close();
}
}
2.异步监听消费者
- 异步消费者是一种非阻塞式的消费方式,它通过注册一个消息监听器(Message Listener)来接收消息,并在消息到达时异步地触发监听器进行消息处理。异步消费者不会暂停执行,而是继续执行后续代码。
package com.qingsi.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
// 1.创建连接工厂, 采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话session
// 两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(是队列还是主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 5.创建消息的消费者
MessageConsumer consumer = session.createConsumer(queue);
// 6.通过监听的方式来消费消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者消费消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 保证控制台不关掉
System.in.read();
consumer.close();
session.close();
connection.close();
}
}
三、主题(Topic)
- 注意:
先启动订阅者再启动生产者,不然发送的消息是废消息
1.发布主题生产者
package com.qingsi.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsTopicProduce {
public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂, 采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话session
// 两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(是队列还是主题)
Topic topic = session.createTopic(TOPIC_NAME);
// 5.创建消息的生产者
MessageProducer producer = session.createProducer(topic);
// 6.使用生产者生成3条消息发送到MQ主题
for (int i = 1; i <= 3; i++) {
// 7.创建消息
TextMessage textMessage = session.createTextMessage("msg-topic--" + i);// 最简单的字符串
// 8.通过producer发送给mq
producer.send(textMessage);
}
// 9.关闭资源
producer.close();
session.close();
connection.close();
System.out.println("MQ消息发布到topic完成");
}
}
2.订阅主题消费者
package com.qingsi.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsTopicConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException, IOException {
System.out.println("我是1号消费者");
// 1.创建连接工厂, 采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话session
// 两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(是队列还是主题)
Topic topic = session.createTopic(TOPIC_NAME);
// 5.创建消息的消费者
MessageConsumer consumer = session.createConsumer(topic);
// 6.通过监听的方式来消费消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("我是1号消费者消费消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 保证控制台不关掉
System.in.read();
consumer.close();
session.close();
connection.close();
}
}
- 结果:运行了2个消费者,入队3条消息,每个消费者都要消费一次,那么总共消费了6条
四、Queue和Topic对比
比较的项目 | Topic队列模式 | Queue队列模式 |
---|---|---|
工作模式 | "订阅-发布"模式,如果当前没有订阅者,消息将会被丢弃,如果有多个订阅者,那么这些订阅者都会收到消息 | "负载均衡"模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送给其中一个消费者,并且要求消费者ack信息 |
有无状态 | 无状态 | Queue数据默认会在mq服务器上已文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面,也可以配置成DB存储 |
传递完整性 | 如果没有订阅者,消息会被丢弃 | 消息不会被丢弃 |
处理效率 | 由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 | 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的 |