Skip to content

RabbitMQ如何保证消息不丢失

生产阶段:确保消息成功发送到 RabbitMQ

在生产阶段,核心是确保生产者发送的消息能够被 RabbitMQ 服务端成功接收并处理。

方案一:AMQP 事务机制 (Transactions)

生产者可以将信道(Channel)设置为事务模式。在此模式下,生产者可以发布一条或多条消息,然后提交事务。只有当事务成功提交,消息才会被确认。如果在提交过程中发生任何错误,生产者可以回滚事务,就像所有消息都没有被发送过一样。

  • 优点:提供了“要么全部成功,要么全部失败”的原子性保证。
  • 缺点:事务机制是同步的,非常耗费性能。它会阻塞生产者,直到 RabbitMQ 返回事务提交的结果,这使得吞吐量大幅下降,有时甚至会降低 250 倍。

方案二:发送方确认机制 (Publisher Confirms)

由于事务机制性能开销巨大,RabbitMQ 引入了更轻量级的发送方确认机制。 这是一种异步的确认方式,当生产者将信道设置为 confirm 模式后,所有在该信道上发布的消息都会被分配一个唯一的 ID。一旦消息被 RabbitMQ 正确接收,它就会发送一个确认(ack)给生产者。

  • 优点:异步机制,性能开销远小于事务。确认信息可以批量发送,也可以逐条发送,非常灵活。 这是业界推荐的、用于保证生产者消息可靠性的首选方案。
  • 如何工作
    • 持久化消息:当消息被写入所有目标队列的磁盘后,RabbitMQ 会发送 ack
    • 非持久化消息:当消息到达所有目标队列后,ack 就会被发送。
    • 如果 RabbitMQ 发生内部错误导致无法处理消息,则会发送一个 nack (Negative Acknowledgement)。生产者收到 nack 或在超时后仍未收到 ack,就可以进行消息的重发。

存储阶段:确保消息在 RabbitMQ 服务端不丢失

即使消息成功到达了 RabbitMQ,如果 RabbitMQ 服务节点宕机或重启,内存中的消息也会丢失。持久化机制是解决此问题的关键。

持久化三要素

要使消息在 RabbitMQ 重启后依然存在,必须同时满足以下三个条件: 1. 交换机(Exchange)持久化:在声明交换机时,将其 durable 属性设置为 true。 2. 队列(Queue)持久化:在声明队列时,也需要将其 durable 属性设置为 true。 3. 消息(Message)持久化:在发送消息时,将其投递模式(delivery_mode)设置为 2 (persistent)。

注意:这三个条件缺一不可。如果消息是持久化的,但它被路由到的队列不是持久化的,那么在 Broker 重启后,队列会消失,消息也会随之丢失。

消费阶段:确保消费者成功处理消息

消息被成功推送到消费者后,如果消费者在处理过程中发生异常(例如服务器宕机或程序崩溃),这条消息就有可能丢失。

消费者确认机制 (Consumer Acknowledgements)

为了解决这个问题,RabbitMQ 允许消费者在处理完消息后,向服务端发送一个确认回执(ack)。

  • 自动确认 (Automatic Acknowledgement):在这种模式下,RabbitMQ 在发送消息后会立即将其标记为已删除。 如果此时消费者处理失败或在处理过程中宕机,消息就会丢失。因此,这种模式的安全性较低。
  • 手动确认 (Manual Acknowledgement):这是保证消费可靠性的推荐方式。RabbitMQ 将消息发送给消费者后,会等待消费者的ack。如果消费者成功处理了消息,就发送 ack,RabbitMQ 才会将消息删除。如果消费者的连接断开(或在超时时间内未ack),RabbitMQ 会认为消息没有被成功处理,并会将其重新投递给其他消费者(或同一个消费者,如果它恢复了)。

消费者流量控制 (Prefetch Count)

在使用手动确认时,为了防止 RabbitMQ 向一个消费者推送过多其来不及处理的消息,导致该消费者内存耗尽,可以使用 basic.qos 方法设置预取值(Prefetch Count)。 这个值限制了一个消费者可以同时持有的未确认消息的数量。 当未确认消息达到这个数量时,RabbitMQ 将停止向该消费者投递新消息,直到有消息被确认为止。

高可用性:确保 RabbitMQ 服务本身不宕机

以上机制解决了单个环节的可靠性问题,但如果 RabbitMQ 的单个节点发生硬件故障或网络分区,整个服务就会变得不可用。

集群 (Clustering)

可以将多个 RabbitMQ 节点组成一个集群。集群中的所有节点共享交换机、队列、用户等元数据。 但需要注意的是,默认情况下,队列及其中的消息内容只存储在创建该队列的那个节点上。

镜像队列 (Mirrored Queues) / Quorum 队列

为了解决集群中队列内容单点存储的问题,可以配置高可用策略。 * 镜像队列(已弃用): 这是传统的 HA 方案,一个队列会有一个主节点(Leader)和多个从节点(Mirrors)。所有操作首先在主节点上执行,然后广播到从节点。如果主节点宕机,一个从节点会被提升为新的主节点。 * Quorum 队列: 这是 RabbitMQ 3.8 版本以后推荐的、用于替代镜像队列的新一代高可用和数据安全方案。 它基于 Raft 一致性协议,提供了比传统镜像队列更强的数据一致性保证。

通过集群和队列复制,即使某个 RabbitMQ 节点完全失效,其他节点上仍然有消息的完整副本,服务可以继续进行,从而避免了单点故障。

环节 可能的问题 解决方案
生产者 -> Broker 网络抖动或 Broker 故障导致消息发送失败。 使用 Publisher Confirms (发送方确认) 机制,并配合重试逻辑。
Broker 内部存储 Broker 节点宕机或重启导致内存中的消息丢失。 1. 将 Exchange 设置为持久化。
2. 将 Queue 设置为持久化。
3. 将 Message 的投递模式设为持久化。
Broker -> 消费者 消息已投递,但消费者在处理完成前就宕机。 使用 手动消息确认 (Manual ACK) 机制,并控制 Prefetch 数量。
Broker 自身高可用 单个 Broker 节点硬件故障或网络不可达。 部署 RabbitMQ 集群,并对关键队列使用 Quorum 队列镜像队列 进行数据复制。