消息队列重要知识整理

11 minute

使用消息队列的目的

  • 解耦:降低模块之间耦合度,提高业务灵活度。
  • 异步:异步地执行一系列互不影响的操作,降低执行所需时间。
  • 削峰:防止高并发场景下大量数据操作直接访问数据库,导致系统不可用。

消息队列的高可用性

保证高可用需要分消息队列选型来确定,相关重要文章如下:

RabbitMQ - 镜像集群模式

跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论是元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。

Kafka - 主从集群模式

Kafka 由多个 broker 组成,每个 broker 是一个节点;你创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据。

Kafka 提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。

也就是说 Kafka 只能读写 leader。要是你可以随意读写每个 follower,那么就要 care 数据一致性的问题,系统复杂度太高,很容易出问题。Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。

RocketMQ - 主从集群模式

Master(主节点)可以进行读和写操作,Slave(从节点)只可以读,不进行写操作。也就是说,不同于 Kafka 只能读写主节点,RocketMQ 中,Producer 可以和主节点的 Broker 连接写入消息,而 Consumer 可以连接主或从节点的的 Broker 来读取消息。

RocketMQ 提供两种数据同步模式:

  • 异步模式:每个 Master 配置一个 Slave,有多组 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟(毫秒级)。
  • 同步模式:每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功。

消费如何做到幂等性

从源头角度解决,就要确保不重复消费某条消息,从结果角度解决,就要确保重复消费后对实际业务没有任何影响。

针对这两个角度,有两种解决方案:

  1. 状态判断法:消费者消费数据后把消费数据记录在 redis 中,比如设定一个可唯一确定该条消息的全局 ID,下次消费时先到 redis 中查看是否存在该消息,存在则表示消息已经消费过,直接丢弃消息。
  2. 业务判断法:通常数据消费后都需要插入到数据库中,使用数据库的唯一性约束防止重复消费。每次消费直接尝试插入数据,如果提示唯一性字段重复,则直接丢失消息。

消息的可靠性传输

消息传输过程中,在生产者、消息队列存储端、消费者端三个点都可能发生丢失。且对于不同的消息队列,需要进行不同的考虑。具体可以参考文章:

总结地讲,我们可以从消息可能丢失的三个地方入手,一般而言对于生产者和消费者端,都要开启确认模式,以 ACK 的方式来确认消息是否被成功生产或消费,注意是要进行手动的 ACK 而不是自动 ACK。对于消息队列存储端,需要关注磁盘,要开启消息的持久化,同时可以采用同步刷盘的方式确保消息被顺利存入。

消息消费的顺序性

参考文章:如何保证消息的顺序性?

总结如下,对于 RabbitMQ 和 RocketMQ,将同一事务的消息顺序存入同一队列,且该队列绑定给一个消费者进行消费即可。对于 Kafka,由于一个 topic 下同一个 partition 中的消息是有序的,将同一事务的消息顺序存入一个 partition,由一个消费者进行消费即可。

最后对于消费者端可能存在的多线程消费方式,只需要一个事务中的消息被一个线程处理即可,可以在线程处理前增加个内存队列,每个线程只负责处理其中一个内存队列的消息,同一个事务中的消息发送到同一个内存队列中即可。

消息积压问题的处理

  1. 水平扩展消费者:消费者数量增多,则可以并行提升消息消费的速度,从而避免消息积压的问题。
  2. 优化消费者处理速度:提升消费者的消费速度也可以避免消息积压的问题,它的解决方案有:
    • 优化消费者处理消息的逻辑,减少不必要的计算和 I/O 操作。
    • 对于可以并行处理的任务,使用多线程或异步处理来提高吞吐量。
  3. 限流生产者和使用背压机制:
    • 在生产者端实施限流策略,确保消息产生的速度不会超过系统的处理能力。
    • 使用背压机制,即当消息队列达到某个阈值时,通知生产者降低发送速率或暂停发送。
  4. 使用死信队列:在消费者处理消息出现失败或超时的情况下,加入消息重试机制或将异常消息放入死信队列,避免异常消息一直占用队列资源。
  5. 监控和告警:设置合理的告警阈值,当消息积压达到一定程度时及时发出告警,以便快速响应和处理。

RocketMQ 消费状态

PushConsumer 有如下五种消费状态:

  • Ready:已就绪状态。消息在 Apache RocketMQ 服务端已就绪,可以被消费者消费。
  • Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。
  • WaitingRetry:待重试状态,PushConsumer 独有的状态。当消费者消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
  • Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。
  • DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。可以通过消费死信队列的消息进行业务恢复。

和 PushConsumer 消费重试策略不同的是,SimpleConsumer 消费者的重试间隔是预分配的,每次获取消息时,消费者会在调用 API 时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败则触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。

TO BE CONTINUED