2023-07-31
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/290

Kafka Producer的源码在clients模块下。本章,我们先来看下KafkaProducer在构造的时候,会涉及到哪些内部的核心组件,然后我将讲解KafkaProducer的初始化流程。

202307312122164171.png

一、整体流程

Kafka Producer的核心原理我在前面章节已经详细讲解过了,回顾下这张图,其实就是一条消息从创建到发送的整体流程:

202307312122172992.png

上图中有几个核心组件我在后续分析KafkaProducer的源码时会重点讲解:

  • Partitioner: 分区器,用来决定发送的每条消息是路由到Topic的哪个分区里;
  • MetaData: Producer发送消息时,需要根据Topic的元数据信息确认发送到哪个Broker。所以,Producer会从Broker集群去拉取元数据,元数据包括Topic信息、分区信息;
  • RecordAccumulator: 缓冲区,负责消息的复杂缓冲机制,发送到每个分区的消息会被打包成ProducerBatch,最后一个Broker上的多个分区对应的多个ProducerBatch又会被打包成一个Request;
  • NetworkClient: 网络通信组件,负责Kafka Producer与Broker之间的通信;
  • Sender线程: 负责从缓冲区里获取消息并发送到Broker;

Kafka Producer还有拦截器和序列化器,这两个了解一下就可以了,不是我们分析的重点。

二、初始化

KafkaProducer 就是Kafka中的生产者,回顾下我们使用Kafka生产者的方式,就是创建KafkaProducer对象,然后传入必要的生产者参数:

    public class ProducerDemo {
    
        public static void main(String[] args) throws Exception {
            Properties props = new Properties();
    
            // 配置Broker地址,配置几台即可,Producer会Broker拉取Topic的元数据并缓存
            props.put("bootstrap.servers", "ressmix01:9092,ressmix02:9092,ressmix03:9092");  
            // key序列化器
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // value序列化器
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // 要求所有ISR中的副本写入成功
            props.put("acks", "-1");
            props.put("retries", 3);
            // 每个批量发送
            props.put("batch.size", 323840);
            props.put("linger.ms", 10);
            // 发送消息的缓冲区大小,32MB
            props.put("buffer.memory", 33554432);
            // 发送消息的缓冲区满时,阻塞超时时间
            props.put("max.block.ms", 3000);
    
            // 创建一个Producer实例
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
            // 创建一个条消息
            ProducerRecord<String, String> record = new ProducerRecord<>(
                    "test-topic", "test-key", "test-value");
    
            // 采用异步发送模式
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(exception == null) {
                        // 消息发送成功
                        System.out.println("消息发送成功");  
                    } else {
                        // 消息发送失败,需要重新发送
                    }
                }
    
            });
    
            Thread.sleep(10 * 1000); 
    
            // 采用同步发送模式
            // producer.send(record).get(); 
    
            // 关闭Producer
            producer.close();
        }    
    }

2.1 构造

