Kafka Producer的源码在clients
模块下。本章,我们先来看下KafkaProducer在构造的时候,会涉及到哪些内部的核心组件,然后我将讲解KafkaProducer的初始化流程。
一、整体流程
Kafka Producer的核心原理我在前面章节已经详细讲解过了,回顾下这张图,其实就是一条消息从创建到发送的整体流程:
上图中有几个核心组件我在后续分析KafkaProducer的源码时会重点讲解:
- Partitioner: 分区器,用来决定发送的每条消息是路由到Topic的哪个分区里;
- MetaData: Producer发送消息时,需要根据Topic的元数据信息确认发送到哪个Broker。所以,Producer会从Broker集群去拉取元数据,元数据包括Topic信息、分区信息;
- RecordAccumulator: 缓冲区,负责消息的复杂缓冲机制,发送到每个分区的消息会被打包成ProducerBatch,最后一个Broker上的多个分区对应的多个ProducerBatch又会被打包成一个Request;
- NetworkClient: 网络通信组件,负责Kafka Producer与Broker之间的通信;
- Sender线程: 负责从缓冲区里获取消息并发送到Broker;
Kafka Producer还有拦截器和序列化器,这两个了解一下就可以了,不是我们分析的重点。
二、初始化
KafkaProducer 就是Kafka中的生产者,回顾下我们使用Kafka生产者的方式,就是创建KafkaProducer对象,然后传入必要的生产者参数:
public class ProducerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// 配置Broker地址,配置几台即可,Producer会Broker拉取Topic的元数据并缓存
props.put("bootstrap.servers", "ressmix01:9092,ressmix02:9092,ressmix03:9092");
// key序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 要求所有ISR中的副本写入成功
props.put("acks", "-1");
props.put("retries", 3);
// 每个批量发送
props.put("batch.size", 323840);
props.put("linger.ms", 10);
// 发送消息的缓冲区大小,32MB
props.put("buffer.memory", 33554432);
// 发送消息的缓冲区满时,阻塞超时时间
props.put("max.block.ms", 3000);
// 创建一个Producer实例
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// 创建一个条消息
ProducerRecord<String, String> record = new ProducerRecord<>(
"test-topic", "test-key", "test-value");
// 采用异步发送模式
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception == null) {
// 消息发送成功
System.out.println("消息发送成功");
} else {
// 消息发送失败,需要重新发送
}
}
});
Thread.sleep(10 * 1000);
// 采用同步发送模式
// producer.send(record).get();
// 关闭Producer
producer.close();
}
}
2.1 构造
我们来看下KafkaProducer的构造,我省略了一些重载构造器,只留下最核心的部分。其实,整个构造过程比较简单,就是给KafkaProducer组装各类核心组件和配置:
public class KafkaProducer<K, V> implements Producer<K, V> {
// 生产者编号,自增序号
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private String clientId;
// 分区器
private final Partitioner partitioner;
// 单个请求最大大小
private final int maxRequestSize;
// 发送缓冲区大小
private final long totalMemorySize;
// 元数据
private final Metadata metadata;
// 缓冲区(消息累加器)
private final RecordAccumulator accumulator;
// Sender线程
private final Sender sender;
private final Thread ioThread;
// 消息压缩类型
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;
// K/V序列化器
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
// 生产者参数
private final ProducerConfig producerConfig;
// 最大请求阻塞时间
private final long maxBlockTimeMs;
// 请求超时时间
private final int requestTimeoutMs;
// 拦截器
private final ProducerInterceptors<K, V> interceptors;
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer,
Serializer<V> valueSerializer) {
try {
log.trace("Starting the Kafka producer");
// 生产者配置
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = Time.SYSTEM;
// 设置Producer的ID
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
// 设置分区器:默认为DefaultPartitioner
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
// 重试发送间隔
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
// 设置K/V序列化器
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
// 设置拦截器
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
// 设置元数据
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
// 单个请求最大大小
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
// 发送缓存区大小
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
// 消息压缩方式,默认不压缩
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
// 设置消息发送阻塞时间;当缓存区满或Producer获取不到元数据时,会进入阻塞,超过阻塞时间后抛出异常
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
// 设置缓冲区(消息累加器)
// batch.size: 消息会被按批次封装成ProducerBatch对象,这个参数用来设置batch的大小
// linger.ms: 消息在缓存区的最大逗留时间,默认值为0,即消息不过缓冲区立即被发送出去
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time);
// 解析配置的Broker地址
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
// 更新元数据
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
// 构建网络通信组件
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs,
time,
true);
// 构建Sender Runnable任务
// max.in.flight.requests.per.connection:控制一个连接中最大未响应的请求数量,与消息发送的有序性有关
this.sender = new Sender(client,
this.metadata,
this.accumulator,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
Time.SYSTEM,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
// 启动Sender线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
//...
log.debug("Kafka producer started");
} catch (Throwable t) {
close(0, TimeUnit.MILLISECONDS, true);
throw new KafkaException("Failed to construct kafka producer", t);
}
}
}
上述构造过程中,有几个核心点要注意下:
- 默认情况下,每个请求的最大大小为1MB,发送缓冲区的大小为32MB,请求重试间隔为100ms,缓冲区填满之后的阻塞时间为60s,请求超时时间为30s;
- RecordAccumulator,负责消息的复杂缓冲机制,消息在发送前会被打包成batch,batch的默认大小为16KB。Batch中的消息,最大逗留时间为
linger.ms
,比如说5ms,如果5ms还没凑出来一个batch,就必须立即把这个消息发送出去; - NetworkClient,负责底层的网络通信,一个网络连接最多空闲9分钟,默认情况下每个连接最多允许5个Request没收到响应,重试连接的时间间隔为50ms,Socket发送缓冲区大小为128KB,Socket接收缓冲区大小为32KB;
- Sender线程,负责从缓冲区里获取消息发送到Broker上去,控制了单个消息最大大小为1MB,acks默认为1,表示要Leader Partition写入成功就认为成功,默认重试次数为0,请求超时时间为30s。
三、总结
本章,我对KafkaProdcer的初始化整体流程进行了分析,下一章开始,我将分析KafkaProducer内部的各个核心组件。