消息队列基本概念
# 消息队列基本概念
# 1. 什么是消息队列,实现原理?
可以把消息队列看作是一个存放消息的容器,消费方从容器中按顺序取出消息,提供方往容器中存放消息。
作用:
- 通过异步处理提高系统性能(减少响应所需时间)。
- 削峰/限流
- 降低系统耦合性。
# 2. 异步处理?
项目的请求链路越来越长,响应变慢。同过消息队列可以减少请求的等待,还能让服务异步并发处理,提升系统总体性能。
比如电商项目中,扣款、扣库存、下单、积分服务、短信服务等,整个过程比较慢。相当于扣款和下单,积分和短信服务没必要立刻完成,因此只要扣款和下单成功后就接收流程,把积分和短信服务扔到消息队列中进行消费。
# 3. 服务解耦?
使用MQ的发布Pub/Sub订阅模型可以将系统进行解耦。
比如订单服务原来只有下单、扣款,但通过业务的不断扩充,为迎合下游系统,需要经常更改订单系统,任何一个下游系统接口的比更都会影响订单服务。
采用消息队列解决系统直接的耦合问题,订单服务把订单相关信息扔到消息队列中,下游系统谁要谁就订阅整个主题。
# 4. 削峰/限流?
服务后端都比较弱,类似于秒杀活动的爆发式流量打过来根本顶不住,因此需要一个中间件做缓冲。
网关的请求先放入消息队列中,后端服务尽自己最大能力去消息队列中消费请求。
# 5. 使用消息队列产生的问题?
系统可用性降低
系统引入外部依赖越多,越容易挂掉。MQ 挂掉了,整个系统就崩溃了。
系统复杂度提高
如何处理消息丢失、重复消息、消息传递的顺序性。
一致性问题
A 系统生成消息到消息队列中,BCD 从消息队列中消费消息。A 处理完返回成功,BD 也写库成功,结果 C 系统写库失败,该怎么办?
# 6. 消息队列的模型?
队列模型:生产者往队列里发送信息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者,消费者之间是竞争关系,每条消息只能被一个消费者消费。
发布/订阅模型:一条消息能被多个消费者消费,将消息发往一个Topic主题中,所有订阅这个Topic的订阅者都能消费这条消息。
# 7. 如何保证消息不丢失?
生产消息
生产者发送消息至 Broker,需要处理 Broker 的响应。如果 Broker 返回写入失败等错误信息,需要重新发送。
存储消息
Broker 在消息刷盘之后再给生产者响应。
如果消息写入缓冲就返回响应,那么机器突然没电了,消息丢失,而生产者以为发送成功了。
消费消息
消费者从队列中拿出消息消费,真正执行完业务逻辑之后,再发送给Broker消费成功。
# 8. RabbitMQ 保证消息不丢失?
# 8.1 生产者到 MQ 过程中丢失消息
保证生产者能够收到 MQ 处理成功的应答,也就是 MQ 落盘后给生产者一个应答。
解决方案一:事务模式(同步方式)
消息发送完之后会阻塞等待 MQ 回应。在此期间无法发送下一条消息,严重降低吞吐量与性能。
解决方案二:Confirm 模式(异步方式)
生产者投递消息后,如果 Broker 收到消息,则会给生产者一个应答。生产者接收应答,来确定这条消息是否正常的发送到 Broker。
第一步,在 channel 上开启确认模式:
channel.confirmSelect()
第二步,在 channel 上添加监听:
channel.addConfirmListener(ConfirmListener listener);
,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送或记录日志等后续处理。//指定消息的投递模式:confirm 确认模式 channel.confirmSelect(); // 发送消息 ... //添加一个确认监听, 这里就不关闭连接了,为了能保证能收到监听消息 channel.addConfirmListener(new ConfirmListener() { // 返回成功的回调函数 public void handleAck(long deliveryTag, boolean multiple) throws IOException { ... } // 返回失败的回调函数 public void handleNack(long deliveryTag, boolean multiple) throws IOException { ... } });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
开启 Confirm 后,消息落盘后会异步 ack。
利用本地消息表(mysql 或 redis)记录消息状态,发送并落盘成功后,立即删除该消息记录。对于那些处理失败的消息,再使用定时任务进行重新发送。
# 8.2 MQ 丢失消息
⭐ 消息到达 MQ,但 MQ 出现内部错误,无法处理该消息
当开启 Confirm 机制,此时 MQ 会回传 nack 代表处理失败。
⭐ 消息还没刷盘,MQ 就宕机了
由于 MQ 没有响应,生产者收不到应答信息会进行消息重发。
⭐ 开启持久化,硬盘坏了,无法恢复数据
如果硬盘坏了数据就无法恢复了。需要采用集群部署
- 默认集群部署,消息只会存在当前节点中,不会同步到其他节点。
- 镜像集群部署,消息会同步到到其他节点上,可以设置同步的节点个数,但吞吐量会下降。
# 8.3 消费者丢失消息
- 消费端采用自动 ack 机制,还没处理完毕,消费者宕机。解决:改为手动 ack,当消息正确处理完毕后,再通知 mq。消费端处理消息异常后,回传 nack,这样 MQ 会把这条消息投递到另一个消费者上。
- 消费端处理完消息后,回传 ack 时发生网络中断,mq 未收到 ack。解决:mq 会将超时未 ack 的消息放回队列。
# 8.4 注意点
- MQ 的 ack 回传是批量异步的方式,生产端对 ack 的监听也是异步的
- 在重试的补偿机制下,消费者需要保证幂等。
# 8 如何处理重复消费?
生产者发送给 Broker,Broker 已经写入了, 但是由于网络原因生产者没有收到响应,又重发一次。
消费者拿消息消费,业务逻辑处理完,事务已经提交,还没响应给 Broker,就挂掉了,此时其他消费者将重复消费。
重复消费是不可避免的,所以只能通过幂等解决重复消费带来的影响。
结合业务的几个思路:
- 写数据库,根据主键查一下,如果有数据就不要再插入了,更新一下。
- 写 redis,没关系,set 天然幂等性。
- 数据库使用唯一键来保证不会重复插入多条数据。
- 可以将消息中的业务参数组合成一个 key,利用数据库唯一索引或者 redis 来判断之前是否执行过。
# 9 如何保证消息的有序性?
顺序消费的原因
比如流程为:增加、修改、删除;如果顺序不对的话就会出错。
全局有序性
一个单线程生产者、一个队列、一个单线程消费者
部分有序
将Topic内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,如何每个队列对应一个单线程处理的消费者。
RabbitMQ 解决
- 给 RabbitMQ 创建多个 queue,每个消费者固定消费一个 queue 的消息
- 生产者发送消息的时候,同一个类型的消息发送到同一个 queue 中
- 由于同一个 queue 的消息是一定会保证有序的,那么同一个订单号的消息就只会被一个消费者顺序消费,从而保证了消息的顺序性
# 10. 如何处理消息堆积
产生原因:
- 生产者的生成速度与消费者的消费速度不匹配。
- 有可能时因为消息消费失败反复重试造成的,也有可能时消费者消费能力弱,渐渐地消息就积压了。
解决办法:定位消息慢的原因
- 有 bug 处理 bug
- 消费能力弱,优化消费逻辑
优化的方式(消息快堆挤满了怎么办):
- 提高消费并行度:绝大部分消费行为都属 IO 密集型,可能是操作数据库,或者是调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量。
- 批量方式消费:某些业务可以批量方式消费,可以很大程度上提高吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1s,一次处理 10 个订单耗时 2s。
- 跳过非重要信息:如果发生消息堆积,消费速度一直追不上生成速度,如果业务要求不高的话,可以选择丢弃不重要的消息。
大量消息在 MQ 里积压几个小时没解决?
- 解决消费者消费慢的问题
- 扩容消费者快速消费掉 MQ 内积压的大量消息
- 等积压的消息处理完毕后再恢复到原先部署的架构。
MQ 消息过期失效?
消息大量积压,导致大量数据丢失。可以批量重新导入,当高峰时间过去后,将丢失的数据重新导入 MQ 中重新处理。
# 11 Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 同 ActiveMQ | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | ||
时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 |
可用性 | 高,基于主从架构实现高可用 | 同 ActiveMQ | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 基本不丢 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ |
功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
综上,各种对比之后,有如下建议:
一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了。
后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高。
不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache (opens new window),但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。
所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
# 12 Rabbit的高可用性
三种模式:单机模式、普通集群模式、镜像集群模式
单机模式
Demo级别,生成不使用单机模式
普通集群模式(无高可用性)
多台机器上启动多台 RabbitMQ 实例,创建的 Queue 只会放在一个 RabbitMQ 实例上,消息只会存在与当前节点中,并不会同步到其他节点,其他节点也仅只会同步该节点的队列结构。可用性无保障,queue 所在节点宕机,数据就丢了。
镜像集群模式(高可用)
queue 的元数据和消息都存在于多个实例上,每个 RabbitMQ 节点都有 queue 的完整镜像,包含 queue 的全部数据信息,每次把消息写到 queue 的时候,都会自动把消息同步到多个实例 queue 上。
缺点:性能开销大,消息需要同步到所有节点上;没有可扩展性,如果某个queue负担很重,加了机器也没有用,无法线性扩展。
# 13 设计一个 MQ?
- 模型:
- 点/点模型
- 发布/订阅模型
- 高并发
- 可以快速扩容,水平扩展
- 高可用
- 数据要持久化到磁盘防止数据丢失,顺序写入磁盘,因为顺序度的性能高于随机读
- Leader & Follower 机制,防止一个节点挂掉无法提供服务。通过选举出一个节点对外面提供服务。
# 14 RabbiMQ 做秒杀比 Kafka 的优点
- RabbitMQ 是延迟最低的,微妙级别,Kafka 和 RocketMQ 都说毫秒级别的。
- RabbitMQ 中的消息更不容易丢失。
推荐文章: