2024-04-03
原文作者:吴声子夜歌 原文地址: https://blog.csdn.net/cold___play/article/details/132331594

MirrorMaker

1、概要介绍

对于Kafka企业级用户而言,一个常见的痛点就是跨机房或跨数据中心(data center,DC)的数据传输。大型企业通常在多个数据中心部署Kafka集群。.这里的数据中心可能是企业拥有的自建机房,也可能是公有云厂商的不同机房。在多个机房部署Kafk集群的优势如下。

  • 实现灾备。
  • 较近的地理位置可缩短延时以及用户响应时间。
  • 实现负载均衡,即每个数据中心上的集群可能只保存部分数据集合。
  • 区别隔离不同优先级的数据处理。

尽管部署方式各种各样,但跨机房部署方案有一个共同的特点:数据需要能够从一个Kafka集群被拷贝到另一个集群,而且还必须支持双向拷贝:某次传输的源集群(source cluster)可能是下次传输的目标集群(target cluster)。

为了实现这样的需求,Kafka默认提供了一个工具MirrorMaker,用来帮助用户实现数据在两个Kafka集群间的拷贝。就具体实现而言,MirrorMaker仅仅是一个consumer+producer的混合体。对于源集群而言,它是一个consumer;而对于目标集群而言,它又是一个producer。
MirrorMaker读取源集群指定topic的数据,然后写入目标集群中的同名topic下。用户可以运行多个MirrorMaker实例增加整体数据拷贝的吞吐量,同时还提升了容错性。毕竟当一个实例崩溃后,其他实例能够自动地承担起它的负载。

在实际生产环境中,源集群和目标集群是完全独立的两套环境:它们上的toc可能设置了不同的分区数且有不同的位移值。基于这个原因,MirrorMaker工具并不能完美地实现容错性一因为consumer的位移值可能是不同的。不过,MirrorMaker依然会保存并使用消息的key来执行分区任务。

2、主要参数

MirrorMaker的脚本名是kafka-mirror–maker…sh(bat),位于Kafka安装目录的bin子目录下
(Windows平台下则位于Kafka安装目录的bin/windows子目录下)。

参数名 参数含义
whitelist 指定一个正则表达式,指定拷贝源集群中的哪些topic。比如a
blacklist 指定一个正则表达式,屏蔽指定topic的拷贝。注意,该参数只使用于老版本consumer
abort.on.send.failure 若设置为true,当发送失败时则关闭MirrorMaker
consumer.config 指定MirrorMaker下consumer的属性文件。至少指定bootstrap.server和group.id
producer.config 指定MirrorMaker下producer的属性文件
consumer.rebalance.listener 指定MirrorMaker使用的consumerrebalance监听器
rebalance.listener.args 指定MirrorMaker使用的consumerrebalance监听器的参数,与consumer.rebalance.listener一同使用
message.handler 指定消息处理器类。消息处理器在consumer获取消息与producer发送消息之间调用
message.handler.args 指定消息处理器类的参数,与message.handler一同使用
num.streams 指定MirrorMaker线程数。默认是1
offset.conmmit.interval.ms 指定MirrorMaker位移提交间隔,默认值为1分钟
help 打印帮助信息

在实际使用中,经常被使用的参数是whitelist、.consumer.config和producer…config。如果
要实现某些特定的拷贝逻辑(比如拷贝指定分区数据或有选择性地拷贝数据等),那么就需要实现特定的消息处理器并使用message.handler进行指定。

一个典型的命令执行如下。该命令读取由consumer.properties文件指定的源Kafka集群,
去读取名为topicA和topicB主题的消息,并写入到由producer.properties文件指定的目标Kafka集群。

    bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist topicA|topicB

3、实例

下面我们结合一个具体的实例来看看如何应用MirrorMaker工具在集群间进行数据拷贝。

在本例中我们搭建2个测试Kafka单点集群,分别用k1、k2表示。两套环境都是单节点的Kafka集群,端口号分别是9092、9093。本例的目标是在k1上生产消息,然后使用MirrorMaker将消息拷贝到k2上,最后在k2上运行consumer以验证消息是否被成功拷贝。

分别设置2套环境,使用不同的Zookeeper chroot:

    zookeeper.connect=localhost:2181/k1
    zookeeper.connect=localhost:2181/k2

分别启动k1、k2

    ./kafka-server-start.sh -daemon ../config/server.properties
    
    ./kafka-server-start.sh -daemon ../config/server2.properties

在k1上创建一个测试topic

    ./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic --partitions 1 --replication-factor 1

创建consumer.config文件

    bootstrap.servers=localhost:9092
    client.id=test.k1Andk2
    group.id=test.k1Andk2
    partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

创建producer.config文件

    bootstrap.servers=localhost:9093
    client.id=test.k1Andk2

启动MirrorMaker

    ./kafka-mirror-maker.sh --consumer.config ../config/consumer.properties --producer.config ../config/producer.properties --whitelist test-topic

在k1上是使用console producer模拟生产消息

    ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
    one
    two
    three
    four

202404032128092661.png

在k2上使用console consumer验证消息是否拷贝成功:

    ./kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test-topic --from-beginning

202404032128096292.png

上述输出表明k1的消息经由MirrorMaker成功拷贝到k2集群。

阅读全文