mq需要知道的点

一、为什么要使用mq

解耦、异步、削峰

二、mq 有什么优缺点

优点就是在特殊场景下有其对应的好处,解耦、异步、削峰

缺点有以下几个:

系统可用性降低 系统引入的外部依赖越多,越容易挂掉。万一 MQ 挂了,MQ 一挂,整套系 统崩 溃,你不就完了?

系统复杂度提高 硬生生加个MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情 况? 怎么保证消息传递的顺序性?问题一大堆。

一致性问题 A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。

三、kafka、ZeroMQ、ActiveMq、RabbitMq、RocketMq、 有什么区别

对于吞吐量来说 kafka 和 RocketMQ 支撑高吞吐,ActiveMQ 和 RabbitMQ 比他们低一个数量级。 对于 延迟量来说 RabbitMQ 是最低的。

优缺点

四、MQ如何保证高可用的?

RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。 RabbitMQ 有三种模式:单机模式、普通集群模式、 镜像集群模式。

单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的?没人生产用单机模式

普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 ,只会放在一个 实例上,但是每个实例都同步 queue 的元数据(元数据可 以认 为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候, 实际上 如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。 这 方案主要是提 高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

镜像集群模式:这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是, 在 镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例 上,就 是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数 据的意思。然 后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。 RabbitMQ 有很 好的管理控制台,就是在后台新增一个策略,这个策略是镜像集权模式的策略, 指定的时候是可以 要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这 个策略,就会自动将数据同步到其他的节点上去了。这样的话,好处在于, 你任何一个机器宕机 了,没事儿,其他机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其 他 节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器 上,导 致网络带宽压力和消耗很重!RabbitMQ 一个 queue 的数据都是放在一个节点里的, 镜像集群下, 也是每个节点都放这个 queue 的完整数据。

Kafka 一个最基本的架构认识:由多个 broker 组成,每个 broker 是一个节点;你创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每 个 partition 就放一部分数据。这就是天然的分布式消息队列,就是说一个 topic 的数据,是 分散放在 多个机器上的,每个机器就放一部分数据。 Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制 品) 副本机制。每个 partition 的数据都会同步到其他机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道, 然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去, 读的时候就直接读 leader 上的数据即可。只能读写 leader?很简单,要是你可以随意读写每 个 follower ,那么就要 care 数 据一致性的问题,系统复杂度太高,很容易出问题。Kafka 会 均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。因为如果某个 broker 宕机了,没事儿, 那 个 broker 上面的 partition 在其他机器上都有副本的,如果这上面有某个 partition 的 leader,那 么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。这就 有所谓的高可用性了。写数据的时候,生产者就写 leader,然后 leader 将 数据落地到本地磁盘, 接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会 发送 ack 给 leader ,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产 者。(当 然,这只是其中一种模式,还可以适当调整这个行为)消费的时候,只会从 leader 去读,但是只 有 当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。

五、如何保证消息的可靠传输?如果消息丢了怎么办

数据的丢失问题,可能出现在生产者、 MQ、消费者

生产者丢失:生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问 题 啥的,都有可能。此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开 启 RabbitMQ 事务 channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那 么 生产者会收到异常报错,此时就可以回滚事务 channel.txRollback,然后重试发送消息;如果收到 了消息,那么可以提交事务 channel.txCommit。吞吐量会下来,因为太耗性能。所以 一般来说,如 果你要确保说写 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模 式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你 一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。事务机制和 cnofirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那 儿,但是confirm 机制是异步的,你发送一个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接 收 了之后会异步回调你一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

MQ 中丢失:就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息 写 入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一 般 数据不会丢。设置持久化有两个步骤:创建 queue 的时候将其设置为持久化,这样就可以保 证RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。第二个是发送消息的时 候 将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消 息持久 化到磁盘上去。必须同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启, 也会从磁 盘上重新恢复 queue,恢复这个 queue 里的数据。持久化可以跟生产者那边的 confirm 机制配合起 来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在 持久化到磁盘之前, RabbitMQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来 得 及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。

