ActiveMQ(三)

协议配置

ActiveMQ 支持的协议有 TCP 、 UDP、NIO、SSL、HTTP(S) 、VM 这是activemq 的activemq.xml 中配置文件设置协议的地方

<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumCon    nections=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnect    ions=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConn    ections=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnect    ions=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnection    s=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

默认是使用 openwire 也就是 tcp 连接
默认的Broker 配置,TCP 的Client 监听端口 61616 ,在网络上传输数据,必须序列化数据,消息是通过一个 write protocol 来序列化为字节流。默认情况 ActiveMQ 会把 wire protocol 叫做 Open Wire ,它的目的是促使网络上的效率和数据快速交互
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  1. NIO 协议为ActiveMQ 提供更好的性能
    适合NIO 使用的场景:
    1 当有大量的Client 连接到Broker 上 , 使用NIO 比使用 tcp 需要更少的线程数量,所以使用 NIO
    2 可能对于 Broker 有一个很迟钝的网络传输, NIO 的性能高于 TCP
    连接形式:
    nio://hostname:port?key=value
    在这里插入图片描述
https://activemq.apache.org/configuring-version-5-transports.html

修改 activemq.xml 使之支持 NIO 协议:


        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
           <transportConnector name="auto+nio+ssl" uri="auto+nio+ssl://0.0.0.0:5671?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

ActiveMQ 的可持久化

将MQ 收到的消息存储到文件、硬盘、数据库 等、 则叫MQ 的持久化,这样即使服务器宕机,消息在本地还是有,仍就可以访问到。
官网 : http://activemq.apache.org/persistence
ActiveMQ 支持的消息持久化机制: 带赋值功能的 LeavelDB 、 KahaDB 、 AMQ 、 JDBC
持久化就是高可用的机制,即使服务器宕机了,消息也不会丢失

AMQ 是文件存储形式,写入快、易恢复 默认 32M 在 ActiveMQ 5.3 之后不再适用
KahaDB : 5.4 之后基于日志文件的持久化插件,默认持久化插件,提高了性能和恢复能力
KahaDB 的属性配置 : http://activemq.apache.org/kahadb
它使用一个事务日志和 索引文件来存储所有的地址
在这里插入图片描述
db-<数字>.log 存储数据,一个存满会再次创建 db-2 db-3 …… ,当不会有引用到数据文件的内容时,文件会被删除或归档
db.data 是一个BTree 索引,索引了消息数据记录的消息,是消息索引文件,它作为索引指向了 db-.log 里的消息
一点题外话:就像mysql 数据库,新建一张表,就有这个表对应的 .MYD 文件,作为它的数据文件,就有一个 .MYI 作为索引文件。
db.free 存储空闲页 ID 有时会被清除
db.redo 当 KahaDB 消息存储在强制退出后启动,用于恢复 BTree 索引
lock 顾名思义就是锁
四类文件+一把锁 ==》 KahaDB

LeavelDB : 希望作为以后的存储引擎,5.8 以后引进,也是基于文件的本地数据存储形式,但是比 KahaDB 更快
它比KahaDB 更快的原因是她不使用BTree 索引,而是使用本身自带的 LeavelDB 索引
题外话:为什么LeavelDB 更快,并且5.8 以后就支持,为什么还是默认 KahaDB 引擎,因为 activemq 官网本身没有定论,LeavelDB 之后又出了可复制的LeavelDB 比LeavelDB 更性能更优越,但需要基于 Zookeeper 所以这些官方还没有定论,任就使用 KahaDB

JDBC : 有一部分数据会真实的存储到数据库中
使用JDBC 的持久化,
①修改配置文件,默认 kahaDB
修改之前:

<persistenceAdapter>
       <kahaDB directory="${activemq.data}/kahadb"/>  
 </persistenceAdapter>

修改之后:

        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-xql" createTablesOnStartup="false"/>
        </persistenceAdapter>


         <bean id="mysql-xql" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
                <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
                <property name="url" value="jdbc:mysql://xxxxxxx:3307/activemq?relaxAutoCommit=true&amp;useSSL=false&amp;serverTimezone=GMT"/>
                <property name="username" value="root"/>
                <property name="password" value="xxxx"/>
                <property name="poolPreparedStatements" value="true"/>
        </bean>

