从本章开始,我将分析Kafka的源码:
从功能上讲,Kafka 源码分为四大模块。
- 服务器端源码:实现 Kafka 架构和各类优秀特性的基础;
- Java 客户端源码:定义了与 Broker 端的交互机制,以及通用的 Broker 端组件支撑代码;
- Connect 源码:用于实现 Kafka 与外部系统的高性能数据传输;
- Streams 源码:用于实现实时的流处理功能。
本系列,我只分析Java客户端源码和服务端源码。我们先来搭建源码环境,Kafka服务端的源码是采用Scala写的,而最新版本的客户端源码则是采用Java写的,另外,Kafka项目的工程采用Gradle管理,所以我们先要在本机安装JDK和Scala、Gradle:
- JDK:1.8
- Scala:2.10.6
- Gradle:3.1
我后续章节使用windows作演示,Linux和Mac OS其实本质是一样的。另外,Kafka的官方文档是非常好的参考资料:http://kafka.apache.org/0102/documentation.html ,建议各位读者抽空系统性的看一下。
一、环境准备
1.1 Scala和Gradle
JDK和Gradle的安装配置就不赘述了,主要就是记得配置JAVA_HOME
和GRADLE_HOME
环境变量。Scala的话去官网下载msi安装包,安装完成后,验证三个组件是否安装成功:
java -version
scala -version
gradle -version
1.2 Zookeeper
因为Kafka是依赖ZooKeeper的,所以还需要安装并部署Zookeeper,我这里使用的版本为3.4.10,解压缩后,将conf目录下的zoo_sample.cfg
文件复制一份,重命名为zoo.cfg
,然后修改dataDir
和dataLogDir
指向自定义的目录:
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=D:\software\zookeeper-3.4.10\data
dataLogDir=D:\software\zookeeper-3.4.10\log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
最后执行bin目录下的zkServer.cmd
文件即可启动:
2018-06-07 21:08:47,280 [myid:] - INFO [main:Environment@100] - Server environment:java.compiler=<NA>
2018-06-07 21:08:47,282 [myid:] - INFO [main:Environment@100] - Server environment:os.name=Windows 10
2018-06-07 21:08:47,283 [myid:] - INFO [main:Environment@100] - Server environment:os.arch=amd64
2018-06-07 21:08:47,283 [myid:] - INFO [main:Environment@100] - Server environment:os.version=10.0
2018-06-07 21:08:47,284 [myid:] - INFO [main:Environment@100] - Server environment:user.name=Ressmix
2018-06-07 21:08:47,285 [myid:] - INFO [main:Environment@100] - Server environment:user.home=C:\Users\Ressmix
2018-06-07 21:08:47,286 [myid:] - INFO [main:Environment@100] - Server environment:user.dir=D:\software\zookeeper-3.4.10\bin
2018-06-07 21:08:47,324 [myid:] - INFO [main:ZooKeeperServer@829] - tickTime set to 2000
2018-06-07 21:08:47,325 [myid:] - INFO [main:ZooKeeperServer@838] - minSessionTimeout set to -1
2018-06-07 21:08:47,326 [myid:] - INFO [main:ZooKeeperServer@847] - maxSessionTimeout set to -1
2018-06-07 21:08:49,337 [myid:] - INFO [main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181
1.3 Kafka源码
最后,下载Kafka源码,我这里使用kafka-0.10.2.2
版本,下载的是kafka-0.10.2.2-src.tgz源码压缩包,解压缩即可。接着,通过win命令行进入kafka-0.10.0.1-src
目录下,然后执行gradle idea
为源码导入Intellij Idea进行构建,成功后显示如下字样:
BUILD SUCCESSFUL
Total time: 9 mins 26.645 secs
如果要导入eclipse,那么就执行
gradle eclipse
。
二、启动Kafka
2.1 源码构建
构建好kafka源码之后,首先要导入到IntelliJ IDEA中,记得安装Scala插件:
导入成功后会自动构建,成功后结果如下:
2.2 Kafka配置
构建并导入工程成功后,我们需要先对Kafka做一些配置。
日志配置
首先,把config目录下的log4j.properties
复制到core/src/main/resources
目录下(没有resources目录就自建),并增加如下配置:
kafka.logs.dir = logs/my-kafka-log
这样Kafka服务端运行起来后,才会打印日志。接着,需要修改config目录下的server.properties
,重点修改日志存储目录log.dirs
。
启动配置
最后,我们配置Kafka的启动类Kafka.scala
:
然后,就可以启动Kafka了,注意要先启动Zookeeper:
...
[2018-06-07 00:06:38,329] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2018-06-07 00:06:38,348] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.GroupCoordinator)
[2018-06-07 00:06:38,351] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.GroupCoordinator)
[2018-06-07 00:06:38,353] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 3 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2018-06-07 00:06:38,401] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2018-06-07 00:06:38,451] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2018-06-07 00:06:38,490] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2018-06-07 00:06:38,493] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(windows10.microdone.cn,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2018-06-07 00:06:38,517] WARN Error while loading kafka-version.properties :null (org.apache.kafka.common.utils.AppInfoParser)
[2018-06-07 00:06:38,522] INFO Kafka version : unknown (org.apache.kafka.common.utils.AppInfoParser)
[2018-06-07 00:06:38,522] INFO Kafka commitId : unknown (org.apache.kafka.common.utils.AppInfoParser)
[2018-06-07 00:06:38,524] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
三、总结
本章,我搭建完了Kafka源码分析的环境,读者可以自己启动一个Kafka客户端程序,尝试发送一个消息进行验证。从下一章开始,我们正式进入Kafka源码分析环节。