了解了Broker集群的选举以及整体的集群管理机制,我们来看下Kafka创建Topic,以及对分区副本进行管理的流程。通常来说,我们会通过Kafka自带的kafka-topics.sh
脚本来创建Topic。那么,当我们指定了一个Topic的分区数、每个分区的副本数之后,Controller(Leader Broker)是如何选择Leader副本?又是如何分配在Broker集群中分配这些副本的呢?
本章,我就对Topic的分区副本分配原理进行讲解。
如果Producer发送消息时指定了一个不存在的Topic,也会默认创建(分区1,副本1),可以通过Broker端的参数
auto.create.topics.enable
禁止默认创建的行为,生产环境建议禁止掉。
一、创建Topic
通过上一章的讲解,我们应该已经明白, 集群中的每个Broker都知道整个集群的元数据信息 。所谓元数据信息就是:集群中的每个Broker上有哪些Topic分区,每个Topic的分区信息,这些分区的Leader副本在哪,Follower副本在哪……
1.1 脚本使用
所以,Controller需要对这些Topic的分区进行管理,我以一个Topic的创建作为示例进行讲解,便于大家理解。首先,我们来看Topic的创建流程:
创建Topic通过脚本kafka-topics.sh
:
# kafka-topics.sh
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
本质是执行了 TopicCommand 命令:
// TopicCommand.scala
def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
// Topic名称
val topic = opts.options.valueOf(opts.topicOpt)
// 配置
val configs = parseTopicConfigsToBeAdded(opts)
val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
try {
// 1.手动指定分区
if (opts.options.has(opts.replicaAssignmentOpt)) {
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
} else {
// 2.自动分配分区
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
// 配置的分区数
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
// 配置的副本数
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
else RackAwareMode.Enforced
// 创建主题
AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
}
println("Created topic \"%s\".".format(topic))
} catch {
case e: TopicExistsException => if (!ifNotExists) throw e
}
}
我们重点看它的自动分配分区分支,调用了AdminUtils.createTopic()
来创建Topic并对分区副本进行分配:
// AdminUtils.scala
def createTopic(zkUtils: ZkUtils,
topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
// 1.从Zookeeper中获取Broker集群的元数据信息
val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
// 2.基于一定的算法,将分区副本分配给各个Broker
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas,
partitions, replicationFactor)’
// 3.将分配好最终策略,直接写入Zookeeper中的/brokers/topics/[Topic名称]节点中
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
}
可以看到, 创建Topic的本质就是根据Topic的分区数、每个分区的副本数,基于一定的算法把它们分配给各个Broker,然后把分配策略写入到Zookeeper中 。
所谓分区副本分配策略,我这里简单解释下,假设有个Topic设置3个分区,每个分区2个副本,那么分配结果可能就是下面这个样子:
partition1 -> [0,1] #分区1的Leader副本分配在Broker0,Follower副本分配在Broker1
partition2 -> [2,0] #分区2的Leader副本分配在Broker2,Follower副本分配在Broker0
partition3 -> [1,2] #分区3的Leader副本分配在Broker1,Follower副本分配在Broker2
至于具体的分区副本分配算法,我就不赘述了,读者可以自己去
AdminUtils.assignReplicasToBrokers
方法里瞅一瞅,无非就是类似负载均衡之类的策略,我重点关注分区副本分配的整体流程。
二、副本管理
创建Topic只是将 分区副本分配策略 写入到了Zookeeper的/brokers/topics/[Topic名称]
节点中,那么接下来Controller如何根据策略来进行执行副本分配?如何对副本进行管理呢?
2.1 监听Topic创建
显然,Controller是可以感知到新Topic的创建的,也就是说它会去监听/brokers/topics
节点的变化,整个监听的过程我用下面这张图来表示:
我们来看下底层的源码:
// KafkaController.scala
// Broker选举成为Leader后,会调用该方法
def onControllerFailover() {
if(isRunning) {
info("Broker %d starting become controller state transition".format(config.brokerId))
readControllerEpochFromZookeeper()
incrementControllerEpoch(zkUtils.zkClient)
registerReassignedPartitionsListener()
registerIsrChangeNotificationListener()
registerPreferredReplicaElectionListener()
// 关键看这里
partitionStateMachine.registerListeners()
//...
}
else
info("Controller has been shut down, aborting startup/failover")
}
onControllerFailover方法中调用了PartitionStateMachine.registerListeners()
,它会去监听/brokers/topics/
节点的变化:
// PartitionStateMachine.scala
def registerListeners() {
registerTopicChangeListener()
registerDeleteTopicListener()
}
private def registerTopicChangeListener() = {
// 监听“/brokers/topics”节点的变化
zkUtils.zkClient.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
}
private def registerDeleteTopicListener() = {
// 监听“/admin/delete_topics”节点的变化
zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
}
我们来看下TopicChangeListener这个监听器,它的内部就 根据“/brokers/topics”节点下的子节点变化,筛选出新增的Topic, 然后按照分区维度维护成一个Map[TopicAndPartition, Seq[Int]]
:
// PartitionStateMachine.scalaSS
class TopicChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {
protected def logName = "TopicChangeListener"
// 当“/brokers/topics”节点下的子节点发生变化时,会触发Controller调用该方法
def doHandleChildChange(parentPath: String, children: Seq[String]) {
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
try {
val currentChildren = {
debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
children.toSet
}
// 新创建的分区
val newTopics = currentChildren -- controllerContext.allTopics
// 删除的分区
val deletedTopics = controllerContext.allTopics -- currentChildren
controllerContext.allTopics = currentChildren
// 获取分区副本分配策略
val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
!deletedTopics.contains(p._1.topic))
controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
deletedTopics, addedPartitionReplicaAssignment))
if (newTopics.nonEmpty)
// 关键看这里,
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
} catch {
case e: Throwable => error("Error while handling new topic", e)
}
}
}
}
}
最后,上述代码会调用KafkaController的onNewTopicCreation
方法,发送Topic的元数据信息给各个Broker,这个Broker就可以进行一些初始化操作,比如新建分区日志段,准备接受Producer发送过来的消息等等:
// KafkaController.scala
def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
info("New topic creation callback for %s".format(newPartitions.mkString(",")))
// subscribe to partition changes
topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
// 按照分配策略,发送Topic的元数据信息给各个Broker
onNewPartitionCreation(newPartitions)
}
三、总结
本章,我对Topic创建的整体流程和底层原理进行了讲解,Controller会监听新Topic的创建,同时对分区副本进行管理,向新的元数据信息发送给集群中的其它Broker。