在第一章概述中,我们的背景案例存在一个问题——状态补偿问题。也就是说,订单系统需要一种补偿机制去确认某些订单的最终状态。比如,因为网络等原因,订单系统没收到“成功”的支付通知,此时订单的状态就是不确定的。一种常见的思路是:订单系统后台起一个定时任务,每隔一段时间扫描下所有”中间状态“的订单,确认下是否要关闭它。但是这种方案在订单量小的情况下还好说,一旦订单量大了,就会出现性能瓶颈,而且重复扫描的效率也太低。本章,我们就来看下如何通过RocketMQ来解决这个问题。在RocketMQ中,有两种特殊类型的消息:定时消息和延时消息。一、延时消息所谓延时消息,就是当Producer将消息发送到MQ后,
我在《分布式框架之高性能:消息有序性》中,曾经介绍过RabbitMQ和Kafka实现消息有序性的基本思路。在RocketMQ中,提供了一种顺序消息类型。所谓顺序消息(FIFO消息),是一种严格按照顺序来发布和消费的消息。顺序发布和顺序消费是指对于指定的一个Topic:生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。一、大数据传输问题还记得第一章系统背景中的“大数据传输问题”吗?我们今天就以这个问题为引子,来讲讲MQ中的消息有序性。先来回顾下系统存在的问题:“大数据系统”需要从订单数据库里查询订单数据,由于数据库中每日的订单数据通常是比较
本章,我们来看下RocketMQ中的重复消费问题。在分布式系列中,我已经介绍过了消费的幂等性,读者可以结合那篇文章一起来阅读本章内容。一、幂等性所谓幂等,就是当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就实现可消息幂等。例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为100元。如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费100元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合
我曾经在分布式系列中讲解过消息丢失,当时主要从RabbitMQ和kafka是如何应对消息丢失的。本章,我们先来回顾下消息丢失的场景,然后看看RocketMQ是如何解决消息丢失的问题的。一、消息丢失场景在《系统改造:异步、解耦、削峰》一章,我们通过引入RocketMQ使得系统之间解耦,如下图:物流、积分、促销、通知系统通过订阅RocketMQ中的消息与订单系统进行交互。那么,这个过程中如果消息丢失了怎么办呢?消息丢失可能发生在以下几个场景。1.1生产者消息丢失订单系统,发送支付成功的消息到MQ中,这个时候恰巧可能网络发生了抖动,也就是网络突然有点问题,导致这次网络通信失败了,消息就丢失了:1.2
本章,我们来讲解下RocketMQ中的底层通信原理,RabbitMQ基于Netty扩展出一套高性能网络通信框架。我们先来看下Broker的网络通信架构是怎样的。一、LinuxIO模型Netty底层采用了IO多路复用技术(IOmultiplexing),而IOmultiplexing又是Linux的五种IO模型之一,所以在聊Broker的网络通信架构前,我们需要了解下什么是Linux的IO模型。简单来说,Linux系统一共有五种I/O模型:阻塞IO(blockingIO):即传统的IO模型;非阻塞IO(NonblockingIO):注意这里所说的NIO并非Java的NIO库;IO多路复用(IOm
在RocketMQ持久化原理中,我们已经讲解了生产者(Producer)发送消息的基本原理。本章,我们来看看消费者(Consumer)消费消息的基本原理。一、消费者组在RabbitMQ中,每一个消费者实例,都必须属于某个GroupID——群组ID,相当于用一个GroupID把一群Consumer实例归为了一类。这里需要特别注意的是:同一个消费者GroupID下所有Consumer实例,订阅关系必须完全一致。如果订阅关系不一致,消息消费的逻辑很容易出现混乱,可能导致消息丢失,甚至出现其它各种莫名其妙的问题!1.1订阅关系什么是订阅关系?我们知道,消息肯定要有其所属的Topic分类,所以订阅关系一
在RocketMQ架构一章中,我们简单介绍过NameServer和Broker是如何实现高可用的。NameServer是数据集中集群,可用性没什么特别好说的。本章,我们重点来看下RocketMQ4.5版本以后的Broker高可用原理。一、DLedger机制RocketMQ4.5版本以后,引入了DLedger机制,DLedger是利用了Raft算法实现Broker主从节点的故障自动转移以及数据同步。比如下图中,Broker集群中有一个Master-Broker节点,2个Slave-Broker节点,那么一份数据就会有三个副本,在DLedger机制中,我们通常把Master-Broker节点叫做L
通过前面的章节,我们已经知道了数据在RocketMQ中是分布式存储的。生产者发送消息时,先从NameServer获取到路由信息,然后根据一定算法将消息发送到某个Master-Broker中。但是,Topic是一个逻辑概念,对于某个Topic来说,属于它的消息分布在不同的Broker上,那如何决定消息到底分布在哪个Broker上呢?一、MessageQueue我们在创建Topic时,需要指定一个很关键的参数——MessageQueue,比如下图,我们可以在RocketMQ的可视化工作台里去创建一个名为order_topic的Topic,指定包含4个MessageQueue:那么Topic、Mes
前几个章节,我们已经对RocketMQ的基本架构、生产部署进行了详细介绍。从本章开始,我们就要利用RocketMQ对第一章中介绍的订单系统进行改造了。回顾下第一章的系统背景,我们的订单系统的逻辑架构图下:上述架构存在如下问题:同步调用问题:用户支付完一笔订单,订单系统就要同步执行一系列接口调用,导致响应慢,用户体验差;性能问题:瓶颈在数据库,无法应对“促销”之类的活动带来的峰值流量;耦合问题:积分、促销、通知、物流应当与核心链路解耦;大数据传输问题:外部系统会从订单数据库里查询订单数据,消耗系统本身的资源;状态补偿问题:”中间状态“的订单通过批量扫描的方式进行状态补偿,效率非常低。本章,我们来
上一章,我们介绍了RocketMQ的生产部署,由于在正式生产部署前一般都需要进行性能测试,所以本章我们来看下如何对RocketMQ进行性能测试。一、性能指标监控既然是性能测试,那么必然要看RocketMQ集群能承载的最高QPS是多少?同时在承载这个QPS的同时,各个机器的CPU、IO、磁盘、网络、内存的负载情况,以及JVM的GC情况等等。我们如何去观察这些指标吗?通常来说,指标分为两部分:机器本身指标:也就是上述说的CPU、内存、IO等等;RocketMQ指标:生产消息TPS、消费消息TPS、集群整体情况等等;1.1机器指标对于机器指标,最简单的方式我们可以通过OS自身的一些命令进行观察,比如
上一章,我们介绍了RocketMQ的基本架构,但是对于RocketMQ的很多底层原理还没有展开讨论,比如Rocket是如何做到高性能的?如何进行持久化?如何防止消息丢失等等。本章,我们先来看下如果要在我们自己的生产环境部署RocketMQ,需要做哪些事情?如何进行部署?后续章节再去研究RocketMQ的底层原理。一、业务量评估首先,我们需要对业务量(日活用户数、日访问数、日新增数据量等运营指标)进行一个预估,可以参考已有的一些类似系统,具体的指标因业务而已。系统上线规划时,一般需要考虑三个时间段:一月、一年、三年。然后根据评估的业务量确定系统的逻辑架构图,要用多少台机器,每台机器什么配置等等。
本章,我们先来看下RocketMQ的基本架构,涉及哪些核心组件,以及各个组件之间的关系又是怎样的。一、基本架构在RockectMQ中,一共有四个核心组件:NameServer、Broker、Producer、Customer,它们之间的基本关系可以用RocketMQ官方的一张图表示:上图中,BrokerCluster就是各个RocketMQ进程,ProducerCluster和ConsumerCluster分别是生产者和消费者,NameServerCluster是路由中心。看不懂?没关系,我们下面将一一分析上述的各个组件。1.1Broker我们在每台机器上部署的RocketMQ进程,就称为Br
本系列的第一篇,我们先来看下消息中间件能做些什么?从架构上来讲,我们之所以要在自己的系统中引入MQ,无非就是三个目的:解耦、异步、削峰。本章我们会先引入一个系统示例,以此为背景,展开我们对消息中间件使用的讨论,后续我们会分章节对一些核心问题进行分析。这里补充一句,在使用各种分布式技术的过程中,我们脑子里一定要有四个概念:高性能、高可用、可扩展、数据一致性。这四个概念我在《分布式系统从理论到实战系列》中一直反复强调的,如果你对一些常用的分布式框架很熟悉,就会明白,无论是哪一种分布式框架,其底层所有核心功能的实现一定是围绕上述四个基本概念来展开的。一、系统背景我们先来看一下业务系统的背景,这个系统
从本章开始,我将分析Kafka中的消费者(Consumer)的源码。我在之前的章节已经对Consumer的基本原理进行了分析,我们先来回顾一下,然后我再从源码层面对GroupCoordinator这个消费者组协调器进行分析。我之前粗略的讲解过GroupCoordinator:每个消费者组都会选择一个Broker作为自己的Coordinator,这个GroupCoordinator协调器负责监控这个消费组里的各个消费者的心跳,判断它们是否宕机,如果宕机则进行Rebalance。那么,GroupCoordinator的底层实现是怎样的?Consumer又是如何与它进行通信的?这就是本章我们要了解的
当Kafka集群中的一个Broker节点突然宕机,那么这个节点上的分区副本就已经处于功能失效的状态了,Kafka并不会自动将这些失效分区副本迁移到其它Broker节点上;或者说,当集群中新增一个Broker时,这个Broker上没有任何分区副本,Kafka也不会自动将其它Broker上已有的一些分区副本重分配到该新加入的Broker中。所以,这就是涉及到了分区重分配(reblance)问题,本章,我就对分区重分配的整体流程和底层原理进行讲解。一、重分配策略执行分区重分配,第一件事情就是要生成重分配方案。Kafka提供了kafka-reassign-partitions.sh脚本,用来生成重分配
了解了Broker集群的选举以及整体的集群管理机制,我们来看下Kafka创建Topic,以及对分区副本进行管理的流程。通常来说,我们会通过Kafka自带的kafka-topics.sh脚本来创建Topic。那么,当我们指定了一个Topic的分区数、每个分区的副本数之后,Controller(LeaderBroker)是如何选择Leader副本?又是如何分配在Broker集群中分配这些副本的呢?本章,我就对Topic的分区副本分配原理进行讲解。如果Producer发送消息时指定了一个不存在的Topic,也会默认创建(分区1,副本1),可以通过Broker端的参数auto.create.topic
在Kafka集群中,会有一个或多个Broker,那么Kafka是如何管理这些Broker的呢?从本章开始,我将讲解Kafka的集群管理,这块内容本质属于ReplicationSubsystem,但是又相对独立,所以我单独拎出来讲解。Kafka集群会选举一个Broker作为控制器(KafkaController),由它来负责管理整个集群中的所有分区和副本的状态:当某个分区的Leader副本出现故障时,由Controller负责为该分区选举新的Leader副本;当检测到某个分区的ISR集合发生变化时,由Controller负责通知所有Broker更新其元数据信息;当为某个Topic增加分区数量时,
本章,我将对副本同步过程中的延迟读写情况进行讲解。什么是延迟读写?这要分两种情况:延迟读取:也就是说,生产者可能发送消息并不频繁,各个分区的Leader和Follower完全处于同步一致的状态,这时,Leader本身没什么消息写入,而此时Follower又发送了Fetch请求过来同步消息,此时Leader就会延迟读取消息;延迟写入:主要就是Producer的acks参数设置为all或-1,必须等待所有ISR中的Follower都拉取到了这批数据后,才可以响应。本章,我就对这两种情况进行讲解。一、延迟读取首先,我们来看如果Broker端接受到Fetch请求后,没有最新的消息可供读取会怎么做?1.
Broker接受到Follower发送过来的Fetch请求后,会由Broker的KafkaAPI层处理,然后由各个Leader分区进行处理://KafkaApis.scalaclassKafkaApis(valrequestChannel:RequestChannel,valreplicaManager:ReplicaManager,valadminManager:AdminManager,valcoordinator:GroupCoordinator,valcontroller:KafkaController,valzkUtils:ZkUtils,valbrokerId:Int,valcon
从本章开始,我将讲解KafkaBroker的副本子系统(ReplicationSubsystem)。在Kafka中,副本是最重要的概念之一,副本机制是Kafka实现高可用的基础。Kafka会将同一个分区的多个副本分散在不同的Broker机器上,其中的某个副本会被指定为Leader,负责响应客户端的读写请求,其它副本自动成为Follower,向Leader副本发送读取请求,同步最新写入的数据。那么接下来的几章,我就针对Kafka副本子系统的副本同步功能进行讲解,主要包括以下内容:Follower副本从Leader拉取数据的全流程;Leader副本的LEO和HW更新机制;ISR列表的更新机制。本章
本章,我来带大家学习一下Kafka源码中的索引对象。在Kafka中索引类的继承关系如下图:AbstractIndex:定义了最顶层的抽象类,这个类封装了所有索引类型的公共操作;LazyIndex:AbstractIndex的一个包装类,实现索引项延迟加载,这个类主要是为了提升性能;OffsetIndex:位移索引,保存<相对位移值,文件磁盘物理位置>;TimeIndex:时间戳索引,保存<时间戳,相对位移值>;TransactionIndex:事务索引,为已中止事务(AbortedTranscation)保存重要的元数据信息,只有启用Kafka事务后,这个索引才有可能出
本章,我们来看下LogSegment的源码,LogSegment就是对日志段的抽象:classLogSegment(vallog:FileRecords,valindex:OffsetIndex,valtimeIndex:TimeIndex,valbaseOffset:Long,valindexIntervalBytes:Int,valrollJitterMs:Long,time:Time)extendsLogging{}可以看到,LogSegment类的声明包含了以下信息:FileRecords:实际保存Kafka消息的对象;OffsetIndex:位移索引;TimeIndex:时间戳索引;
Log是LogSegment日志段的容器,里面定义了很多管理日志段的操作。Log对象是KafkaBroker最核心的部分://Log.scalaclassLog(@volatilevardir:File,@volatilevarconfig:LogConfig,@volatilevarrecoveryPoint:Long=0L,scheduler:Scheduler,time:Time=Time.SYSTEM)extendsLoggingwithKafkaMetricsGroup{}Log中包含两个核心属性:dir和logStartOffset。dir是主题分区日志所在的文件夹路径,比如&qu
从开始本章,我将讲解KafkaServer的日志子系统(LogSubsystem)。LogSubsystem负责的核心工作就是日志的持久化,也就是写消息到磁盘。Kafka之所以具有极高的性能,和LogSubsystem的优秀设计是分不开的。本章,我先带大家回顾下Kafka的整个日志文件系统的设计,然后对Server端的几个核心日志组件进行整体讲解。一、日志结构我们回顾下《透彻理解Kafka(二)——消息存储:日志格式》中的内容,假设有一个名为“topic”的主题,此主题具有4个分区,那么在物理存储上就有topic-0、topic-1、topic-2、topic-3四个目录:[root@node
上一章,我讲到KafkaRequestHandlerPool最终会将请求交给API层的KafkaApis处理。本章,我们就来看下KafkaApis的整体架构。KafkaApis类定义在源码文件KafkaApis.scala中:classKafkaApis(valrequestChannel:RequestChannel,valreplicaManager:ReplicaManager,valadminManager:AdminManager,valcoordinator:GroupCoordinator,valcontroller:KafkaController,valzkUtils:ZkUt
本章,我将对网络层中的最后两个组件RequestChannel和RequestHandler进行讲解。严格来说,RequestChannel和RequestHandler并不属于网络层(NetworkLayer),当然,它们也不属于API层(APILayer)。我将RequestChannel和RequestHandler定位为KafkaServer端网络层与API层之间交互的中间件。本章,我会分析RequestChannel和RequestHandler的整体架构,以及它们对请求/响应的底层处理细节。一、RequestChannel首先,我们来看RequestChannel。RequestC
上一章讲到,Acceptor线程通过创建SocketChannel与客户端完成三次握手,建立TCP连接后,会将SocketChannel交给Processor线程处理。本章,我们就来看看Processor线程内部的运行机制。一、整体架构Processer线程的数量是通过num.network.threads配置的,默认3个,即一个Acceptor线程对应3个Processor线程。Processor会对读/写请求进行处理,我后面会分两部分分别讲解读/写流程,这样流程会更清晰,也便于读者理解,我们先来看Processer线程的整体工作流程。1.1工作流程Acceptor线程通过调用Process
NetworkLayer(网络层)是KafkaBroker处理所有请求的入口。Kafka基于JavaNIO实现了一套Reactor线程模型,其核心流程就是与客户端建立连接,然后对请求进行解析,封装成Request对象传递给API层,同时接受API层的处理结果,封装后响应给客户端。本章,我将先对网络层的整体架构进行分析,然后对其中的一个核心组件——Acceptor线程进行讲解。一、整体架构我们来看下网络层的整体架构:整个网路层,包含的核心组件和功能说明如下:SocketServer#网络层的整体封装类|--Acceptor#Acceptor线程,负责监听并建立与客户端的连接|--Processo
Kafka服务端(也就是Broker)的源码是采用Scala和Java编写的,Scala是一门JVM语言,Kafka用它写服务端源码主要是利用它的一些语法糖,简化编码,所以基本不影响我们阅读Broker的源码。服务端源码主要存放在core模块中:我们分析Kafka服务端的源码,需要重点关注以下四个方面,后续我也会分四个部分逐一讲解,本章我先从整体上讲解Broker的架构:网络通信;磁盘读写;副本同步;集群管理。一、整体架构KafkaBroker的整体架构可以用下面这张图表示:可以看到,Broker的各个模块的职责和架构分层非常清晰:NetworkLayer(网络层):负责与客户端建立连接并处理
KafkaProducer发送消息的过程中可能会出现消息超时的问题。本章,我会从Kafka客户端的底层对该问题进行讲解。一、超时场景我们先来看下,哪些情况下会出现超时问题:RecordBatch长时间停留在BufferPool缓冲区中,压根没有被Sender线程获取;Sender线程将消息发送出去了,但是一直没有收到响应,NetworkSend请求长时间积压在InFlightRequests中。1.1BufferPool超时我们先来看第一种情况。Sender线程的运行主流程中有这么一行代码://Sender.javavoidrun(longnow){//...//5.剔除超时的batch(默认