我们来看下KafkaProducer的构造,我省略了一些重载构造器,只留下最核心的部分。其实,整个构造过程比较简单,就是给KafkaProducer组装各类核心组件和配置:

    public class KafkaProducer<K, V> implements Producer<K, V> {
        // 生产者编号,自增序号
        private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
        private String clientId;
    
        // 分区器
        private final Partitioner partitioner;
        // 单个请求最大大小
        private final int maxRequestSize;
        // 发送缓冲区大小
        private final long totalMemorySize;
    
        // 元数据
        private final Metadata metadata;
    
        // 缓冲区(消息累加器)
        private final RecordAccumulator accumulator;
    
        // Sender线程
        private final Sender sender;
        private final Thread ioThread;
    
        // 消息压缩类型
        private final CompressionType compressionType;
        private final Sensor errors;
        private final Time time;
    
        // K/V序列化器
        private final Serializer<K> keySerializer;
        private final Serializer<V> valueSerializer;
    
        // 生产者参数
        private final ProducerConfig producerConfig;
        // 最大请求阻塞时间
        private final long maxBlockTimeMs;
        // 请求超时时间
        private final int requestTimeoutMs;
    
        // 拦截器
        private final ProducerInterceptors<K, V> interceptors;
    
        private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, 
                              Serializer<V> valueSerializer) {
            try {
                log.trace("Starting the Kafka producer");
                // 生产者配置
                Map<String, Object> userProvidedConfigs = config.originals();
                this.producerConfig = config;
                this.time = Time.SYSTEM;
    
                // 设置Producer的ID
                clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
                if (clientId.length() <= 0)
                    clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
                // 设置分区器:默认为DefaultPartitioner
                this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
                // 重试发送间隔
                long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
    
                // 设置K/V序列化器
                if (keySerializer == null) {
                    this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                            Serializer.class);
                    this.keySerializer.configure(config.originals(), true);
                } else {
                    config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                    this.keySerializer = keySerializer;
                }
                if (valueSerializer == null) {
                    this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                            Serializer.class);
                    this.valueSerializer.configure(config.originals(), false);
                } else {
                    config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                    this.valueSerializer = valueSerializer;
                }
    
                // 设置拦截器
                List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                        ProducerInterceptor.class);
                this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
    
                // 设置元数据
                this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
                // 单个请求最大大小
                this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
                // 发送缓存区大小
                this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
                // 消息压缩方式,默认不压缩
                this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
    
                // 设置消息发送阻塞时间;当缓存区满或Producer获取不到元数据时,会进入阻塞,超过阻塞时间后抛出异常
                this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
    
                // 设置缓冲区(消息累加器)
                // batch.size: 消息会被按批次封装成ProducerBatch对象,这个参数用来设置batch的大小
                // linger.ms: 消息在缓存区的最大逗留时间,默认值为0,即消息不过缓冲区立即被发送出去
                this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                        this.totalMemorySize,
                        this.compressionType,
                        config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                        retryBackoffMs,
                        metrics,
                        time);
    
                // 解析配置的Broker地址
                List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
                // 更新元数据
                this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
                // 构建网络通信组件
                ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
                NetworkClient client = new NetworkClient(
                        new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
                        this.metadata,
                        clientId,
                        config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                        config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                        config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                        this.requestTimeoutMs,
                        time,
                        true);
    
                // 构建Sender Runnable任务
                // max.in.flight.requests.per.connection:控制一个连接中最大未响应的请求数量,与消息发送的有序性有关
                this.sender = new Sender(client,
                        this.metadata,
                        this.accumulator,
                        config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                        config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                        (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                        config.getInt(ProducerConfig.RETRIES_CONFIG),
                        this.metrics,
                        Time.SYSTEM,
                        this.requestTimeoutMs);
                String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
                // 启动Sender线程
                this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
                this.ioThread.start();
    
                //...
                log.debug("Kafka producer started");
            } catch (Throwable t) {
                close(0, TimeUnit.MILLISECONDS, true);
                throw new KafkaException("Failed to construct kafka producer", t);
            }
        }
    }

上述构造过程中,有几个核心点要注意下:

  • 默认情况下,每个请求的最大大小为1MB,发送缓冲区的大小为32MB,请求重试间隔为100ms,缓冲区填满之后的阻塞时间为60s,请求超时时间为30s;
  • RecordAccumulator,负责消息的复杂缓冲机制,消息在发送前会被打包成batch,batch的默认大小为16KB。Batch中的消息,最大逗留时间为linger.ms,比如说5ms,如果5ms还没凑出来一个batch,就必须立即把这个消息发送出去;
  • NetworkClient,负责底层的网络通信,一个网络连接最多空闲9分钟,默认情况下每个连接最多允许5个Request没收到响应,重试连接的时间间隔为50ms,Socket发送缓冲区大小为128KB,Socket接收缓冲区大小为32KB;
  • Sender线程,负责从缓冲区里获取消息发送到Broker上去,控制了单个消息最大大小为1MB,acks默认为1,表示要Leader Partition写入成功就认为成功,默认重试次数为0,请求超时时间为30s。

三、总结

本章,我对KafkaProdcer的初始化整体流程进行了分析,下一章开始,我将分析KafkaProducer内部的各个核心组件。

阅读全文