消费端丢失:你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了, RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的 ack 机制,简单来 说,就是你关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码 里确 保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack? 那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处 理,消息是不会丢的。

六、如何保证消息的顺序性

  • 使用单个队列和消费者:这是最直接的方法,通过确保只有一个消费者消费一个队列中的消息,可以保证消息的严格顺序。这种方法适用于对性能要求不高,且所有消息都需要按照先进先出(FIFO)原则处理的场景。然而,这种方法会限制系统的扩展性和吞吐量。
  • 使用多个队列和消费者:在这种方法中,可以将消息拆分到多个队列中,每个队列由一个消费者消费。这可以通过使用Sharding Key(分区键)来实现,确保具有相同Sharding Key的消息被发送到同一个队列中,从而保持局部的消息顺序。这种方法适用于高性能要求的场景,其中消息可以根据业务逻辑进行分组,并保证同一组内的消息顺序。
  • 使用内存队列:在消费者内部,可以使用内存队列来进一步处理从MQ接收到的消息。这样,即使是从多个队列接收到的消息,也可以在消费者内部通过内存队列的排序和分发,保证它们按照特定的顺序进行处理。
  • 控制生产者和消费者的行为:确保生产者以单一线程或串行方式发送消息,以及消费者以正确的顺序处理消息,也是保证消息顺序性的关键。这可能涉及到生产者和消费者的设计调整,以确保它们的行为符合保证消息顺序性的要求。
  • 使用特定的MQ特性:某些MQ系统提供了特定的功能或配置选项来支持顺序消息的处理,如RocketMQ中的“消息组”概念。利用这些特性可以更方便地实现局部或全局的消息顺序性需求。

综上所述,保证消息的顺序性需要综合考虑业务需求、MQ系统的特性和性能要求,通过合理的队列设计、消费者和生产者的行为控制以及利用MQ系统的特定功能来实现。

七、如何解决消息队列的延时以及过期失效问题?消息队列满了以 后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

消息积压处理办法:临时紧急扩容:

先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。 新建一个 topic ,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理, 直接均匀轮询写入临时建立好的 10 倍数量的 queue。 接着临时征用10倍的机器来部署 consumer ,每一 批 consumer 消费一个临时 queue 的数据。这种做法相当于临时将 queue 资源和 consumer资源扩大10 倍,以正常的10 倍速度来消费数据。 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。

MQ 中消息失效: 假设你用的是 RabbitMQ , RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就 会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压 在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重蹈,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过 高峰期以后,将丢失的那批数据,写个临时程序,一点一点地查出来,然后重新灌入 mq 里面去, 把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了, 你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。 mq 消息队列快满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行得太慢了,你临时写程序, 接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案, 到了晚上再补数据吧。

八、让你来设计一个消息队列,你会怎么设计

首先这个 mq 得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎 么 搞?设计个分布式的系统呗,参照一下 kafka 的设计理念,broker -> topic -> partition, 每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition , 然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?

其次你得考虑一下这个 mq 的数据要不要落地磁盘吧?那肯定要了,落磁盘才能保证别进程挂了 数 据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺 序读 写的性能是很高的,这就是 kafka 的思路。

最后你考虑一下你的 mq 的可用性啊?这个事儿,具体参考之前可用性那个环节讲解的 kafka 的高 可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。

九、kafka 如何不消费重复数据?比如扣款,我们不能重复地扣

你需要让生产者发送每条数据的时候 ,里面加一个全局唯一的 id , 类似订单 id 之类的东西 ,然后你这里消费到了之后, 先根据这个id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过 ,你就处理,然后这个 id 写 Redis。 如果消费过了 , 那你就别处理了 , 保证别重复处理相同的消息即可。

