2024-04-03
原文作者:吴声子夜歌 原文地址: https://blog.csdn.net/cold___play/article/details/132241408

1、流式处理概述

数据流(也被称为“事件流”或“流数据”)。首先,数据流是无边界数据 集的抽象表示。无边界意味着无限和持续增长。无边界数据集之所以是无限的,是因为随 着时间的推移,新的记录会不断加入进来。

这个简单的模型(事件流)可以表示很多业务活动,比如信用卡交易、股票交易、包裹 递送、流经交换机的网络事件、制造商设备传感器发出的事件、发送出去的邮件、游戏 里物体的移动,等等。这个清单是无穷无尽的,因为几乎每一件事情都可以被看成事件 的序列。除了没有边界外,事件流模型还有其他一些属性。

  • 事件流是有序的
    事件的发生总是有个先后顺序。以金融活动事件为例,先将钱存进账户后再花钱,这与 先花钱再还钱的次序是完全不一样的。后者会出现透支,而前者不会。这是事件流与数 据库表的不同点之一。数据库表里的记录是无序的,而 SQL 语法中的 order by 并不是 关系模型的组成部分,它是为了报表查询而添加的。
  • 不可变的数据记录
    事件一旦发生,就不能被改变。一个金融交易被取消,并不是说它就消失了,相反,这 需要往事件流里添加一个额外的事件,表示前一个交易的取消操作。顾客的一次退货并 不意味着之前的销售记录被删除,相反,退货行为被当成一个额外的事件记录下来。这 是数据流与数据表之间的另一个不同点——可以删除和修改数据表里的记录,但这些操 作只不过是发生在数据库里的事务,这些事务可以被看成事件流。假设你对数据库的二 进制日志(bin log)、预写式日志(WAL)和重做日志(redo log)的概念都很熟悉,那 么就会知道,如果往数据库表插入一条记录,然后将其删除,表里就不会再有这条记 录。但重做日志里包含了两个事务:插入事务和删除事务。
  • 事件流是可重播的
    这是事件流非常有价值的一个属性。用户可以很容易地找出那些不可重播的流(流经 套接字的 TCP 数据包就是不可重播的),但对于大多数业务来说,重播发生在几个月 前(甚至几年前)的原始事件流是一个很重要的需求。可能是为了尝试使用新的分析方 法纠正过去的错误,或是为了进行审计。这也就是为什么我们相信 Kafka 能够让现代业 务领域的流式处理大获成功——可以借助 Kafka 来捕捉和重播事件流。如果没有这项能 力,流式处理充其量只是数据科学实验室里的一个玩具而已。

流式处理是指实时地处 理一个或多个事件流。流式处理是一种编程范式,就像请求与响应范式和批处理范式那 样。下面将对这 3 种范式进行比较,以便更好地理解如何在软件架构中应用流式处理。

  • 请求与响应
    这是延迟最小的一种范式,响应时间处于亚毫秒到毫秒之间,而且响应时间一般非常稳 定。这种处理模式一般是阻塞的,应用程序向处理系统发出请求,然后等待响应。在数 据库领域,这种范式就是线上交易处理(OLTP)。销售点(POS)系统、信用卡处理系 统和基于时间的追踪系统一般都使用这种范式。
  • 批处理
    这种范式具有高延迟和高吞吐量的特点。处理系统按照设定的时间启动处理进程,比如 每天的下午两点开始启动,每小时启动一次等。它读取所有的输入数据(从上一次执行 之后的所有可用数据,或者从月初开始的所有数据等),输出结果,然后等待下一次启 动。处理时间从几分钟到几小时不等,并且用户从结果里读到的都是旧数据。在数据库 领域,它们就是数据仓库(DWH)或商业智能(BI)系统。它们每天加载巨大批次的 数据,并生成报表,用户在下一次加载数据之前看到的都是相同的报表。从规模上来 说,这种范式既高效又经济。但在近几年,为了能够更及时、高效地作出决策,业务要 求在更短的时间内能提供可用的数据,这就给那些为探索规模经济而开发却无法提供低 延迟报表的系统带来了巨大的压力。
  • 流式处理
    这种范式介于上述两者之间。大部分的业务不要求亚毫秒级的响应,不过也接受不了要 等到第二天才知道结果。大部分业务流程都是持续进行的,只要业务报告保持更新,业 务产品线能够持续响应,那么业务流程就可以进行下去,而无需等待特定的响应,也不 要求在几毫秒内得到响应。一些业务流程具有持续性和非阻塞的特点,比如针对可疑信 用卡交易的警告、网络警告、根据供应关系实时调整价格、跟踪包裹。

