2024-04-03
原文作者:吴声子夜歌 原文地址: https://blog.csdn.net/cold___play/article/details/132287669

无消息丢失配置

Java版本producer用户采用异步发送机制。KafkaProducer.send方法仅仅把消息放入缓冲区中,由一个专属I/O线程负责从缓冲区中提取消息并封装进消息batch中,然后发送出去。显然,这个过程中存在着数据丢失的窗口:若I/O线程发送之前producer崩溃,则存储缓冲区中的消息全部丢失了。这是producer需要处理的很重要的问题。

producer的另一个问题就是消息的乱序。假设客户端依次执行下面的语句发送两条消息到相同的分区:

    producer.send(record1);
    producer.send(record2);

若此时由于某些原因(比如瞬时的网络抖动)导致record1未发送成功,同时Kafka又配置了重试机制以及max.in.flight.requests.per.connection大于1(默认值是5),那么producer重试recordl成功后,recordl在日志中的位置反而位于record2之后,这样造成了消息的乱序。要知道很多实际使用场景中都有事件强顺序保证的要求。

鉴于producer的这两个问题,应该如何规避呢?首先,对于消息丢失的问题,很容易想到的一个方案就是:既然异步发送可能丢失数据,改成同步发送似乎是一个不错的主意。比如这样:

    producer.send(record).get();

采用同步发送当然是可以的,但是性能会很差,并不推荐在实际场景中使用。因此最好能有一份配置,既使用异步方式还能有效地避免数据丢失,即使出现producer崩溃的情况也不会有问题。

首先给出producer端“无消息丢失配置”,然后再分别解释每个参数配置的含义。具体配置参数列表如下。

  • block.on.buffer.full=true
  • acks=all or -1
  • retries=Integer.MAX_VALUE
  • max.in.flight.requests.per.connection=1
  • 使用带回调机制的send发送消息,即KafkaProducer.send(record, callback)
  • Callback逻辑中显式地立即关闭producer,使用close(0)
  • unclean.leader.election.enable=false
  • replication.factor=3
  • min.insync.replicas=2
  • replication.factor>min.insync.replicas
  • enable.auto.commit=false

1、producer端配置

block.on.buffer.full = true

实际上这个参数在Kafka0.9.0.0版本已经被标记为“deprecated”,并使用max.block.ms参数替代,但这里还是推荐用户显式地设置它为true,使得内存缓冲区被填满时producer处于阻塞状态并停止接收新的消息而不是抛出异常;否则producer生产速度过快会耗尽缓冲区。新版本Kafka(0.10.0.0之后)可以不用理会这个参数,转而设置max.block.ms即可。

acks = all

设置acks为all很容易理解,即必须要等到所有follower都响应了发送消息才能认为提交成功,这是producer端最强程度的持久化保证。

retries = Integer.MAX_VALUE

设置成MAX VALUE纵然有些极端,但其实想表达的是producer要开启无限重试。用户不必担心producer会重试那些肯定无法恢复的错误,当前producer只会重试那些可恢复的异常情况,所以放心地设置一个比较大的值通常能很好地保证消息不丢失。

使用带有回调机制的send

不要使用KafkaProducer中单参数的send方法,因为该send调用仅仅是把消息发出而不会理会消息发送的结果。如果消息发送失败,该方法不会得到任何通知,故可能造成数据的丢失。实际环境中一定要使用带回调机制的send版本,即KafkaProducer.send(record,.callback)。

Callback逻辑中显式立即关闭producer

在Callback的失败处理逻辑中显式调用KafkaProducer.close(0)。这样做的目的是为了处理
消息的乱序问题。若不使用close(0),默认情况下producer会被允许将未完成的消息发送出去,这样就有可能造成消息乱序。

2、broker端配置

unclean.leader.election.enable = false

关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,从而避免broker端因日志水位截断而造成的消息丢失。

replication.factor >= 3

设置成3主要是参考了Hadoop及业界通用的三备份原则,其实这里想强调的是一定要使用多个副本来保存分区的消息。

min.insync.replicas > 1

用于控制某条消息至少被写入到ISR中的多少个副本才算成功,设置成大于1是为了提升producer端发送语义的持久性。记住只有在producer端acks被设置成all或-l时,这个参数才有意义。在实际使用时,不要使用默认值。

确保replication.factor > min.insync.replicas

若两者相等,那么只要有一个副本挂掉,分区就无法正常工作,虽然有很高的持久性但可用性被极大地降低了。推荐配置成replication.factor=min.insyn.replicas+l。

阅读全文