通过前面的章节,我们已经知道了数据在RocketMQ中是分布式存储的。生产者发送消息时,先从NameServer获取到路由信息,然后根据一定算法将消息发送到某个Master-Broker中。但是, Topic是一个逻辑概念 ,对于某个Topic来说,属于它的消息分布在不同的Broker上,那如何决定消息到底分布在哪个Broker上呢?
一、MessageQueue
我们在创建Topic时,需要指定一个很关键的参数——MessageQueue,比如下图,我们可以在RocketMQ的可视化工作台里去创建一个名为order_topic
的Topic,指定包含4个MessageQueue:
那么Topic、MessageQueue、Broker之间到底是什么关系?
可以先简单的认为: Topic是消息的逻辑分类,消息保存在MessageQueue中,MessageQueue分布在Master-Broker上,Slave-Broker从Master-Broker同步数据。
事实上,MessageQueue本质就是一个数据分片的机制。比如order_topic一共有1万条消息,那么可以大致认为每个MessageQueue保存2500条消息。但是,这不是绝对的,需要根据Producer写入消息的策略来定,可能有的MessageQueue中消息多些,有的少些。我们先暂且认为消息是在MessageQueue上平均分配的,然后MessageQueue也可能平均分布在Master-Broker上,如下图:
二、消息发送
2.1 消息发送策略
我们之前说过,Producer会从NameServer拉取路由信息,那么Producer就知道了Broker集群的情况:每个Master-Broker分布着哪些Topic?每个Topic有多少MessageQueue?这些MessageQueue分布在哪些Master-Broker上?
这样的话,Producer就可以按照一定策略将消息写入到Master-Broker中,比如对于order_topic
,Producer现在知道了它有4个MessageQueue,均匀分布在两个Master-Broker上,那么Producer就可能采取Round Robin策略均匀的order_topic类型的消息写入到各个MessageQueue中。
2.2 消息发送容错
那如果某个Master-Broker故障了怎么办?RocketMQ虽然有Dledger机制可以实现故障自动转移,但是主从切换需要时间,在故障的这段时间里Producer访问那个故障的Master-Broker都会失败:
RocketMQ的Producer中有个开关,叫做sendLatencyFaultEnable
,一旦打开了这个开关,就会开启生产者的自动容错机制,比如Producer访问一个Master-Broker发现网络延迟有500ms,之后一直延迟,那么就会自动回避访问这个Broker一段时间,比如接下来3000ms内,就不会访问这个Broker了。
这样就可以避免一个Master-Broker故障后,短时间内Producer频繁的发送消息到这个故障的Broker上去,出现大量写入异常。
三、消息存储
RocketMQ中最核心的一个环节就是Broker中的消息数据存储,也就是所谓的消息持久化。
3.1 CommitLog
生产者发送消息到Broker后,Master-Broker会将消息写入磁盘上的一个日志文件——CommitLog,按照顺序写入文件末尾,CommitLog中包含了各种各样不通类型的Topic对应的消息内容,如下图:
CommitLog文件每个限定最大1GB,Master-Broker收到消息之后就将内容直接追加到文件末尾,如果一个CommitLog写满了1GB,就会创建一个新的CommitLog文件。
Broker以顺序的方式将消息写入CommitLog磁盘文件,也就是每次写入就是在文件末尾追加一条数据就可以了,对文件进行顺序写的性能要比随机写的性能高很多。
3.2 ConsumeQueue
我们之前说过,消息是保存在MessageQueue中的,那这个CommitLog和MessageQueue是什么关系呢?事实上,对于每一个Topic,它在某个Broker所在的机器上都会有一些MessageQueue,每一个MessageQueue又会有很多 ConsumeQueue 文件,这些ConsumeQueue文件里存储的是一条消息对应在CommitLog文件中的 offset 偏移量。
举个例子,假设对于order_topic这个Topic,它在Broker集群中一共有4个MessageQueue:queue1、queue2、queue3、queue3,均匀分布在两个Master-Broker中,Producer选择queue1这个MessageQueue发送了一条“消息A”。那么:
- 首先Master-Broker接收到消息A后,将其内容顺序写入自己机器上的CommitLog文件末尾;
- 然后,这个Master-Broker会将消息A在CommitLog文件中的物理位置——offset,写入queue1对应的ConsumeQueue文件末尾;
整个过程如下图所示:
实际上,ConsumeQueue中存储的不仅仅只是消息在CommitLog中的offset偏移量,还包含了消息长度、tag hashcode等信息,一条数据是20个字节,每个ConsumeQueue文件能保存30万条数据,所以每个ConsumeQueue文件的大小约为5.72MB。
我们可以在Master-Broker所在机器的如下目录中找到ConsumeQueue文件:
$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
上述示例中,我们的Tpoic是order_topic
,queueId是queue1,comsumequeue的名称是comsumequeue1,所以可以在如下路径找到ConsumeQueue文件:
$HOME/store/consumequeue/order_topic/queue1/comsumequeue1
四、存储性能优化
4.1 PageCache
我们知道,Broker在将消息写入CommitLog时,采用了“顺序写”的方式,这样可以大大提升性能。但是,光这样做还是不够的,因为毕竟仍然是磁盘IO操作,要想进一步提升性能,必须利用内存。
所以,Broker将数据写入CommitLog文件的时候,其实不是直接写入底层的物理磁盘文件,而是先进入OS的 PageCache 内存缓存中,后续由OS后台线程异步化的将OS PageCache中的数据刷入底层的磁盘文件中:
所以,RocketMQ正式通过 磁盘文件顺序写+OS PageCache写入+OS异步刷盘 的策略来保证消息写入的性能。
在上述这种异步刷盘的模式下,Producer将消息发送给Broker,Broker将消息写入OS PageCache中,就会直接返回ACK给生产者,生产者收到ACK消息就认为写入成功了。
有异步刷盘就有同步刷盘,同步刷盘主要的不同点就是,只有Broker强制把这条消息刷入底层的磁盘文件后,才会返回ACK给生产者。
在异步刷盘的模式下,如果Broker将消息写入PageCahe并响应给生产者后突然宕机,此时消息在缓存中没有写入底层的磁盘文件,就会造成消息丢失——生产者认为发送成功,实际上消息写入失败。我会在后续章节对MQ中的消息丢失问题专门讲解。
4.2 mmap
RocketMQ除了利用PageCache和异步刷盘来提升性能外,还使用了一种叫做 mmap 的技术。
多次拷贝问题
传统的磁盘IO操作,当我们的程序需要将数据写入操作系统的磁盘上时(或从磁盘上读取文件内容到程序中),涉及用户空间和内核空间的转换,我们的程序进程有自己独立的内存空间,而操作系统也有自己的内核缓存区,所以一次数据写入或读取过程是像下面这样的:
可以看到,无论是写入还是读取,都涉及两次数据拷贝,这就是传统IO的数据多次拷贝问题。
内存映射
RocketMQ底层对CommitLog、ConsumeQueue之类的磁盘文件的读写操作,基本上都会采用mmap技术配合pagecache进行文件读写优化。
mmap技术会先将一个磁盘文件的物理地址(比如CommitLog文件或ConsumeQueue文件)映射到程序进程自己的私有内存里来,所谓映射并不是直接将磁盘文件中的数据读取到内存,而是 把磁盘文件的一些物理地址和用户进程私有空间的一些虚拟内存地址进行映射,这个过程并没有任何的数据拷贝操作 ,如下图:
mmap技术在进行文件映射时,对文件大小一般限制,在1.5GB~2GB之间,所以RocketMQ才让CommitLog单个文件在不超过1GB,ConsumeQueue文件是5.72MB。
上述这个 地址映射 的过程,具体到代码层面,RocketMQ是利用JDK NIO包下的MappedByteBuffer.map()
来实现的,其底层就是基于mmap技术。
所以,假设现在要写入消息内容到CommitLog文件中,那么过程是这样的:
- 首先,Broker会将一个CommitLog文件的物理地址通过
MappedByteBuffer.map()
映射到Broker进程自己的虚拟内存中; - 接着,写入消息时,消息内容会先进入PageCache(PageCahe就对应虚拟内存);
- OS的线程异步将PageCache中的内容刷入CommitLog磁盘中(顺序写入)。
上述整个过程没有“内核IO缓存区”参与,只有一次数据拷贝过程,就是从PageCache里拷贝到磁盘文件里而已!这个就是使用mmap技术之后,相比于传统磁盘IO的一个性能优化。
Broker会针对磁盘上的各种CommitLog、ConsumeQueue文件预先分配好MappedFile,也就是提前对一些可能接下来要读写的磁盘文件进行内存映射,这样后续读写文件时,就可以直接执行了。
4.3 文件预热
上述我们讲了mmap技术对消息写入的优化,事实上,对于消息读取,整个过程也只有一次数据拷贝而已,其过程如下:
- 读取消息时,首先判断要读取的数据是否在PageCache里?如果在的话,直接从PageCache里读取;
- 如果PageCache里没有,就会从磁盘文件里加载数据到PageCache中(这个过程利用mmap技术只需一次拷贝);
- PageCache在加载数据时,会将被加载数据的临近其它数据块也一起加载,提升命中率。
RocketMQ在提前进行内存映射后,还会提前将后面可能频繁读取的一些数据(比如CommitLog、ConsumeQueue文件),尽可能多的加载到PageCache中,这就是所谓的 文件预热 。
五、总结
本章,我们深入讲解RocketMQ的持久化机制,其核心就是:CommitLog保存消息内容,ConsumeQueue保存消息地址——offset。
RocketMQ为了提升消息持久化的性能,采用了如下手段:
- 顺序写入消息内容至磁盘文件尾;
- 将消息先写入PageCache,然后异步刷盘;
- 利用mmap技术进行内存映射,以减少数据拷贝次数;
- 提前加载磁盘文件内容到PageCache中,进行文件预热,提升读取效率。