消息队列

消息队列

为什么需要消息队列?

异步处理

  • 例子: 秒杀系统
1
如何利用有限的服务器资源,尽可能多地处理短时间内的海量请求   
  • 处理方案: 多步骤异步处理
1
2
3
4
一个秒杀请求的步骤: 风险控制;库存锁定;生成订单;短信通知;更新统计数据 (5步) 

决定秒杀成功的步骤:实际上只有风险控制和库存锁定这 2 个步骤
所以服务端可以在处理完前两步后就返回给用户结果。后面三步异步去处理

秒杀请求异步处理

  • 好处
    • 更快返回结果
    • 减少等待,自然实现了步骤之间的并发,提升系统总体的性能

流量控制

  • 例子: 秒杀系统
1
短时间海量的请求不会压垮秒杀服务
  • 处理方案: 消息队列堆积; 消息队列令牌桶
1
2
3
4
5
6
7
8
9
10
消息队列堆积: 
逻辑: 消息队列直接隔离 网关和秒杀服务, 网关将请求放入消息队列,秒杀服务按照自己的能力处理请求并返回结果,超时按秒杀失败算。
优点: 根据下游的处理能力自动调节流量,达到“削峰填谷”的作用
问题: 增加了系统调用链环节,导致总体的响应时延变长。上下游系统都要将同步调用改为异步消息,增加了系统的复杂度。

消息队列令牌桶: 预估出秒杀服务的处理能力, 以固定大小的消息队列实现一个令牌桶。
令牌桶逻辑: 单位时间放置固定数量的令牌,服务在请求之前必须先获取令牌。 保证单位时间处理的请求不会超过令牌数量,流量控制
逻辑:网关增加一个获取令牌的步骤,没有令牌就拒绝请求
有点:不破坏原有调用链

消息队列堆积流控

消息队列令牌桶流控

服务解耦

  • 例子: 电商
1
2
3
4
5
6
7
8
订单系统创建订单后: 
支付系统需要发起支付流程;
风控系统需要审核订单的合法性;
客服系统需要给用户发短信告知用户;
经营分析系统需要更新统计数据;
......

消息队列:每当订单创建,发送一份数据到消息队列。 下游业务不管怎么需求,订单系统不需要改变。

其他场景

  • 作为发布 / 订阅系统实现一个微服务级系统间的观察者模式;
  • 连接流计算任务和数据;
  • 用于将消息广播给大量接收者。

引入消息队列带来的问题

  • 引入消息队列带来的延迟问题;
  • 增加了系统的复杂度;
  • 可能产生数据不一致的问题。

如何选择消息队列?

选择标准

  • 开源:
    • 遇到问题可以自己改源码紧急避险。 而不是等待开发者不知道什么时候的下一版
  • 流行并有一定社区活跃度:
    • 只要不是很偏的使用场景,使用过程遇到的问题都能在网上找到解决方案
    • 流行的产品与周边生态系统会有一个比较好的集成和兼容,比如,Kafka 和 Flink,Flink 内置了 Kafka 的 Data Source,使用 Kafka 就很容易作为 Flink 的数据源开发流计算应用

及格的消息队列

  • 消息的可靠传递:确保不丢消息;
  • Cluster:支持集群,确保不会因为某个节点宕机导致服务不可用,当然也不能丢消息;
  • 性能:具备足够好的性能,能满足绝大多数场景的性能要求。

产品

  • RabbitMQ : 老牌消息队列
1
2
3
4
5
6
Erlang 语言编写的,最早是为电信行业系统之间的可靠通信设计的,也是少数几个支持 AMQP 协议的消息队列之一
优点: 轻量级、迅捷
问题:
对消息堆积的支持并不好。 当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。
性能比较其他的较差: 支持每秒几万到十几万的数据。应用对性能要求高的不好用
是小众语言Erlang开发
  • RocketMQ
1
2
3
4
5
6
7
8
9
是阿里在 2012 年开源的消息队列产品,后来捐赠给 Apache 软件基金会,2017 正式毕业,成为 Apache 的顶级项目


优点:
性能,稳定性,可靠性 都不错, JAVA开发,有活跃的中文社区
时延低,对在线业务的响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应
每秒钟大概能处理几十万条消息
缺点:
国产的消息队列,相比国外的比较流行的同类产品,在国际上还没有那么流行,与周边生态系统的集成和兼容程度要略逊一筹。
  • kafka
1
2
3
4
5
6
7
Kafka 最早是由 LinkedIn 开发,目前也是 Apache 的顶级项目。Kafka 最初的设计目的是用于处理海量的日志。

优点:
Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域,几乎所有的相关开源软件系统都会优先支持 Kafka。
每秒处理几十万消息
缺点:
异步批量思想带来的问题,同步收发消息的时延相较其他的较高。 因为当客户端发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送,在它的 Broker 中,很多地方都会使用这种“先攒一波再一起处理”的设计
  • 其他消息队列