2、流式处理的概念

2.1、时间

  • 事件时间
    事件时间是指所追踪事件的发生时间和记录的创建时间。例如,度量的获取时间、商店 里商品的出售时间、网站用户访问网页的时间,等等。在 Kafka 0.10.0 和更高版本里, 生产者会自动在记录中添加记录的创建时间。如果这个时间戳与应用程序对“事件时 间”的定义不一样,例如,Kafka 的记录是基于事件发生后的数据库记录创建的,那就 需要自己设置这个时间戳字段。在处理数据流时,事件时间是很重要的。
  • 日志追加时间
    日志追加时间是指事件保存到 broker 的时间。在 Kafka 0.10.0 和更高版本里,如果启用 了自动添加时间戳的功能,或者记录是使用旧版本的生产者客户端生成的,而且没有包 含时间戳,那么 broker 会在接收这些记录时自动添加时间戳。这个时间戳一般与流式 处理没有太大关系,因为用户一般只对事件的发生时间感兴趣。例如,如果要计算每天 生产了多少台设备,就需要计算在那一天实际生产的设备数量,尽管这些事件有可能因 为网络问题到了第二天才进入 Kafka。不过,如果真实的事件时间没有被记录下来,那 么就可以使用日志追加时间,在记录创建之后,这个时间就不会发生改变。
  • 处理时间
    处理时间是指应用程序在收到事件之后要对其进行处理的时间。这个时间可以是在事件 发生之后的几毫秒、几小时或几天。同一个事件可能会被分配不同的时间戳,这取决于 应用程序何时读取这个事件。如果应用程序使用了两个线程来读取同一个事件,这个时 间戳也会不一样!所以这个时间戳非常不可靠,应该避免使用它。

2.2、状态

如果只是单独处理每一个事件,那么流式处理就很简单。例如,如果想从 Kafka 读取在线 购物交易事件流,找出金额超过 10 000 美元的交易,并将结果通过邮件发送给销售人员, 那么可以使用 Kafka 消费者客户端和 SMTP 库,几行代码就可以搞定。
如果操作里包含了多个事件,流式处理就会变得很有意思,比如根据类型计算事件的数 量、移动平均数、合并两个流以便生成更丰富的信息流。在这些情况下,光处理单个事件 是不够的,用户需要跟踪更多的信息,比如这个小时内看到的每种类型事件的个数、需要 合并的事件、将每种类型的事件值相加,等等。事件与事件之间的信息被称为“状态”。
这些状态一般被保存在应用程序的本地变量里。例如,使用散列表来保存移动计数器。事 实上,本书的很多例子就是这么做的。不过,这不是一种可靠的方法,因为如果应用程序 关闭,状态就会丢失,结果就会发生变化,而这并不是用户希望看到的。所以,要小心地 持久化最近的状态,如果应用程序重启,要将其恢复。

  • 本地状态或内部状态
    这种状态只能被单个应用程序实例访问,它们一般使用内嵌在应用程序里的数据库进行 维护和管理。本地状态的优势在于它的速度,不足之处在于它受到内存大小的限制。所 以,流式处理的很多设计模式都将数据拆分到多个子流,这样就可以使用有限的本地状 态来处理它们。
  • 外部状态
    这种状态使用外部的数据存储来维护,一般使用 NoSQL 系统,比如 Cassandra。使用外 部存储的优势在于,它没有大小的限制,而且可以被应用程序的多个实例访问,甚至被 不同的应用程序访问。不足之处在于,引入额外的系统会造成更大的延迟和复杂性。大 部分流式处理应用尽量避免使用外部存储,或者将信息缓存在本地,减少与外部存储发 生交互,以此来降低延迟,而这就引入了如何维护内部和外部状态一致性的问题。

