# RabbitMQ浅析

RabbitMQ 是一款开源的,使用 Erlang 语言编写的,基于 AMQP 协议的消息中间件

图片

# 1. 基本概念

  • AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全
  • AMQP 协议对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次

# 1.1. 常用组件

  • Connection: 连接,应用程序与Server的网络连接,TCP连接
  • Channel: 信道,消息读写等操作在信道中进行
  • Message: 消息,应用程序和服务器之间传送的数据
  • Virtual Host: 虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个 Exchange 和 Queue
  • Exchange: 交换器,接收消息,按照路由规则将消息路由到一个或者多个队列
  • Binding: 绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个 RoutingKey
  • RoutingKey: 路由键,生产者将消息发送给交换器时会发一个 RoutingKey,用来指定路由规则
  • Queue: 消息队列,用来保存消息,供消费者消费
  • Broker: 标识消息队列服务器实体

# 1.2. 交换器类型

  • Direct Exchange: 完全匹配,消息中的路由键(routing key)和 Binding 中的 binding key 一致
  • Topic Exchange: 模糊匹配,两个通配符:"#"和"*",#匹配0个或多个单词,*只匹配一个单词
  • Fanout Exchange: 广播模式,不处理路由键,把所有发送到交换器的消息路由到所有绑定的队列中
  • Headers Exchange: 忽略路由规则,根据消息中的headers属性来匹配,性能较低(不常用)

# 2. 优点

优点

# 2.1. 解耦(最终一致性)

用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口,假如库存系统无法访问,则订单减库存将失败,从而导致订单失败

引入 MQ 用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功,库存系统订阅下单消息,进行增减库存操作,即使库存系统崩溃了,也不影响正常下单,实现解耦

# 2.2. 异步(提升效率)

比如用户注册,需要发注册通知。传统的做法有两种 1.串行的方式;2.并行方式。注册用户入库耗时 200ms,发送通知耗时 200ms,串行调用的话是耗费时间总和,改用 MQ 异步发送通知,大大缩短了整体的响应时间,提升效率

# 2.3. 削峰(量力而行)

比如秒杀场景,高峰期时每秒有 5000 个请求打到 MySQL 上,而 MySQL 最多每秒处理 2000 个请求,过多的请求会导致 MySQL 瘫痪

引入 MQ 将高峰期的请求直接写入 MQ,MySQL 根据自己的处理能力去 MQ 拉去请求进行处理,缓解 MySQL 压力,虽然期间会有短暂的消息积压,不过高峰期过后消息就会被快速消费掉

限流

RabbitMQ 提供了一种 QOS(服务质量保证)功能。即在非自动确认消息的前提下,如果一定数目的消息未被确认前,不进行消费新的消息

# 3. 缺点

缺点

# 3.1. 系统的可用性降低

引入外部依赖越多,系统越容易挂掉,一旦 MQ 挂了,也会导致整个系统不可用

# 3.2. 系统的复杂性提高

  • 幂等性: 如何保证消息没有重复消费
  • 顺序性: 如何保证消息传递的顺序
  • 可靠性: 如何保证消息不丢失

# 3.3. 数据一致性的问题

A 系统发送完消息直接返回成功,但是 BCD 系统之中若有系统写库失败,则会产生数据不一致的问题

# 4. 重复消费

幂等性

使用全局唯一 ID + 指纹码(全局唯一 ID:雪花算法生成的业务表的主键。指纹码:时间戳、UUID、订单号),并发量不高的情况下可以在数据库维护一张消费记录表维护处理状态,查询是否处理消费消息,并发量很高使用分布式锁解决并发问题,将全局 ID 写入 Redis 的 Key,Redis的 Value 值保存处理状态,判断是否处理消费消息

# 5. 顺序消费

顺序性

在 MQ 里面创建多个 Queue,使用 Hash 算法将需要排序的数据有顺序的放入同一个 Queue,每个 Queue 对应一个 Consumer,或者就一个 Queue,对应一个 Consumer,这个 Consumer 内部用内存队列排队,然后分发给不同的 Worker 来处理

# 6. 消息丢失

可靠性

# 6.1. 生产者丢消息

  • 生产者发送消息时网络波动导致
  • 代码/配置导致

事例1

一般情况下,生产者使用 Confirm 模式投递消息,如果方案不够严谨,比如 RabbitMQ Server 接收消息失败后会发送 nack 消息通知生产者,生产者监听消息失败或者没做任何事情,消息存在丢失风险

事例2

生产者发送消息到 Exchange 后,发送的路由和 Queue 没有绑定,消息会存在丢失情况,下面会讲到具体的例子,保证意外情况的发生,即使发生,也在可控范围内

事务机制(基于AMQP协议)

发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit()),吞吐量下降,同步处理,不推荐

Confirm 模式

在生产者那里设置开启 Confirm 模式后,每次写的消息都会分配一个唯一的 ID,如果成功写入了 RabbitMQ 中,RabbitMQ 会回调生产者 Ack 接口,说明这个消息成功写入了。如果 RabbitMQ 没能处理这个消息,会回调一个 nack 接口,说明这个消息接收失败,需要重发。而且还可以在内存里维护每个消息 ID 的状态,如果超过一定时间还没有接受到这个消息的回调,也可以重发

# 6.2. MQ丢消息

消息未完全持久化

  • 创建 Queue 时设置持久化
  • 发送消息时设置持久化

开启 RabbitMQ 持久化

设置持久化有两个步骤:

  • 创建 Queue 的时候将其设置为持久化,队列的持久化标识 durable 设置为 true,这样就可以保证持久化 Queue 的元数据,但是不会持久化 Queue 里的消息
  • 发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化。此时 RabbitMQ 就会将消息持久化到磁盘了
  • 注意:必须同时设置这两个才可以

这个持久化可以跟生产者那边的 Confirm 机制配合起来,只有消息被持久化到磁盘之后,才会回调生产者的 ack 接口,所以即使是持久化到磁盘之前 MQ 挂了,生产者收不到回调,也会重发

镜像集群模式保证消息高可用

# 6.3. 消费者丢消息

消费端接收到相关消息之后,消费端还没来得及处理消息,消费端机器就宕机了

  • 关闭消费者的自动 ack 机制,采用手动 ack 形式
  • 消费者处理完消息后手动 ack 通知 MQ 删除消息

# 7. 消息集群

可以使用镜像集群模式,RabbitMQ 有两种集群模式:普通集群模式和镜像集群模式

普通集群模式并不能保证消息队列的可用性。它的实现方式就是在多台机器上启动多个 RabbitMQ 实例,但是创建的队列只会保存在其中一个实例中,其他的实例只是同步队列的配置信息,通过配置信息来找到队列所在的实例。因此,普通集群模式只能提高消息队列的吞吐量,如果保存队列的实例宕机,那么整个集群还是会失效

镜像集群模式才是 RabbitMQ 的高可用模式,跟普通集群模式的区别在于创建的队列会同步保存在所有实例中,这样即使其中一个实例宕机,还有其他的实例可以正常工作,这样就保证了消息队列的可用性

参考

上次更新时间: 2023-12-15 03:14:55