AdminClient
自0.ll.0.0版本起,Kafka社区推出了AdminClient和KafkaAdminClient,意在统一所有的集群管理API。使用0.11.0.0及以后版本的用户应该始终使用这个类来管理集群。
虽然和原先服务器端的AdminClient类同名,但这个工具是属于客户端的,因此只需要在管理程序项目中添加kafka clients依赖即可:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
方法名称 | 作用 |
---|---|
createTopics | 创建一个或多个Topic |
listTopics | 查询Topic列表 |
deleteTopics | 删除一个或多个Topic |
describeTopics | 查询Topic的描述信息 |
describeConfigs | 查询Topic、Broker等的所有配置项信息 |
alterConfigs | 用于修改Topic、Broker等的配置项信息(该方法在新版本中被标记为已过期) |
incrementalAlterConfigs | 同样也是用于修改Topic、Broker等的配置项信息,但功能更多、更灵活,用于代替alterConfigs |
createPartitions | 用于调整Topic的Partition数量,只能增加不能减少或删除,也就是说新设置的Partition数量必须大于等于之前的Partition数量 |
1、创建AdminClient
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(properties);
adminClient.close();
}
2、获取集群信息
/**
* 获取Kafka集群信息
* @param adminClient
* @throws ExecutionException
* @throws InterruptedException
*/
public static void describeCluster(AdminClient adminClient) throws ExecutionException, InterruptedException {
DescribeClusterResult ret = adminClient.describeCluster();
System.out.println(String.format("Cluster id: %s, controller: %s",
ret.clusterId().get(), ret.controller().get()));
System.out.println("Current cluster nodes info: ");
for (Node node : ret.nodes().get()) {
System.out.println(node);
}
}
Cluster id: z-FbYLVTSqK-6TCv0osH3Q, controller: localhost:9092 (id: 0 rack: null)
Current cluster nodes info:
localhost:9092 (id: 0 rack: null)
3、创建topic
/**
* 创建一个分区数量为3,副本数量为1的主题
* @param adminClient
* @param topicName
* @throws ExecutionException
* @throws InterruptedException
*/
public static void createTopics(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
NewTopic newTopic = new NewTopic(topicName, 3, (short)1);
CreateTopicsResult ret = adminClient.createTopics(Arrays.asList(newTopic));
ret.all().get();
}
4、查看topic列表
/**
* 列出所有topic
* @param adminClient
* @throws ExecutionException
* @throws InterruptedException
*/
public static void listAllTopics(AdminClient adminClient) throws ExecutionException, InterruptedException {
//可配置选项
ListTopicsOptions options = new ListTopicsOptions();
//是否列出内部使用的topics
options.listInternal(true);
ListTopicsResult topicsResult = adminClient.listTopics(options);
//打印topics名称
Set<String> topicNames = topicsResult.names().get();
System.out.println(topicNames);
//打印topics信息
Collection<TopicListing> topicListings = topicsResult.listings().get();
System.out.println(topicListings);
}
[test, testTopic, test-create-topic, my-topic, __consumer_offsets]
[(name=test, internal=false), (name=testTopic, internal=false), (name=test-create-topic, internal=false), (name=my-topic, internal=false), (name=__consumer_offsets, internal=true)]
5、删除topic
/**
* 删除给定topic
* @param adminClient
* @param topicName
* @throws ExecutionException
* @throws InterruptedException
*/
public static void deleteTopics(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
DeleteTopicsResult ret = adminClient.deleteTopics(Arrays.asList(topicName));
ret.all().get();
}
6、查询topic信息
一个Topic会有自身的描述信息,例如:partition的数量,副本集的数量,是否为internal等等。AdminClient中提供了describeTopics方法来查询这些描述信息。代码示例:
/**
* 查看topic详细信息
* @param adminClient
* @param topicName
* @throws ExecutionException
* @throws InterruptedException
*/
public static void describeTopics(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
DescribeTopicsResult ret = adminClient.describeTopics(Arrays.asList(topicName, "__consumer_offsets"));
//等待返回结果完成
Map<String, TopicDescription> topics = ret.all().get();
for (Map.Entry<String, TopicDescription> entry : topics.entrySet()) {
System.out.println(entry.getKey() + "---->" + entry.getValue());
}
}
testTopic---->(name=testTopic, internal=false, partitions=(partition=0, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)), authorizedOperations=null)
__consumer_offsets---->(name=__consumer_offsets, internal=true, partitions=(partition=0, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=1, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=2, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=3, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=4, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=5, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=6, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=7, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=8, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=9, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=10, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=11, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=12, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=13, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=14, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=15, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=16, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=17, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=18, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=19, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=20, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=21, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=22, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=23, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=24, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=25, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=26, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=27, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=28, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=29, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=30, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=31, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=32, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=33, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=34, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=35, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=36, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=37, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=38, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=39, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=40, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=41, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=42, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=43, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=44, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=45, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=46, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=47, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=48, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)),(partition=49, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)), authorizedOperations=null)
7、查询配置信息
/**
* 获取指定topic的topic级别参数配置信息
* @param adminClient
* @param topicName
* @throws ExecutionException
* @throws InterruptedException
*/
public static void describeConfig(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
DescribeConfigsResult ret = adminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName)));
Map<ConfigResource, Config> configMap = ret.all().get();
for (Map.Entry<ConfigResource, Config> entry : configMap.entrySet()) {
ConfigResource key = entry.getKey();
Config value = entry.getValue();
System.out.println(String.format("Resource type: %s, resource name: %s", key.type(), key.name()));
Collection<ConfigEntry> configEntries = value.entries();
for (ConfigEntry each : configEntries) {
System.out.println(each.name() + " = " + each.value());
}
}
}
Resource type: TOPIC, resource name: testTopic
compression.type = producer
leader.replication.throttled.replicas =
message.downconversion.enable = true
min.insync.replicas = 1
segment.jitter.ms = 0
cleanup.policy = delete
flush.ms = 9223372036854775807
follower.replication.throttled.replicas =
segment.bytes = 1073741824
retention.ms = 604800000
flush.messages = 9223372036854775807
message.format.version = 3.0-IV1
file.delete.delay.ms = 60000
max.compaction.lag.ms = 9223372036854775807
max.message.bytes = 1048588
min.compaction.lag.ms = 0
message.timestamp.type = CreateTime
preallocate = false
min.cleanable.dirty.ratio = 0.5
index.interval.bytes = 4096
unclean.leader.election.enable = false
retention.bytes = -1
delete.retention.ms = 86400000
segment.ms = 604800000
message.timestamp.difference.max.ms = 9223372036854775807
segment.index.bytes = 10485760
8、修改配置信息
8.1、alterConfigs
在早期版本中,使用alterConfigs方法来修改配置项。代码示例:
/**
* 修改指定topic的topic级别参数
* @param adminClient
* @param topicName
* @throws ExecutionException
* @throws InterruptedException
*/
public static void alterConfigs(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
// 指定ConfigResource的类型及名称
ConfigResource configResource = new ConfigResource(
ConfigResource.Type.TOPIC, topicName
);
// 配置项以ConfigEntry形式存在
Config config = new Config(Arrays.asList(
new ConfigEntry("cleanup.policy", "compact")
));
//构建
Map<ConfigResource, Config> configMap = new HashMap<>();
configMap.put(configResource, config);
//执行
AlterConfigsResult ret = adminClient.alterConfigs(configMap);
ret.all().get();
//查看
describeConfig(adminClient, topicName);
}
Resource type: TOPIC, resource name: my-topic
compression.type = producer
leader.replication.throttled.replicas =
message.downconversion.enable = true
min.insync.replicas = 1
segment.jitter.ms = 0
cleanup.policy = compact
flush.ms = 9223372036854775807
follower.replication.throttled.replicas =
segment.bytes = 1073741824
retention.ms = 604800000
flush.messages = 9223372036854775807
message.format.version = 3.0-IV1
file.delete.delay.ms = 60000
max.compaction.lag.ms = 9223372036854775807
max.message.bytes = 1048588
min.compaction.lag.ms = 0
message.timestamp.type = CreateTime
preallocate = false
min.cleanable.dirty.ratio = 0.5
index.interval.bytes = 4096
unclean.leader.election.enable = false
retention.bytes = -1
delete.retention.ms = 86400000
segment.ms = 604800000
message.timestamp.difference.max.ms = 9223372036854775807
segment.index.bytes = 10485760
8.2、incrementalAlterConfigs
在新版本中则是使用incrementalAlterConfigs方法来修改Topic的配置项,该方法使用起来相对于alterConfigs要略微复杂一些,但因此功能更多、更灵活。代码示例:
/**
* 修改指定topic的topic级别配置
* @param adminClient
* @param topicName
* @throws ExecutionException
* @throws InterruptedException
*/
public static void incrementalAlterConfigs(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
// 指定ConfigResource的类型及名称
ConfigResource configResource = new ConfigResource(
ConfigResource.Type.TOPIC, topicName
);
// 配置项同样以ConfigEntry形式存在,只不过增加了操作类型
// 以及能够支持操作多个配置项,相对来说功能更多、更灵活
Collection<AlterConfigOp> configs = Arrays.asList(
new AlterConfigOp(
new ConfigEntry("preallocate", "false"),
AlterConfigOp.OpType.SET
)
);
Map<ConfigResource, Collection<AlterConfigOp>> configMaps = new HashMap<>();
configMaps.put(configResource, configs);
AlterConfigsResult result = adminClient.incrementalAlterConfigs(configMaps);
result.all().get();
//查看
describeConfig(adminClient, topicName);
}
Resource type: TOPIC, resource name: my-topic
compression.type = producer
leader.replication.throttled.replicas =
message.downconversion.enable = true
min.insync.replicas = 1
segment.jitter.ms = 0
cleanup.policy = compact
flush.ms = 9223372036854775807
follower.replication.throttled.replicas =
segment.bytes = 1073741824
retention.ms = 604800000
flush.messages = 9223372036854775807
message.format.version = 3.0-IV1
file.delete.delay.ms = 60000
max.compaction.lag.ms = 9223372036854775807
max.message.bytes = 1048588
min.compaction.lag.ms = 0
message.timestamp.type = CreateTime
preallocate = false
min.cleanable.dirty.ratio = 0.5
index.interval.bytes = 4096
unclean.leader.election.enable = false
retention.bytes = -1
delete.retention.ms = 86400000
segment.ms = 604800000
message.timestamp.difference.max.ms = 9223372036854775807
segment.index.bytes = 10485760
9、修改partition数量
在创建Topic时我们需要设定Partition的数量,但如果觉得初始设置的Partition数量太少了,那么就可以使用createPartitions方法来调整Topic的Partition数量,但是需要注意在Kafka中Partition只能增加不能减少。代码示例:
/**
* 增加指定Topic的Partition数量
* @param adminClient
* @param topicName
* @throws ExecutionException
* @throws InterruptedException
*/
public static void incrPartitions(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {
Map<String, NewPartitions> newPartitions = new HashMap<>();
//将Partition数量调整为2
newPartitions.put(topicName, NewPartitions.increaseTo(2));
CreatePartitionsResult ret = adminClient.createPartitions(newPartitions);
ret.all().get();
}