2.3、流和表的二元性

表就是记录的集合,每个表都有一个主键,并包含了一系列由 schema 定义的属性。表的记录是可变的(可以在表上面执行更新和删除操作)。我们可以 通过查询表数据获知某一时刻的数据状态。例如,通过查询 CUSTOMERS_CONTACTS 这 个表,就可以获取所有客户的联系信息。如果表被设计成不包含历史信息,那么就找不到 客户过去的联系信息了。

在将表与流进行对比时,可以这么想:流包含了变更——流是一系列事件,每个事件就是 一个变更。表包含了当前的状态,是多个变更所产生的结果。所以说,表和流是同一个硬 币的两面——世界总是在发生变化,用户有时候关注变更事件,有时候则关注世界的当前 状态。如果一个系统允许使用这两种方式来查看数据,那么它就比只支持一种方式的系统 强大。

为了将表转化成流,需要捕捉到在表上所发生的变更,将“insert”、“update”和“delete”事件保存到流里。大部分数据库提供了用于捕捉变更的“Change Data Capture”(CDC)解 决方案,Kafka 连接器将这些变更发送到 Kafka,用于后续的流式处理。
为了将流转化成表,需要“应用”流里所包含的所有变更,这也叫作流的“物化”。首 先在内存里、内部状态存储或外部数据库里创建一个表,然后从头到尾遍历流里的所 有事件,逐个地改变状态。在完成这个过程之后,得到了一个表,它代表了某个时间 点的状态。

假设有一个鞋店,某零售活动可以使用一个事件流来表示:

    “红色、蓝色和绿色鞋子到货”
    “蓝色鞋子卖出”
    “红色鞋子卖出”
    “蓝色鞋子退货”
    “绿色鞋子卖出”

如果想知道现在仓库里还有哪些库存,或者到目前为止赚了多少钱,需要对视图进行物 化。图 11-1 告诉我们,目前还有蓝色和黄色鞋子,账户上有 170 美元。如果想知道鞋店的 繁忙程度,可以查看整个事件流,会发现总共发生了 5 个交易,还可以查出为什么蓝色鞋 子被退货。

202404032127251831.png

2.4、时间窗口

大部分针对流的操作都是基于时间窗口的,比如移动平均数、一周内销量最好的产品、系 统的 99 百分位等。两个流的合并操作也是基于时间窗口的,我们会合并发生在相同时间 片段上的事件。不过,很少人会停下来仔细想想时间窗口的类型。例如,在计算移动平均 数时,需要知道以下几个问题:

  • 窗口的大小:是基于 5 分钟进行平均,还是 15 分钟,或者一天?窗口越小,就能越快 地发现变更,不过噪声也越多。窗口越大,变更就越平滑,不过延迟也越严重,如果价 格涨了,需要更长的时间才能看出来。
  • 窗口移动的频率(“移动间隔”):5 分钟的平均数可以每分钟变化一次,或者每秒钟变 化一次,或者每当有新事件到达时发生变化。如果“移动间隔”与窗口大小相等,这种 情况被称为“滚动窗口(tumbling window)”。如果窗口随着每一条记录移动,这种情况 被称为“滑动窗口(sliding window)”。
  • 窗口的可更新时间多长:假设计算了 00:00 到 00:05 之间的移动平均数,一个小时之后 又得到了一些“事件时间”是 00:02 的事件,那么需要更新 00:00 到 00:05 这个窗口的 结果吗?或者就这么算了?理想情况下,可以定义一个时间段,在这个时间段内,事件 可以被添加到与它们相应的时间片段里。如果事件处于 4 个小时以内,那么就更新它们, 否则就忽略它们。

