MirrorMaker
1、概要介绍
对于Kafka企业级用户而言,一个常见的痛点就是跨机房或跨数据中心(data center,DC)的数据传输。大型企业通常在多个数据中心部署Kafka集群。.这里的数据中心可能是企业拥有的自建机房,也可能是公有云厂商的不同机房。在多个机房部署Kafk集群的优势如下。
- 实现灾备。
- 较近的地理位置可缩短延时以及用户响应时间。
- 实现负载均衡,即每个数据中心上的集群可能只保存部分数据集合。
- 区别隔离不同优先级的数据处理。
尽管部署方式各种各样,但跨机房部署方案有一个共同的特点:数据需要能够从一个Kafka集群被拷贝到另一个集群,而且还必须支持双向拷贝:某次传输的源集群(source cluster)可能是下次传输的目标集群(target cluster)。
为了实现这样的需求,Kafka默认提供了一个工具MirrorMaker,用来帮助用户实现数据在两个Kafka集群间的拷贝。就具体实现而言,MirrorMaker仅仅是一个consumer+producer的混合体。对于源集群而言,它是一个consumer;而对于目标集群而言,它又是一个producer。
MirrorMaker读取源集群指定topic的数据,然后写入目标集群中的同名topic下。用户可以运行多个MirrorMaker实例增加整体数据拷贝的吞吐量,同时还提升了容错性。毕竟当一个实例崩溃后,其他实例能够自动地承担起它的负载。
在实际生产环境中,源集群和目标集群是完全独立的两套环境:它们上的toc可能设置了不同的分区数且有不同的位移值。基于这个原因,MirrorMaker工具并不能完美地实现容错性一因为consumer的位移值可能是不同的。不过,MirrorMaker依然会保存并使用消息的key来执行分区任务。
2、主要参数
MirrorMaker的脚本名是kafka-mirror–maker…sh(bat),位于Kafka安装目录的bin子目录下
(Windows平台下则位于Kafka安装目录的bin/windows子目录下)。
参数名 | 参数含义 |
---|---|
whitelist | 指定一个正则表达式,指定拷贝源集群中的哪些topic。比如a |
blacklist | 指定一个正则表达式,屏蔽指定topic的拷贝。注意,该参数只使用于老版本consumer |
abort.on.send.failure | 若设置为true,当发送失败时则关闭MirrorMaker |
consumer.config | 指定MirrorMaker下consumer的属性文件。至少指定bootstrap.server和group.id |
producer.config | 指定MirrorMaker下producer的属性文件 |
consumer.rebalance.listener | 指定MirrorMaker使用的consumerrebalance监听器 |
rebalance.listener.args | 指定MirrorMaker使用的consumerrebalance监听器的参数,与consumer.rebalance.listener一同使用 |
message.handler | 指定消息处理器类。消息处理器在consumer获取消息与producer发送消息之间调用 |
message.handler.args | 指定消息处理器类的参数,与message.handler一同使用 |
num.streams | 指定MirrorMaker线程数。默认是1 |
offset.conmmit.interval.ms | 指定MirrorMaker位移提交间隔,默认值为1分钟 |
help | 打印帮助信息 |
在实际使用中,经常被使用的参数是whitelist、.consumer.config和producer…config。如果
要实现某些特定的拷贝逻辑(比如拷贝指定分区数据或有选择性地拷贝数据等),那么就需要实现特定的消息处理器并使用message.handler进行指定。
一个典型的命令执行如下。该命令读取由consumer.properties文件指定的源Kafka集群,
去读取名为topicA和topicB主题的消息,并写入到由producer.properties文件指定的目标Kafka集群。
bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist topicA|topicB
3、实例
下面我们结合一个具体的实例来看看如何应用MirrorMaker工具在集群间进行数据拷贝。
在本例中我们搭建2个测试Kafka单点集群,分别用k1、k2表示。两套环境都是单节点的Kafka集群,端口号分别是9092、9093。本例的目标是在k1上生产消息,然后使用MirrorMaker将消息拷贝到k2上,最后在k2上运行consumer以验证消息是否被成功拷贝。
分别设置2套环境,使用不同的Zookeeper chroot:
zookeeper.connect=localhost:2181/k1
zookeeper.connect=localhost:2181/k2
分别启动k1、k2
./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server2.properties
在k1上创建一个测试topic
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic --partitions 1 --replication-factor 1
创建consumer.config文件
bootstrap.servers=localhost:9092
client.id=test.k1Andk2
group.id=test.k1Andk2
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
创建producer.config文件
bootstrap.servers=localhost:9093
client.id=test.k1Andk2
启动MirrorMaker
./kafka-mirror-maker.sh --consumer.config ../config/consumer.properties --producer.config ../config/producer.properties --whitelist test-topic
在k1上是使用console producer模拟生产消息
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
one
two
three
four
在k2上使用console consumer验证消息是否拷贝成功:
./kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test-topic --from-beginning
上述输出表明k1的消息经由MirrorMaker成功拷贝到k2集群。