2023-07-29  阅读(5)
原文作者:说好不能打脸 原文地址:https://yinwj.blog.csdn.net/article/details/50698695

4-5、Kafka原理:消费者

作为Apache Kafka消息队列,它的性能指标相当一部分取决于消费者们的性能——只要消息能被快速消费掉不在Broker端形成拥堵,整个Apache Kafka就不会出现性能瓶颈问题。

4-5-1、基本使用

我们首先使用Kafka Client For JAVA API为各位读者演示一下最简单的Kafka消费者端的使用。以下示例代码可以和上文中所给出的生产者代码相对应,形成一个完整的消息创建——接收——发送过程:

    package kafkaTQ;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    
    /**
     * 这是Kafka的topic消费者
     * @author yinwenjie
     */
    public class KafkaConsumer_GroupOne {
        public static void main(String[] args) throws RuntimeException {
            // ==============首先各种连接属性
            // Kafka消费者的完整连接属性在Apache Kafka官网http://kafka.apache.org/documentation.html#consumerconfigs
            // 有详细介绍(请参看Old Consumer Configs。New Consumer Configs是给Kafka V0.9.0.0+使用的)
            // 这里我们设置几个关键属性
            Properties props = new Properties();
            // zookeeper相关的,如果有多个zk节点,这里以“,”进行分割
            props.put("zookeeper.connect", "192.168.61.140:2181");
            props.put("zookeeper.connection.timeout.ms", "10000");
            // 还记得上文的说明吗:对于一个topic而言,同一用户组内的所有用户只被允许访问一个分区。
            // 所以要让多个Consumer实现对一个topic的负载均衡,每个groupid的名称都要一样
            String groupname = "group2";
            props.put("group.id", groupname);
    
            //==============
            ConsumerConfig consumerConfig = new ConsumerConfig(props);
            ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    
            // 我们只创建一个消费者
            HashMap<String, Integer> map = new HashMap<String, Integer>();
            map.put("my_topic2", 1);
            Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map);
    
            // 获取并启动消费线程,注意看关键就在这里,一个消费线程可以负责消费一个topic中的多个partition
            // 但是一个partition只能分配到一个消费线程去
            KafkaStream<byte[], byte[]> stream = topicMessageStreams.get("my_topic2").get(0);
            new Thread(new ConsumerThread(stream)).start();
    
            // 接着锁住主线程,让其不退出
            synchronized (KafkaConsumer_GroupTwo.class) {
                try {
                    KafkaConsumer_GroupTwo.class.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace(System.out);
                }
            }
        }
    
        /**
         * @author yinwenjie
         */
        private static class ConsumerThread implements Runnable {
    
            private KafkaStream<byte[], byte[]> stream;
    
            /**
             * @param stream
             */
            public ConsumerThread(KafkaStream<byte[], byte[]> stream) {
                this.stream = stream;
            }
    
            @Override
            public void run() {
                ConsumerIterator<byte[], byte[]> iterator =  this.stream.iterator();
                //============这个消费者获取的数据在这里
                while(iterator.hasNext()){  
                    MessageAndMetadata<byte[], byte[]> message = iterator.next();
                    int partition = message.partition();
                    String topic = message.topic();
                    String messageT = new String(message.message());
                    System.out.println("接收到: " + messageT + "来自于topic:[" + topic + "] + 第partition[" + partition + "]"); 
                }
            }
        }
    }

以上代码片段有几个关键点需要进行一下说明:

  • “map.put(“my_topic2”, 1);” 这句代码表示将会为名叫“my_topic2”的队列创建数量为1的消费者。在一个进程的连接中,您可以指定创建多个topic的消费者数量。例如:
    ......
    # 为my_topic2的队列创建数量为1的消费者
    # 并且为my_topic3的队列创建数量为4的消费者
    map.put("my_topic2", 1);
    map.put("my_topic3", 4);
    ......
  • 每一个消费者都需要一个独立的线程进行工作。您可以将工作任务放入已经创建好的线程池(推荐这样做),也可以像以上代码示例中那样创建一个线程并运行任务。
    ......
    # 使用线程池
    # 这里的参数就是消费者的总数量
    ExecutorService threadPool = Executors.newFixedThreadPool(1);
    threadPool.execute(new ConsumerThread(stream));
    ......
  • 在开发过程中,消费者端无需知道任何一个Broker的位置。但是必须至少知道一个zookeeper服务节点的位置。通过这个位置,消费者端首先会去zookeeper服务上查找指定的topic的分区情况和已有的消费者情况。

