多线程消费实例
,KafkaConsumer是非线程安全的。它和KafkaProducer不同,后者是线程安全的,因此用户可以在多个线程中放心地使用同一个KafkaProducer实例,事实上这也是社区推荐的producer使用方法,因为通常它比每个线程维护一个KafkaProducer实例效率要高。
但是对于consumer而言,用户无法直接在多个线程中共享一个KafkaConsumer实例,下面给出两种多线程消费的方法以及各自的实例。
1、每个线程维护一个KafkaConsumer
在这个方法中,用户创建多个线程来消费topic数据。每个线程都会创建专属于该线程的KafkaConsumer实例,如下图所示:
consumer group由多个线程的KafkaConsumer组成,每个线程负责消费固定数目的分区。下面给出一个完整的样例,该样例中包含3个类:
- ConsumerRunnable类:消费线程类,执行真正的消费任务
- ConsumerGroup类:消费线程管理类,创建过个线程类执行消费任务
- ConsumerMain类:测试主方法类
ConsumerRunnable.java
public class ConsumerRunnable implements Runnable {
//每个线程维护私有的KafkaConsumer实例
private final KafkaConsumer<String, String> consumer;
public ConsumerRunnable(String brokerList, String groupId, String topic) {
Properties properties = new Properties();
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", groupId);
properties.put("enable.auto.commit", "true");//自动提交offset
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records) {
System.out.println(Thread.currentThread().getName() + " consumed " + record.partition()
+ "th message with offset: " + record.offset() + " value:" + record.value());
}
}
}
}
ConsumerGroup.java
public class ConsumerGroup {
private List<ConsumerRunnable> consumers;
public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
consumers = new ArrayList<>(consumerNum);
for (int i = 0; i < consumerNum; ++i) {
ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
consumers.add(consumerThread);
}
}
public void execute() {
for (ConsumerRunnable task : consumers) {
new Thread(task).start();
}
}
}
ConsumerMain.java
public class ConsumerMain {
public static void main(String[] args) {
String brokerList = "localhost:9092";
String groupId = "testGroup";
String topic = "testTopic";
int consumerNum = 3;
ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
consumerGroup.execute();
}
}
2、单KafkaConsumer实例+多worker线程
此方法与第一种方法的区别在于,我们将消息的获取与消息的处理解耦,把后者放入单独的工作者线程中,即所谓的worker线程中。同时在全局维护一个或若干个consumer实例执行消息获取任务,如图所示。
本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作。之后worker线程完成处理后上报位移状态,由全局consumer提交位移。代码中共有如下3个类。
Consumer ThreadHandler类
:consumer多线程管理类,用于创建线程池以及为每个线程分配消息集合。另外consumer位移提交也在该类中完成。Consumer Worker类
:本质上是一个Runnable,执行真正的消费逻辑并上报位移信息给ConsumerThreadHandler.Main类
:测试主方法类。
ConsumerThreadHandler.java
public class ConsumerThreadHandler<K, V> {
private final KafkaConsumer<K, V> consumer;
private ExecutorService executors;
private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
Properties properties = new Properties();
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", groupId);
properties.put("enable.auto.commit", "false");//手动提交offset
properties.put("auto.offset.reset", "earliest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<K, V>(properties);
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(offsets);//提交位移
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
offsets.clear();
}
});
}
/**
* 消费方法
* @param threadNumber 线程池中的线程数
*/
public void consume(int threadNumber) {
executors = new ThreadPoolExecutor(
threadNumber,
threadNumber,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
try {
while (true) {
ConsumerRecords<K, V> records = consumer.poll(1000L);
if (!records.isEmpty()) {
executors.submit(new ConsumerWorker<>(records, offsets));
}
commitOffsets();
}
} catch (WakeupException e) {
} finally {
commitOffsets();
consumer.close();
}
}
private void commitOffsets() {
//尽量降低synchronized块对offsets锁定的时间
Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
synchronized (offsets) {
if (offsets.isEmpty()) {
return;
}
unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
offsets.clear();
}
consumer.commitSync(unmodfiedMap);
}
public void close() {
consumer.wakeup();
executors.shutdown();
}
}
ConsumerWorker.java
public class ConsumerWorker<K, V> implements Runnable {
private final ConsumerRecords<K, V> records;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets) {
this.records = records;
this.offsets = offsets;
}
@Override
public void run() {
for (TopicPartition partition : records.partitions()) {
//获取指定分区的记录
List<ConsumerRecord<K, V>> partitionRecords = this.records.records(partition);
for (ConsumerRecord<K, V> record : partitionRecords) {
//处理记录
System.out.println(Thread.currentThread().getName() + " consumed " + record.partition()
+ "th message with offset: " + record.offset() + " value:" + record.value());
}
//提交offset
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
synchronized (offsets) {
if (!offsets.containsKey(partition)) {
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
} else {
long curr = offsets.get(partition).offset();
if (curr <= lastOffset + 1) {
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
}
}
}
}
}
Main.java
public class Main {
public static void main(String[] args) {
String brokerList = "localhost:9092";
String topic = "testTopic";
String groupId = "testGroup";
final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupId, topic);
final int cpuCount = Runtime.getRuntime().availableProcessors();
Runnable runnable = new Runnable() {
@Override
public void run() {
handler.consume(cpuCount);
}
};
new Thread(runnable).start();
try{
Thread.sleep(20000L);
} catch (InterruptedException e) {
}
System.out.println("Starting to close the consumer....");
handler.close();
}
}