消息队列应用场景
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合
,异步消息
,流量 削锋
等问题实现高性能,高可用,可伸缩和最终一致性:
解耦:
异步:
削峰:
常见类型:
ps:
CAP理论
在分布式系统中,也有类似数据库ACID的特性,那就是CAP,他们分别是:
1.Consistency 一致性
强调进群节点中数据一致。在分布式中一致性又包括强一致性和弱一致性,强一致性就是指在任何时刻任何节点看到的数据都是一样的;
弱一致性一般实现是最终一致性,即刚开始可能存在差异,但随着时间的推移,最终数据保持一致。
2.Availability 可用性
强调集群在任何时间内都正常使用
3.Partition Tolerance 分区容错性
即使某一部分集群坏掉,另一部分仍能正常工作。
这三个特性只能满足其中两个,牺牲另一个。大部分系统也都是如此:
一般来说分布式集群都会保证P优先,即集群部分节点坏死不影响整个集群的使用,然后再去追求C和A。因为如果放弃P——分区可用性,那不如就直接使用多个传统数据库了。事实上,很多微服务分库分表就是这个道理。
如果追求强一致性,那么势必会导致可用性下降。比如在Master-Slave的场景中,Master负责数据写入,然后分发给各个节点,所有节点都写入成功,才算写入,这样保证了强一致性,但是延迟也会随之增加,导致可用性降低。
因此在可用性和一致性之间,就出现了各种解决方案,如时序一致性、最终一致性等等。
BASE理论
BASE理论是对CAP理论的延伸,核心思想是即使无法做到强一致性(Strong Consistency,CAP的一致性就是强一致性),但应用可以采用适合的方式达到最终一致性(Eventual Consitency)。
BASE是指基本可用(Basically Available)、软状态( Soft State)、最终一致性( Eventual Consistency)。
1.基本可用(Basically Available)
基本可用是指分布式系统在出现故障的时候,允许损失部分可用性,即保证核心可用。
电商大促时,为了应对访问量激增,部分用户可能会被引导到降级页面,服务层也可能只提供降级服务,这就是损失部分可用性的体现。
2.软状态( Soft State)
软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。
分布式存储中一般一份数据至少会有三个副本,允许不同节点间副本同步的延时就是软状态的体现。mysql replication的异步复制也是一种体现。
3.最终一致性( Eventual Consistency)
最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。
弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况。
BASE和ACID代表两种截然相反的设计理念,ACID注重一致性,是传统关系型数据库(MySQL)的设计思路,BASE关注高可用性。
当今大规模、跨数据中心的分布式系统(如云计算)大多同时采用这两种设计理念,并在两者之间寻求平衡。
消息队列结构
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。给予此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比http
2007年,Rabbit计数公司给予AMQP标准开发的RabbitMQ 1.0 发布。RabbitMQ采用Erlang语言开发。Erlang语言由Ericson设计,专门为开发高并发和分布式系统的一种语言,在典型领域使用广泛。
RabbitMQ基础架构如下图:
RabbitMQ中的相关概念:
Broker:接收和分发消息的应用,RabbitMQ server 就是Message BrokerVirtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等Connection:publisher/consumer 和 broker之间的TCP连接Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候简历TCP Connection的开销将是巨大的,效率也低。Channel是在Connection内部简历的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel进行通讯,AMQP method包含了channel id 帮组客户端和message broker 识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统简历TCP connectino的开销Exchange:message到达broker的第一站,根据分发规则,配匹查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point),topic(publish-subscribe)and fanout(multicast)Queue:消息最终被送到这里等待consumer取走Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。
安装
windows环境下安装RabbitMQ(超详细) windows环境下安装RabbitMQ(超详细)_windows安装rabbitmq_luckySnow-julyo的博客-CSDN博客
RabbitMQ超详细安装教程(Linux)RabbitMQ超详细安装教程(Linux)_rabbitmq安装_Baret-H的博客-CSDN博客
RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ
验证:rabbitmqctl status
http://localhost:15672
springboot 集成
引入依赖:
配置文件:
server: port: 8081spring: application:
name: test-rabbitmq-producer
rabbitmq:
server: port: 8081 spring: application: name: test-rabbitmq-producer rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtualHost: /
配置文件:
package com.example.testmq2.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Scope; @SpringBootConfiguration public class RabbitmqConfig { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualhost; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualhost); //connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } }
模式:
-
direct
-
fanout
-
topic
-
Headers exchange
生产者
@Autowired private RabbitTemplate rabbitTemplate;
消费者
@SpringBootTest @RabbitListener(queues = {"q2"})//从哪个队列取消息 class TestMq2ApplicationTests2 { @Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() { //第一个参数:发送的队列 第二个参数: 发送的信息 } @RabbitHandler//取到消息后的处理方法 public void receiverMsg(String msg){ System.out.println(msg); } }
集群与负载均衡
略
rabbitmq的五种工作模式及应用场景
rabbitmq的五种工作模式及应用场景**简单模式
1个生产者将消息交给默认的交换机(AMQP default)2 交换机获取消息后交给绑定这个生产者的队列(关系是通过队列名称完成)3 监听当前队列的消费者获取消息,执行消费逻辑应用场景:短信,聊天场景:有一个oa系统,用户通过接收手机验证码进行注册,页面上点击获取验证码后,将验证码放到消息队列,然后短信服务从队列中获取到验证码,并发送给用户工作模式(轮询)
1 生产者将消息交个交换机2 交换机交给绑定的队列3 队列由多个消费者同时监听,只有其中一个能够获取这一条消息,形成了资源的争抢,谁的资源空闲大,争抢到的可能越大;
应用场景:抢红包,大型系统的资源调度
场景:有一个电商平台,有两个订单服务,用户下单的时候,任意一个订单服务消费用户的下单请求生成订单即可。不用两个订单服务同时消费用户的下单请求
3 发布订阅
1 生产者扔给交换机消息2 交换机根据自身的类型将会把所有消息复制同步到所有与其绑定的队列3 每个队列可以有一个消费者,接收消息进行消费逻辑应用场景:邮件群发,广告场景:有一个商城,我们新添加一个商品后,可能同时需要去更新缓存和数据库
4 路由模式
1 生产者还是将消息发送给交换机,消息携带具体的路由key(routingKey)2 交换机类型direct,将接收到的消息中的routingKey,比对与之绑定的队列的routingKey3 消费者监听一个队列,获取消息,执行消费逻辑应用场景:根据生产者的要求发送给特定的一个或者一批队列;错误的通报;场景:还是一样,有一个商城,新添加了一个商品,实时性不是很高,只需要添加到数据库即可,不用刷新缓存
5 topic主题模式
1 生产端发送消息,消息携带具体的路由key2 交换机的类型topic3 队列绑定交换机不在使用具体的路由key而是一个范围值.orange. : haha.orange.haha,haha.haha.orange.hahalazy.# : haha.lazy.haha.haha,layz.alsdhfsh(sh9ou)N0*表示一个字符串(不能携带特殊符号) 例如 *表示 haha,item,update
表示任意字符串
场景:还是一样,有一个商城,新添加了一个商品,实时性不是很高,只需要添加到数据库即可,数据库包含了主数据库mysql1和从数据库mysql2的内容,不用刷新缓存
topic主题模式和路由模式区别:路由模式中的queue绑定携带的是具体的key值,路由细化划分topic主题模式queue携带的是范围的匹配,某一类的消息获取
ps:
@RabbitListener(bindings = { @QueueBinding( value = @Queue,//声明临时队列 exchange = @Exchange(value = "fanoutsr",type = "fanout") ) })
ps: 通过代码动态:
、package com.example.config;
import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;
-
@Configurationpublic class RabbitmqConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";public static final String QUEUE_INFORM_SMS = "queue_inform_sms";public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";public static final String ROUTINGKEY_EMAIL="inform.#.email.#";public static final String ROUTINGKEY_SMS="inform.#.sms.#";
//声明交换机@Bean(EXCHANGE_TOPICS_INFORM)public Exchange EXCHANGE_TOPICS_INFORM(){
//durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//声明QUEUE_INFORM_EMAIL队列@Bean(QUEUE_INFORM_EMAIL)public Queue QUEUE_INFORM_EMAIL(){
return new Queue(QUEUE_INFORM_EMAIL);
}//声明QUEUE_INFORM_SMS队列@Bean(QUEUE_INFORM_SMS)public Queue QUEUE_INFORM_SMS(){
return new Queue(QUEUE_INFORM_SMS);
}
//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey@Beanpublic Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}//ROUTINGKEY_SMS队列绑定交换机,指定routingKey@Beanpublic Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}