1、生产者概览
生产者向Kafka发送消息的主要步骤:
从创建一个 ProducerRecord 对象开始,ProducerRecord 对象需要包含目标主题和要发 送的内容。我们还可以指定键或分区。在发送 ProducerRecord 对象时,生产者要先把键和 值对象序列化成字节数组,这样它们才能够在网络上传输。
接下来,数据被传给分区器。如果之前在 ProducerRecord 对象里指定了分区,那么分区器 就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据 ProducerRecord 对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和 分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消 息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。
服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入 失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还 是失败,就返回错误信息。
依赖如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
2、创建Kafka生产者
要往 Kafka 写入消息,首先要创建一个生产者对象,并设置一些属性。Kafka 生产者有 3个必选的属性。
- bootstrap.servers
该属性指定 broker 的地址清单,地址的格式为 host:port。清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要 提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。 - key.serializer
broker 希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因 此可以把 Java 对象作为键和值发送给 broker。这样的代码具有良好的可读性,不过生 产者需要知道如何把这些 Java 对象转换成字节数组。key.serializer 必须被设置为一 个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使 用这个类把键对象序列化成字节数组。Kafka 客户端默认提供了 ByteArraySerializer
(这个只做很少的事情)、StringSerializer 和 IntegerSerializer,因此,如果你只 使用常见的几种 Java 对象类型,那么就没必要实现自己的序列化器。要注意,key. serializer 是必须设置的,就算你打算只发送值内容。 - value.serializer
与 key.serializer 一样,value.serializer 指定的类会将值序列化。如果键和值都是字 符串,可以使用与 key.serializer 一样的序列化器。如果键是整数类型而值是字符串, 那么需要使用不同的序列化器。
Properties properties = new Properties();//Properties配置对象
properties.put("bootstrap.servers", "localhost:9092");
//因为发送的键值都是字符串类型,所以使用内置的StringSerializer
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建生产者
KafkaProducer producer = new KafkaProducer<String, String>(properties);
3、发送消息到Kafka
实例化生产者对象后,接下来就可以开始发送消息了。发送消息主要有以下 3 种方式。
- 发送并忘记(fire-and-forget)
把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候 也会丢失一些消息。 - 同步发送
我们使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待, 就可以知道消息是否发送成功。 - 异步发送
我们调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。
3.1、ProducerRecord
ProducerRecord<K, V>有6个属性:
private final String topic; //必填-主题
private final Integer partition; //分区号
private final Headers headers; //消息头部,设置和应用相关的信息
private final K key; //键,附加信息,计算分区
private final V value; //必填-值(消息体)
private final Long timestamp; //消息时间戳
6个构造方法:
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) { }
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {}
public ProducerRecord(String topic, Integer partition, K key, V value) {}
public ProducerRecord(String topic, K key, V value) {}
public ProducerRecord(String topic, V value) { }
3.2、发送并忘记
最简单的消息发送方式如下:
//发送消息到Kafka
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("testTopic", "testKey", "testValue");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
- 生产者的 send() 方法将 ProducerRecord 对象作为参数,所以我们要先创建一个 ProducerRecord 对象。它需要目标主题的名字和要发送的键和值对象,它们都是字符 串。键和值对象的类型必须与序列化器和生产者对象相匹配。
- 我们使用生产者的 send() 方法发送 ProducerRecord 对象。从生产者的架构图里可以看 到,消息先是被放进缓冲区,然后使用单独的线程发送到服务器端。send() 方法会返 回一个包含 RecordMetadata 的 Future 对象,不过因为我们会忽略返回值,所以无法知 道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。比如,记录 Twitter 消息日志,或记录不太重要的应用程序日志。
- 我们可以忽略发送消息时可能发生的错误或在服务器端可能发生的错误,但在发送消 息之前,生产者还是有可能发生其他的异常。这些异常有可能是 SerializationException (说明序列化消息失败)、BufferExhaustedException 或 TimeoutException(说明缓冲区已满),又或者是 InterruptException(说明发送线程被中断)。
3.3、同步发送消息
//同步发送消息
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("testTopic", "testKey", "testValue");
try {
RecordMetadata metadata = (RecordMetadata) producer.send(record).get();
System.out.println(metadata.offset());
} catch (Exception e) {
e.printStackTrace();
}
- producer.send() 方法先返回一个 Future 对象,然后调用 Future 对象的 get() 方法等待 Kafka 响应。如果服务器返回错误,get() 方法会抛出异常。如果没有发生错 误,我们会得到一个 RecordMetadata 对象,可以用它获取消息的偏移量。
- 如果在发送数据之前或者在发送过程中发生了任何错误,比如 broker 返回了一个不允 许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常。我们只是简单地把 异常信息打印出来。
KafkaProducer 一般会发生两类错误。其中一类是可重试错误,这类错误可以通过重发消息 来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主(no leader)”错误则可 以通过重新为分区选举首领来解决。KafkaProducer 可以被配置成自动重试,如果在多次重 试后仍无法解决问题,应用程序会收到一个重试异常。另一类错误无法通过重试解决,比如
“消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。
3.4、异步发送消息
//异步发送消息
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("testTopic", "testKey", "testValue");
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("offset:" + recordMetadata.offset());
}
}
});
//阻塞,等待执行回调
System.in.read();
- 在send()方法指定Callback回调函数,kafka在返回响应时调用该函数来实现异步发送的确认。
- 成功时RecordMetadate不为空,失败时exception不为空,互斥。kakfaProducer可以保证回调函数的顺序。
4、生产者的配置
除了上述必填的参数,生产者还有很多可配置的参数,在 Kafka 文档里都有说明,它们大部分都有合理的默认 值,所以没有必要去修改它们。不过有几个参数在内存使用、性能和可靠性方面对生产者 影响比较大,接下来我们会一一说明。
-
acks
acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。 这个参数对消息丢失的可能性有重要影响。该参数有如下选项。- 如果 acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说, 如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢 失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大 速度发送消息,从而达到很高的吞吐量。
- 如果 acks=1(默认值),只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功 响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来), 生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个 没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象 的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回 调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生 产者在收到服务器响应之前可以发送多少个消息)。
- 如果 acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自 服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算 有服务器发生崩溃,整个集群仍然可以运行。不过,它的 延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。
-
buffer.memory
该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果 应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候, send() 方法调用要么被阻塞,要么抛出异常,取决于如何设置 block.on.buffer.full 参数
(在 0.9.0.0 版本里被替换成了 max.block.ms,表示在抛出异常之前可以阻塞一段时间)。 -
compression.type
默认情况下,消息发送时不会被压缩。该参数可以设置为 snappy、gzip 或 lz4,它指定了 消息被发送给 broker 之前使用哪一种压缩算法进行压缩。snappy 压缩算法由 Google 发明, 它占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网 络带宽,可以使用这种算法。gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩 比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和 存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。 -
retries
生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况 下,retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会 放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前, 先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间), 让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不 过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。一般情 况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。 你只需要处理那些不可重试的错误或重试次数超出上限的情况。 -
batch.size
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指 定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满, 批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满 的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大, 也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频 繁地发送消息,会增加一些额外的开销。 -
linger.ms
该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在 批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生 产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms 设置成比 0 大的数, 让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延 迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。 -
client.id
该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指 标里。 -
max.in.flight.requests.per.connection
该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用 越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务 器的,即使发生了重试。 -
timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间,metadata. fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器 返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误
(抛出异常或执行回调)。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回 一个错误。 -
max.block.ms
该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞 时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻 塞时间达到 max.block.ms 时,生产者会抛出超时异常。 -
max.request.size
该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指 单个请求里所有消息总的大小。例如,假设这个值为 1MB,那么可以发送的单个最大消 息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每 个消息大小为 1KB。另外,broker 对可接收的消息最大值也有自己的限制(message.max. bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。 -
receive.buffer.bytes 和 send.buffer.bytes
这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1, 就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以 适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
顺序保证
Kafka 可以保证同一个分区里的消息是有序的。也就是说,如果生产者按照 一定的顺序发送消息,broker 就会按照这个顺序把它们写入分区,消费者也 会按照同样的顺序读取它们。在某些情况下,顺序是非常重要的。例如,往 一个账户存入 100 元再取出来,这个与先取钱再存钱是截然不同的!不过, 有些场景对顺序不是很敏感。
如果把 retries 设为非零整数,同时把 max.in.flight.requests.per.connection 设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入 成功,broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那 么两个批次的顺序就反过来了。
一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是 很关键的,所以不建议把 retries 设为 0。可以把 max.in.flight.requests. per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其 他的消息发送给 broker。不过这样会严重影响生产者的吞吐量,所以只有在 对消息的顺序有严格要求的情况下才能这么做。
5、序列化器
Kafka提供的所有序列化器都实现了org.apache.kafka.common.serialization.Serializer接口,kafka默认为我们提供了六种基本类型的包装类(Byte、Double、Float、Long、Short、Integer)的序列化器,还提供了String、List、UUID等类型的序列化器。
ByteArraySerializer
ByteBufferSerializer
BytesSerializer
StringSerializer
IntegerSerializer
VoidSerializer
ShortSerializer
UUIDSerializer
LongSerializer
FloatSerializer
ListSerializer
DoubleSerializer
5.1、自定义序列化器
如果发送到 Kafka 的对象不是简单的字符串或整型,那么可以使用序列化框架来创建消息 记录,如 Avro、Thrift 或 Protobuf,或者使用自定义序列化器。我们强烈建议使用通用的 序列化框架。
假设有一个客户类:
public class Customer {
private int customerID;
private String customerName;
public Customer(int customerID, String customerName) {
this.customerID = customerID;
this.customerName = customerName;
}
public int getCustomerID() {
return customerID;
}
public void setCustomerID(int customerID) {
this.customerID = customerID;
}
public String getCustomerName() {
return customerName;
}
public void setCustomerName(String customerName) {
this.customerName = customerName;
}
}
现在我们要为这个类创建一个序列化器,它看起来可能是这样的:
public class CustomerSerializer implements Serializer<Customer> {
//用来配置当前类
public void configure(Map<String, ?> configs, boolean isKey) {
//不做任何配置
}
/**
* 执行序列化操作
* @param topic
* @param data
* @return
* Customer对象被序列化成:
* 表示customerID的4字节整数
* 表示customerName长度的4字节整数(如果customerName为空,则长度为0)
* 表示customerName的N个字节
*/
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null) {
return null;
} else {
if (data.getCustomerName() != null) {
serializedName = data.getCustomerName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getCustomerID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
//关闭当前序列化器
public void close() {
//不需要关系任何东西
}
}
只要使用这个 CustomerSerializer,就可以把消息记录定义成 ProducerRecord<String, Customer>,并且可以直接把 Customer 对象传给生产者。这个例子很简单,不过代码看起 来太脆弱了——如果我们有多种类型的消费者,可能需要把 customerID 字段变成长整型, 或者为 Customer 添加 startDate 字段,这样就会出现新旧消息的兼容性问题。在不同版 本的序列化器和反序列化器之间调试兼容性问题着实是个挑战——你需要比较原始的字节 数组。更糟糕的是,如果同一个公司的不同团队都需要往 Kafka 写入 Customer 数据,那 么他们就需要使用相同的序列化器,如果序列化器发生改动,他们几乎要在同一时间修改 代码。
5.2、使用Avro序列化
Apache Avro(以下简称 Avro)是一种与编程语言无关的序列化格式。Doug Cutting 创建了这个项目,目的是提供一种共享数据文件的方式。
Avro 数据通过与语言无关的 schema 来定义。schema 通过 JSON 来描述,数据被序列化 成二进制文件或 JSON 文件,不过一般会使用二进制文件。Avro 在读写文件时需要用到 schema,schema 一般会被内嵌在数据文件里。
Avro 有一个很有意思的特性是,当负责写消息的应用程序使用了新的 schema,负责读 消息的应用程序可以继续处理消息而无需做任何改动,这个特性使得它特别适合用在像 Kafka 这样的消息系统上。
假设最初的 schema 是这样的:
{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "faxNumber",
"type": [
"null",
"string"
],
"default": "null"
}
]
}
假设我们已经使用了这个 schema 几个月的时间,并用它生成了几个太字节的数据。现在,我们决定在新版本里做一些修改。因为在 21 世纪不再需要 faxNumber 字段,需要用 email字段来代替它。 新的 schema 如下:
{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": [
"null",
"string"
],
"default": "null"
}
]
}
更新到新版的 schema 后,旧记录仍然包含 faxNumber 字段,而新记录则包含 email 字段。 部分负责读取数据的应用程序进行了升级,那么它们是如何处理这些变化的呢?
在应用程序升级之前,它们会调用类似 getName()、getId() 和 getFaxNumber() 这样的方 法。如果碰到使用新 schema 构建的消息,getName() 和 getId() 方法仍然能够正常返回, 但 getFaxNumber() 方法会返回 null,因为消息里不包含传真号码。
在应用程序升级之后,getEmail() 方法取代了 getFaxNumber() 方法。如果碰到一个使用旧 schema 构建的消息,那么 getEmail() 方法会返回 null,因为旧消息不包含邮件地址。
现在可以看出使用 Avro 的好处了:我们修改了消息的 schema,但并没有更新所有负责读 取数据的应用程序,而这样仍然不会出现异常或阻断性错误,也不需要对现有数据进行大 幅更新。
不过这里有以下两个需要注意的地方。
- 用于写入数据和读取数据的 schema 必须是相互兼容的。Avro 文档提到了一些兼容性 原则。
- 反序列化器需要用到用于写入数据的 schema,即使它可能与用于读取数据的 schema 不 一样。Avro 数据文件里就包含了用于写入数据的 schema,不过在 Kafka 里有一种更好 的处理方式,
6、分区
ProducerRecord 对象包含了目标主题、键和值。Kafka 的消息是一个个 键值对,ProducerRecord 对象可以只包含目标主题和值,键可以设置为默认的 null,不 过大多数应用程序会用到键。键有两个用途:可以作为消息的附加信息,也可以用来 决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。也就是 说,如果一个进程只从一个主题的分区读取数据,那么具有相 同键的所有记录都会被该进程读取。要创建一个包含键值的记录,只需像下面这样创建 ProducerRecord 对象:
ProducerRecord<Integer, String> record =
new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");
如果要创建键为 null 的消息,不指定键就可以了:
ProducerRecord<Integer, String> record =
new ProducerRecord<>("CustomerCountry", "USA");
如果键值为 null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用 的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。
如果键不为空,并且使用了默认的分区器,那么 Kafka 会对键进行散列(使用 Kafka 自己 的散列算法,即使升级 Java 版本,散列值也不会发生变化),然后根据散列值把消息映射 到特定的分区上。这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进 行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。这也意味着,如果写入 数据的分区是不可用的,那么就会发生错误。
只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。举个例子,在 分区数量保持不变的情况下,可以保证用户 045189 的记录总是被写到分区 34。在从分 区读取数据时,可以进行各种优化。不过,一旦主题增加了新的分区,这些就无法保证 了——旧数据仍然留在分区 34,但新的记录可能被写到其他分区上。如果要使用键来映射 分区,那么最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。
6.1、PartitionInfo
如果没有提前创建消息所属的主题,默认情况下主题的分区数量只有一个。一个主题只有一个分区时,会导致同一个主题的所有消息都只会保存到一个节点上。一般我们会提前创建主题,指定更多的分区数,这样同一个主题的所有消息就会分散在不同的节点上。
PartitionInfo对象表示一个分区的分布信息,它的成员变量有主题名称、分区编号、所在的主副本节点、所有的副本、ISR列表。把消息和PartitionInfo组合起来,就能表示消息被发送到哪个主题的哪个分区。相关代码如下:
public class PartitionInfo {
private final String topic;//主题名称
private final int partition;//分区编号
private final Node leader;//分区的主节点
private final Node[] replicas;//分区的所有副本节点
private final Node[] inSyncReplicas;//分区中处于ISR的副本
private final Node[] offlineReplicas;//分区中处于离线的副本
}
6.2、实现自定义分区策略
partition() 方法用来计算分区号,返回值为 int 类型。partition() 方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。
public class MyPartitioner implements Partitioner {
//分区操作
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceof String))) {
throw new InvalidRecordException("We expect all messages to have customer name as key");
}
if (((String)key).equals("special")) {
return numPartitions;//key为special的消息,总是被分配到最后一个分区
}
//其它记录被散列到其他分区
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1));
}
//关闭分区器
public void close() {
//不做任何操作
}
//配置
public void configure(Map<String, ?> configs) {
//不做任何配置
}
}
然后通过partitioner.class指定分区器:
Properties properties = new Properties();
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
7、完整示例
新的生产者应用程序使用KafkaProducer对象代表一个生产者客户端进程。生产者要发送消息,并不是直接发送给服务端,而是先在客户端把消息放入队列中,然后由一个消息发送线程从队列中拉取消息,以批量的方式发送消息给服务端。Kafka的记录收集器(RecordAccumulator)负责缓存生产者客户端产生的消息,发送线程(Sender)负责读取记录收集器的批量消息,通过网络发送给服务端。为了保证客户端网络请求的快速响应,Kafka使用选择(Selector)处理网络连接和读写处理,使用网络连接(NetworkClient)处理客户端网络请求。
//下面的生产者示例代码来自Kafka源码根目录的examples包
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
public Producer( String topic, Boolean isAsync) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("client.id", "DemoProducer");
properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer<Integer, String>(properties);
this.topic = topic;
this.isAsync = isAsync;
}
@Override
public void run() {
int num = 1;
while (true) {
String msg = "Message_" + num;
if (isAsync) { //异步发送
producer.send(new ProducerRecord<Integer, String>(topic, num, msg), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("#offset:" + metadata.offset());
}
});
} else { //同步发送
try {
producer.send(new ProducerRecord<Integer, String>(topic, num, msg)).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
++num;
}
}
}
其中Producer类首先根据配置文件创建了一个生产者客户端对象KafkaProducer。在构造函数中,isAsync参数表示生产者发送消息的模式,值为true表示异步,值为false表示同步。每一条消息都会包装成一条生产者记录(ProducerRecord),并传给生产者客户端对象的send发送方法,表示生产者客户端产生的一条消息通过生产者客户端对象发送给Kafka服务端集群。
异步发送模式还传递了一个Callback匿名回调类,当客户端发送完一条消息,并且这条消息成功地存储到Kafka集群后,服务端会调用匿名回调类的回调方法(onCompletion)。异步发送指的是生产者发送完一条消息后,不需要关心服务端处理完了没有,可以接着发送下一条消息。服务端在处理完每一条消息后,会自动触发回调函数,返回响应结果给客户端。如果是以同步模式发送消息,生产者客户端应用程序不需要提供回凋函数。同步模式下,生产者发送完一条消息后,必须等待服务端返回
响应结果,然后才能发送下一条消息。
KafkaProducer只用了一个send方法,就可以完成同步和异步两种模式的消息发送,这是因为send方法返回的是一个Future。基于Future,我们可以实现同步或异步的消息发送语义。
- 同步。调用send返回Futurel时,需要立即调用get,因为Future.get在没有返回结果时会一直阻塞。
- 异步。提供一个回调,调用sed后可以继续发送消息而不用等待。当有结果返回时,会自动执行回调函数。
生产者客户端对象KafkaProducer的send方法的处理逻辑是:首先序列化消息的key和value(消息必须序列化成二进制流的形式才能在网络中传输),然后为每一条消息选择对应的分区(表示要将消息存储到Kafka集群的哪个节点上),最后通知发送线程发送消息。