窗口可以与时间对齐,比如 5 分钟的窗口如果每分钟移动一次,那么第一个分片可以是 00:00~00:05,第二个就是 00:01~00:06。它也可以不与时间对齐,应用可以在任何时候启 动,那么第一个分片有可能是 03:17~03:22。滑动窗口永远不会与时间对齐,因为只要有新 记录到达,它们就会发生移动。

202404032127255462.png

3、流式处理的设计模式

3.1、单个事件处理

处理单个事件是流式处理最基本的模式。这个模式也叫 map 或 filter 模式,因为它经常被 用于过滤无用的事件或者用于转换事件(map 这个术语是从 Map-Reduce 模式中来的,map 阶段转换事件,reduce 阶段聚合转换过的事件)。

在这种模式下,应用程序读取流中的事件,修改它们,然后把事件生成到另一个流上。比 如,一个应用程序从一个流中读取日志消息,并把 ERROR 级别的消息写到高优先级的流 中,同时把其他消息写到低优先级的流中。再如,一个应用程序从流中读取事件,并把事 件从 JSON 格式改为 Avro 格式。这类应用程序不需要在程序内部维护状态,因为每一个 事件都是独立处理的。这也意味着,从错误中恢复或进行负载均衡会非常容易,因为不需 要进行恢复状态的操作,只需要将事件交给应用程序的另一个实例去处理。
这种模式可以使用一个生产者和一个消费者来实现,如图 所示。

202404032127259943.png

3.2、使用本地状态

大部分流式处理应用程序关心的是如何聚合信息,特别是基于时间窗口进行聚合。例如, 找出每天最低和最高的股票交易价格并计算移动平均数。
要实现这些聚合操作,需要维护流的状态。在本例中,为了计算每天的最小价格和平均价 格,需要将最小值和最大值保存下来,并将它们与每一个新值进行对比。
这些操作可以通过本地状态(而不是共享状态)来实现,因为本例中的每一个操作都是基 于组的聚合操作,如图所示。例如,基于各个股票代码进行聚合,而不是基于整个股 票市场。我们使用了一个 Kafka 分区器来确保具有相同股票代码的事件总是被写入相同的 分区。应用程序的每个实例从分配给它们的分区上获取事件(这是 Kafka 的消费者保证)。 也就是说,应用程序的每一个实例都可以维护一个股票代码子集的状态。

202404032127266684.png

如果流式处理应用程序包含了本地状态,情况就会变得非常复杂,而且还需要解决下列的一些问题。

  • 内存使用
    应用实例必须有可用的内存来保存本地状态。
  • 持久化
    要确保在应用程序关闭时不会丢失状态,并且在应用程序重启后或者切换到另一个应用 实例时可以恢复状态。Streams 可以很好地处理这些问题,它使用内嵌的 RocksDB 将本 地状态保存在内存里,同时持久化到磁盘上,以便在重启后可以恢复。本地状态的变更 也会被发送到 Kafka 主题上。如果 Streams 节点崩溃,本地状态并不会丢失,可以通过 重新读取 Kafka 主题上的事件来重建本地状态。例如,如果本地状态包含“IBM 当前 最小价格是 167.19”,并且已经保存到了 Kafka 上,那么稍后就可以通过读取这些数据 来重建本地缓存。这些 Kafka 主题使用了压缩日志,以确保它们不会无限量地增长,方 便重建状态。
  • 再均衡
    有时候,分区会被重新分配给不同的消费者。在这种情况下,失去分区的实例必须把最 后的状态保存起来,同时获得分区的实例必须知道如何恢复到正确的状态。

