消息队列相关知识整理汇总

11 minute

使用消息队列的目的

  • 解耦:降低模块之间耦合度,提高业务灵活度。
  • 异步:异步地执行一系列互不影响的操作,降低执行所需时间。
  • 削峰:防止高并发场景下大量数据操作直接访问数据库,导致系统不可用。

消息队列的高可用性

保证高可用需要分消息队列选型来确定,相关重要文章如下:

RabbitMQ - 镜像集群模式

跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论是元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。

Kafka - 主从集群模式

Kafka 由多个 broker 组成,每个 broker 是一个节点。创建一个 topic,这个 topic 根据 key(也可以是随机) 可划分为多个 partition,每个 broker 都存有所有的 partition。对于每个 key 对应的多个 partition,其中一个 broker 下的 partition 会被选为 leader,其他 partition 作为 follower。

任何一个 partition 都可用于读,写则只作用于 leader。在正常情况下,任何一个 broker 都可以得到利用,因为每个主题的某个 key 对应的 leader partition 可能存在于任一 broker 中。而由于每个 broker 都存了所有 partition 数据,当出现故障则进行选举,调整 leader-follower 关系即可。

arch-kafka

RocketMQ - 主从集群模式

Master(主节点)可以进行读和写操作,Slave(从节点)只可以读,不进行写操作。RocketMQ 提供两种数据同步模式:

  • 异步模式:每个 Master 配置一个 Slave,有多组 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟(毫秒级)。
  • 同步模式:每个 Master 配置一个 Slave,有多组 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功。

arch-rocketmq

消费如何做到幂等性

从源头角度解决,就要确保不重复消费某条消息,从结果角度解决,就要确保重复消费后对实际业务没有任何影响。

针对这两个角度,有两种解决方案:

  1. 状态判断法:消费者消费数据后把消费数据记录在 redis 中,比如设定一个可唯一确定该条消息的全局 ID,下次消费时先到 redis 中查看是否存在该消息,存在则表示消息已经消费过,直接丢弃消息。
  2. 业务判断法:通常数据消费后都需要插入到数据库中,使用数据库的唯一性约束防止重复消费。每次消费直接尝试插入数据,如果提示唯一性字段重复,则直接丢失消息。

消息的可靠性传输

开启确认模式

消息传输过程中,在生产者、消息队列存储端、消费者端三个点都可能发生丢失。且对于不同的消息队列,需要进行不同的考虑。具体可以参考文章:

总结地讲,我们可以从消息可能丢失的三个地方入手,一般而言对于生产者和消费者端,都要开启确认模式,以 ACK 的方式来确认消息是否被成功生产或消费,注意是要进行手动的 ACK 而不是自动 ACK。对于消息队列存储端,需要关注磁盘,要开启消息的持久化,同时可以采用同步刷盘的方式确保消息被顺利存入。

消息消费的顺序性

参考文章:如何保证消息的顺序性?

总结如下,对于 RabbitMQ 和 RocketMQ,将同一事务的消息顺序存入同一队列,且该队列绑定给一个消费者进行消费即可。对于 Kafka,由于一个 topic 下同一个 partition 中的消息是有序的,将同一事务的消息顺序存入一个 partition,由一个消费者进行消费即可。

最后对于消费者端可能存在的多线程消费方式,只需要一个事务中的消息被一个线程处理即可,可以在线程处理前增加个内存队列,每个线程只负责处理其中一个内存队列的消息,同一个事务中的消息发送到同一个内存队列中即可。

消息积压问题的处理

  1. 水平扩展消费者:消费者数量增多,则可以并行提升消息消费的速度,从而避免消息积压的问题。
  2. 优化消费者处理速度:提升消费者的消费速度也可以避免消息积压的问题,它的解决方案有:
    • 优化消费者处理消息的逻辑,减少不必要的计算和 I/O 操作。
    • 对于可以并行处理的任务,使用多线程或异步处理来提高吞吐量。
  3. 限流生产者和使用背压机制:
    • 在生产者端实施限流策略,确保消息产生的速度不会超过系统的处理能力。
    • 使用背压机制,即当消息队列达到某个阈值时,通知生产者降低发送速率或暂停发送。
  4. 使用死信队列:在消费者处理消息出现失败或超时的情况下,加入消息重试机制或将异常消息放入死信队列,避免异常消息一直占用队列资源。
  5. 监控和告警:设置合理的告警阈值,当消息积压达到一定程度时及时发出告警,以便快速响应和处理。

RocketMQ 消费状态

PushConsumer 有如下五种消费状态:

  • Ready:已就绪状态。消息在 Apache RocketMQ 服务端已就绪,可以被消费者消费。
  • Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。
  • WaitingRetry:待重试状态,PushConsumer 独有的状态。当消费者消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
  • Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。
  • DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。可以通过消费死信队列的消息进行业务恢复。

和 PushConsumer 消费重试策略不同的是,SimpleConsumer 消费者的重试间隔是预分配的,每次获取消息时,消费者会在调用 API 时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败则触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。

Reference