④  让linux 上activemq 可以访问到 mysql ,之后产生消息。
    ActiveMQ 启动后会自动在 mysql 的activemq 数据库下创建三张表:activemq_msgs 、activemq_acks、activemq_lock   
activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存
activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker
activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中
  
 点对点会在数据库的数据表 ACTIVEMQ_MSGS 中加入消息的数据,且在点对点时,消息被消费就会从数据库中删除  
   但是对于主题,订阅方式接受到的消息,会在 ACTIVEMQ_MSGS 存储消息,即使MQ 服务器下线,并在 ACTIVEMQ_ACKS 中存储消费者信息 。 并且存储以 activemq 为主,当activemq 中的消息被删除后,数据库中的也会自动被删除。  

在这里插入图片描述
持久化消息是指:
MQ 所在的服务器down 了消息也不会丢失
持久化机制演化过程:
从最初的AMQ Message Store 方案到 ActiveMQ V4版本推出的High performance journal (高性能事务)附件,并且同步推出了关系型数据库的存储方案, ActiveMQ 5.3 版本有推出了KahaDB 的支持,(也是5.4之后的默认持久化方案),后来ActiveMQ 从5.8开始支持LevelDB ,现在5.9 提供了 Zookeeper + LevelDB 的集群化方案。
ActiveMQ 消息持久化机制有:
AMQ 基于日志文件
KahaDB 基于日志文件,5.4 之后的默认持久化
JDBC 基于第三方数据库
LevelDB : 基于文件的本地数据库存储,从5.8 之后推出了LevelDB 性能高于 KahaDB
ReplicatedLevelDB Store 从5.8之后提供了基于LevelDB 和Zookeeper 的数据复制方式,用于Master-slave方式的首数据复制选方案

但是无论使用哪种持久化方式,消息的存储逻辑都一样

小结

  • 1> 引入消息队列后 如何保证高可用性
    持久化、事务、签收、 以及带复制的 Leavel DB + zookeeper 主从集群搭建
  • 2> 异步投递 Async send
    对于一个慢消费者,使用同步有可能造成堵塞,消息消费较慢时适合用异步发送消息
    activemq 支持同步异步 发送的消息,默认异步。当你设定同步发送的方式和 未使用事务的情况下发持久化消息,这时是同步的。
    如果没有使用事务,且发送的是持久化消息,每次发送都会阻塞一个生产者直到 broker 发回一个确认,这样做保证了消息的安全送达,但是会阻塞客户端,造成很大延时 。
    在高性能要求下,可以使用异步提高producer 的性能。但会消耗较多的client 端内存,也不能完全保证消息发送成功。在 useAsyncSend = true 情况下容忍消息丢失。

// 开启异步投递
activeMQConnectionFactory.setUseAsyncSend(true);

在这里插入图片描述

如何在投递快还可以保证消息不丢失 ?
异步发送消息丢失的情况场景是: UseAsyncSend 为 true 使用 producer(send)持续发送消息,消息不会阻塞,生产者会认为所有的 send 消息均会被发送到 MQ ,如果MQ 突然宕机,此时生产者端尚未同步到 MQ 的消息均会丢失 。
故 正确的异步发送方法需要接收回调
同步发送和异步发送的区别就在于 :
同步发送send 不阻塞就代表消息发送成功
异步发送需要接收回执并又客户端在判断一次是否发送

在代码中接收回调的函数 :

activeMQConnectionFactory.setUseAsyncSend(true);
    ……  
    
 for (int i = 1; i < 4 ; i++) {
         textMessage = session.createTextMessage("msg--" + i);
      textMessage.setJMSMessageID(UUID.randomUUID().toString()+"--  orderr");
     String msgid = textMessage.getJMSMessageID();
            messageProducer.send(textMessage, new AsyncCallback() {
                @Override
                public void onSuccess() {
                    // 发送成功怎么样
                    System.out.println(msgid+"has been successful send ");
                }

                @Override
                public void onException(JMSException e) {
                    // 发送失败怎么样
                    System.out.println(msgid+" has been failure send ");
                }
            });
}    

在这里插入图片描述

long delay = 3 * 1000 ;
long perid = 4 * 1000 ;
int repeat = 7 ;
for (int i = 1; i < 4 ; i++) {
    TextMessage textMessage = session.createTextMessage("delay msg--" + i);
    // 消息每过 3 秒投递,每 4 秒重复投递一次 ,一共重复投递 7 次
    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,perid);
    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);

    messageProducer.send(textMessage);
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/4163.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