1
2
ActiveMQ: 老气的开源队列,已过期,社区不活跃
Pulsar: 新兴的开源消息队列产品,最早是由 Yahoo 开发,目前处于成长期,流行度和成熟度相对没有那么高。 可持续关注
  • 总结建议
1
2
3
对消息队列功能和性能都没有很高的要求,只需要一个开箱即用易于维护的产品, 选择 RabbitMQ
主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,需要低延迟和金融级的稳定性, 选择 RocketMQ
需要处理海量的消息,像收集日志、监控信息或是前端的埋点这类数据,或是你的应用场景大量使用了大数据、流计算相关的开源产品,选择 Kafka

消息模型: 主题 和 队列

1
2
3
队列是 一种数据结构
队列是先进先出(FIFO, First-In-First-Out)的线性表(Linear List)。
在具体应用中通常用链表或者数组来实现。队列只允许在后端(称为 rear)进行插入操作,在前端(称为 front)进行删除操作。

早期的消息队列

队列模型

1
2
3
4
5
按照 '队列' 数据结构进行设计的 。 消息是严格有序的

问题:
多个消费者, 竞争消费, 每个消费者只会收到其中一部分消息。 不能实现每个消费者消费全量数据
可行办法是: 为每个消费者创建一个单独的队列,让生产者发送多份。浪费资源,并且需要知道多少给消费者不能实现解耦

发布-订阅 模型

发布订阅模型

1
2
3
服务端存放消息的容器称为主题(Topic)

与 队列模型的区别: 一条消息能不能被消费多次

RabbitMQ 的消息模型

1
2
3
4
比较例外,依然坚持着 队列模型。
其解决多个消费者的问题方案: Exchange 模块
exchange位于 生产者和队列之间, 生产者不关心消息发给哪个队列,而是将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中。
同一份消息如果需要被多个消费者来消费,需要配置 Exchange 将消息发送到多个队列,每个队列中都存放一份完整的消息数据,

RocketMQ 的消息模型

是标准的 发布 - 订阅 模型
rocketMQ消息模型

  • rocketMQ的 队列概念
1
2
3
4
5
6
7
8
9
10
11
12
如何确保不丢失?  请求 - 确认 机制
发布者 ( 发送消息等待服务端确认 ,没有确认就重发)
消费者 ( 收到消息后给服务端发送确认, 服务端没有收到确认就重发 )

请求-确认机制的 问题:
为了确保消息的有序性,在某一条消息被成功消费之前,下一条消息是不能被消费的。
即同一时刻只能有一个消费者消费消息。那么就没法通过水平扩展消费者的数量来提升消费端总体的消费性能

问题解决方案: 主题下增加 队列概念
每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费
(即同一时刻,如果消费者1在处理队列1的数据,消费者2就可以拿队列2的数据进行消费)
注意: RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。
  • 消费组
1
2
3
4
5
6
7
8
9
是通过消费组来体现 订阅者概念的

每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响
同一个消费组中的消费者,是竞争消费的关系,每个消费者负责消费组内的一部分消息

在topic中消息被消费的过程中, 由于消息需要被不同的组进行多次消费,
所以消费完的消息不会立即被删除,而是为每个消费组在每个队列上维护一个消费位置(Consumer Offset), 每次消费一条消息,位置往后移

注: 丢消息的原因大多是由于消费位置处理不当导致的。

kafka的消息模型

1
2
业务层面概念上与RocketMQ的消息模型基本一致。 
只是其队列的概念是 分区(Partition)

如何利用事务消息实现分布式事务?

解决 消息队列 生产者和消费者的 数据一致性问题

事务

1
2
3
4
A: 原子性,一个事务操作不可分割,要么成功,要么失败,
C: 一致性,在事务执行完成之前,读到的一定是更新前的数据,之后读到的一定是更新后的数据
I: 隔离性, 事务的执行不能被其他事务干扰
D: 持久性, 事务一旦完成提交,后续的其他操作和故障都不会对事务的结果产生任何影响。

分布式事务

1
2
3
4
5
6
7
8
9
对于分布式系统来说,严格的实现 ACID 这四个特性几乎是不可能的,或者说实现的代价太大,大到我们无法接受

所以在更多情况下,分布式事务指的是 在分布式系统中事务的不完整实现
如一致性的残血版本: 顺序一致性、最终一致性等

常见的分布式事务实现:
2PC(Two-phase Commit,也叫二阶段提交)、
TCC(Try-Confirm-Cancel)
事务消息

例子

事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。

电商用户将购物车中的商品一起下单支付 (订单成功后清理购物车中这几个商品)
电商购物车商品下单清空

1
2
3
4
5
6
清理购物车 不是下单支付的主流程, 所以用消息队列异步删除
同时即使下单成功后几秒内商品没清理掉问题也不大(即可接收一定的时延)

1. 消费方(购物车系统): 接收到消息后清理购物车,如果失败,只要不确认消息等待消息队列重传即可。

2. 生产方(订单系统)(事务问题): 创建订单 和 发送消息 需要同时成功或失败

消息队列如何实现分布式事务?

Kafka 和 RocketMQ 都提供了事务相关功能。

