1、KafkaConsumer概念
1.1、消费者和消费者群组
Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者 接收主题一部分分区的消息。
假设主题 T1 有 4 个分区,我们创建了消费者 C1,它是群组 G1 里唯一的消费者,我们用 它订阅主题 T1。消费者 C1 将收到主题 T1 全部 4 个分区的消息,如图所示。
如果在群组 G1 里新增一个消费者 C2,那么每个消费者将分别从两个分区接收消息。我们 假设消费者 C1 接收分区 0 和分区 2 的消息,消费者 C2 接收分区 1 和分区 3 的消息,如图所示。
如果群组 G1 有 4 个消费者,那么每个消费者可以分配到一个分区,如图所示。
如果我们往群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被 闲置,不会接收到任何消息,如图所示。
往群组里增加消费者是横向伸缩消费能力的主要方式。Kafka 消费者经常会做一些高延迟 的操作,比如把数据写到数据库或 HDFS,或者使用数据进行比较耗时的计算。在这些情 况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负 载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。我们有必要为主题 创建大量的分区,在负载增长时可以加入更多的消费者。不过要注意,不要让消费者的数 量超过主题分区的数量,多余的消费者只会被闲置。
除了通过增加消费者来横向伸缩单个应用程序外,还经常出现多个应用程序从同一个主题 读取数据的情况。实际上,Kafka 设计的主要目标之一,就是要让 Kafka 主题里的数据能 够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息, 而不只是其中的一部分。只要保证每个应用程序有自己的消费者群组,就可以让它们获取 到主题所有的消息。不同于传统的消息系统,横向伸缩 Kafka 消费者和消费者群组并不会 对性能造成负面影响。
在上面的例子里,如果新增一个只包含一个消费者的群组 G2,那么这个消费者将从主题 T1 上接收所有的消息,与群组 G1 之间互不影响。群组 G2 可以增加更多的消费者,每个 消费者可以消费若干个分区,就像群组 G1 那样,如图所示。总的来说,群组 G2 还是 会接收到所有消息,不管有没有其他群组存在。
简而言之,为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者群组, 然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每个消费者只处理一部分 消息。
1.2、消费者群组和分区再均衡
群组里的消费者共同读取主题的分区。一个新的消费者加 入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变 化时,比如管理员添加了新的分区,会发生分区重分配。
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡
。再均衡非常 重要,它为消费者群组带来了高可用性和伸缩性,不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消 息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消 费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢 应用程序。
消费者通过向被指派为群组协调器
的 broker(不同的群组可以有不同的协调器)发送心跳 来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间 间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会 话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才 会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者 时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理 停顿。
当消费者要加入群组时,它会向群组协调器发送一个 JoinGroup 请求。第一 个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列 表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的), 并负责给每一个消费者分配分区。它使用一个实现了 PartitionAssignor 接 口的类来决定哪些分区应该被分配给哪个消费者。
分配 完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发 送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组 里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。
2、创建Kafka消费者
依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
在读取消息之前,需要先创建一个 KafkaConsumer 对象。创建 KafkaConsumer 对象与创建 KafkaProducer 对象非常相似,把想要传给消费者的属性放在 Properties 对象里。在这里,我们只需要使用 3 个必要的属性:bootstrap. servers、key.deserializer 和 value.deserializer。
第 1 个 属 性 bootstrap.servers 指 定 了 Kafka 集 群 的 连 接 字 符 串。 它 的 用 途 与 在 KafkaProducer 中的用途是一样的。另外两个属性 key. deserializer 和 value.deserializer 与生产者的 serializer 定义也很类似,不过它们不是使 用指定的类把 Java 对象转成字节数组,而是使用指定的类把字节数组转成 Java 对象。第 4 个属性 group.id 不是必需的,不过我们现在姑且认为它是必需的。它指定了 KafkaConsumer 属于哪一个消费者群组。创建不属于任何一个群组的消费者也是可以的,只 是这样做不太常见。
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "testGroup");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
3、订阅主题
subscribe() 方法接受一个主题列表作为参数,使用起来很简单:
consumer.subscribe(Collections.singleton("testTopic"));
也可以在调用 subscribe() 方法时传入一个正则表达式。正则表达式可以匹配多个主 题,如果有人创建了新的主题,并且主题的名字与正则表达式匹配,那么会立即触发一次 再均衡,消费者就可以读取新添加的主题。如果应用程序需要读取多个主题,并且可以处 理不同类型的数据,那么这种订阅方式就很管用。在 Kafka 和其他系统之间复制数据时, 使用正则表达式的方式订阅多个主题是很常见的做法。
consumer.subscribe(Pattern.compile("test.*"));
同时,也可以在订阅主题时添加一个再分配监听器ConsumerRebalanceListener。
consumer.subscribe(Collections.singleton("testTopic"), new ConsumerRebalanceListener() {
//开启新一轮的重平衡前会调用,一般用于手动提交位移,及审计功能
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
final TreeSet<String> treeSet = new TreeSet<String>();
partitions.forEach(p -> treeSet.add(p.toString()));
System.out.println("kafka rebalance start. onParitionsRevoked " + System.currentTimeMillis() + treeSet);
}
//在重平衡结束后会调用,一般用于消费逻辑处理
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
final TreeSet<String> treeSet = new TreeSet<String>();
partitions.forEach(p -> treeSet.add(p.toString()));
System.out.println("kafka rebalance start. onPartitionsAssigned " + System.currentTimeMillis() + treeSet);
}
});
4、轮询
消息轮询是消费者 API 的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅 了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据, 开发者只需要使用一组简单的 API 来处理从分区返回的数据。消费者代码的主要部分如下 所示:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %s,offset = %d,customer = %s,country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
System.out.println("value:" + record.value());
}
}
} finally {
consumer.close();
}
- 这是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过持续轮询向 Kafka 请求数据。
- 消费者必须持续对 Kafka 进 行轮询,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。传给 poll() 方法的参数是一个超时时间,用于控制 poll() 方法的阻塞时间(在消费者的缓 冲区里没有可用数据时会发生阻塞)。如果该参数被设为 0,poll() 会立即返回,否则 它会在指定的毫秒数内一直等待 broker 返回数据。
- poll() 方法返回一个记录列表。每条记录都包含了记录所属主题的信息、记录所在分 区的信息、记录在分区里的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐 条处理这些记录。poll() 方法有一个超时参数,它指定了方法在多久之后可以返回, 不管有没有可用的数据都要返回。超时时间的设置取决于应用程序对响应速度的要求, 比如要在多长时间内把控制权归还给执行轮询的线程。
- 在退出应用程序之前使用 close() 方法关闭消费者。网络连接和 socket 也会随之关闭, 并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡, 因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息。
轮询不只是获取数据那么简单。在第一次调用新消费者的 poll() 方法时,它会负责查找 GroupCoordinator,然后加入群组,接受分配的分区。如果发生了再均衡,整个过程也是 在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,我们要确保在轮询期间 所做的任何处理工作都应该尽快完成。
5、消费者的配置
除了bootstrap. servers、group.id、key.deserializer 和 value.deserializer。Kafka 的文档列出了所有与消费者相 关的配置说明。大部分参数都有合理的默认值,一般不需要修改它们,不过有一些参数与消费 者的性能和可用性有很大关系。接下来介绍这些重要的属性。
-
fetch.min.bytes
该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时, 如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时 才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很 活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用 数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果 消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。 -
fetch.max.wait.ms
我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 feth. max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。如果要降低 潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。如果 fetch.max.wait.ms 被设 为 100ms,并且 fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返 回 1MB 数据,要么在 100ms 后返回所有可用的数据,就看哪个条件先得到满足。 -
max.partition.fetch.bytes
该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB,也 就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition. fetch.bytes 指定的字节。如果一个主题有 20 个分区和 5 个消费者,那么每个消费者需要 至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因 为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。max.partition. fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属 性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用 poll() 方法 来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更 多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可 以把 max.partition.fetch.bytes 值改小,或者延长会话过期时间。 -
session.timeout.ms
该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如 果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为 已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。该属性与 heartbeat.interval.ms 紧 密 相 关。heartbeat.interval.ms 指 定 了 poll() 方 法 向 协 调 器 发送心跳的频率,session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,一 般需要同时修改这两个属性,heartbeat.interval.ms 必须比 session.timeout.ms 小,一 般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat. interval.ms 应该是 1s。把 session.timeout.ms 值设得比默认值小,可以更快地检测和恢 复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设 置得大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。 -
auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长 时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是 latest,意 思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之 后生成的记录)。另一个值是 earliest,意思是说,在偏移量无效的情况下,消费者将从 起始位置读取分区的记录。 -
enable.auto.commit
该属性指定了消费者是否自动提交偏移 量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自 己控制何时提交偏移量。如果把它设为 true,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。 -
partition.assignment.strategy
分区会被分配给群组里的消费者。PartitionAssignor 根据给定的消费者和主 题,决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策略。- Range
该策略会把主题的若干个连续的分区分配给消费者。假设消费者 C1 和消费者 C2 同时 订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这 两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区 2。因为每个主题 拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消 费者更多的分区。只要使用了 Range 策略,而且分区数量无法被消费者数量整除,就会 出现这种情况。 - RoundRobin
该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 C1 和消费者 C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2。一般 来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin 策略会给所 有消费者分配相同数量的分区(或最多就差一个分区)。 - 可以通过设置 partition.assignment.strategy 来选择分区策略。默认使用的是 org. apache.kafka.clients.consumer.RangeAssignor,这个类实现了 Range 策略,不过也可以 把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor。我们还可以使用自定 义策略,在这种情况下,partition.assignment.strategy 属性的值就是自定义类的名字。
- Range
-
client.id
该属性可以是任意字符串,broker 用它来标识从客户端发送过来的消息,通常被用在日志、 度量指标和配额里。 -
max.poll.records
该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处 理的数据量。 -
receive.buffer.bytes 和 send.buffer.bytes
socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为 -1,就使用操 作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这 些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
6、提交和偏移量
每次调用 poll() 方法,它总是返回由生产者写入 Kafka 但还没有被消费者读取过的记录, 我们因此可以追踪到哪些记录是被群组里的哪个消费者读取的。之前已经讨论过,Kafka 不会像其他 JMS 队列那样需要得到消费者的确认,这是 Kafka 的一个独特之处。相反,消 费者可以使用 Kafka 来追踪消息在分区里的位置(偏移量)。
我们把更新分区当前位置的操作叫作提交
。
消费者往一个叫作 _consumer_offset
的特殊主题发送 消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有 什么用处。不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完 成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续 之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方 继续处理。
如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的 消息就会被重复处理,如图所示。
如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失,如图所示。
所以,处理偏移量的方式对客户端会有很大的影响。
KafkaConsumer API 提供了很多种方式来提交偏移量。
6.1、自动提交
最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true,那 么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔 由 auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西一样,自动提交 也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那 么就会提交从上一次轮询返回的偏移量。
假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地 提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的。
6.2、提交当前偏移量
消费者 API 提供了另一种提交偏移量的方式,开发者可以在必要的时候 提交当前偏移量,而不是基于时间间隔。
把 auto.commit.offset 设为 false,让应用程序决定何时提交偏移量。使用 commitSync() 提交偏移量最简单也最可靠。这个 API 会提交由 poll() 方法返回的最新偏移量,提交成 功后马上返回,如果提交失败就抛出异常。
要记住,commitSync() 将会提交由 poll() 返回的最新偏移量,所以在处理完所有记录后要 确保调用了 commitSync(),否则还是会有丢失消息的风险。如果发生了再均衡,从最近一 批消息到发生再均衡之间的所有消息都将被重复处理。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %s,offset = %d,customer = %s,country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
System.out.println("value:" + record.value());
try {
//提交offset
consumer.commitSync();
} catch (CommitFailedException e) {
e.printStackTrace();
}
}
}
} finally {
consumer.close();
}
6.3、异步提交
手动提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了再 均衡,会增加重复消息的数量。这个时候可以使用异步提交 API。我们只管发送提交请求,无需等待 broker 的响应。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %s,offset = %d,customer = %s,country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
System.out.println("value:" + record.value());
try {
//异步提交offset
consumer.commitAsync();
} catch (CommitFailedException e) {
e.printStackTrace();
}
}
}
} finally {
consumer.close();
}
在成功提交或碰到无法恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会,这也是 commitAsync() 不好的一个地方。它之所以不进行重试,是因为在它收到 服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用 于提交偏移量 2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会 作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync() 重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。这个时 候如果发生再均衡,就会出现重复消息。
之所以提到这个问题的复杂性和提交顺序的重要性,是因为 commitAsync() 也支持回 调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标,不 过如果你要用它来进行重试,一定要注意提交的顺序。
//回调
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println(offsets);
}
}
});
6.4、同步和异步组合提交
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败 是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或 再均衡前的最后一次提交,就要确保能够提交成功。
因此,在消费者关闭前一般会组合使用 commitAsync() 和 commitSync()。它们的工作原理 如下:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",record.topic(),record.partition(),record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
- 如果一切正常,我们使用 commitAsync() 方法来提交。这样速度更快,而且即使这次提 交失败,下一次提交很可能会成功。
- 如果直接关闭消费者,就没有所谓的“下一次提交”了。使用 commitSync() 方法会一 直重试,直到提交成功或发生无法恢复的错误。
6.5、提交特定的偏移量
消费者 API 允许在调用 commitSync() 和 commitAsync() 方法时传进去希望提交 的分区和偏移量的 map。假设你处理了半个批次的消息,最后一个来自主题“customers” 分区 3 的消息的偏移量是 5000,你可以调用 commitSync() 方法来提交它。不过,因为消 费者可能不只读取一个分区,你需要跟踪所有分区的偏移量,所以在这个层面上控制偏移 量的提交会让代码变复杂。
public class CommitOffset {
//用于跟踪偏移的map
private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
//记录处理记录条数
static int count = 0;
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "testGroup");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//订阅主题
consumer.subscribe(Collections.singleton("testTopic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
//模拟处理数据
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
//记录offset
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
//每1000条提交一次偏移量
if (count % 1000 == 0) {
consumer.commitAsync(currentOffsets,null);
}
count++;
}
}
} finally {
consumer.close();
}
}
}
7、再均衡监听器
消费者在退出和进行分区再均衡之前,会做一些清理工作。你会在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量。如果消费 者准备了一个缓冲区用于处理偶发的事件,那么在失去分区所有权之前,需要处理在缓冲 区累积下来的记录。你可能还需要关闭文件句柄、数据库连接等。
在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代 码,在调用 subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例就可以了。 ConsumerRebalanceListener 有两个需要实现的方法。
- public void onPartitionsRevoked(Collection partitions)方法会在 再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接 管分区的消费者就知道该从哪里开始读取了。
- public void onPartitionsAssigned(Collection partitions)方法会在 重新分配分区之后和消费者开始读取消息之前被调用。
//订阅主题
consumer.subscribe(Collections.singleton("testTopic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
});
8、从特定偏移量处开始处理记录
如果你想从分区的起始位置开始读取消息,或者直接跳到分区的末尾开始读取消息,可以使 用seekToBeginning(Collection<TopicPartition> tp)
和seekToEnd(Collection<TopicPartition> tp)
这两个方法。
API | 说明 |
---|---|
API | 说明 |
publicvoidassign(Collection<TopicPartition>partitions) | 给当前消费者手动分配一系列主题分区。手动分配分区不支持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配。如果给出的主题分区是空的,则等价于调用unsubscribe方法。手动分配主题分区的方法不使用消费组管理功能。当消费组成员变了,或者集群或主题的元数据改变了,不会触发分区分配的再平衡。手动分区分配assign(Collection)不能和自动分区分配subscribe(Collection,ConsumerRebalanceListener)一起使用。如果启用了自动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区分配中的消费偏移量进行异步提交。 |
publicSet<TopicPartition>assignment() | 获取给当前消费者分配的分区集合。如果订阅是通过调用assign方法直接分配主题分区,则返回相同的集合。如果使用了主题订阅,该方法返回当前分配给该消费者的主题分区集合。如果分区订阅还没开始进行分区分配,或者正在重新分配分区,则会返回none。 |
publicMap<String,List<PartitionInfo>>listTopics() | 获取对用户授权的所有主题分区元数据。该方法会对服务器发起远程调用。 |
publicList<PartitionInfo>partitionsFor(Stringtopic) | 获取指定主题的分区元数据。如果当前消费者没有关于该主题的元数据,就会对服务器发起远程调用。 |
publicMap<TopicPartition,Long>beginningOffsets(Collection<TopicPartition>partitions) | 对于给定的主题分区,列出它们第一个消息的偏移量。注意,如果指定的分区不存在,该方法可能会永远阻塞。该方法不改变分区的当前消费者偏移量。 |
publicvoidseekToEnd(Collection<TopicPartition>partitions) | 将偏移量移动到每个给定分区的最后一个。该方法延迟执行,只有当调用过poll方法或position方法之后才可以使用。如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后。如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消费偏移量移动到最后一个稳定的偏移量,即下一个要消费的消息现在还是未提交状态的事务消息。 |
publicvoidseek(TopicPartitionpartition,longoffset) | 将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下一条要消费的消息偏移量。若该方法多次调用,则最后一次的覆盖前面的。如果在消费中间随意使用,可能会丢失数据。 |
publiclongposition(TopicPartitionpartition) | 检查指定主题分区的消费偏移量 |
publicvoidseekToBeginning(Collection<TopicPartition>partitions) | 将给定每个分区的消费者偏移量移动到它们的起始偏移量。该方法懒执行,只有当调用过poll方法或position方法之后才会执行。如果没有提供分区,则将所有分配给当前消费者的分区消费偏移量移动到起始偏移量。 |
准备数据:
# 生成消息文件
[root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> nm.txt; done
# 创建主题,三个分区,每个分区一个副本
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_demo_01 --partitions 3 --replication-factor 1
# 将消息生产到主题中
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_demo_01 < nm.txt
代码:
package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
/**
* # 生成消息文件
* [root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> nm.txt; done
* # 创建主题,三个分区,每个分区一个副本
* [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_demo_01 --partitions 3 --replication-factor 1
* # 将消息生产到主题中
* [root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 -- topic tp_demo_01 < nm.txt
* <p>
* 消费者位移管理
*/
public class MyConsumerMgr1 {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
/**
* 给当前消费者手动分配一系列主题分区。
* 手动分配分区不支持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配。
* 如果给出的主题分区是空的,则等价于调用unsubscribe方法。
* 手动分配主题分区的方法不使用消费组管理功能。当消费组成员变了,或者集群或主题的元数据改变了,不会触发分区分配的再平衡。
*
* 手动分区分配assign(Collection)不能和自动分区分配subscribe(Collection, ConsumerRebalanceListener)一起使用。
* 如果启用了自动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区分配中的消费偏移量进行异步提交。
*
*/
/* consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 0)));
Set<TopicPartition> assignment = consumer.assignment();
for (TopicPartition topicPartition : assignment) {
System.out.println(topicPartition);
}
// 获取对用户授权的所有主题分区元数据。该方法会对服务器发起远程调用。
Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
stringListMap.forEach((k, v) -> {
System.out.println("主题:" + k);
v.forEach(info -> {
System.out.println(info);
});
});
Set<String> strings = consumer.listTopics().keySet();
strings.forEach(topicName -> {
System.out.println(topicName);
});
List<PartitionInfo> partitionInfos = consumer.partitionsFor("tp_demo_01");
for (PartitionInfo partitionInfo : partitionInfos) {
Node leader = partitionInfo.leader();
System.out.println(leader);
System.out.println(partitionInfo);
// 当前分区在线副本
Node[] nodes = partitionInfo.inSyncReplicas();
// 当前分区下线副本
Node[] nodes1 = partitionInfo.offlineReplicas();
} */
// 手动分配主题分区给当前消费者
consumer.assign(Arrays.asList(
new TopicPartition("tp_demo_01", 0),
new TopicPartition("tp_demo_01", 1),
new TopicPartition("tp_demo_01", 2)
));
// 列出当前主题分配的所有主题分区
/* Set<TopicPartition> assignment = consumer.assignment();
assignment.forEach(k -> {
System.out.println(k);
}); */
// 对于给定的主题分区,列出它们第一个消息的偏移量。
// 注意,如果指定的分区不存在,该方法可能会永远阻塞。
// 该方法不改变分区的当前消费者偏移量。
// Map<TopicPartition, Long> topicPartitionLongMap = consumer.beginningOffsets(consumer.assignment());
/* topicPartitionLongMap.forEach((k, v) -> {
System.out.println("主题:" + k.topic() + "\t分区:" + k.partition() + "偏移量\t" + v);
}); */
// 将偏移量移动到每个给定分区的最后一个。
// 该方法延迟执行,只有当调用过poll方法或position方法之后才可以使用。
// 如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后。
// 如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消费偏移量移动到
// 最后一个稳定的偏移量,即下一个要消费的消息现在还是未提交状态的事务消息。
// consumer.seekToEnd(consumer.assignment());
// 将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下一条要消费的消息偏移量。
// 若该方法多次调用,则最后一次的覆盖前面的。
// 如果在消费中间随意使用,可能会丢失数据。
// consumer.seek(new TopicPartition("tp_demo_01", 1), 10);
// 检查指定主题分区的消费偏移量
/* long position = consumer.position(new TopicPartition("tp_demo_01", 1));
System.out.println(position); */
consumer.seekToEnd(Arrays.asList(new TopicPartition("tp_demo_01", 1)));
// 检查指定主题分区的消费偏移量
long position = consumer.position(new TopicPartition("tp_demo_01", 1));
System.out.println(position);
// 关闭生产者
consumer.close();
}
}
9、优雅退出
如果确定要退出循环,需要通过另一个线程调用 consumer.wakeup() 方法。如果循环运行 在主线程里,可以在 ShutdownHook 里调用该方法。要记住,consumer.wakeup() 是消费者 唯一一个可以从其他线程里安全调用的方法。调用 consumer.wakeup() 可以退出 poll(), 并抛出 WakeupException 异常,或者如果调用 consumer.wakeup() 时线程没有等待轮询,那 么异常将在下一轮调用 poll() 时抛出。我们不需要处理 WakeupException,因为它只是用 于跳出循环的一种方式。不过,在退出线程之前调用 consumer.close() 是很有必要的,它 会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来 就会触发再均衡,而不需要等待会话超时。
//优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
System.out.println("Starting exit...");
consumer.wakeup();
}
});
10、反序列化器
消费者需要用反序列化器把从 Kafka 接收到的字节数组转换成 Java 对象。 在 前面的例子里,我们假设每个消息的键值对都是字符串,所以我们使用了默认的 String Deserializer。
10.1、默认的反序列化器
org.apache.kafka.common.serialization.Deserializer接口是kafka中反序列化器的顶级接口,所有的反序列化器都必须实现该接口。
ByteArrayDeserializer
VoidDeserializer
UUIDDeserializer
StringDeserializer
ShortDeserializer
LongDeserializer
ListDeserializer
IntegerDeserializer
FloatDeserializer
DoubleDeserializer
BytesDeserializer
ByteBufferDeserializer
Deserializer接口的代码如下:
public interface Deserializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
//配置当前类
}
T deserialize(String topic, byte[] data);
default T deserialize(String topic, Headers headers, byte[] data) {
//反序列化操作
return deserialize(topic, data);
}
@Override
default void close() {
//关闭序列化器
}
}
10.2、自定义反序列化器
自定义对象:
public class Customer {
private int customerID;
private String customerName;
public Customer(int customerID, String customerName) {
this.customerID = customerID;
this.customerName = customerName;
}
public int getCustomerID() {
return customerID;
}
public void setCustomerID(int customerID) {
this.customerID = customerID;
}
public String getCustomerName() {
return customerName;
}
public void setCustomerName(String customerName) {
this.customerName = customerName;
}
}
自定义序列化器:
public class CustomerDeserializer implements Deserializer<Customer> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
//不做任何配置
}
@Override
public Customer deserialize(String topic, byte[] data) {
int id;
int nameSize;
String name;
try {
if (data == null)
return null;
if (data.length < 8)
throw new SerializationException("Size of data reveived by IntegerDeserializer is shorter than expected");
ByteBuffer buffer = ByteBuffer.wrap(data);
id = buffer.getInt();
nameSize = buffer.getInt();
byte[] nameBytes = new byte[nameSize];
buffer.get(nameBytes);
name = new String(nameBytes, "UTF-8");
return new Customer(id, name);
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
@Override
public void close() {
//不需要关闭任何资源
}
}
11、独立消费者
不需要订阅主题,取而代之的是为自己分配分区。一个消费者可以订 阅主题(并加入消费者群组),或者为自己分配分区,但不能同时做这两件事情。
List<PartitionInfo> partitionInfos = null;
//向集群请求主题可用的分区。如果只打算读取特定分区,可以跳过这一步。
partitionInfos = consumer.partitionsFor("topic");
if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
//知道需要哪些分区之后,调用 assign() 方法。
consumer.assign(partitions);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record: records) {
System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync(); }
}
除了不会发生再均衡,也不需要手动查找分区,其他的看起来一切正常。不过要记住,如 果主题增加了新的分区,消费者并不会收到通知。所以,要么周期性地调用 consumer. partitionsFor() 方法来检查是否有新分区加入,要么在添加新分区后重启应用程序。