利用摄影测量进行地形建模的介绍

一、前言 从一个地方到另一个地方的地球表面由连续和突然的海拔变化组成&#xff0c;个人和社会都必须应对这些变化。 水从高山和丘陵向下流&#xff0c;从溪流流入河流&#xff0c;形成三角洲&#xff0c;最终汇入大海。 三维 (3D) 地面信息的获取和表示一直是与行星表面相关的…

RK3568平台开发系列讲解(调试篇)Linux 内核的日志打印

🚀返回专栏总目录 文章目录 一、dmseg 命令二、查看 kmsg 文件三、调整内核打印等级沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇将 Linux 内核的日志打印进行梳理。 一、dmseg 命令 在终端使用 dmseg 命令可以获取内核打印信息,该命令的具体使用方法如下所…

Downie 4 4.6.12 MAC上最好的一款视频下载工具

Downie for Mac 简介 Downie是Mac下一个简单的下载管理器&#xff0c;可以让您快速将不同的视频网站上的视频下载并保存到电脑磁盘里然后使用您的默认媒体播放器观看它们。 Downie 4 Downie 4 for Mac Downie 4 for Mac软件特点 支持许多站点 -当前支持1000多个不同的站点&…

叮咚,您有一封告白信件待查收(原生HTML+CSS+JS绘制表白信件,代码+链接+步骤详解)

马上就要5月20号啦&#xff0c;准备好如何向心仪的她/他表白了嘛&#xff01;特此出一篇告白小信件&#xff0c;效果图如下。纯htmlcss绘制&#xff0c;包含详细教程注释&#xff0c;干货满满哦。 链接置于文章结尾总结处。 文章目录一、叮咚&#xff01;查收您的信件&#x…

Spring Cloud Alibaba全家桶(七)——Sentinel控制台规则配置

前言 本文小新为大家带来 Sentinel控制台规则配置 相关知识&#xff0c;具体内容包括流控规则&#xff08;包括&#xff1a;QPS流控规则&#xff0c;并发线程数流控规则&#xff09;&#xff0c;BlockException统一异常处理&#xff0c;流控模式&#xff08;包括&#xff1a;直…

thinkphp内核开源商城APP小程序H5开源源码讲解

系统功能介绍 支持点餐、桌码点餐 知识付费、家政功能 公众号管理 设置自定义菜单、被关注回复、关键字回复&#xff0c;查看公众号粉丝、素材管理、素材群发、模板消息群发、活跃粉丝群发等功能 用户领卡后在微信卡包中展示&#xff0c;实现会员卡买单消费等功能&#xff0c;…

Python实战,爬取金融期货数据

大家好&#xff0c;我是毕加锁。 今天给大家带来的是 Python实战&#xff0c;爬取金融期货数据 文末送书&#xff01; 文末送书&#xff01; 文末送书&#xff01; 任务简介 首先&#xff0c;客户原需求是获取https://hq.smm.cn/copper网站上的价格数据(注&#xff1a;获取的是…

【LeetCode】剑指 Offer 39. 数组中出现次数超过一半的数字 p205 -- Java Version

题目链接&#xff1a;https://leetcode.cn/problems/shu-zu-zhong-chu-xian-ci-shu-chao-guo-yi-ban-de-shu-zi-lcof/ 1. 题目介绍&#xff08;39. 数组中出现次数超过一半的数字&#xff09; 数组中有一个数字出现的次数超过数组长度的一半&#xff0c;请找出这个数字。 你可…

js 数据类型

1.概念 数据类型指的是可以在程序中存储和操作的值的类型&#xff0c;每种编程语言都有其支持的数据类型&#xff0c;不同的数据类型用来存储不同的数据&#xff0c;例如文本、数值、图像等。 JavaScript 是一种动态类型的语言&#xff0c;在定义变量时不需要提前指定变量的类…

如何用iOS自带摄像头进行拍摄获取视频流以及OpenCV图像处理实时显示