消息队列实现事务

半消息: 在事务提交之前,对消费者不可见

  • 问题: 第4步,事务提交失败怎么办
1
2
3
4
5
6
7
kafka: 
粗暴解决,抛出异常,让用户自行处理,可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿

RocketMQ:
增加了事务反查的机制来解决事务消息提交失败的问题
消息队列broker在收到半消息,然后没有收到提交或回滚请求,会定期去进行事务反查,然后根据事务反查结果进行下一步操作
(该机制,业务代码提供反查接口,如以上例子中,反查接口为:查询该订单在本地事务是否创建成功)

RocketMQ事务实现流程

如何确保消息不会丢失?

检测消息丢失的方法

用消息队列最尴尬的情况不是丢消息,而是消息丢了还不知道

1
2
3
1. IT 基础设施比较完善的公司,一般都有分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息

2. 如果没有, 一个比较简单的方法,利用消息队列的有序性来验证是否有消息丢失
  • 利用消息队列的有序性来验证是否有消息丢失
1
2
3
4
5
6
7
8
9
10
11
12
原理: 
在 Producer 端,给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性

用法:
拦截器机制(大多数消息队列的客户端都支持): 在生产者发消息之前的拦截器中将序号注入到消息中,在消费者收到消息的拦截器中检测序号的连续性 (不侵入业务,系统稳定后就可关闭)

问题:
1. 像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。
2. 如果 生产者是多实例的, 并不好协调多实例之间的消息发送顺序,需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

建议:
Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。

确保消息可靠传递

消息 生成,存储,消费 三阶段

  • 生产阶段:从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
1
通过最常用的请求确认机制,来保证消息的可靠传递  
  • 存储阶段: 消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
1
2
3
4
5
6
7
8
9
10
正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题
对可靠性要求高的,一般可以对broker的配置:
1. 消息存储进磁盘再进行确认
2. 如果是集群模式,至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。
```

- 消费阶段: Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

```text
同生成阶段一样使用确认机制, 客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应

如何处理 消费过程的重复消息?

消息服务质量

  • At most once : 至多一次,没有消息可靠性
  • At least once: 至少一次, 不允许丢消息,但是允许有少量重复消息出现。
  • Exactly once:恰好一次, 不允许丢失也不允许重复
1
大多数消息队列的消息质量是达到 至少一次

重复消息 解决方案: 用幂等性解决重复消息问题

1
2
3
4
在消费端,让我们消费消息的操作具备幂等性
幂等性: 任意多次执行所产生的影响均与一次执行的影响相同

从对系统的影响来看: At least once + 幂等消费 = Exactly once
  • 幂等方案一 : 数据库的唯一约束
1
2
例子: 将账户 X 的余额加 100 元 
设计: 在数据库中建一张转账流水表,先建账单再更新余额。 根据账单ID实现幂等
  • 幂等方案二: 为更新的数据设置前置条件
1
2
例子: 将账户 X 的余额加 100 元  
设计: 加前置条件, 如果账户 X 当前的余额为 500 元,将余额加 100 元
  • 幂等方案三: 记录并检查操作 (通用,但难)
1
2
3
4
5
6
给每条消息指定全局唯一的ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

难点:
全局唯一ID (高可用,高性能)
检查消费状态,更新数据,设置消费状态 三个操作必须保持原子性
分布式, 多个消费者收到重复消息, 同时执行,操作两次。 ( 需要 分布式事务或锁 )

消息积压 如何处理?

1
消息积压原因: 一定是系统中的某个部分出现了性能问题,来不及处理上游发送的消息

优化性能来避免消息积压

1
2
3
4
5
对于性能的优化,主要体现在生产者和消费者这一收一发两部分的业务逻辑中
原因: 消息队列本身的处理能力要远大于业务系统的处理能力
一般业务系统需要处理的业务逻辑远比消息队列要复杂:
消息队列单个节点每秒可以处理几万至几十万的消息,而业务系统单个节点单个节点每秒钟可以处理几百到几千次请求,已经可以算是性能非常好的了
所以性能优化,一般是在消息的收发两端,让业务代码和消息队列配合,达到一个最佳的性能
  • 发送端性能优化
1
2
3
4
代码发送消息的性能上不去,需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的

1. 并发: 如果是处理在线业务,将业务做成并发就可以了。RPC框架都是多线程支持多并发的,所以只需要提高业务并发就可以了
2. 批量: 如果是离线分析系统,不关心时延。适合批量从数据库读取,然后批量发送
  • 消费端性能优化
1
2
3
4
5
大部分的性能问题都出现在消费端,消费速度 < 生产速度(即性能倒挂),就会造成消息积压

暂时倒挂:只要后续消费端性能恢复,积压的消息就可以被消化
一直倒挂:要么消息队列的存储被填满无法提供服务,要么消息丢失,对于整个系统来说都是严重故障

系统设计: 一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。

1
2
水平扩容消费者实例数量,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的  
因为队列消息有序性, 如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。 对于消费者来说,在每个分区上实际上只能支持单线程消费。