4-5-2、分区与消费者负载

Apache Kafka集群中的消费者以线程为单位 ,如在上一小节代码段所示:我们在一个进程中,为Topic为“my_topic2”的队列创建了一个线程,这个线程就是一个消费者——属于名为“group2”的用户组。这时,Topic中所有分区的消息都会交给这个消费线程进行消费。如下图所示:

202307292155102211.png

虽然一个消费者可以同时消费Topic中多个分区(Partition)的消息,但在生产环境下为了获得更优的消费性能并不建议这样做。由于单个消费者线程的处理能力是有限的,一旦出现数据洪峰,消息就会堆积在Broker端无法被处理(如果消费者端使用了线程池,则可能堆积在消费者端,这要看您怎么编写代码)。所以上一个小节那样的消费者编码方式,最多就是用来做做“Hello World”那样的示例,没有更多的使用价值了。

4-5-3、优化 一:

第一种改进方法,就是让 一个消费者只消费一个分区(Partition)中的消息,且整个系统中的消费者大于等于Topic中的分区数量 。设计方案如下:

202307292155116302.png

如上图所示,这个Topic下一共有四个分区(Partition),对应的消费者数量也有四个,但是这四个消费者同属于一个进程,存在于同一个物理节点上。我们根据这个设计方案,更改之前消费者端的代码,如下(为了节约篇幅,只给出主要的更改位置):

    ......
    // 后续创建的所有消费者线程,都是属于group2的消费组
    String groupname = "group2";
    props.put("group.id", groupname);
    
    ......
    // 在这个进程中,为topic名为my_topic2的队列创建了四个消费者
    HashMap<String, Integer> map = new HashMap<String, Integer>();
    map.put("my_topic2", 4);
    Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map);
    
    ......
    // 为这四个消费者分配四个不同的线程
    // 消费者线程1
    KafkaStream<byte[], byte[]> stream = topicMessageStreams.get("my_topic2").get(0);
    new Thread(new ConsumerThread(stream)).start();
    
    // 消费者线程2
    stream = topicMessageStreams.get("my_topic2").get(1);
    new Thread(new ConsumerThread(stream)).start();
    
    // 消费者线程3
    stream = topicMessageStreams.get("my_topic2").get(2);
    new Thread(new ConsumerThread(stream)).start();
    
    // 消费者线程4
    stream = topicMessageStreams.get("my_topic2").get(3);
    new Thread(new ConsumerThread(stream)).start();
    
    ......
    // 接着锁住主线程,让其不退出
    synchronized (KafkaConsumer_GroupTwo.class) {
        try {
            KafkaConsumer_GroupTwo.class.wait();
        } catch (InterruptedException e) {
            e.printStackTrace(System.out);
        }
    }
    ......

4-5-4、优化 二:

显然“优化方案一”中的做法虽然实现了4消费者分别对应4个分区的负载均衡方案,但是受限于单个物理节点的处理性能,所以这种方案的处理性能还有进一步优化的可能。我们可以在多个节点物理节点上均匀散步这些消费者,对Topic分区中的消息进行一一对应的消费,如下图所示:

202307292155143603.png

上图所示的设计思路中,我们使用了2个物理节点完成消息的消费任务,每个服务节点上开启的消费进程上有两个消费者线程。这样Topic中4个分区的消息就会被均匀分布到2个物理节点中,且每一个物理节点处理两个分区中的消息。

注意:可能您在分别启动这些消费进程的时候,由于时间上存在差异,某一台服务节点上的消费进程将暂时被分配多个分区进行消息接收。但没有关系,当这个消费者性能到达瓶颈,分区中的消息出现拥堵的时候,这个分区就会被新的消费者所代替,直到10个消费者线程分别和10个分区建立一一对应关系为止


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文