2024-04-03
原文作者:吴声子夜歌 原文地址: https://blog.csdn.net/cold___play/article/details/132351928

Kafka Streams

1、概述

Kafka一直被认为是一个强大的消息中间件,它实现了高吞吐、高可用和低延时的消息传输能力,这让它成为流式处理系统中完美的数据来源。目前通用的一些流式处理框架如Apache Spark、Apache Flink、Apache Storm等都可以将Kafka作为可靠的数据来源。但遗憾的是,在0.l0.x版本之前,Kafka还并不具备任何数据处理的能力,但在此之后,Kafka Streams应运而生。

Kafka Streams是一个用于处理和分析数据的客户端库。它先把存储在Kafka中的数据进行处理和分析,然后将最终所得的数据结果回写到Kafka或发送到外部系统。它建立在一些非常重要的流式处理概念之上,例如适当区分事件时间和处理时间、窗口支持,以及应用程序状态的简单(高效)管理。同时,它也基于Kafka中的许多概念,例如通过划分主题进行扩展。此外,由于这个原因,它作为一个轻量级的库可以集成到应用程序中。这个应用程序可以根据需要独立运行、在应用程序服务器中运行、作为Docker容器,或者通过资源管理器(如Mesos)进行操作。

Kafka Streams直接解决了流式处理中的很多问题:

  • 毫秒级延迟的逐个事件处理。
  • 有状态的处理,包括连接(join)和聚合类操作。
  • 提供了必要的流处理原语,包括高级流处理DSL和低级处理器API。高级流处理DSL提供了常用流处理变换操作,低级处理器API支持客户端自定义处理器并与状态仓库交互。
  • 使用类似DataFlow的模型对无序数据进行窗口化处理。
  • 具有快速故障切换的分布式处理和容错能力。
  • 无停机滚动部署。

2、单词统计

单词统计是流式处理领域中最常见的示例,这里我们同样使用它来演示一下Kafka Streams的用法。在Kafka的代码中就包含了一个单词统计的示例程序,即org.apache.kafka.streams. examples.wordcount.WordCountDemo,这个示例中以硬编码的形式用到了两个主题:
streams-plaintext-inputstreams-wordcount-output。为了能够使示例程序正常运行,我们需要预先准备好这两个主题:

    ./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic streams-plaintext-input --replication-factor 1 --partitions 1
    
    ./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic streams-wordcount-output --replication-factor 1 --partitions 1

这两个主题的详细信息如下:

    ./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic streams-plaintext-input,streams-wordcount-output

202404032128510281.png

之后就可以运行WordCountDemo这个示例:

    ./kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

这个示例程序将从主题streams-plaintext-input中读取消息,然后对读取的消息执行单词统计,并将结果持续写入主题streams-wordcount-output。

使用Kafka自带的console producer来生产一些输入数据供WordCount程序消费:

    ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input

再运行console consumer脚本来验证WordCount程序的计算结果:

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

输入一些单词:

202404032128514402.png
统计结果:输出结果中的第一列是消息的key,这里表示被计数的单词,第二列是消息的value,这里表示该单词的最新计数。

202404032128518943.png

下面我们通过WordCountDemo程序来了解一下Kafka Streams的开发方式,WordCountDemo程序如下所示:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.3.0</version>
    </dependency>
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.KTable;
    import org.apache.kafka.streams.kstream.Produced;
    
    import java.util.Arrays;
    import java.util.Locale;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * @Author: acton_zhang
     * @Date: 2023/8/17 11:41 下午
     * @Version 1.0
     * 单词统计
     */
    public class WordCountDemo {
        public static void main(String[] args) {
            //构建Kafka Streams的配置
            Properties props = new Properties();
            //每个Kafka Streams应用程序必须要有一个application.id,这个applicationID用于协调应用实例
            //也用于命名内部的本地存储和相关主题。在整个Kafka集群中,applicationId必须是唯一的
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
            //bootstrap.servers配置的是Kafka集群的地址,必填
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            //最大缓冲字节数
            props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
            //default.key.serde设置key的序列化器
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            //default.value.serde设置value的序列化器
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    
            //创建StreamBuilder实例
            StreamsBuilder builder = new StreamsBuilder();
            //创建一个KStream实例,并设定了输入主题streams-plaintext-input
            KStream<String, String> source = builder.stream("streams-plaintext-input");
            //具体的单词统计逻辑
            //KStream是一个由键值对构成的抽象记录流,每个键值对是一个独立单元,即使相同的key也不会被覆盖,类似数据库的插入操作
            //KTable可以理解成一个基于表主键的日志更新流,相同key的每条记录只保存最新的一条记录,类似数据库中基于主键的更新
            //无论记录流(用KStream定义),还是更新日志流(用KTable定义),都可以从一个或多个Kafka主题数据源来创建。
            //一个KStream可以与另一个KStream或KTable进行Join操作,或者聚合成一个KTable。同样,一个KTable也可以转换成一个KStream。
            //KStream和KTable都提供了一系列转换操作,每个转换操作都可以转化为一个KStream或KTable对象,将这些转换操作连接在一起就构成了一个处理器拓扑。
            KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(
                    value.toLowerCase(Locale.getDefault()).split(" ")))
                    .groupBy((key, value) -> value)
                    .count();
            //toStream().to()将单词统计结果写入输出主题streams-wordcount-output,key是String类型,value是Long类型
            counts.toStream().to("streams-wordcount-output",
                    Produced.with(Serdes.String(), Serdes.Long()));
            //基于拓扑和配置来订阅一个KafkaStreams对象
            final KafkaStreams streams = new KafkaStreams(builder.build(), props);
            final CountDownLatch latch = new CountDownLatch(1);
            Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
            //启动KafkaStreams引擎
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }
    }
阅读全文