3.3、多阶段处理和重分区

本地状态对按组聚合操作起到很大的作用。但如果需要使用所有可用的信息来获得一个结 果呢?例如,假设要发布每天的“前 10 支”股票,这 10 支股票需要从每天的交易股票中 挑选出来。很显然,如果只是在每个应用实例上进行处理是不够的,因为 10 支股票分布 在多个实例上,如图 11-5 所示。我们需要一个两阶段解决方案。首先,计算每支股票当天 的涨跌,这个可以在每个实例上进行。然后将结果写到一个包含了单个分区的新主题上。 另一个单独的应用实例读取这个分区,找出当天的前 10 支股票。新主题只包含了每支股 票的概要信息,比其他包含交易信息的主题要小很多,所以流量很小,使用单个应用实例 就足以应付。不过,有时候需要更多的步骤才能生成结果。

202404032127272555.png
这种多阶段处理对于写过 Map-Reduce 代码的人来说应该很熟悉,因为他们经常要使用多 个 reduce 步骤。如果写过 Map-Reduce 代码,就应该知道,处理每个 reduce 步骤的应用需 要被隔离开来。与 Map-Reduce 不同的是,大多数流式处理框架可以将多个步骤放在同一 个应用里,框架会负责调配每一步需要运行哪一个应用实例(或 worker)。

3.4、使用外部查找——流和表的连接

有时候,流式处理需要将外部数据和流集成在一起,比如使用保存在外部数据库里的规则来验证事务,或者将用户信息填充到点击事件当中。

很明显,为了使用外部查找来实现数据填充,可以这样做:对于事件流里的每一个点击事 件,从用户信息表里查找相关的用户信息,从中抽取用户的年龄和性别信息,把它们包含 在点击事件里,然后将事件发布到另一个主题上。

202404032127278126.png

这种方式最大的问题在于,外部查找会带来严重的延迟,一般在 5~15ms 之间。这在很多 情况下是不可行的。另外,外部数据存储也无法接受这种额外的负载——流式处理系统每 秒钟可以处理 10~50 万个事件,而数据库正常情况下每秒钟只能处理 1 万个事件,所以需 要伸缩性更强的解决方案。

为了获得更好的性能和更强的伸缩性,需要将数据库的信息缓存到流式处理应用程序里。 不过,要管理好这个缓存也是一个挑战。比如,如何保证缓存里的数据是最新的?如果刷 新太频繁,那么仍然会对数据库造成压力,缓存也就失去了作用。如果刷新不及时,那么 流式处理中所用的数据就会过时。

如果能够捕捉数据库的变更事件,并形成事件流,流式处理作业就可以监听事件流,并及 时更新缓存。捕捉数据库的变更事件并形成事件流,这个过程被称为 CDC——变更数据捕 捉(Change Data Capture)。如果使用了 Connect,就会发现,有一些连接器可以用于执行 CDC 任务,把数据库表转成变更事件流。这样就拥有了数据库表的私有副本,一旦数据库 发生变更,用户会收到通知,并根据变更事件更新私有副本里的数据。

202404032127282827.png

这样一来,当收到点击事件时,可以从本地的缓存里查找 user_id,并将其填充到点击事件 里。因为使用的是本地缓存,它具有更强的伸缩性,而且不会影响数据库和其他使用数据 库的应用程序。

3.5、流与流的连接

有时候需要连接两个真实的事件流。什么是“真实”的流?本章开始的时候曾经说过,流 是无边界的。如果使用一个流来表示一个表,那么就可以忽略流的大部分历史事件,因为 你只关心表的当前状态。不过,如果要连接两个流,那么就是在连接所有的历史事件—— 将两个流里具有相同键和发生在相同时间窗口内的事件匹配起来。这就是为什么流和流的 连接也叫作基于时间窗口的连接(windowed-join)。

