在RocketMQ持久化原理中,我们已经讲解了生产者(Producer)发送消息的基本原理。本章,我们来看看消费者(Consumer)消费消息的基本原理。
一、消费者组
在RabbitMQ中,每一个消费者实例,都必须属于某个Group ID——群组ID,相当于用一个Group ID把一群Consumer实例归为了一类。这里需要特别注意的是: 同一个消费者 Group ID 下所有 Consumer 实例,订阅关系必须完全一致。如果订阅关系不一致,消息消费的逻辑很容易出现混乱,可能导致消息丢失,甚至出现其它各种莫名其妙的问题!
1.1 订阅关系
什么是订阅关系?我们知道,消息肯定要有其所属的Topic分类,所以订阅关系一致就是指:
- 订阅的 Topic 必须一致;
- 订阅的 Topic 中的 Tag 必须一致。
我们先来看个正确的订阅关系示例:
从上图可以看到,同一个消费者群组中的所有消费者实例,Topic+Tag是完全一致的。
我们再来看个错误的订阅关系示例:
我们可以在代码中通过下面的方式设置消费者的群组、Topic、Tag:
// 设置消费者群组:wms_group
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wms_group");
// 设置Topic:order_topic,Tag:TagA
consumer.subscribe("order_topic", "TagA");
二、消费模式
RabbitMQ支持两种消费模式: 集群消费模式 、 广播消费模式 。
2.1 集群消费模式
所谓集群,就是指使用了相同 Group ID 的消费者,它们属于同一个集群。 当使用集群消费模式时,RocketMQ认为一条消息只需要被集群内的任意一个消费者处理即可。
集群消费模式,一般适用于每条消息只需要被处理一次的场景,消费进度在服务端维护,可靠性更高,也是默认的模式:
上图中,其实只有一个Group ID 1群组,这个群组中的Consumer实例分布在三台不同的机器上,同时这个群组订阅了Topic+Tag(比如:"order_topic", "TagA"),所以对于Broker中的每一条相关消息,只能被这个群组中的某个Consumer实例消费到。
注意:集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。
2.2 广播消费模式
当使用广播消费模式时,RocketMQ会将一条消息推送给集群内的所有消费者,保证消息至少被每个消费者消费一次。
广播消费模式,一般适用于每条消息需要被集群下的每个消费者处理的场景,消费进度在客户端维护,出现重复消费的概率稍大于集群模式:
上图中,只有一个Group ID 1群组,这个群组中的Consumer实例分布在三台不同的机器上,同时这个群组订阅了Topic+Tag(比如:"order_topic", "TagA"),对于Broker中的每一条相关消息,Group ID 1群组中的每一个消费者实例都能消费到该消息。
可以通过以下方式开启消费者实例的广播消费模式:
consumer.setMessageModel(MessageModel.BROADCASTING); //设置广播消费模式
2.3 使用集群模式模拟广播
上面的两个示例都是只有一个群组,这个群组内的Consumer实例都订阅了相同的Topic+Tag。如果我们有多个不同的群组呢?比如下面这种情况:
上图中,一共有Group ID 1、Group ID 2、Group ID 3三个群组,每个群组都订阅了相同的Topic+Tag(比如:"order_topic", "TagA"),这种情况下,对于Broker中的每一条相关消息,三个群组都能消费到该消息,与此同时,这条消息又只能被某个群组中的一个Consumer实例消费到。
比如对于消息MessageN,Group ID 1、Group ID 2、Group ID 3都能消费到这条消息,相当于MessageN拷贝了三份,但是从组内来看,比如Group ID 1,组内只有一个Consumer实例能消费到MessageN。
三、消费方式
3.1 Pull/Push
消息者有两种方式从Broker中的MessageQueue获取消息: Pull方式 和 Push方式 。这两种方式的本质是一样的:都是消费者实例主动发送请求到Broker中拉取消息。Push方式的消息时效性更好一些。
Push方式也只是名字叫Push而已,并不是Broker真的会主动把消息推给Consumer,而是当Consumer主动去获取消息时,如果有新的消息可以消费,那么Broker会立马返回一批消息给Consumer,Consumer处理完后会接着发送请求到Broker拉取下一批消息,这样看起来就好像是Broker在不断推消息给Consumer。
此外,在Push方式下,具有请求挂起/长轮询机制。
请求挂起/长轮询:
当Consumer的请求发送到Broker,如果Broker发现没有新的消息可供消费时,就会让请求线程挂起,默认15秒,在此期间,Broker有后台线程每隔一会儿去检查一下是否有新的消息给Consumer,如果有新的消息到达就会主动唤醒挂起的线程,然后把消息返回给它。
四、底层原理
我们已经从逻辑上了解了消费者的消费模式及消费方式,那么其底层到底是什么样的呢?根据之前学习到的知识,我们知道:
- 一个Topic中的多个MessageQueue会分散在多个Master-Broker上(Slave会去同步数据);
- 每个Broker机器上的一个MessageQueue,对应一个ConsumeQueue(在物理磁盘上其实是对应了多个ConsumeQueue文件)。
- Consumer会从NameServer拉取路由信息,所以它知道自己订阅的Topic中MessageQueue分布在哪些Broker上。
所以对于一个Topic,比如我们的order_topic,分了4个MessageQueue,均匀分布在两台机器上,假设我们现在有一个库存消费者群组——wms_group:
- 如果组内有2个Consumer实例,那么默认情况下可能说就是每个Consumer实例负责2个MessageQueue的读取;
- 如果组内有5个Consumer实例,那么每个Consumer实例负责1个MessageQueue的读取,还剩一个空闲的。
也就是说, 集群模式下,一个Topic中的多个MessageQueue会均匀分摊给同一消费组内的多个Consumer实例去消费,这里的一个原则就是:集群模式下,同一消费者组内,一个MessageQueue只能被一个Consumer实例处理,但是一个Consumer实例可以负责多个MessageQueue的消息处理。
是否均匀分摊需要视Consumer的消费策略来定,默认情况下就是均匀分摊,后面我们讲Consumer源码时再具体讲其它情况。另外,如果多个消费者群组订阅同一个Topic+Tag,从外部看,其实属于广播模式,上面的原则是不适用的。
4.1 消费进度
当消费者实例指定对某个MessageQueue进行消费时,请求到达Broker后,如果是首次消费,Broker就从这个MessageQueue对应的ConsumeQueue文件中,找到第一条消息的地址,然后去CommitLog中根据这个offset地址读取出消息数据,最后将这条消息数据返回给消费者实例。
所以,消费消息的本质是:根据要消费的MessageQueue以及消费位置,去找到对应的ConsumeQueue,读取里面的消息物理offset偏移量,然后到CommitLog中根据offset读取消息数据,返回给消费者。 当消费者处理完一批消息后,会提交一个消费进度到Broker上去,然后Broker就会存储我们的消费进度,以便下次消费使用 。
4.2 消费负载
之前我们提到过,消息者既可以从Master-Broker拉取消息,也可以从Slave-Broker拉取, 那到底什么时候从Master-Broker拉取,什么时候从Slave-Broker拉取呢?
首先,我们要知道,消费者拉取消息时,Broker会去频繁的去读取ConsumeQueue文件,获取offset。之前RocketMQ持久化原理一章我们讲过,RocketMQ会利用PageCache对磁盘文件的读写进行优化。也就是说,ConsumeQueue文件的内容会被预热到PageCache中,因为ConsumeQueue文件的大小只有几MB,所以Broker读取ConsumeQueue文件的内容时,效率是非常高的,几乎都是直接从内存读取。
但是这里注意:Broker读取完ConsumeQueue中的offset后,还要去CommitLog中读取消息内容,CommitLog文件是比较大的,所以是无法把全部数据都放到PageCache中的,只有那些最新写入的消息,一般才会停留在PageCache中,那些比较老的数据,会被OS异步刷到磁盘上。所以,当Broker从CommitLog中读取消息内容时,就有两种可能:
- 读取的是那种刚刚写入CommitLog的消息,此时它们大概率还停留在PageCache中,那么就直接是内存读取,性能是很高的;
- 读取的是比较早之前写入CommitLog的数据,此时它们早就被刷入磁盘了,已经不在PageCache中,那么就只能从磁盘上的文件里读取了,这个性能是比较差的;
所以,结论就很明显了:
- 如果消费者实例一直快速的在拉取并处理消息,速率跟生产者写入的速率差不多,那么每次拉取几乎都是从PageCache从读取数据,此时消费者大概率都是直接从Master-Broker读取消息;
- 如果Master-Broker的负载很高,导致拉取消息的速度很慢,或者消费者实例自身的处理消息速度很慢,导致跟不上生产者写入的速率,此时Master-Broker就会通知消费者下一次开始从它的Slave节点去拉消息。
我们举个例子来理解下,假设PacheCache最多缓存5w条消息,现在生产者已经写入到第10w万条消息,而消费者只拉取到第2万条,那此时Master-Broker就会认为:你消费者还有8w条消息没消费掉,而我的PageCache最多只能容纳5w条,按照你目前的消费速率,大概率需要我从磁盘上加载数据,所以你下次直接从Slave去拉消息吧,不要对我的性能造成影响。
五、总结
本章,我们对消费者Consumer的基本原理进行了讲解,主要包含三种消费模式、两种消费方式,以及消费者消费消息时底层原理是怎么样的。关于消费者(Consumer)的更多细节点和使用最佳实践,读者可以参考官方的文档。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。