十、kafka 如何减少数据丢失?

  1. 增加副本数:Kafka中的每个分区都可以有多个副本,通过增加副本数可以提高容错性。

  2. 设置复制策略:确保min.insync.replicas的设置足够高,这样才能确保数据被正确复制。

  3. acks 参数:在生产者端设置合适的acks参数。如果设置为all,则只有当所有同步副本都收到消息时,才认为消息写入成功。

  4. unclean.leader.election.enable:如果开启了unclean.leader.election.enable,当ISR(In-Sync Replica集合)中的副本全部失效时,未同步的副本可以被选举为新的leader,减少数据丢失。

  5. 配置持久化存储:确保Kafka的数据存储用的硬盘是可靠的,使用SSD或RAID。

  6. 使用事务:如果确保数据要么全部成功,要么全部不成功,可以使用Kafka事务。

  7. 批量发送:通过批量发送减少网络通信次数,提高吞吐量。

  8. 设置合理的超时时间:合理设置生产者和消费者的超时时间,以便在网络延迟下正确等待。

以下是一个简单的Kafka生产者配置示例,增加数据可靠性:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

在这个配置中,acks设置为all,表示只有当所有参与复制的节点都确认收到消息时,才认为消息发送成功。通过这种方式,可以减少数据丢失的风险。其他参数也可以根据具体需求进行调整。

十一、kafka 的高可用机制是什么?

Kafka的高可用机制主要体现在以下几个方面:

  1. Broker与Partition:Kafka集群由多个Broker节点组成,每个Broker负责存储和转发消息。Kafka将每个主题(Topic)划分为多个Partition,每个Partition是一个有序、不可变的消息序列,分布在不同的Broker上,提供水平扩展能力。
  2. 副本与ISR机制:每个Partition都有多个副本,分别存储在不同的Broker上。其中一个副本被选举为Leader Replica,负责处理所有对该Partition的读写请求;其他副本作为Follower Replica,从Leader Replica异步拉取数据进行同步。Kafka维护一个ISR(In-Sync Replica Set)集合,包含与Leader Replica保持同步的Follower Replica。只有处于ISR中的Follower Replica才有资格在Leader Replica故障时被选举为新的Leader。
  3. Leader选举:当Leader Replica出现故障时,Kafka依赖ZooKeeper进行Leader选举。ZooKeeper监控各Broker的状态,一旦检测到Leader Replica失效,会触发新的选举,从ISR中选出新的Leader Replica。
  4. 数据持久化与故障恢复:Kafka将消息数据持久化存储在磁盘上,确保即使Broker重启或故障,数据也能得以恢复。在故障恢复期间,新的Leader Replica被选举出来后,未完成同步的Follower Replica会从新的Leader Replica处拉取缺失的消息,重新加入ISR。
  5. 跨数据中心复制:Kafka支持跨数据中心的数据复制和同步,可以实现数据的异地备份和容灾,提高系统的可用性和灾备能力。
  6. 回溯消费:Kafka支持消费者进行回溯消费,即消费者可以重新消费历史消息,这在一定程度上也能保证数据的完整性和一致性。

总的来说,Kafka的高可用机制通过多副本、ISR、Leader选举、数据持久化、跨数据中心复制以及回溯消费等技术手段,确保了Kafka系统的高可用性和数据的可靠性。

十二、kafka 分布式(不是单机)的情况下,如何保证消息的顺序消费?

Kafka 分布式的单位是 partition ,同一个 partition 用一个 write ahead log 组织,所以可 以保证 FIFO 的顺序。不同 partition 之间不能保证顺序。但是绝大多数用户都可以通过 message key 来定 义 , 因为同一个 key 的 message 可以保证只发送到同一个 partition。

Kafka 中发送 1 条消息的时候 , 可以指定(topic, partition, key) 3 个参数。 partiton 和 key 是 可 选 的 。 如 果 你 指 定 了 partition ,那就是所有消息发往同 1 个partition ,就是有序的 。 并且在消费端,Kafka 保证,1 个 partition 只 能 被1个 consumer 消 费 。或者你指定 key( 比如 order id),具有同1个 key的所有消息 , 会发往同1个partition。

十三、kafka 如何控制消费的位置

