在使用消息队列中的过程中,很可能会出现消息丢失的情况。本章我们就来分析哪些场景下可能出现消息丢失,然后继续以RabbitMQ和Kafka为例,介绍这两种消息队列是如何保证消息不丢失的。
一、丢失场景
一般来说,消息丢失无非就是发生在三个阶段: 生产者投递消息 、 消息队列存储消息 、 消费者消费消息 。
生产者投递消息:
生产者在写消息的过程中,由于网络等原因,导致消息队列没接收到消息,从而出现消息丢失的现象。
消息队列存储消息:
消息队列接受到消息后,一般先暂存到内存中,然后再持久化,如果在持久化前消息队列自身挂掉了,就可能导致消息丢失。
消费者消费消息:
消费者消费到了消息,但是还没来得及处理完成,然后就自己挂掉了,而消息队列则认为消费者已经处理完了。
二、RabbitMQ
我们先来看下RabbitMQ是如何应对上述三种消息丢失的场景的。
2.1 生产者投递消息丢失
RabbitMQ有两种方式可以保证生产者对消息的100%成功投递:事务机制、Confirm机制。
事务机制
- 生产者发送数据之前开启Rabbitmq事务
channel.txSelect
,然后发送消息; - 如果消息没有被接收到,生产者会收到异常报错,此时可以回滚事务
channel.txRollback
,然后重新发送; - 如果消息被接受,则可以提交事务
channel.txCommit
。
channel.txSelect(); //开启事务
try{
// 发送消息
}catch(Exception ex){
channel.txRollback(); //回滚事务
// 重新发送消息
}
channel.txCommit(); //提交事务
缺点:
RabbitMQ的事务机制是同步的,会导致吞吐量大幅下降。
Confirm机制
生产者可以开启confitm模式:
- 每次写消息时会给消息分配一个唯一id;
- 如果RabbitMQ收到了该消息,会回调生产者的
ack
接口,表示接受成功; - 如果RabbitMQ接受消息失败,会回调生产者的
nack
接口,表示接受或处理失败,生产者在nack方法内进行重试发送;
//开启confirm模式
channel.confirm();
// 发送消息,然后就不管了
send();
/**
* 消息成功被接受后回调
*/
public void ack(String messageId){
}
/**
* 消息接受失败时回调
*/
public void nack(String messageId){
send();
}
如果因为网络原因,这两个方法都没有被回调,生产者可以自己维护消息id的状态,对一些超时的消息,根据状态进行重发。
事务机制和confirm机制最大的不同在于:事务机制是同步的,会导致生产者线程阻塞。而Confirm机制是异步的,采用回调来确认消息是否发送成功,所以生产环境一般都用Confirm机制保证生产者对消息的100%可靠投递。
2.2 MQ故障丢失
因MQ故障而导致消息丢失的解决方案就是 持久化 。在RabbitMQ中开启持久化的方式如下:
- 创建queue时,将其设置为持久化,这样RabbitMQ会持久化queue的元数据;
- 生产者发送消息时,消息的
deliveryMode
设置为2,此时RabbitMQ就会将消息持久化到磁盘上去。
经过这样的设置,当RabbitMQ接受到消息然后持久化完成后,即使MQ挂了,重启后也可以从磁盘恢复queue和消息。但是还要注意一种情况:RabbitMQ接收到了消息,但是还没来得及持久化到磁盘,自己就挂了。此时,会导致MQ内存里的一点点数据丢失,但是这个概率是很小的。
我们可以配合生产者的confirm机制来解决上述问题。由于开启持久化后,只有消息被持久化到磁盘,才会回调生产者的ack接口,所以生产者在收不到ack的情况下,可以进行重发,这样哪怕持久化到磁盘前MQ自身挂了,也可以保证恢复后收到重发的消息。
2.3 消息投递后丢失
这种情况是因为消费者自身问题或网络问题造成的。RabbitMQ有一个消费者的autoAck机制,当消费者消费成功后,会自动通知RabbitMQ已经消费成功,此时如果消费者自身出现异常,就会导致消息丢失。
解决方案如下:
- 关闭RabbitMQ的消费者autoAck机制;
- 消费者消费完消息,自身逻辑处理成功后,再进行手动ack确认。(这种情况下,消费者必须要保证自身接口的幂等性)
三、Kafka
3.1 生产者投递消息丢失
Kafka可以通过一些配置来保证生产者对消息的100%可靠投递。
我们之前在《分布式框架之高性能:消息队列的可用性》一章中提到过,Kafka只有Leader节点会接受消息的读/写。Leader接受到生产发送来的消息后,只要当其它所有follower都同步成功,才会响应给生产者,否则生产者会不断重试,直到收到成功响应。
可以通过配置
ack=all
,来确保所有follower都进行消息同步。
3.2 MQ故障丢失
这种情况通常出现在Leader选举时:
- 某个partiton的Leader正在和其它Follower同步消息;
- 这个partiton所在的Broker突然挂了,部分Follower还没同步完成;
- 新选举的Leader刚好是之前没同步完成的,此时它就缺少了一些数据。
Kafka可以通过配置来解决这个问题,具体配置如下:
- topic设置
replication.factor
参数:值必须大于1,该参数用于配置每个topic的partition的副本数,即每份数据一共存在replication.factor+1
份拷贝; - kafka服务端设置
min.insync.replicas
参数:值必须大于1,这个是要求一个leader至少感知到有一个follower还跟自己保持联系,这样才能确保leader挂了还有一个follower候补; - producer端设置
acks=all
:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了; - producer端设置
retries=MAX
:这个是要求生成者确保消息发送成功,一旦失败,就无限重试。
3.3 消息投递后丢失
我们之前在《分布式框架之高性能:消息队列的可用性》一章中提到过,消费者消费完消息后,会将消息的offset告知Kafka,表示这条消息已经被成功消费。
默认情况下,消费者会自动提交offset,如果此时消费者挂了,那么Kafka依然会认为消费者已经成功消费了该消息,从而出现消息丢失。
所以,Kafka对“消息投递后丢失”这一场景的问题处理方式和RabbitMQ类似,一般 消费者需要关闭自动提交offset ,等处理完消息后自己手动提交offset,就可以保证消息不会丢。但依然一样可能会出现重复消费问题,比如消费者刚处理完,还没提交offset结果自己挂了或因为网络原因Kafka没收到通知。所以消费者端仍然需要保证接口的幂等性。
四、总结
本章,我们介绍了在使用消息队列过程中出现消息丢失的几种场景,并以RabbitMQ和Kafka为例介绍了解决方案。总体来说,要保证消息不丢失,就是以下几种思路:
- 要保证消息投递的100%成功,基本思路就是消息队列的ack机制,以及生产者的最大努力投递;
- 要保证MQ故障消息不丢失,基本思路就是MQ自身做好持久化,或数据同步机制;
- 要保证消费者的100%消费成功,基本思路就是消费者的手动确认,以及消费者自身接口的幂等性保证。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。