activemq消息中间件

ActiveMQ消息中间件详解

下载地址:https://activemq.apache.org/activemq-5015009-release

1、MQ的产品种类

1.1、消息中间件的特性/共同特性/共同维度

  • Kafka(大数据专用、由java/scala编写
    • API发送和接收
    • MQ的高可用性
    • MQ的集群和容错配置
    • MQ持久化
      • radis 特性:持久化
    • 延时发送/定时投递
    • 签收机制
    • Spring整合
  • RabbitMQ(erlang编写
    • API发送和接收
    • MQ的高可用性
    • MQ的集群和容错配置
    • MQ持久化
      • radis 特性:持久化
    • 延时发送/定时投递
    • 签收机制
    • Spring整合
  • RocketMQ(java编写
    • API发送和接收
    • MQ的高可用性
    • MQ的集群和容错配置
    • MQ持久化
      • radis 特性:持久化
    • 延时发送/定时投递
    • 签收机制
    • Spring整合
  • ActiveMQ
    • API发送和接收
    • MQ的高可用性
    • MQ的集群和容错配置
    • MQ持久化
      • radis 特性:持久化
    • 延时发送/定时投递
    • 签收机制
    • Spring整合

1.2 入门场景使用概述

订单秒杀系统下单之后存在在业务的流程

(读取订单、库存检查、库存冻结、余额检查、余额冻结、订单生成、余额扣减、库存扣减、生成流水、余额冻结、库存解冻)

RPC接口基本是同步调用,整体的服务性能遵循“木桶理论”,即整体系统的耗时取决于最慢的那个接口。比如A调用B/C/D都是50 ms,但是B调用B1花费的时间为2000 ms,那么将会拖累整个系统的服务性能。

image-20230531220137860

注:在设计系统时,明确达到的目标

  1. 要做到系统解耦,当新的模块接进来时,要做到代码的改动最小;能够解耦
  2. 设置流量缓冲池,可以让后端按照系统自身的吞吐能力进行消费,不被冲垮;能够削峰
  3. 强弱依赖梳理能将非常关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步

消息中间件的作用:解耦、削峰、异步;

定义:发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题中,在合适的时候,消息服务器会将消息转发给消息接收者。在这个过程中,发送和接收都是异步的,也就是发送无需等待,而且发送者和接收者的生命周期也没有必然的关系;尤其在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接收者。

image-20230531220905761

队列(queue):相当于发短信,一对一。

主题(topic):相当于朋友圈,需要订阅,公众号;一对多。

特点

  • 异步处理
  • 应用系统之间解耦
  • 削峰

image-20230531222008307

ActiveMQ的解压安装

  • 在官网进行下载(下载地址:https://activemq.apache.org/activemq-5015009-release)

image-20230603215545727

注:建议下载linux版本,消息队列大多数情况都是在集群的环境下进行部署,而集群的使用大多数是使用linux

  • 上传linux压缩包并进行解压(注:外来文件放在/opt下

image-20230603220051707

  • 进行启动(在bin目录下)
./activemq start #启动
./activemq stop #关闭
./activemq restart #重启
  • 查看启动是否成功activemq的默认进程编号为61616,查看进程是否被占用
netstat -anp|grep 61616
lsof -i 61616
ps -ef|grep activemq
#可以访问http://localhost:8161 可现可视化界面
#默认的用户名/密码: admin/admin

注:页面访问,点击进行登录

  • 携带日志启动
./activemq start > 路径/run_activemq.log

image-20230603221649898

1.3、JMS编码总体架构

image-20230604165513141

2、代码编写

  • IDEA创建新的maven工程,并引进相应版本的依赖。可在你下载ActiveMQ的版本网页查找对应版本的maven依赖

image-20230604171146556

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.15.9</version>
</dependency>

2.1、队列(Queues)实现

注:ActiveMQ的访问地址的方式为tcp的方式

进行生产者进行消息发送

//生产者
public static  void queueDemo() throws JMSException {
        String path="tcp://192.168.160.128:61616/";
        String name ="queue01";

        //创建连接工厂,采用默认的用户名和密码
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(path);
        //创建连接并启动
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //创建会话session,第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Queue queue = session.createQueue(name);
        //创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        //进行消息发送
        for (int i =0;i<3;i++){
            //创建消息
            String message = "这是第"+i+"条消息";
            //创建消息
            Message textMessage = session.createTextMessage(message);
            //消息发送
            producer.send(textMessage);
        }
        producer.close();
        session.close();
        connection.close();
        System.err.println("消息发送完成");

    }

//消费者
public static void queueConsumersDemo() throws JMSException {
        String path = "tcp://192.168.160.128:61616";
        String name ="queue01";
        //创建连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(path);
        //创建链接
        Connection connection = connectionFactory.createConnection();
        //启动
        connection.start();
        //创建session
        Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);
        //创建消费者
        Queue queue = session.createQueue(name);
        //创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        while (true){
            //可以通过recive设置等待时间,当超时时,消费者自动关闭
            //Message receive = consumer.receive(4000L);
            Message receive = consumer.receive();
            if (receive != null){
                System.err.println(receive);
            }
        }
    }

注:tcp对应的进程的端口为61616,注意连接的地址

image-20230604174451423

image-20230604180026455

队列监听setMessageListener

 public static void setListener() throws JMSException, IOException {
        ActiveMQConnectionFactory MQ = new ActiveMQConnectionFactory(path);
        Connection conn = MQ.createConnection();
        conn.start();

        Session session = conn.createSession(false, AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(name);
        MessageConsumer consumer = session.createConsumer(queue);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message!=null ){
                    try {
//                        TextMessage receive = (TextMessage) consumer.receive();
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("==>>>"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        //防止后台关闭,对MQ进行监听,当有消息的时候进行输出
        System.in.read();
        consumer.close();
        session.close();
        conn.close();
    }

在当多个消费者进行等待,之后生产者进行消息的产生,每个消费者对消息进行平均分配,类似于负载均衡。

例:当两个消费者在进行等待时,生产者产生6条消息,则两个消费者对者6条消息进行平均分配,没人三条。

2.2、主题(topic)代码实现

特点:

  1. 生产者将消息发送到topic中,每个消息可以有多个消费者,属于1:N的关系
  2. 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息
  3. 生产者生产时,topic不保存消息它是无状态不能落地,例如无人订阅就去生产,那是一条废消息,所以,一般先启动消费者在启动生产者

JMS规范允许客户创建持久订阅,还在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,类似于微信公众号的订阅

//主题生产者
    public static void topicProduce() throws JMSException {
        ActiveMQConnectionFactory mq = new ActiveMQConnectionFactory(path);
        Connection connection = mq.createConnection();
        connection.start();
        Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);

        MessageProducer producer = session.createProducer(topic);

        for (int i =0; i <3; i++){

            String tpc = "这是第一条消息";
            TextMessage textMessage = session.createTextMessage(tpc);
            producer.send(textMessage);
        }
        producer.close();
        session.close();
        connection.close();
        System.err.println("主题发送成功====");

    }
//创建消费者
public static void topicConsumer() throws JMSException, IOException {
        ActiveMQConnectionFactory mq = new ActiveMQConnectionFactory(path);
        Connection connection = mq.createConnection();
        connection.start();
        Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.err.println("============="+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }

注:主题的启动顺序,先启动消费者,在启动生产者。消费者每个都会得到一份生产者发出的所有的信息。

image-20230615220347023

3、浅谈JMS与JavaEE

3.1、什么是JavaEE

JavaEE是一套使用Java进行企业级应用开发的大家一直遵循的13个核心规范工业标准。JavaEE平台提供了一个基于组件的方法来加快设计、开发、装配即部署企业应用程序。

image-20230615221826581

3.2、什么时JMS(Java消息服务)

Java消息服务是指的两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序的开发,在JavaEE中,当两个应用程序使用JMS进行通信时,他们之间并不是直接相连,而是通过一个共同的消息收发服务组件关联起来以达到解耦、异步、削峰的效果

image-20230615222420549

3.3、MQ中间件的其他落地产品

image-20230615222605081

特性ActiveMQRabbitMQRocketMQKafka
单机吞吐量万级,比 RocketMQ、Kafka 低一个数量级同 ActiveMQ10 万级,支撑高吞吐10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
opic 数量对吞吐量的影响topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topictopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性ms 级微秒级,这是 RabbitMQ 的一大特点,延迟最低ms 级延迟在 ms 级以内
可用性高,基于主从架构实现高可用同 ActiveMQ非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到 0 丢失同 RocketMQ
功能支持MQ 领域的功能极其完备基于 erlang 开发,并发能力很强,性能极好,延时很低MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
其他Apache软件基金会开发、起步较早,但没有经过大量吞吐场景验证,目前社区不是很活跃开源,稳定,社区活跃度高阿里出品,目前已交给Apache,但社区活跃度较低Apache软件基金会开发、开源、高通吐量,社区活跃度高

3.4、JMS组成结构和特点

  1. JMS provider 实现JMS接口和规范的消息中间件,也就是我们的MQ服务器
  2. JMS producer 消息的生产者 创建和发送消息的客户端应用
  3. JMS consumer 消息消费者 接受和处理JMS消息的终端
  4. JMS message 产生的消息信息体
    • 消息头
    • 消息属性
    • 消息体 封装具体的消息数据,5中消息体格式,发送和接收的消息体类型必须一一对应

注:发送什么类型的消息,就得接收什么类型的消息,要一一对应

消息持久模式与非持久模式

  • 持久性,应该被传送“一次仅仅一次”,这意味着如果JMS提供者出现故障,该消息不会丢失。他会在服务器恢复之后再次传递(注:主要没有被发送,便会一直在消息服务器中存储)
  • 非持久,最多会传送一次,只要服务器出现故障,消息就会被丢失

消息头属性

  1. JMSDestination 消息发动的目的地
  2. JMSDeliveryMode 消息是否持久
  3. JMSExpiration 消息的过期时间(默认时永不过期)可设置消息在一定时间之后会过期。消息的过期时间Destination的send方法中的timeToLive值加上发送时刻的GMT时间值如果timeToLive的值为0,表示消息永不过期,如果发送后,在消息过期时间之后消息还没有被发送到消息的目的地,则该消息被清除。
  4. JMSPriority 消息优先级,从0-9十个级别,0-4是普通消息,5-9是加急消息。JMS不要求MQ严格按照者是个优先级发送消息,但必须保重加急消息要先于普通消息到达默认级别是4
  5. JMSMessageId 消息ID,消息的唯一识别方式。

消息体的五种属性

  1. TextMessage 字符串类型
  2. MapMessage Map类型
  3. BytesMessage 字节类型(二进制数组消息)
  4. StreamMessage 流类型
  5. ObjectMessage 对象类型

消息属性

如果需要除消息头字段以外的值,那么可以使用消息属性,识别/去重/重点标注等操作非常有用的方法。他们是以属性名和**属性值对(K:V)**的形式指定的,可以将属性是为消息头的扩展,属性指定一些消息头没有包括的附加消息,比如可以在属性里指定消息选择器。

消息的属性就像可以分配给一条消息的附加消息头一样,他们可以允许开发者添加有关消息的不透明附加消息,他们还用于暴漏消息选择器在消息过滤是使用的数据。

例:

TextMessage message = session.createTextMessage();
message.setText(text);
message.setStringProperty("username","ABC")// 进行消息自定义

3.5、消息持久性

3.5.1、持久的队列(Queue)

消息队列中,默认的消息为消息持久化

采用持久性消息,当ActiveMQ宕机时,未消费的消息的数量保持不变,有利于保持消息而不被丢失。

队列的默认传送模式,此模式保证这些消息被成功的发送一次和成功的使用一次。对于这些消息可靠性是优先考虑的因素。

可靠性另一个重要的方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

3.5.2、持久主题(topic)

  • 先启动订阅,在启动生产
	//持久化主题,消息消费者
    public static void lastingTopicConsumer() throws JMSException, IOException {
        ActiveMQConnectionFactory mq = new ActiveMQConnectionFactory(path);
        Connection connection = mq.createConnection();
       connection.setClientID(name);


        Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");

        connection.start();

        //主题的订阅者
        Message receive = topicSubscriber.receive();
        while (receive!=null){

            TextMessage textMessage =(TextMessage) receive;
            System.out.println("====="+textMessage.getText());
            receive = topicSubscriber.receive(5000);
        }


        System.in.read();
        session.close();
        connection.close();
    }
  1. 一定要先运行一次消费者,类似于订阅这个主题
  2. 然后在运行生产者发送信息
  3. 无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收到的消息接收下来

4、ActiveMQ的broker

定义:相当于一个ActiveMQ服务器实例

Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,

在用的时候再去启动这样节省了资源,也保证了可靠性。

方式:

用ActiveMQ Broker作为独立的消息服务器来构建java应用

ActiveMQ也支持在VM中通信基于嵌入式的broker,能够无缝的集成其他java应用

所需mvn依赖

		//ActiveMQ核心依赖
		<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>
            
         // 整合spring所需要的依赖  
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
            
          //进行json数据格式转换,在进行Java内嵌activqMQ时,需要引进,否则会报错
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.5</version>
        </dependency>

代码实现

//相当于在本机上边开启了一个ActiveMQ的服务
public static void main(String[] args) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(true);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }

image-20230705214051813

5、spring整合ActiveMQ

application.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--开启包的自动扫描-->
    <context:component-scan base-package="com.atguigu.activemq"/>
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp:192.168.160.128"/>
            </bean>
        </property>
        <property name="maxConnections" value="100"/>
    </bean>
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="spring-active-queue"/>
    </bean>
    <bean id="jsmTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="defaultDestination" ref="destinationQueue"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
</beans>

生产者

package com.atguigu.activemq;

import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

@Service
public class Producer {
    @Autowired
    private JmsTemplate  jmsTemplate;

    public static void main(String[] args) {
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
        Producer producer =(Producer) applicationContext.getBean("Producer");
        producer.jmsTemplate.send(new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage("********当前消息");
                return textMessage;
            }
        });
    }
}

消费者

package com.atguigu.activemq;

import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;


@Service
public class Customer {
    @Autowired
    private JmsTemplate jmsTemplate;

    public static void main(String[] args) {
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
        Customer customer =(Customer) applicationContext.getBean("Customer");
        String receiveAndConvert =(String) customer.jmsTemplate.receiveAndConvert();
        System.out.println(receiveAndConvert);
    }
}

mvn依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>ActiveMQDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <!--activemq需要的jav包-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.9</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-orm</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.6.8</version>
        </dependency>
        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib</artifactId>
            <version>2.1_2</version>
        </dependency>
        <!--下面是junit/log4等通用配置-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

</project>

6、springBoot整合ActiveMQ

7、ActiveMQ的传输协议

  • ActiveMQ支持的client-broker通讯协议有:TCP、NIO、 UDP、SSL、Https(Http)、VM 。其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的标签之内。

image-20230715134903343

注:在ActiveMQ中默认的使用TCP协议,也默认支持多种的传输协议

设置支持NIO网络协议

<transportConnector name="nio" uri="nio://0.0.0.0:61618"/>

设置进行多协议支持,在5.13之后在支持auto多协议同时支持

<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61617?auto.protocols=default,stomp"/>

image-20230715142346935

8、ActiveMQ的消息存储和持久化

为了避免以为宕机之后数据的丢失,需要做到重启之后可以恢复消息队列,消息系统一般都会采用持久化机制。ActiveMQ的消息持久化机制又JDBC、AMQ、KahaDB(默认使用)、LevelDB,无论使用哪一种持久化的机制,消息存储的逻辑都是一致的。

消息存储机制:

就是在发送者在发送出去之后,消息中心首先将消息存储到本地的数据文件、内存数据库或者远程数据库等在试图将消息在发送给接收者,成功则将消息从存储中进行删除,失败则继续尝试发送。消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要将消息发送出去。

KahaDB:在5.3版本之后建议推荐使用KahaDB存储方式,在5.4版本之后,默认使用kahaDB存储方式

image-20230715145324601

默认的文件的存储位置在activeMQ下的data文件中

image-20230715145453094

8.1、KahaDB存储机制的原理

  • 在KahaDB在消息目录进行存储时。只有4类文件和一个lock。以下四个文件还有一个db.free 四类文件一把锁

  • KahaDB会将消息存储在db-.log文件之中,当一个文件已满时(默认的一个文件的大小为32MB),会自动创建一个新的文件进行相关的存储。例:db-1.log、db-2.log …。当不会再有引用到数据文件中的任何数据消息时,文件会被删除或者时归档。
  • db.data该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,他是消息的索引文件,本质是B-Tree(B树),使用B-Tree作为索引执行db-.log文件中存储的消息
  • db.free当前db.data文件那些页面是空闲的,文件具体内通过是所有空闲页面的ID
  • db.radio是用来进行消息恢复的,当KahaDB消息存储被强制退出后启动,用于恢复BTree索引
  • lock为文件的读取进行添加锁机制,防止数据出现混乱。

8.2、JDBC消息存储(一部分消息会被存储到数据库中)

注:对于长时间的存储,建议使用JDBC的存储方式

  • 将数据库驱动jar包放到MQ的lib文件夹下
  • 做JDBC吃持久化的配置,对文件进行修改适配

image-20230715153921537

<persistenceAdapter> 
  <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>   
</persistenceAdapter>

dataSource指定将要引用的持久化数据库的bean名称;
createTablesOnStartup 是否在启动的时候创建数据表,默认值是true;
这样每次启动都会去创建数据表,一般是第一次启动的时候设置为true 之后改成false;

  • 数据库连接池的配置
relaxAutoCommit 表示进行自动提交

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> 
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> 
    <property name="username" value="activemq"/> 
    <property name="password" value="activemq"/> 
    <property name="poolPreparedStatements" value="true"/> 
</bean> 

注:注意配置信息将要添加的位置,否则可能会报错:

**image-20230715155441934

  • 建仓SQL和建表说明
  1. 创建对应名称的数据库
  2. 创建表,默认表名:
    1. ACTIVEMQ_MSGS
    2. ACTIVEMQ_ACKS
    3. ACTIVEMQ_LOCK
  3. 如果新建数据库OK+上述的配置OK+代码运行OK,3表会自动生成。

ACTIVEMQ_MSGS表字段:

ID:自增数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者的主键
MSG_SEQ:是发下哦那个消息的顺序,MSGID_PROD+MSG_SEQ可以足证JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01
MSG:消息本体的Java序列对象的二进制数据
PRIORITY:优先级,0-9,数值越大优先级越高

ACTIVEMQ_ACKS表字段:用于存储订阅关系,如果是持久化topic,订阅者和服务器的订阅关系在这个表里面进行保存;

CONTAINER:消息的介绍
SUB_DEST:如果使用的是Static集群,这个字段会有集群其他系统的信息
CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,可以选择之消费满足条件的信息,条件可以用自定义的属性进行实现,支持多属性ANDOR
LAST_ACKED_ID:记录消费过的消息的ID

可能出现的错误

在这里插入图片描述

解决方法:

在保证activeMQ在使用JDBC消息持久化时,所需要的jar包,以及数据库密码,用户、地址都没有错误的情况下,造成报错的原因可能是数据库远程连接的权限没有被放开,导致数据库在进行远程连接时,连接失败。这种情况下,需要放开数据库远程连接的权限。

  • 在本地通过连接数据库的工具或者cmd命令框进入到数据库都可以,在这我采用cmd
mysql -u root -p

image-20230716162207402

  • 查看mysql中存在的库,并使用mysql库
show databases;
use mysql;

image-20230716162533190

  • 查看对应表
select User,Host from user;

image-20230716162455449

注:如果权限没有被放开的情况下,root所对应的Host为localhost,这是我们需要将其改为%

update set Host='%' where User='root';
  • 进行配置刷新
flush privileges;

注:在修改完成之后一定要进行配置刷新,否则相关修改不起作用

  • 在服务器重启activeMQ,进行日志查看

image-20230716162852413

  • 启动成功,activeMQ会在你对应的数据库中创建相关的表

image-20230716162949701

注:在使用消息持久化存储机制时,一定要将activeMQ设置为持久化。否则不会将信息存储到数据库中

点对点类型
在DeliveryMode设置为NODE_PERSISTENCE(非持久化)时,消息保存在内存中

在DeliveryMode设置为PERSISTENCE(持久化),消息保存在broker的相应的文件或者数据库中

而且点对点类型中消息一旦被消费,消息就会在存储的位置进行删除操作。

在使用Topic时,在消息持久化,消息在被消费的时候,消息不会被删除。

8.3、开发中遇到的问题

如果是queue

在没有消费者消费的情况下会将信息保存在activemq_msgs表中,只要有任意消费者已经消费过了,消费之后这些消息将被删除。

如果时topic

一般是先启动消费订阅然后在生产的情况下,会将消息保存到activemq_acks表中

数据库jar包

记得需要使用到的相关的jar文件,放置到lib目录下,mysql-jdbc驱动的jar包和对应的数据库连接池的jar包。默认是dbcp

createTableOnStartup属性
在jdbcPersistenceAdapter标签中设置了createTableOnStartup属性为true时,在第一次启动ActiveMQ时,ActiveMQ服务节点会自动在数据库中创建相关的数据库表,启动完成后可以去掉这个属性,或者是将属性值修改为false。其属性值默认为true,建议在不用的时候将属性值,修改为false

下划线的问题

“java.lang.lllegalStateExcepton:BeanFactory not initialized or already closed”

产生报错的原因是因为您的操作系统的机器名中存在“_”符号,请修改机器名并重启可以解决相关问题。

8.4、高性能缓存(ActiveMQ Journal)

在activemq.xml中进行配置

<persistenceFactory>
    <journalPersistenceAdapterFactory journalLogFiles="4"
                                      journalLogFileSize=""
                                      useJournal="true"
                                      useQuickJournal="true"
                                      dataSource="#mysql-ds"
                                      dataDirectory="activemq-data"/>
</persistenceFactory>    

配置完成之后,对activeMQ进行重启操作

9、ActiveMQ的高级特性

9.1、ActiveMQ异步传输以及确认发送成功

注:ActiveMQ默认的消息发送方式为异步传输,但是建议在进行代码编写时,再次将消息传输的方式设置为异步
注:当消息设置为不采用事务,但是却将消息设置为持久化时,MQ会默认将消息的传输 方式设置为同步消息传输。这样的传输方式,只有当消息发送出去之后,只有被接受返回成功之后,才会进行下一个消息的处理,容易造成堵塞。

注:在进行异步消息传输时,MQ允许存在极少量的数据丢失,也会存在极少量数据丢失的情况。

例:

cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

((ActiveMQConnection)connection).setUseAsyncSend(true);

注:在使用异步消息队列是,注意要采用消息的回调,来确认消息是否发送成功

 public static  void queueDemo() throws JMSException {
//        String path="tcp://192.168.160.128:61616/";
        String name ="queue01";

        //创建连接工厂,采用默认的用户名和密码
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
        //创建连接并启动
        Connection connection = connectionFactory.createConnection();

        connection.start();
        //创建会话session,第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Queue queue = session.createQueue(name);
        //创建消息生产者
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);

        //进行消息发送
        for (int i =0;i<3;i++){
            //创建消息
            String message = "这是第"+i+"条消息";
            //创建消息
            Message textMessage = session.createTextMessage(message);
            textMessage.setJMSMessageID(UUID.randomUUID().toString().replaceAll("-","")); //设置消息的id
            //消息发送
            producer.send(textMessage, new AsyncCallback() {
                @Override
                public void onSuccess() {
                    //进行消息的回调
                    System.out.println("发送成功的消息"+textMessage.toString());
                }

                @Override
                public void onException(JMSException e) {
                     //进行消息的回调
                    System.out.println("发送失败的消息"+textMessage.toString());
                }
            });
        }
        producer.close();
        session.close();
        connection.close();
        System.err.println("消息发送完成");

    }

9.2、消息的延时发送和定时发送

image-20230726211612426

9.3、分发策略

9.4、ActiveMQ重试机制(重新交付政策)

官网地址:https://activemq.apache.org/redelivery-policy

引起消息重发的情况

Client用了transaction且在session中调用了rollback();(没被签收被回调)

Client用了transactions且在调用commit()之前关闭或者时没有commit(事务没有被提交)

Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()

默认:每一秒钟发送6次,当消息发送超过最大次数时,该消息会被标志位异常消息,最终会被存储到死信队列

常用属性:

image-20230726213519746

9.5、死信队列

处理发送失败的消息

  • 一般的生产环境之中,MQ一般会设置两个队列:核心业务队列和死信队列
  • 核心业务队列:就是处理正常的业务信息,死信队列主主要是处理异常的业务队列信息

将所有的DeadLetter保存到一个共享的队列之中,是ActiveMQ的默认的策略

共享队列默认为ActiveMQ.DLQ可以通过deadLetterQueue属性进行设定

<deadLetterStratege>
    <shareDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStratege>   

可以将定时自动删除死信队列中的消息,或者可以存储非持久的异常消息

9.6、如何保证消息不被重复消费

可以通过radis进行相关的解决

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/49285.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker 之 Consul容器服务更新与发现

一、Consul介绍 1、什么是服务注册与发现 服务注册与发现是微服务架构中不可或缺的重要组件。起初服务都是单节点的&#xff0c;不保障高可用性&#xff0c;也不考虑服务的压力承载&#xff0c;服务之间调用单纯的通过接口访问。直到后来出现了多个节点的分布式架构&#xff…

Android平台GB28181设备接入侧如何同时对外输出RTSP流?

技术背景 GB28181的应用场景非常广泛&#xff0c;如公共安全、交通管理、企业安全、教育、医疗等众多领域&#xff0c;细分场景可用于如执法记录仪、智能安全帽、智能监控、智慧零售、智慧教育、远程办公、明厨亮灶、智慧交通、智慧工地、雪亮工程、平安乡村、生产运输、车载终…

Flutter的开发环境搭建-图解

前言&#xff1a;Flutter作为一个移动应用开发框架&#xff0c;具有许多优点和一些局限性。最大的优点就是-跨平台开发&#xff1a;Flutter可以在iOS和Android等多个平台上进行跨平台开发&#xff0c;使用一套代码编写应用程序&#xff0c;节省开发时间和成本。 Flutter可以编…

了解Unity编辑器之组件篇Mesh(三)

Mesh&#xff1a;是一种三维模型的表示形式&#xff0c;它由一系列顶点、三角形&#xff08;或其他多边形&#xff09;和相关属性组成。Mesh用于表示物体的外观和形状&#xff0c;它是可见物体的基本组成部分。通过操作Mesh&#xff0c;开发者可以实现各种视觉效果、物理模拟和…

基于PCA和小波算法联合实现红外与可见光图像融合的Matlab仿真(完整源码+35组数据集)

以下是一个使用PCA和小波实现红外与可见光图像融合的Matlab仿真完整源码。源码中只需修改红外图像&#xff08;IR.bmp&#xff09;和可见光图像&#xff08;VI.bmp&#xff09;名字即可 文章目录 效果展示数据集展示步骤说明完整源码下载地址 效果展示 最终融合效果展示&#x…

java执行ffmpeg命名的Docker镜像制作

今天来记录一下通过Dockerfile制作docker镜像的过程 背景 我需要通过java服务调用ffmpeg去执行视频合并的功能&#xff0c;想把这个环境封装到docker镜像当中&#xff0c;方便以后迁移部署。 实现方法 随便找一个路径创建一个Dockerfile文件 touch Dockerfilevim Dockerfi…

如何使用vscode连接远程服务器

1、安装remote-ssh 在应用商店搜索remote-ssh&#xff0c;安装remote-ssh 2、安装完成后会出现远程资源管理器 3、点击远程资源管理器 --ssh的➕号&#xff0c;在输出框内输入要连接的服务器ip及账户名 如&#xff1a;ssh 账户名ip地址 4、输入后回车保存 5、保存后刷新一下 6…

Redis学习路线(1)—— Redis的安装

一、NoSQL SQL VS NoSQL 1、名称 SQL 主要是指关系数据库。NoSQL 主要是指非关系数据库。 2、存储结构 SQL 是结构化的数据库&#xff0c;以表格的形式存储数据。NoSQL 是非结构化的数据库&#xff0c;以Key-Value&#xff08;Redis&#xff09;&#xff0c;JSON格式文档&…

【React】版本正确安装echarts-liquidfill(水球图表)包引入不成功问题

目标效果图&#xff1a; 安装&#xff1a; npm install echarts npm install echarts-liquidfill 引入&#xff1a; Import:import * as echarts from echarts; import echarts-liquidfill 或 import echarts-liquidfill/src/liquidFill.jsOr:import * as echarts from…

TreeMap的底层实现

0. 你需要知道的TreeMap的内置属性 0.1 节点属性 K key; // 键 V value; // 值 Entry<K,V> left; // 左子节点 Entry<K,V> right; // 右子节点 Entry<K,V> parent; // 父节点 boolean color; // 节点的颜色0.2 成员变量 //比较器对象private f…

Android性能优化之游戏引擎初始化ANR

近期&#xff0c;着手对bugly上的anr 处理&#xff0c;记录下优化的方向。 借用网上的一张图&#xff1a; 这里的anr 问题是属于主线程的call 耗时操作。需要使用trace 来获取发生anr前一些列的耗时方法调用时间&#xff0c;再次梳理业务&#xff0c;才可能解决。 问题1 ja…

14 Linux实操篇-进程管理(重点)

14 Linux实操篇-进程管理&#xff08;重点&#xff09; 文章目录 14 Linux实操篇-进程管理&#xff08;重点&#xff09;14.1 进程的基本操作14.1.1 进程和程序14.1.2 父进程和子进程14.1.3 常见的Linux进程14.1.4 显示系统执行的进程-ps14.1.5 终止进程-kill/killall14.1.6 查…

LeetCode 75 第十二题(11)盛最多水的容器

目录 题目: 示例: 分析: 代码: 题目: 示例: 分析: 配合着示例给出的图片我们可以得知找出盛水最多的容器是什么意思,给一个数组,找出数组中两个元素能围成的最大的矩阵面积是多少. 比较直观的想法是套两层for循环暴力解出来,但是这题是中等难度题,一般中等题是没法用暴力得…

【动态规划刷题 2】使⽤最⼩花费爬楼梯 解码⽅法

使⽤最⼩花费爬楼梯 746 . 使用最小花费爬楼梯 链接: 746 . 使用最小花费爬楼梯 给你一个整数数组 cost &#xff0c;其中 cost[i] 是从楼梯第 i 个台阶向上爬需要支付的费用。一旦你支付此费用&#xff0c;即可选择向上爬一个或者两个台阶。 你可以选择从下标为 0 或下标为 …

【JAVA】 String 类简述笔记

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️初识JAVA】 文章目录 前言String类创建一个String类 常用方法字符串长度 length() 方法连接字符串 concat() 方法创建格式化字符串 format()功能 前言 string是C、java、VB等编程语言中的字符串&…

安卓:百度地图开发(超详细)

一、百度地图介绍 百度地图SDK是一套供开发者使用的软件开发工具包&#xff08;SDK&#xff09;&#xff0c;用于在Android应用程序中集成和使用百度地图功能。通过使用百度地图SDK&#xff0c;开发者可以实现在自己的应用中显示地图、获取定位信息、进行搜索、导航等功能。 百…

办公软件巨头CCED、WPS迎来新挑战,新款办公软件已形成普及之势

办公软件巨头CCED、WPS的成长经历 众所周知&#xff0c;CCED和WPS是中国办公软件行业的两大知名品牌。 但它们的成长经历不是一蹴而就的&#xff0c;都是经历了漫长的发展过程的。 CCED是中国大陆早期的一款文本编辑器&#xff0c;它在上个世纪80年代末和90年代初非常流行。 …

数学建模-MATLAB三维作图

导出图片用无压缩tif会更清晰 帮助文档&#xff1a;doc 函数名 matlab代码导出为PDF 新建实时脚本或右键文件转换为实时脚本实时编辑器-全部运行-内嵌显示保存为PDF

githack的安装步骤+一次错误体验

一.githack的安装步骤 1.要在Kali Linux上安装GitHack工具&#xff0c;您可以按照以下步骤操作&#xff1a; 打开终端并使用以下命令克隆GitHack存储库&#xff1a; git clone https://github.com/lijiejie/GitHack.git2.进入GitHack目录&#xff1a; cd GitHack3.安装依赖项…

Linux下安装RabbitMQ教程

官方安装指南&#xff1a;https://www.rabbitmq.com/install-rpm.html 我们将要安装的RabbitMQ的版本是3.8.2 el/7/rabbitmq-server-3.8.2-1.el7.noarch.rpm - rabbitmq/rabbitmq-server packagecloud 不需要单独安装Erlang环境。 2. 环境配置&#xff1a; 前提&#xff…