目录
ActiveMQ简介
ActiveMQ安装
原生JMS API操作ActiveMQ
SpringBoot与ActiveMQ整合
ActiveMQ消息组成与高级特性
ActiveMQ企业面试经典问题总结
ActiveMQ简介
消息中间件应用场景
异步处理应用解耦流量削锋
异步处理
场景说明:用户注册,需要执行三个业务逻辑,分别为写入用户表,发注册邮件以及注册短信。
串行方式
将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
并行方式
将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
异步处理
引入消息中间件,将部分的业务逻辑,进行异步处理。改造后的架构如下:
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是
50
毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50
毫秒。因此架构改变后,系统的吞吐量提高啦,比串行提高了
3
倍,比并行提高了两倍。
应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图
传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合。如何解决以上问题呢?引入应用消息队列后的方案,如下图:
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功库存系统:订阅下单的消息,采用拉/
推的方式,获取下单信息,库存系统根据下单信息,进行库存操作假 如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。
流量消峰
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。通过加入消息队列完成如下功能:
a
、可以控制活动的人数
b
、可以缓解短时间内高流量压垮应用
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理
常见的消息中间件产品对比
ActiveMQ
简介及
JMS
什么是
ActiveMQ
?
官网:
http://activemq.apache.org/
ActiveMQ
是
Apache
出品,最流行的,能力强劲的开源消息总线。
ActiveMQ
是一个完全支持
JMS1.1
和J2EE 1.4规范的
JMS Provider
实现。我们在本次课程中介绍
ActiveMQ的使用。
什么是
JMS
?
JMS
消息模型
消息中间件一般有两种传递模式:点对点模式
(P2P)
和发布
-
订阅模式
(Pub/Sub)
。
(1) P2P (Point to Point)
点对点模型(
Queue
队列模型)
(2) Publish/Subscribe(Pub/Sub)
发布
/
订阅模型
(Topic
主题模型
)
点对点模型
点对点模型(
Pointer-to-Pointer
):即生产者和消费者之间的消息往来。
每个消息都被发送到特定的消息队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
点对点模型的特点:
- 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中);
- 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列;
- 接收者在成功接收消息之后需向队列应答成功
发布
/
订阅模型
发布
/
订阅(
Publish-Subscribe
)
包含三个角色:主题(
Topic
),发布者(
Publisher
),订阅者(
Subscriber
),多个发布者将消息发送到topic
,系统将这些消息投递到订阅此
topic
的订阅者
发布者发送到
topic
的消息,只有订阅了
topic
的订阅者才会收到消息。
topic
实现了发布和订阅,当你发布一个消息,所有订阅这个topic
的服务都能得到这个消息,所以从
1
到
N
个订阅者都能得到这个消息的拷贝。
发布
/
订阅模型的特点:
每个消息可以有多个消费者;
发布者和订阅者之间有时间上的依赖性(先订阅主题,再来发送消息)。
订阅者必须保持运行的状态,才能接受发布者发布的消息;
JMS
编程
API
(
1
)
ConnectionFactory
创建
Connection
对象的工厂,针对两种不同的
jms
消息模型,分别有
QueueConnectionFactory
和TopicConnectionFactory两种。
(
2
)
Destination
Destination
的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination
是某个队列(
Queue
)或某个主题(
Topic
)
;
对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以,
Destination
实际上就是两种类型的对象:Queue、
Topic
(
3
)
Connection
Connection
表示在客户端和
JMS
系统之间建立的链接(对
TCP/IP socket
的包装)。
Connection
可以产生一个或多个Session
(
4
)
Session
Session
是我们对消息进行操作的接口,可以通过
session
创建生产者、消费者、消息等。
Session
提供了事务的功能,如果需要使用session
发送
/
接收多个消息时,可以将这些发送
/
接收动作放到一个事务 中。
(
5
)
Producter
Producter
(消息生产者):消息生产者由
Session
创建,并用于将消息发送到
Destination
。同样,消息生产者分两种类型:QueueSender
和
TopicPublisher
。可以调用消息生产者的方法(
send
或
publish方法)发送消息。
(
6
)
Consumer
Consumer
(消息消费者):消息消费者由
Session
创建,用于接收被发送到
Destination
的消息。两种 类型:QueueReceiver
和
TopicSubscriber
。可分别通过
session
的
createReceiver(Queue)
或 createSubscriber(Topic)来创建。当然,也可以
session
的
creatDurableSubscriber
方法来创建持久化的订阅者。
(
7
)
MessageListener
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的
onMessage
方法。
EJB
中的MDB(
Message-Driven Bean
)就是一种
MessageListener
。
ActiveMQ安装
第一步:安装 jdk (略)第二步:把 activemq 的压缩包( apache-activemq-5.14.5-bin.tar.gz )上传到 linux 系统第三步:解压缩压缩包 tar -zxvf apache-activemq-5.14.5-bin.tar.gz第四步:进入 apache-activemq-5.14.5 的 bin 目录 cd apache-activemq-5.14.5/bin第五步:启动 activemq ./activemq start (执行 2 次:第一次:生成配置信息;第二次:启动 )第六步:停止 activemq : ./activemq stop
访问
http://192.168.12.132:8161
页面控制台:
http
:
//ip:8161 (
监控
)
请求地址:
tcp
:
//ip:61616
(
java
代码访问消息中间件)
账号:
admin
密码:
admin
图
1
:登陆:
图
2
:点击
Queues
队列或者
Topics
主题消息
列表各列信息含义如下:
Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。Number Of Consumers :消费者 这个是消费者端的消费者数量Messages Enqueued :进入队列的消息 进入队列的总数量 , 包括出队列的。Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。
原生JMS API操作ActiveMQ
PTP
模式
(
生产者
)
(
1
)引入坐标
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>
</dependencies>
(
2
)编写生产消息的测试类
QueueProducer
步骤:
1. 创建连接工厂2. 创建连接3. 打开连接4. 创建 session5. 创建目标地址( Queue: 点对点消息, Topic :发布订阅消息)6. 创建消息生产者7. 创建消息8. 发送消息9. 释放资源
package com.itheima.producer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 演示点对点模式 -- 消息生产者
*/
public class PTP_Producer {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactoryfactory=newActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.打开连接
connection.start();
//4.创建session
/**
* 参数一:是否开启事务操作
* 参数二:消息确认机制
*/
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
Queue queue = session.createQueue("queue01");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.创建消息
//createTextMessage: 文本类型
TextMessage textMessage = session.createTextMessage("test message");
//8.发送消息
producer.send(textMessage);
System.out.println("消息发送完成");
//9.释放资源
session.close();
connection.close();
}
}
观察发送消息的结果:
PTP
模式
(
消费者
)
步骤:
1. 创建连接工厂2. 创建连接3. 打开连接4. 创建 session5. 指定目标地址6. 创建消息的消费者7. 配置消息监听器
package com.itheima.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 演示点对点模式- 消息消费者(第二种方案) -- 更加推荐
*/
public class PTP_Consumer2 {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory
= new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.打开连接
connection.start();
//4.创建session
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.指定目标地址
Queue queue = session.createQueue("queue01");
//6.创建消息的消费者
MessageConsumer consumer = session.createConsumer(queue);
//7.设置消息监听器来接收消息
consumer.setMessageListener(new MessageListener() {
//处理消息
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收的消息(2):"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//注意:在监听器的模式下千万不要关闭连接,一旦关闭,消息无法接收
}
}
观察消费消息的结果:
Pub/Sub
模式
(
生成者
)
1. 创建连接工厂2. 创建连接3. 打开连接4. 创建 session5. 创建目标地址( Queue: 点对点消息, Topic :发布订阅消息)6. 创建消息生产者7. 创建消息8. 发送消息9. 释放资源
package cn.itcast.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 主题消息,消息的发送方
*/
public class TopicProducer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.打开连接
connection.start();
//4.创建session
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
Topic topic = session.createTopic("sms");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
//7.创建消息
TextMessage message = session.createTextMessage("发短信...");
//8.发送消息
producer.send(message);
System.out.println("发送消息:发短信...");
session.close();;
connection.close();
}
}
查看主题消息:
Pub/Sub
模式
(
消费者
)
1. 创建连接工厂2. 创建连接3. 打开连接4. 创建 session5 指定目标地址6. 创建消息的消费者7. 配置消息监听器
package cn.itcast.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 主题消息,消息的消费方
*/
public class TopicConsumer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.打开连接
connection.start();
//4.创建session
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
Topic topic = session.createTopic("sms");
//6.创建消息的消费者
MessageConsumer consumer = session.createConsumer(topic);
//7.配置消息监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
SpringBoot与ActiveMQ整合
生产者
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置:
server:
port: 9001 #端口
spring:
application:
name: activemq-producer # 服务名称
# springboot与activemq整合配置
activemq:
broker-url: tcp://192.168.66.133:61616 # 连接地址
user: admin # activemq用户名
password: admin # activemq密码
# 指定发送模式 (点对点 false , 发布订阅 true)
jms:
pub-sub-domain: false
public class SpringBootProducer {
//JmsMessagingTemplate: 用于工具类发送消息
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Test
public void ptpSender(){
/**
* 参数一:队列的名称或主题名称
* 参数二:消息内容
*/
jmsMessagingTemplate.convertAndSend("springboot_queue","spring boot
message");
}
}
消费者
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置:
server:
port: 9002 #端口
spring:
application:
name: activemq-consumer # 服务名称
# springboot与activemq整合配置
activemq:
broker-url: tcp://192.168.66.133:61616 # 连接地址
user: admin # activemq用户名
password: admin # activemq密码
# 指定发送模式 (点对点 false , 发布订阅 true)
jms:
pub-sub-domain: false
activemq:
name: springboot_queue
/**
* 用于监听消息类(既可以用于队列的监听,也可以用于主题监听)
*/
@Component // 放入IOC容器
public class MsgListener {
/**
* 用于接收消息的方法
* destination: 队列的名称或主题的名称
*/
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
ActiveMQ消息组成与高级特性
JMS消息组成详解
JMS
消息组成格式
整个
JMS
协议组成结构如下:
JMS Message
消息由三部分组成:
1 )消息头2 )消息体3 )消息属性
JMS
消息头
JMS
消息头预定义了若干字段用于客户端与
JMS
提供者之间识别和发送消息,预编译头如下:
红色
为重要的消息头
名称
|
描述
|
JMSDestination
|
消息发送的
Destination
,在发送过程中由提供者设置
|
JMSMessageID
|
唯一标识提供者发送的每一条消息。这个字段是在发送过程中由提供者设置的,客户机只能在消息发送后才能确定消息的 JMSMessageID
|
JMSDeliveryMode
|
消息持久化。包含值
DeliveryMode.PERSISTENT
或者
DeliveryMode.NON_PERSISTENT
。
|
JMSTimestamp
|
提供者发送消息的时间,由提供者在发送过程中设置
|
JMSExpiration
|
消息失效的时间,毫秒,值
0
表明消息不会过期,默认值为
0
|
JMSPriority
| 消息的优先级,由提供者在发送过程中设置。优先级 0 的优先级最低,优先级 9 的优先级最高。0-4为普通消息,5-9为加急消息。ActiveMQ不保证优先级高就一定先发送,只保证了加急消息必须先于普通消息发送。默认值为4 |
JMSCorrelationID
|
通常用来链接响应消息与请求消息,由发送消息的
JMS
程序设置。
|
JMSReplyTo
|
请求程序用它来指出回复消息应发送的地方,由发送消息的
JMS
程序设置
|
JMSType
|
JMS
程序用它来指出消息的类型。
|
JMSRedelivered
|
消息的重发标志,
false
,代表该消息是第一次发生,
true
,代表该消息为重发消息
|
不过需要注意的是,在传送消息时,消息头的值由
JMS
提供者来设置,
因此开发者使用以上
setJMSXXX()
方法分配的值就被忽略了
,只有以下几个值是可以由开发者设置的:
JMSCorrelationID
,
JMSReplyTo
,
JMSType
JMS
消息体
在消息体中,
JMS API
定义了五种类型的消息格式,让我们可以以不同的形式发送和接受消息,并提供了对已有消息格式的兼容。不同的消息类型如下:
JMS
定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· TextMessage-- 一个字符串对象 *· MapMessage-- 一套名称 - 值对· ObjectMessage-- 一个序列化的 Java 对象 *· BytesMessage-- 一个字节的数据流 *· StreamMessage -- Java 原始值的数据流
TextMessage:
写出:
/**
* 发送TextMessage消息
*/
@Test
public void testMessage(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage("文本消息");
return textMessage;
}
});
}
读取:
/**
* 接收TextMessage的方法
*/
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
MapMessage:
发送:
/**
* 发送MapMessage消息
*/
@Test
public void mapMessage(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","张三");
mapMessage.setInt("age",20);
return mapMessage;
}
});
}
接收:
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
System.out.println("名称:"+mapMessage.getString("name"));
System.out.println("年龄:"+mapMessage.getString("age"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
ObjectMessage:
发送:
//发送ObjectMessage消息
@Test
public void test2(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
User user = new User();
user.setName("小苍");
user.setAge(18);
ObjectMessage objectMessage = session.createObjectMessage(user);
return objectMessage;
}
});
}
接收:
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof ObjectMessage){
ObjectMessage objectMessage = (ObjectMessage)message;
try {
User user = (User)objectMessage.getObject();
System.out.println(user.getUsername());
System.out.println(user.getPassword());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
注意:
ActiveMQ5.12
后 ,为了安全考虑,
ActiveMQ
默认不接受自定义的序列化对象,需要将自定义的加入到受信任的列表。
spring:
activemq:
broker-url: tcp://192.168.66.133:61616
user: admin
password: admin
packages:
trust-all: true # 添加所有包到信任列表
BytesMessage:
写出:
//发送BytesMessage消息
@Test
public void test3(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
BytesMessage bytesMessage = session.createBytesMessage();
try {
File file = new File("d:/spring.jpg");
FileInputStream in = new FileInputStream(file);
byte[] bytes = new byte[(int)file.length()];
in.read(bytes);
bytesMessage.writeBytes(bytes);
} catch (Exception e) {
e.printStackTrace();
}
return bytesMessage;
}
});
}
读取:
@JmsListener(destination="${activemq.name}")
public void receiveMessage(Message message) throws Exception {
BytesMessage bytesMessage = (BytesMessage)message;
FileOutputStream out = new FileOutputStream("d:/abc.jpg");
byte[] buf = new byte[(int)bytesMessage.getBodyLength()];
bytesMessage.readBytes(buf);
out.write(buf);
out.close();
}
StreamMessage:
写出:
//发送StreamMessage消息
@Test
public void test4(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("你好,ActiveMQ");
streamMessage.writeInt(20);
return streamMessage;
}
});
}
读取:
@JmsListener(destination="${activemq.name}")
public void receiveMessage(Message message) throws Exception {
StreamMessage streamMessage = (StreamMessage)message;
String str = streamMessage.readString();
int i = streamMessage.readInt();
System.out.println(str);
System.out.println(i);
}
JMS
消息属性
我们可以给消息设置自定义属性,这些属性主要是提供给应用程序的。对于实现消息过滤功能,消息属性非常有用JMS API定义了一些标准属性,这些属性在所有JMS实现中都是可用的。这些标准属性包括:
- JMSMessageID:消息的唯一标识符
- JMSDestination:消息的目标目的地
- JMSDeliveryMode:消息的传递模式(持久化或非持久化)
- JMSPriority:消息的优先级
- JMSExpiration:消息的过期时间
- JMSTimestamp:消息的创建时间戳
- JMSCorrelationID:用于关联消息的唯一标识符
- JMSReplyTo:回复消息应该发送到的目的地
- JMSRedelivered:表示消息是否被重新传递的布尔值
- JMSType:消息的类型
message . setStringProperty ( "Property" , Property ); // 自定义属性
消息持久化
消息持久化是保证消息不丢失的重要方式!!! ActiveMQ提供了以下三种的消息存储方式:
- Memory 消息存储-基于内存的消息存储。
- 基于日志消息存储方式,KahaDB是ActiveMQ的默认日志存储方式,它提供了容量的提升和恢复能力。
- 基于JDBC的消息存储方式-数据存储于数据库(例如:MySQL)中。
ActiveMQ
持久化机制流程图: