Kafka Streams
1、概述
Kafka一直被认为是一个强大的消息中间件,它实现了高吞吐、高可用和低延时的消息传输能力,这让它成为流式处理系统中完美的数据来源。目前通用的一些流式处理框架如Apache Spark、Apache Flink、Apache Storm等都可以将Kafka作为可靠的数据来源。但遗憾的是,在0.l0.x版本之前,Kafka还并不具备任何数据处理的能力,但在此之后,Kafka Streams应运而生。
Kafka Streams是一个用于处理和分析数据的客户端库。它先把存储在Kafka中的数据进行处理和分析,然后将最终所得的数据结果回写到Kafka或发送到外部系统。它建立在一些非常重要的流式处理概念之上,例如适当区分事件时间和处理时间、窗口支持,以及应用程序状态的简单(高效)管理。同时,它也基于Kafka中的许多概念,例如通过划分主题进行扩展。此外,由于这个原因,它作为一个轻量级的库可以集成到应用程序中。这个应用程序可以根据需要独立运行、在应用程序服务器中运行、作为Docker容器,或者通过资源管理器(如Mesos)进行操作。
Kafka Streams直接解决了流式处理中的很多问题:
- 毫秒级延迟的逐个事件处理。
- 有状态的处理,包括连接(join)和聚合类操作。
- 提供了必要的流处理原语,包括高级流处理DSL和低级处理器API。高级流处理DSL提供了常用流处理变换操作,低级处理器API支持客户端自定义处理器并与状态仓库交互。
- 使用类似DataFlow的模型对无序数据进行窗口化处理。
- 具有快速故障切换的分布式处理和容错能力。
- 无停机滚动部署。
2、单词统计
单词统计是流式处理领域中最常见的示例,这里我们同样使用它来演示一下Kafka Streams的用法。在Kafka的代码中就包含了一个单词统计的示例程序,即org.apache.kafka.streams. examples.wordcount.WordCountDemo
,这个示例中以硬编码的形式用到了两个主题:
streams-plaintext-input
和streams-wordcount-output
。为了能够使示例程序正常运行,我们需要预先准备好这两个主题:
./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic streams-plaintext-input --replication-factor 1 --partitions 1
./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic streams-wordcount-output --replication-factor 1 --partitions 1
这两个主题的详细信息如下:
./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic streams-plaintext-input,streams-wordcount-output
之后就可以运行WordCountDemo这个示例:
./kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
这个示例程序将从主题streams-plaintext-input中读取消息,然后对读取的消息执行单词统计,并将结果持续写入主题streams-wordcount-output。
使用Kafka自带的console producer来生产一些输入数据供WordCount程序消费:
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
再运行console consumer脚本来验证WordCount程序的计算结果:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
输入一些单词:
统计结果:输出结果中的第一列是消息的key,这里表示被计数的单词,第二列是消息的value,这里表示该单词的最新计数。
下面我们通过WordCountDemo程序来了解一下Kafka Streams的开发方式,WordCountDemo程序如下所示:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.3.0</version>
</dependency>
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
/**
* @Author: acton_zhang
* @Date: 2023/8/17 11:41 下午
* @Version 1.0
* 单词统计
*/
public class WordCountDemo {
public static void main(String[] args) {
//构建Kafka Streams的配置
Properties props = new Properties();
//每个Kafka Streams应用程序必须要有一个application.id,这个applicationID用于协调应用实例
//也用于命名内部的本地存储和相关主题。在整个Kafka集群中,applicationId必须是唯一的
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
//bootstrap.servers配置的是Kafka集群的地址,必填
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//最大缓冲字节数
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
//default.key.serde设置key的序列化器
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//default.value.serde设置value的序列化器
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//创建StreamBuilder实例
StreamsBuilder builder = new StreamsBuilder();
//创建一个KStream实例,并设定了输入主题streams-plaintext-input
KStream<String, String> source = builder.stream("streams-plaintext-input");
//具体的单词统计逻辑
//KStream是一个由键值对构成的抽象记录流,每个键值对是一个独立单元,即使相同的key也不会被覆盖,类似数据库的插入操作
//KTable可以理解成一个基于表主键的日志更新流,相同key的每条记录只保存最新的一条记录,类似数据库中基于主键的更新
//无论记录流(用KStream定义),还是更新日志流(用KTable定义),都可以从一个或多个Kafka主题数据源来创建。
//一个KStream可以与另一个KStream或KTable进行Join操作,或者聚合成一个KTable。同样,一个KTable也可以转换成一个KStream。
//KStream和KTable都提供了一系列转换操作,每个转换操作都可以转化为一个KStream或KTable对象,将这些转换操作连接在一起就构成了一个处理器拓扑。
KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(
value.toLowerCase(Locale.getDefault()).split(" ")))
.groupBy((key, value) -> value)
.count();
//toStream().to()将单词统计结果写入输出主题streams-wordcount-output,key是String类型,value是Long类型
counts.toStream().to("streams-wordcount-output",
Produced.with(Serdes.String(), Serdes.Long()));
//基于拓扑和配置来订阅一个KafkaStreams对象
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
//启动KafkaStreams引擎
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}