假设有一个由网站用户输入的搜索事件流和一个由用户对搜索结果进行点击的事件流。对用 户的搜索和用户对搜索结果的点击进行匹配,就可以知道哪一个搜索的热度更高。很显然, 我们需要基于搜索关键词进行匹配,而且每个关键词只能与一定时间窗口内的事件进行匹 配——假设用户在输入搜索关键词后几秒钟就会点击搜索结果。因此,我们为每一个流维护 了以几秒钟为单位的时间窗口,并对这些时间窗口事件结果进行匹配。

202404032127287978.png
在 Streams 中,上述的两个流都是通过相同的键来进行分区的,这个键也是用于连接两个 流的键。这样一来,user_id:42 的点击事件就被保存在点击主题的分区 5 上,而所有 user_ id:42 的搜索事件被保存在搜索主题的分区 5 上。Streams 可以确保这两个主题的分区 5 的 事件被分配给同一个任务,这个任务就会得到所有与 user_id:42 相关的事件。Streams 在内 嵌的 RocksDB 里维护了两个主题的连接时间窗口,所以能够执行连接操作。

3.6、乱序的事件

不管是对于流式处理还是传统的 ETL 系统来说,处理乱序事件都是一个挑战。物联网领域 经常发生乱序事件:一个移动设备断开 WiFi 连接几个小时,在重新连上 WiFi 之后将几个 小时累积的事件一起发送出去,如图 11-9 所示。这在监控网络设备(故障交换机被修复之 前不会发送任何诊断数据)或进行生产(装置间的网络连接非常不可靠)时也时有发生。

202404032127293609.png

要让流处理应用程序处理好这些场景,需要做到以下几点。

  • 识别乱序的事件。应用程序需要检查事件的时间,并将其与当前时间进行比较。
  • 规定一个时间段用于重排乱序的事件。比如 3 个小时以内的事件可以重排,但 3 周以外的事件就可以直接扔掉。
  • 具有在一定时间段内重排乱序事件的能力。这是流式处理应用与批处理作业的一个主要不同点。假设有一个每天运行的作业,一些事件在作业结束之后才到达,那么可以重新 运行昨天的作业来更新事件。而在流式处理中,“重新运行昨天的作业”这种情况是不 存在的,乱序事件和新到达的事件必须一起处理。
  • 具备更新结果的能力。如果处理的结果保存到数据库里,那么可以通过 put 或 update 对 结果进行更新。如果流应用程序通过邮件发送结果,那么要对结果进行更新,就需要很 巧妙的手段。

3.7、重新处理

最后一个很重要的模式是重新处理事件,该模式有两个变种。

  • 我们对流式处理应用进行了改进,使用新版本应用处理同一个事件流,生成新的结果, 并比较两种版本的结果,然后在某个时间点将客户端切换到新的结果流上。
  • 现有的流式处理应用出现了缺陷,修复缺陷之后,重新处理事件流并重新计算结果。

对于第一种情况,Kafka 将事件流长时间地保存在可伸缩的数据存储里。也就是说,要使用两个版本的流式处理应用来生成结果,只需要满足如下条件:

  • 将新版本的应用作为一个新的消费者群组;
  • 让它从输入主题的第一个偏移量开始读取数据(这样它就拥有了属于自己的输入流事件副本);
  • 检查结果流,在新版本的处理作业赶上进度时,将客户端应用程序切换到新的结果流上。

第二种情况有一定的挑战性。它要求“重置”应用,让应用回到输入流的起始位置开始处 理,同时重置本地状态(这样就不会将两个版本应用的处理结果混淆起来了),而且还可能需要清理之前的输出流。虽然 Streams 提供了一个工具用于重置应用的状态,不过如果 有条件运行两个应用程序并生成两个结果流,还是建议使用第一种方案。第一种方案更加 安全,多个版本可以来回切换,可以比较不同版本的结果,而且不会造成数据的丢失,也 不会在清理过程中引入错误。

阅读全文