2023-07-31
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/290

从本章开始,我将分析Kafka的源码:

202307312122031401.png

从功能上讲,Kafka 源码分为四大模块。

  • 服务器端源码:实现 Kafka 架构和各类优秀特性的基础;
  • Java 客户端源码:定义了与 Broker 端的交互机制,以及通用的 Broker 端组件支撑代码;
  • Connect 源码:用于实现 Kafka 与外部系统的高性能数据传输;
  • Streams 源码:用于实现实时的流处理功能。

本系列,我只分析Java客户端源码和服务端源码。我们先来搭建源码环境,Kafka服务端的源码是采用Scala写的,而最新版本的客户端源码则是采用Java写的,另外,Kafka项目的工程采用Gradle管理,所以我们先要在本机安装JDK和Scala、Gradle:

  • JDK:1.8
  • Scala:2.10.6
  • Gradle:3.1

我后续章节使用windows作演示,Linux和Mac OS其实本质是一样的。另外,Kafka的官方文档是非常好的参考资料:http://kafka.apache.org/0102/documentation.html ,建议各位读者抽空系统性的看一下。

一、环境准备

1.1 Scala和Gradle

JDK和Gradle的安装配置就不赘述了,主要就是记得配置JAVA_HOMEGRADLE_HOME环境变量。Scala的话去官网下载msi安装包,安装完成后,验证三个组件是否安装成功:

    java -version
    scala -version
    gradle -version

1.2 Zookeeper

因为Kafka是依赖ZooKeeper的,所以还需要安装并部署Zookeeper,我这里使用的版本为3.4.10,解压缩后,将conf目录下的zoo_sample.cfg文件复制一份,重命名为zoo.cfg,然后修改dataDirdataLogDir指向自定义的目录:

    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial 
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just 
    # example sakes.
    dataDir=D:\software\zookeeper-3.4.10\data
    dataLogDir=D:\software\zookeeper-3.4.10\log
    # the port at which the clients will connect
    clientPort=2181
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the 
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1

最后执行bin目录下的zkServer.cmd文件即可启动:

    2018-06-07 21:08:47,280 [myid:] - INFO  [main:Environment@100] - Server environment:java.compiler=<NA>
    2018-06-07 21:08:47,282 [myid:] - INFO  [main:Environment@100] - Server environment:os.name=Windows 10
    2018-06-07 21:08:47,283 [myid:] - INFO  [main:Environment@100] - Server environment:os.arch=amd64
    2018-06-07 21:08:47,283 [myid:] - INFO  [main:Environment@100] - Server environment:os.version=10.0
    2018-06-07 21:08:47,284 [myid:] - INFO  [main:Environment@100] - Server environment:user.name=Ressmix
    2018-06-07 21:08:47,285 [myid:] - INFO  [main:Environment@100] - Server environment:user.home=C:\Users\Ressmix
    2018-06-07 21:08:47,286 [myid:] - INFO  [main:Environment@100] - Server environment:user.dir=D:\software\zookeeper-3.4.10\bin
    2018-06-07 21:08:47,324 [myid:] - INFO  [main:ZooKeeperServer@829] - tickTime set to 2000
    2018-06-07 21:08:47,325 [myid:] - INFO  [main:ZooKeeperServer@838] - minSessionTimeout set to -1
    2018-06-07 21:08:47,326 [myid:] - INFO  [main:ZooKeeperServer@847] - maxSessionTimeout set to -1
    2018-06-07 21:08:49,337 [myid:] - INFO  [main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181

1.3 Kafka源码

最后,下载Kafka源码,我这里使用kafka-0.10.2.2版本,下载的是kafka-0.10.2.2-src.tgz源码压缩包,解压缩即可。接着,通过win命令行进入kafka-0.10.0.1-src目录下,然后执行gradle idea为源码导入Intellij Idea进行构建,成功后显示如下字样:

    BUILD SUCCESSFUL
    Total time: 9 mins 26.645 secs

如果要导入eclipse,那么就执行gradle eclipse

二、启动Kafka

2.1 源码构建

构建好kafka源码之后,首先要导入到IntelliJ IDEA中,记得安装Scala插件:

202307312122073112.png

202307312122079563.png

导入成功后会自动构建,成功后结果如下:

202307312122090104.png

2.2 Kafka配置

构建并导入工程成功后,我们需要先对Kafka做一些配置。

日志配置

首先,把config目录下的log4j.properties复制到core/src/main/resources目录下(没有resources目录就自建),并增加如下配置:

    kafka.logs.dir = logs/my-kafka-log

202307312122097885.png

这样Kafka服务端运行起来后,才会打印日志。接着,需要修改config目录下的server.properties,重点修改日志存储目录log.dirs

启动配置

最后,我们配置Kafka的启动类Kafka.scala

202307312122103676.png

然后,就可以启动Kafka了,注意要先启动Zookeeper:

    ...
    [2018-06-07 00:06:38,329] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
    [2018-06-07 00:06:38,348] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.GroupCoordinator)
    [2018-06-07 00:06:38,351] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.GroupCoordinator)
    [2018-06-07 00:06:38,353] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 3 milliseconds. (kafka.coordinator.GroupMetadataManager)
    [2018-06-07 00:06:38,401] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
    [2018-06-07 00:06:38,451] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    [2018-06-07 00:06:38,490] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    [2018-06-07 00:06:38,493] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(windows10.microdone.cn,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
    [2018-06-07 00:06:38,517] WARN Error while loading kafka-version.properties :null (org.apache.kafka.common.utils.AppInfoParser)
    [2018-06-07 00:06:38,522] INFO Kafka version : unknown (org.apache.kafka.common.utils.AppInfoParser)
    [2018-06-07 00:06:38,522] INFO Kafka commitId : unknown (org.apache.kafka.common.utils.AppInfoParser)
    [2018-06-07 00:06:38,524] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

三、总结

本章,我搭建完了Kafka源码分析的环境,读者可以自己启动一个Kafka客户端程序,尝试发送一个消息进行验证。从下一章开始,我们正式进入Kafka源码分析环节。

阅读全文