kafka 使用 seek(TopicPartition, long)指定新的消费位置。用于查找服务器保留的最早和最新的 offset 的特殊的方法也可用( seekToBeginning(Collection) 和 seekToEnd(Collection)

十四、消费者故障 ,出现活锁问题如何解

出现“ 活锁” 的情况 , 是它持续地发送心跳 , 但是没有处理。为了预防消费者在这种情 况下一 直持有分区,我们使用 max.poll.interval.ms 活跃检测机制。 在此基础上 , 如果 你调用的 poll 的频率大于最大间隔 , 则客户端将主动地离开组 , 以便其他消费者接管该分区。发生这种情况时 , 你会看到 offset 提交失败( 调用 commitSync() 引发的 CommitFailed Exception )。这是一 种安全机制 ,保障只有活动成员能够提交 offset。

消费者提供两个配置设置来控制 poll 循环

max.poll.interval.ms :增大 poll 的间隔,可以为消费者提供更多的时间去处理返回的消 息( 调 用 poll(long)返回的消息 ,通常返回的消息都是一批) 。缺点是此值越大将会延迟组重新平衡。

max.poll.records : 此设置限制每次调用 poll 返回的消息数 , 这样可以更容易地预测每 次 poll 间隔要处理的最大值。通过调整此值,可以减少 poll 间隔,减少重新平衡分组的。

对于消息处理时间不可预测的情况,这些选项是不够的。处理这种情况的推荐方法是将 消息处理移到 另一个线程中,让消费者继续调用 poll 。但是必须注意确保已提交的 offset 不超过实际位置。另外 , 你必须禁用自动提交 , 并只有在线程完成处理后才为记 录手动提交偏移量( 取决于你) 。 还要注意,你需要 pause 暂停分区 ,不会从poll 接 收到新消息 ,让线程处理完之前返回的消息( 如果你的处理能力比拉取消息的慢 ,那创 建新线程将导致你机器内存溢出)

十五、消费者如何不自动提交偏移量 , 由应用提交?

将 auto.commit.offset 设为 false ,然后在处理一批消息后 commitSync() 或者异步提 commitAsync()

ConsumerRecords<> records = consumer.poll();
for (ConsumerRecord <> record : records) {
    try{
        consumer.commitSync()
    } catch(Exception e) {
    }
}

十六、kafka 的 ack 的三种机制

request.required.acks 有三个值 0、1、-1(all)

  • 0:生产者不会等待 broker 的 ack ,这个延迟最低但是存储的保证最弱当 server 挂掉的时 候就会丢数据。
  • 1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader 挂掉后 他不确保 是否复制完成新 leader 也会导致数据丢失。
  • -1(all) : 服务端会等所有的 follower 的副本收到数据后才会收到 leader 发出的 ack , 这样数据不会丢失

十七、Kafka 与传统 MQ 消息系统之间有三个关键区

  1. Kafka 持久化日志 , 这些日志可以被重复读取和无限期保留
  2. Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数 据提升容错 能力和高可用性
  3. Kafka 支持实时的流式处理

十八、Kafka 判断一个节点是否还活着有哪两个条件?

  1. 节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接
  2. 如果节点是个 follower,他必须能及时地同步 leader 的写操作,延时不能太久

十九、数据传输的事务定义有哪三种?

和 MQTT 的事务定义一样都是 3 种。

  • 最多一次: 消息不会被重复发送 , 最多被传输一次 , 但也有可能一次不传输
  • 最少一次: 消息不会被漏发送 , 最少被传输一次 , 但也有可能被重复传输.
  • 精确的一次( Exactly once ): 不会漏传输也不会重复传输 , 每个消息都传 输一次 而且仅仅被传输一次 , 这是大家所期望

二十、Zookeeper 对于 Kafka 的作用是什么

Zookeeper 是一个开放源码的、高性能的协调服务,它用于 Kafka 的分布式应用。

Zookeeper 主要用于在集群中不同节点之间进行通信,在 Kafka 中 ,它被用于提交偏移量 , 因此如果节点在任何情况下都失败了 , 它都可以从之前提交的偏移量中获取。

除此之外,它还执行其他活动,如: leader 检测、分布式同步、配置管理、识别新节点何时离开或 连接、集群、节点实时状态等等。

二十一、讲一下主从同步

漫游Kafka设计篇之主从同步_kafka主从同步原理-CSDN博客

二十二、讲讲 kafka 维护消费状态跟踪的方法

大部分消息系统在 broker 端的维护消息被消费的记录: 一个消息被分发到 consumer 后 broker 就马上进行标记或者等待 customer 的通知后进行标记。这样也可以在消息在消费后立马删除以减少 空间占用。

但是这样会不会有什么问题呢?如果一条消息发送出去之后就立即被标记为消费过的 , 一旦 consumer 处理消息时失败了( 比如程序崩溃) 消息就丢失了。为了解决这个问 题 ,很多消息 系统提供了另外一个功能: 当消息被发送出去之后仅仅被标记为已发送状态 , 当接到 consumer 已 经消费成功的通知后才标记为已被消费的状态。

这虽然解决了消息丢失的问题,但产生了新问题,首先如果 consumer 处理消息成功了但是向 broker 发送响应时失败了,这条消息将被消费两次。第二个问题时 , broker 必须维护每条消息的状态 , 并且每次都要先锁住消息然后更改状态然后释放锁。这样麻烦又来了,且不说要维护大量的状态数据 ,比如如果消息发送出去但没有收到消费成功的通知 , 这条消息将一直处于被锁定的状态 , Kafka 采用了不同的策略。Topic 被分成了若干分区, 每个分区在同一时间只被一个 consumer 消费。这意 味着每个分区被消费的消息在日志中 的位置仅仅是一个简单的整数:offset。这样就很容易标记每个分区消费状态就很容易了 ,仅仅需要一个整数而已。这样消费状态的跟踪就很简单了。

这带来了另外一个好处: consumer 可以把 offset 调成一个较老的值 ,去重新消费者的消息。这 对传统的消息系统来说看起来有些不可思议,但确实是非常有用的,谁规定了一条消息只能被消费 一次呢?

二十三、consumer 是推还是拉

Kafka 最初考虑的问题是 , customer 应该从 brokes 拉取消息还是 brokers 将消息推送 到 consumer ,也就是 pull 还 push。在这方面,Kafka 遵循了一种大部分消息系统共同的传统的设计: producer 将消息推送到 broker , consumer 从 broker 拉取消息

一些消息系统比如 Scribe 和 Apache Flume 采用了 push 模式 , 将消息推送到下游的 consumer。这样做有好处也有坏处: 由 broker 决定消息推送的速率 , 对于不同消费速率的 consumer 就不太好处理了。消息系统都致力于让 consumer 以最大的速率最快速 地消费消息 ,但 不幸的是 ,push 模式下 ,当 broker 推送的速率远大于 consumer 消费的速率时 ,consumer 恐怕就要崩溃了。最终 Kafka 还是选取了传统的 pull 模式。

Pull 模式的另外一个好处是 consumer 可以自主决定是否批量地从 broker 拉取数据。

Push 模式必须在不知道下游 consumer 消费能力和消费策略的情况下决定是立即推送每条消息还 是缓存之后批量推送。如果为了避免 consumer 崩溃而采用较低的推送速率,将可能导致一次只推送 较少的消息而造成浪费。 Pull 模式下 , consumer 就可以根据自己的消费能力去决定这些策略。

Pull 有个缺点是,如果 broker 没有可供消费的消息 ,将导致 consumer 不断在循环中轮询,直到新消息到达。 为了避免这点,Kafka 有个参数可以让 consumer 阻塞知道新消息到达(当然也可以阻塞小道消息的数量达到某个特定的量这样就可以批量发送)。

二十四、生产者和消费者的命令行是什么?

生产者在主题上发布消息:

bin/kafka-console-producer.sh --broker-list 192.168.43.49:9092 --topic Hello- Kafka 

注意这里的 IP 是 server. properties 中的 listeners 的配置 。 接下来每个新行就是输入一条新消 息。

消费者接收消息:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic Hello- Kafka --from-beginning

如何获取 topic 主题的列表:

bin/kafka -topics. sh --list - -zookeeper localhost:2181

二十五、使用 rabbitmq 的场景

1、服务间异步通信 2 、 顺序消费 3 、 定时任务 4 、 请求削峰

二十六、如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费 了消息?

发送方确认模式

将信道设置成 confirm 模式( 发送方确认模式) ,则所有在信道上发布的消息都会被指派一个唯一 的 ID。 咕泡科技出品,严禁商用 一旦消息被投递到目的队列后 ,或者消息被写入磁盘后( 可持久化的消息) ,信道会 发送一个确 认给生产者( 包含消息唯一 ID )。 如果 RabbitMQ 发生内部错误从而导致消息丢失 , 会发送一条 nack( not acknowledged , 未 确认)消息。 发送方确认模式是异步的 ,生产者应用程序在等待确认的同时 ,可以继续发送消 息 。 当确认消息 到达生产者应用程序, 生产者应用程序的回调方法就会被触发来处理确认消息。

接收方确认机制

接收方消息确认机制 消费者接受每一条消息后都必须进行确认( 消息接收和消息确认是两个不同操作)。 只有消费者确 认了消息 , RabbitMQ 才能安全地把消息从队列中删除。 这里并没有用到超时机制 , RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重 新发送消 息。也就是说 ,只要连接不中断 , RabbitMQ 给了 Consumer 足够长的时间来处理消息。保证数 据的最终一致性;

二十七、rabbitmq 如何避免消息重复投递或重复消费?

在消息生产时 , MQ 内部针对每条生产者发送的消息生成一个 inner-msg-id , 作为去重的依据 ( 消息投递失败并重传) , 避免重复的消息进入队列;

在消息消费时,要求消息体中必须有一个 bizId( 对于同一业务全局唯一,如支付 ID、 订单 ID、帖 子 ID 等)作为去重的依据 , 避免同一条消息被重复消费

二十八、rabbitmq 消息基于什么传输?

由于 TCP 连接的创建和销毁开销较大 , 且并发数受系统资源限制 , 会造成性能瓶颈。 RabbitMQ 使用信道的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信 道数量没有限制。

二十九、rabbitmq 消息如何分发?

若该队列至少有一 个消费者订阅,消息将以循环(round- robin )的方式发送给消费者 。 每条消息只会分发给一个订阅的消费者( 前提是消费者能够正常处理消息并进行确认)。 通过路由可实现多消费的功能。

三十、rabbitmq 消息怎么路由

消息提供方- 〉路由- 〉一至多个队列

消息发布到交换器时 , 消息将拥有一个路由键( routing key ) , 在消息创建时设定。 通过队列路由键 , 可以把队列绑定到交换器上。 消息到达交换器后 , RabbitMQ 会将消息的路由键与队列的路由键进行匹配( 针对不同 的交换器 有不同的路由规则) ;

常用的交换器主要分为以下三种:

  • fanout :如果交换器收到消息 ,将会广播到所有绑定的队列上
  • direct : 如果路由键完全匹配 ,消息就被投递到相应的队列
  • topic: 可以使来自不同源头的消息能够到达同一个队列 。 使用 topic 交换器时,可以使用通配符

三十一、rabbitmq 如何确保消息不丢失?

消息持久化 , 当然前提是队列必须持久化 RabbitMQ 确保持久性消息能从服务器重启中恢复的方式是 ,将它们写入磁盘上的一个 持久化日志 文件 , 当发布一条持久性消息到持久交换器上时 , Rabbit 会在消息提交到日志文件后才发送响应。

一旦消费者从持久队列中消费了一条持久化消息 ,RabbitMQ 会在持久化日志中把这条消 息标记为 等待垃圾收集。如果持久化消息在被消费之前 RabbitMQ 重启 , 那么 Rabbit 会 自动重建交换器 和队列( 以及绑定) ,并重新发布持久化日志文件中的消息到合适的队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/750872.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Jetpack - Navigation: 一个全面的安卓开发指南

引言 导航是任何安卓应用程序中至关重要的部分。无缝地在不同的屏幕之间移动并传递数据&#xff0c;对于流畅的用户体验来说至关重要。在这篇博客中&#xff0c;我们将深入探讨Jetpack的Navigation组件&#xff0c;这个强大的框架旨在简化安卓应用中的导航。我们将涵盖从设置和…

应急响应靶机-Linux(1)

前言 本次应急响应靶机采用的是知攻善防实验室的Linux-1应急响应靶机 靶机下载地址为&#xff1a; https://pan.quark.cn/s/4b6dffd0c51a 相关账户密码&#xff1a; defend/defend root/defend 解题 第一题-攻击者的IP地址 先找到的三个flag&#xff0c;最后才找的ip地址 所…

openinstall拥抱鸿蒙生态,SDK全面适配HarmonyOS NEXT

作为国内领先的App渠道统计与深度链接服务商&#xff0c;openinstall持续推动鸿蒙生态建设&#xff0c;近日正式发布openinstall HarmonyOS SDK&#xff0c;并成功入驻鸿蒙生态伙伴SDK专区&#xff0c;成为华为鸿蒙生态的合作伙伴&#xff0c;为鸿蒙应用开发者带来安全合规、高…

C语言的内存知识

这节我们主要认识一下内存&#xff0c;便于理解指针操作和后续内存管理。 一、内存分区模型 C程序在执行时&#xff0c;将内存大方向划分为4个区域 &#xff08;可以结合函数小节的函数栈帧部分看一下&#xff09; ⚪ 代码区:存放函数体的二进制代码&#xff0c;由操作系统进…

Java | Leetcode Java题解之第174题地下城游戏

题目&#xff1a; 题解&#xff1a; class Solution {public int calculateMinimumHP(int[][] dungeon) {int n dungeon.length, m dungeon[0].length;int[][] dp new int[n 1][m 1];for (int i 0; i < n; i) {Arrays.fill(dp[i], Integer.MAX_VALUE);}dp[n][m - 1] …

HarmonyOS ArkUi Tabs+TabContent+List实现tab吸顶功能

Demo效果 Entry Component struct StickyNestedScroll {State message: string Hello WorldState arr: number[] []scroller new Scroller()StyleslistCard() {.backgroundColor(Color.White).height(72).width("100%").borderRadius(12)}build() {Scroll(this.sc…

火山引擎ByteHouse:新一代云数仓必不可少的五大核心能力

从数据库领域的发展历程来看&#xff0c;分析型数据库已有 40 多年的发展历史&#xff0c;与数据库基本同时代。从OLTP 和 OLAP 的分支来看&#xff0c;分析型数据库支持了海量数据规模下的聚合性分析。尤其是随着移动互联网甚至 AI 等领域的发展&#xff0c;用户画像行为分析的…

AFLNet入门教学——测试RTSP协议实现Live555(Ubuntu)

1、简介 本文旨在使用AFLNet对RTSP协议实现Live555进行模糊测试。实验环境为&#xff1a;Ubuntu22.04.4AFLNet安装参考&#xff1a;AFLNet入门教学——安装&#xff08;Ubuntu22.04.4&#xff09;-CSDN博客 2、安装Live555 本次实验采取的是live555在2018年8月28日上传的版本…

【应届应知应会】Linux常用指令

SueWakeup 个人主页&#xff1a;SueWakeup 系列专栏&#xff1a;学习技术栈 个性签名&#xff1a;保留赤子之心也许是种幸运吧 本文封面由 凯楠&#x1f4f8;友情提供 目录 文件与目录管理 目录操作命令&#xff1a; ls [选项] [目录或文件] mkdir 文件操作命令&#xf…

Django 如何使用视图动态输出 CSV 以及 PDF

Django 如何使用视图动态输出 CSV 以及 PDF 这一篇我们需要用到 python 的 csv 和 reportLab 库&#xff0c;通过django视图来定义输出我们需要的 csv 或者 pdf 文件。 csv文件 打开我们的视图文件 testsite/members/views.py 。新增一个视图方法&#xff1a; import csv …

链在一起怎么联机 链在一起远程同玩联机教程

steam中最近特别热门的多人跑酷冒险的游戏&#xff1a;《链在一起》&#xff0c;英文名称叫做Chained Together&#xff0c;在游戏中我们需要开始自己的旅程&#xff0c;在地狱的深处&#xff0c;与我们的同伴被链在一起。我们的任务是通过尽可能高的攀登逃离地狱。每一次跳跃都…

【Python机器学习】自动化特征选择——迭代特征选择

在单变量测试中&#xff0c;没有使用模型&#xff1b;在基于模型的选择中&#xff0c;使用单个模型来选择特征。而在迭代特征选择中&#xff0c;将会构造一系列模型&#xff0c;每个模型都使用不同数量的特征。有两种基本方法&#xff1a; 1、开始时没有特征&#xff0c;然后逐…

前端主流框架-JQuery

Javascript DOM 1 DOM模型Document对象 1.1 DOM模型 DOM【Document Object Model】 &#xff1a;文档对象模型。直白的讲就是通过程序解析结构化文档&#xff08;xml&#xff0c;html&#xff09;的时候&#xff0c;在内存中生成的包含当前结构化文档中所有内容的一个对象模型…

openlayers 轨迹回放(历史轨迹)(postrender事件和render方法)

openlayers 轨迹回放&#xff08;历史轨迹&#xff09;&#xff08;postrender事件和render方法&#xff09; 本篇介绍一下使用openlayers轨迹回放&#xff08;历史轨迹&#xff09;&#xff08;postrender事件和render方法&#xff09; 1 需求 轨迹回放&#xff08;历史轨迹…

网络问题排障专题-AF网络问题排障

目录 一、数据交换基本原理 1、ARP协议工作原理 数据包如图&#xff1a; 2、二层交换工作原理 简述核心概念&#xff1a; 二层交换原理-VLAN标签 3、三层交换工作原理 二、AF各种部署模式数据转发流程 1、路由模式数据转发流程 三、分层/分组逐一案例讲解 1、问题现…

《非暴力沟通》

The English name of the book: Nonviolent Communication 我对《非暴力沟通》的理解总归于一句话&#xff1a;我们所认识的世界&#xff0c;来源于我们的认知里的世界。我们总喜欢用“说教”的方式&#xff0c;评论他人的行为。这本书讲述如何摘掉偏见。 文章&#xff1a;

海外仓货物何如高效入库:入库区域规划策略,附规划图

作为海外仓布局的一部分&#xff0c;入库区可以说是所有业务流程的开端&#xff0c;也是最重要的区域之一。如果海外仓的入库区布局不合理&#xff0c;会直接导致后续所有的作业流程都出现拥堵、低效。 今天我们就会给大家分享海外仓入库区的规划指南&#xff0c;通过科学的规…

压缩pdf文件大小的方法,如何压缩pdf格式的大小

pdf太大怎么压缩&#xff1f;当你需要通过电子邮件发送一个PDF文件&#xff0c;却发现文件太大无法成功发出时&#xff0c;这些情况下&#xff0c;我们都需要找到一种方法来压缩PDF文件&#xff0c;以便更便捷地进行分享和传输。PDF文件的大小通常与其中包含的图片、图形和文本…

leetCode.91. 解码方法

leetCode.91. 解码方法 题目思路 题解 class Solution { public:int numDecodings(string s) {int n s.size();// dp 中f[0]一般不做使用&#xff0c;只是存一个初值1&#xff0c;表示默认由一种方案s s;vector<int> f( n 1 );f[0] 1;for ( int i 1; i < n;…

SRC公益上分的小技巧二

前言 漏洞挖掘有时候换几个思路&#xff0c;事半功倍 下面讲解一些很简单&#xff0c;但是实用的思路 案例一、若依系统配置不当 讲解了这么多系统&#xff0c;兜兜转转又回到了若依 其实最早的若依系统&#xff0c;在js中已经将账号密码自动填充&#xff0c;我们一访问就…
最新文章