2024-04-03
原文作者:吴声子夜歌 原文地址: https://blog.csdn.net/cold___play/article/details/132324514

AdminClient

自0.ll.0.0版本起,Kafka社区推出了AdminClient和KafkaAdminClient,意在统一所有的集群管理API。使用0.11.0.0及以后版本的用户应该始终使用这个类来管理集群。

虽然和原先服务器端的AdminClient类同名,但这个工具是属于客户端的,因此只需要在管理程序项目中添加kafka clients依赖即可:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.3.0</version>
    </dependency>
方法名称 作用
createTopics 创建一个或多个Topic
listTopics 查询Topic列表
deleteTopics 删除一个或多个Topic
describeTopics 查询Topic的描述信息
describeConfigs 查询Topic、Broker等的所有配置项信息
alterConfigs 用于修改Topic、Broker等的配置项信息(该方法在新版本中被标记为已过期)
incrementalAlterConfigs 同样也是用于修改Topic、Broker等的配置项信息,但功能更多、更灵活,用于代替alterConfigs
createPartitions 用于调整Topic的Partition数量,只能增加不能减少或删除,也就是说新设置的Partition数量必须大于等于之前的Partition数量

1、创建AdminClient

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(properties);
        adminClient.close();
    }

2、获取集群信息

    /**
     * 获取Kafka集群信息
     * @param adminClient
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void describeCluster(AdminClient adminClient) throws ExecutionException, InterruptedException {
        DescribeClusterResult ret = adminClient.describeCluster();
        System.out.println(String.format("Cluster id: %s, controller: %s",
                ret.clusterId().get(), ret.controller().get()));
        System.out.println("Current cluster nodes info: ");
        for (Node node : ret.nodes().get()) {
            System.out.println(node);
        }
    }
    Cluster id: z-FbYLVTSqK-6TCv0osH3Q, controller: localhost:9092 (id: 0 rack: null)
    Current cluster nodes info: 
    localhost:9092 (id: 0 rack: null)

3、创建topic

    /**
     * 创建一个分区数量为3,副本数量为1的主题
     * @param adminClient
     * @param topicName
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void createTopics(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
        NewTopic newTopic = new NewTopic(topicName, 3, (short)1);
        CreateTopicsResult ret = adminClient.createTopics(Arrays.asList(newTopic));
        ret.all().get();
    }

4、查看topic列表

    /**
     * 列出所有topic
     * @param adminClient
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void listAllTopics(AdminClient adminClient) throws ExecutionException, InterruptedException {
        //可配置选项
        ListTopicsOptions options = new ListTopicsOptions();
        //是否列出内部使用的topics
        options.listInternal(true);
        ListTopicsResult topicsResult = adminClient.listTopics(options);
        //打印topics名称
        Set<String> topicNames = topicsResult.names().get();
        System.out.println(topicNames);
        //打印topics信息
        Collection<TopicListing> topicListings = topicsResult.listings().get();
        System.out.println(topicListings);
    }
    [test, testTopic, test-create-topic, my-topic, __consumer_offsets]
    [(name=test, internal=false), (name=testTopic, internal=false), (name=test-create-topic, internal=false), (name=my-topic, internal=false), (name=__consumer_offsets, internal=true)]

5、删除topic

    /**
     * 删除给定topic
     * @param adminClient
     * @param topicName
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void deleteTopics(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
        DeleteTopicsResult ret = adminClient.deleteTopics(Arrays.asList(topicName));
        ret.all().get();
    }

6、查询topic信息

一个Topic会有自身的描述信息,例如:partition的数量,副本集的数量,是否为internal等等。AdminClient中提供了describeTopics方法来查询这些描述信息。代码示例:

    /**
     * 查看topic详细信息
     * @param adminClient
     * @param topicName
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void describeTopics(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
        DescribeTopicsResult ret = adminClient.describeTopics(Arrays.asList(topicName, "__consumer_offsets"));
        //等待返回结果完成
        Map<String, TopicDescription> topics = ret.all().get();
        for (Map.Entry<String, TopicDescription> entry : topics.entrySet()) {
            System.out.println(entry.getKey() + "---->" + entry.getValue());
        }
    
    }
    testTopic---->(name=testTopic, internal=false, partitions=(partition=0, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)), authorizedOperations=null)
    __consumer_offsets---->(name=__consumer_offsets, internal=true, partitions=(partition=0, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=1, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=2, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=3, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=4, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=5, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=6, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=7, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=8, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=9, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=10, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=11, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=12, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=13, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=14, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=15, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=16, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=17, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=18, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=19, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=20, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=21, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=22, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=23, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=24, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=25, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=26, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=27, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=28, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=29, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=30, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=31, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=32, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=33, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=34, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=35, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=36, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=37, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=38, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=39, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=40, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=41, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=42, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=43, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=44, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=45, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=46, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=47, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=48, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=49, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)), authorizedOperations=null)

7、查询配置信息

    /**
     * 获取指定topic的topic级别参数配置信息
     * @param adminClient
     * @param topicName
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void describeConfig(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
        DescribeConfigsResult ret = adminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName)));
        Map<ConfigResource, Config> configMap = ret.all().get();
        for (Map.Entry<ConfigResource, Config> entry : configMap.entrySet()) {
            ConfigResource key = entry.getKey();
            Config value = entry.getValue();
            System.out.println(String.format("Resource type: %s, resource name: %s", key.type(), key.name()));
            Collection<ConfigEntry> configEntries = value.entries();
            for (ConfigEntry each : configEntries) {
                System.out.println(each.name() + " = " + each.value());
            }
        }
    }
    Resource type: TOPIC, resource name: testTopic
    compression.type = producer
    leader.replication.throttled.replicas = 
    message.downconversion.enable = true
    min.insync.replicas = 1
    segment.jitter.ms = 0
    cleanup.policy = delete
    flush.ms = 9223372036854775807
    follower.replication.throttled.replicas = 
    segment.bytes = 1073741824
    retention.ms = 604800000
    flush.messages = 9223372036854775807
    message.format.version = 3.0-IV1
    file.delete.delay.ms = 60000
    max.compaction.lag.ms = 9223372036854775807
    max.message.bytes = 1048588
    min.compaction.lag.ms = 0
    message.timestamp.type = CreateTime
    preallocate = false
    min.cleanable.dirty.ratio = 0.5
    index.interval.bytes = 4096
    unclean.leader.election.enable = false
    retention.bytes = -1
    delete.retention.ms = 86400000
    segment.ms = 604800000
    message.timestamp.difference.max.ms = 9223372036854775807
    segment.index.bytes = 10485760

8、修改配置信息

8.1、alterConfigs

在早期版本中,使用alterConfigs方法来修改配置项。代码示例:

    /**
     * 修改指定topic的topic级别参数
     * @param adminClient
     * @param topicName
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void alterConfigs(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
        // 指定ConfigResource的类型及名称
        ConfigResource configResource = new ConfigResource(
                ConfigResource.Type.TOPIC, topicName
        );
        // 配置项以ConfigEntry形式存在
        Config config = new Config(Arrays.asList(
                new ConfigEntry("cleanup.policy", "compact")
        ));
        //构建
        Map<ConfigResource, Config> configMap = new HashMap<>();
        configMap.put(configResource, config);
        //执行
        AlterConfigsResult ret = adminClient.alterConfigs(configMap);
        ret.all().get();
    
        //查看
        describeConfig(adminClient, topicName);
    }
    Resource type: TOPIC, resource name: my-topic
    compression.type = producer
    leader.replication.throttled.replicas = 
    message.downconversion.enable = true
    min.insync.replicas = 1
    segment.jitter.ms = 0
    cleanup.policy = compact
    flush.ms = 9223372036854775807
    follower.replication.throttled.replicas = 
    segment.bytes = 1073741824
    retention.ms = 604800000
    flush.messages = 9223372036854775807
    message.format.version = 3.0-IV1
    file.delete.delay.ms = 60000
    max.compaction.lag.ms = 9223372036854775807
    max.message.bytes = 1048588
    min.compaction.lag.ms = 0
    message.timestamp.type = CreateTime
    preallocate = false
    min.cleanable.dirty.ratio = 0.5
    index.interval.bytes = 4096
    unclean.leader.election.enable = false
    retention.bytes = -1
    delete.retention.ms = 86400000
    segment.ms = 604800000
    message.timestamp.difference.max.ms = 9223372036854775807
    segment.index.bytes = 10485760

8.2、incrementalAlterConfigs

在新版本中则是使用incrementalAlterConfigs方法来修改Topic的配置项,该方法使用起来相对于alterConfigs要略微复杂一些,但因此功能更多、更灵活。代码示例:

    /**
     * 修改指定topic的topic级别配置
     * @param adminClient
     * @param topicName
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void incrementalAlterConfigs(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
        // 指定ConfigResource的类型及名称
        ConfigResource configResource = new ConfigResource(
                ConfigResource.Type.TOPIC, topicName
        );
        // 配置项同样以ConfigEntry形式存在,只不过增加了操作类型
        // 以及能够支持操作多个配置项,相对来说功能更多、更灵活
        Collection<AlterConfigOp> configs = Arrays.asList(
                new AlterConfigOp(
                        new ConfigEntry("preallocate", "false"),
                        AlterConfigOp.OpType.SET
                )
        );
        Map<ConfigResource, Collection<AlterConfigOp>> configMaps = new HashMap<>();
        configMaps.put(configResource, configs);
        AlterConfigsResult result = adminClient.incrementalAlterConfigs(configMaps);
        result.all().get();
    
        //查看
        describeConfig(adminClient, topicName);
    }
    Resource type: TOPIC, resource name: my-topic
    compression.type = producer
    leader.replication.throttled.replicas = 
    message.downconversion.enable = true
    min.insync.replicas = 1
    segment.jitter.ms = 0
    cleanup.policy = compact
    flush.ms = 9223372036854775807
    follower.replication.throttled.replicas = 
    segment.bytes = 1073741824
    retention.ms = 604800000
    flush.messages = 9223372036854775807
    message.format.version = 3.0-IV1
    file.delete.delay.ms = 60000
    max.compaction.lag.ms = 9223372036854775807
    max.message.bytes = 1048588
    min.compaction.lag.ms = 0
    message.timestamp.type = CreateTime
    preallocate = false
    min.cleanable.dirty.ratio = 0.5
    index.interval.bytes = 4096
    unclean.leader.election.enable = false
    retention.bytes = -1
    delete.retention.ms = 86400000
    segment.ms = 604800000
    message.timestamp.difference.max.ms = 9223372036854775807
    segment.index.bytes = 10485760

9、修改partition数量

在创建Topic时我们需要设定Partition的数量,但如果觉得初始设置的Partition数量太少了,那么就可以使用createPartitions方法来调整Topic的Partition数量,但是需要注意在Kafka中Partition只能增加不能减少。代码示例:

    /**
     * 增加指定Topic的Partition数量
     * @param adminClient
     * @param topicName
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void incrPartitions(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
        Map<String, NewPartitions> newPartitions = new HashMap<>();
        //将Partition数量调整为2
        newPartitions.put(topicName, NewPartitions.increaseTo(2));
        CreatePartitionsResult ret = adminClient.createPartitions(newPartitions);
        ret.all().get();
    }
阅读全文