目录概述一、如何用Swift调用OpenCV库1.项目引入OpenCV库2.桥接OpenCV及Swift二、运用AVFoundation获取实时图像数据1.建立视频流数据捕获框架2.建立 Capture Session3.取得并配置 Capture Devices4.设定 Device Inputs5.配置Video Data Output输出6.工程隐私权限配置7.处理相机…

基于Java Web的图书管理系统

目录 1.系统简要概述 2.系统主要用到的数据库表 3.主要功能 管理员&#xff1a; 用户&#xff1a; 3.1管理员功能 3.11登录 3.12添加学生 3.13查看学生 3.14删除学生 3.15添加书籍 3.16查看书籍 3.2用户端功能 3.2.1登录 3.2.2注册 3.2.3查询图书 3.2.4借阅书籍…

【云原生】初识 Kubernetes — pod 的前世今生

目录标题前言&#x1f433; Kubernetes到底是什么&#xff1f;&#x1f42c; K8s 的由来&#x1f42c;K8s 的工作方式&#x1f42c; K8s 主要组件&#x1f40b;Master 组件&#x1f40b;Node 组件&#x1f433; pod 是什么&#xff1f;&#x1f42c;pod 的概念&#x1f42c;控制…

Kafka在Mac下的安装与使用

mac 安装kafka安装kafka的原因安装kafka启动Zookeeper启动Kafka创建topic查看topic生产数据消费数据关闭zookeeper关闭kafka测试安装kafka的原因 用户微服务登录后需要向广告微服务中发送用户登录的信息以获取用户画像&#xff08;这个过程是异步的&#xff09;&#xff0c;故…

雷电4模拟器安装xposed框架(2022年)

别问我都2202年了为什么还在用雷电4安卓7。我特么哪知道Xposed的相关资料这么难找啊&#xff0c;只能搜到一些老旧的资料&#xff0c;尝试在老旧的平台上实现了。 最初的Xposed框架现在已经停止更新了&#xff0c;只支持到安卓8。如果要在更高版本的安卓系统上使用Xposed得看看…

mac程序员必备的20款软件

今天给大家分享一下我作为一名后端程序员工作中常用的软件&#xff0c;相信下面我要介绍的很多软件对大家来说并不陌生&#xff0c;mac程序员必备的20款软件能够在不同岗位上提升大家的效率和体验。 1、Chrome 我们首先来介绍一些开发常用工具&#xff0c;先是浏览器&#xff…

手撕二叉树--堆的接口实现(附源码+图解)

堆的接口实现&#xff08;附源码图解&#xff09; 文章目录堆的接口实现&#xff08;附源码图解&#xff09;前言一、定义结构体二、接口实现&#xff08;附图解源码&#xff09;1.初始化堆2.销毁堆3.尾插数据&#xff08;1&#xff09;向上调整&#xff08;2&#xff09;交换函…

Elasticsearch 需要了解的都在这

ES选主过程&#xff1f;其实ES的选主过程其实没有很高深的算法加持&#xff0c;启动过程中对接点的ID进行排序&#xff0c;取ID最大节点作为Master节点&#xff0c;那么如果选出来的主节点中存储的元信息不是最新的怎么办&#xff1f;其实他是分了2个步骤做这件事&#xff0c;先…

es-head插件插入查询以及条件查询(五)

es-head插件插入查询以及条件查询 1.es-head插件页面介绍 页面详细介绍 2.es-head查询语句 2.1.查询索引中的全部数据 curl命令交互&#xff0c;采用GET请求 语法格式&#xff1a; curl -XGET es地址:9200/索引名/_search?pretty [rootelaticsearch ~]# curl -XGET 192…

Mac环境变量配置(Java)

1.打开终端&#xff1a; 2.输入命令&#xff1a;【/usr/libexec/java_home -V】,查看默认的jdk下载地址&#xff08;绿色下划线的就是jdk默认路径&#xff09;&#xff08;注意⚠️&#xff1a;命令行终端是区分大小写的【-v 是不对的&#xff0c;必须是大写 -V】&#xff09; …

Windows Server 2016远程桌面配置全过程

镜像下载 系统镜像网址 本次下载的是 Windows Server 2016 (Updated Feb 2018) (x64) - DVD (Chinese-Simplified) 远程桌面配置 Step 1 在开始菜单搜索服务&#xff0c;打开服务器管理器&#xff0c;点击右上角的管理按钮 Step 2 添加角色控制&#xff0c;点击下一步 S…