出现的问题,原本4个操作,要么全部执行,要么全部不执行------->强一致性
但是现在分开了-----------最终一致性
强一致性:指在消息传递的过程中,系统会确保每个消息被精确地按照发送的顺序被传递,并且每个消息都会被正确地处理。强一致性保证了系统中的所有节点都具有相同的视图,即任何节点查询的数据都与其他节点查询的数据一致。在强一致性模型下,无论系统中发生了任何错误,所有节点在任何时候都能够看到完全相同的数据。
最终一致性:是指系统可以容忍在消息传递过程中的一定程度的不一致,但是最终会保证所有的节点都达到一致的状态。最终一致性不会保证在每个节点上看到的数据是相同的,但是当系统达到稳定状态时,每个节点的数据都会达到相同的状态。在最终一致性模型下,如果发生了某些错误,节点在一定时间内仍可能会看到不一致的数据,但是当系统达到一定时间后,所有节点上的数据将最终达到一致状态。
应用场景:
1.异步解耦,就是上面这张图
2.削峰填谷
3.消息分发
缺点:系统可用性降低,一旦MQ宕机,对业务造成影响,当然可以集群
系统复杂度提高
一致性问题
常用消息中间件
ActivitiMQ RabbitMQ RocketMQ Kafka
RabbitMQ是使用Erlang语言开发的AMQP的开源实现.好比java中的JDBC,不同厂商都要来实现JDBC,AMQP就是不同厂商去实现AMQP,RabbitMQ就是具体实现
安装RabbitMQ前
要先下载安装Erlang
Erlang及RabbitMQ安装版本的选择
下载时一定要注意版本兼容性
版本兼容说明地址:RabbitMQ Erlang Version Requirements — RabbitMQ
我们选择的版本
Erlang官网下载:https://www.erlang.org/downloads
RabbitMQ官方下载地址:https://www.rabbitmq.com/download.html
工作模式总结
1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。
2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
3、发布订阅模式 Publish/subscribe
需要设置类型为 fanout 的交换机 ,并且交换机和队列进行绑定 ,当发送消息到交换机后,交换机会将消
息发送到绑定的队列。
4、路由模式 Routing
需要设置类型为 direct 的交换机 ,交换机和队列进行绑定 , 并且指定 routing key,当发送消息到交换机 后 ,交换机会根据 routing key 将消息发送到对应的队列。
5、通配符模式 Topic
需要设置类型为 topic 的交换机 ,交换机和队列进行绑定 ,并且指定通配符方式的 routing key ,当发送消息到交换机后 ,交换机会根据 routing key 将消息发送到对应的队列。
关于Windows的Rabbit安装https://tiantian.blog.csdn.net/article/details/130452949?spm=1001.2014.3001.5502
注意 RabbitMQ中 队列持久化和消息持久化是两个概念
入门案例 简单模式 生产者代码
public static void main(String[] args) throws IOException, TimeoutException { //1 创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2 设置rabbitmq ip地址 connectionFactory.setHost("localhost"); //3 创建连接对象 Conection对象 Connection connection=connectionFactory.newConnection(); //4 创建管道 Chanel Channel channel=connection.createChannel(); //5 设置队列属性 /** * 第一个参数:队列的名称 * 第二个参数:队列是否要持久化 * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的) * 第四个参数:是否自动删除消息 * 第五个参数:是否要设置一些额外的参数 */ channel.queueDeclare("01-hello",false,false,false,null); //6 发送消息 /** * 第一个参数:交换机名称 没有交换机就设置"" * 第二个参数:路由key * 第三个参数:消息属性 * 第四个参数:消息内容 */ channel.basicPublish("","01-hello",null, "hello-rabbitMQ".getBytes(StandardCharsets.UTF_8)); //7 关闭消息 //channel.close(); connection.close(); }
消费者 前面5步都是一样的
注意消费者 需要持续监听,不要关闭
public static void main(String[] args) throws IOException, TimeoutException { //1 创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2 设置rabbitmq ip地址 connectionFactory.setHost("localhost"); //3 创建连接对象 Conection对象 Connection connection=connectionFactory.newConnection(); //4 创建管道 Chanel Channel channel=connection.createChannel(); //5 设置队列属性 /** * 第一个参数:队列的名称 * 第二个参数:队列是否要持久化 * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的) * 第四个参数:是否自动删除消息 * 第五个参数:是否要设置一些额外的参数 */ channel.queueDeclare("01-hello",false,false,false,null); //6 使用chanel 去 rabbitmq 获取消息进行消费 /** * 第一个参数:队列的名称 * 第二个参数:是否自动签收 * 第三个参数:消息属性 * 第四个参数:消息内容 */ channel.basicConsume("01-hello", true,new DeliverCallback(){ /** * 当消息从mq 中取出来了会回调这个方法 * 消费者消费消息就在这个 handle中进行处理 */ @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println("消息中的内容为:"+new String(delivery.getBody())); } },new CancelCallback(){ /** * 当消息取消了会回调这个方法 */ @Override public void handle(String s) throws IOException { System.out.println(111); } }); //7 关闭消息
注意消费者 需要持续监听,不要关闭
//channel.close(); //connection.close(); }