2023-07-31
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/290

本章,我将对网络层中的最后两个组件 RequestChannelRequestHandler 进行讲解。严格来说, RequestChannelRequestHandler 并不属于网络层(Network Layer),当然,它们也不属于API层(API Layer)。

我将RequestChannel和RequestHandler定位为Kafka Server端 网络层与API层之间交互的中间件 。本章,我会分析RequestChannel和RequestHandler的整体架构,以及它们对请求/响应的底层处理细节。

一、RequestChannel

首先,我们来看RequestChannel。RequestChannel会缓存Processor线程发送过来的Request请求,同时也接受API层的Response响应。它的内部有两个重要的队列:

  • 一个请求缓存队列requestQueue
  • N个响应缓存队列responseQueues

requestQueue本质是一个ArrayBlockingQueue, 所有Processor线程共享一个 ,每个Processor线程都会将请求封装成Request对象并入到这个队列里面。

responseQueue本质是一个LinkedBlockingQueue, 每个Processor线程独立拥有一个 ,API层会将响应封装成Response对象并入队。

1.1 内部结构

整体结构大致如下图:

202307312123316611.png

接受Processor线程发送过来的Request请求:

    // RequestChannel.scala
    
    def sendRequest(request: RequestChannel.Request) {
      requestQueue.put(request)
    }

获取Response对象,返回给Processor线程处理:

    // RequestChannel.scala
    
    def receiveResponse(processor: Int): RequestChannel.Response = {
      val response = responseQueues(processor).poll()
      if (response != null)
        response.request.responseDequeueTimeMs = Time.SYSTEM.milliseconds
      response
    }

接受API层的响应结果,并添加到响应队列中:

    // RequestChannel.scala
    
    def sendResponse(response: RequestChannel.Response) {
      responseQueues(response.processor).put(response)
      for(onResponse <- responseListeners)
        onResponse(response.processor)
    }

二、RequestHandler

我们再来看另一个组件—— RequestHandler ,它是在Kafka Server启动时创建的,内部委托了对API层的调用:

    // KafkaServer.scala
    
    var apis: KafkaApis = null
    var requestHandlerPool: KafkaRequestHandlerPool = null
    def startup() {
        try {
            //...
            if (canStartup) {
                // 14.启动请求处理线程池
                apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager,
                                     groupCoordinator,kafkaController, zkUtils, config.brokerId,
                                     config, metadataCache, metrics, authorizer, quotaManagers,
                                     clusterId, time)
    
                requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,
                                                                 socketServer.requestChannel,
                                                                 apis, time, config.numIoThreads)
            }
        }
        catch {
            case e: Throwable =>
            fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
            isStartingUp.set(false)
            shutdown()
            throw e
        }
    }

整个处理流程可以用下面这张图表述:

202307312123338842.png

2.1 KafkaRequestHandlerPool

我们来看下KafkaRequestHandlerPool的内部,它封装了一堆线程,可以看成是一个线程池,这些线程启动后,就不断执行 KafkaRequestHandler 任务:

    // KafkaRequestHandlerPool.scala
    
    class KafkaRequestHandlerPool(val brokerId: Int,
                                  val requestChannel: RequestChannel,
                                  val apis: KafkaApis,
                                  time: Time,
                                  numThreads: Int) extends Logging with KafkaMetricsGroup {
      // 工作线程,默认8个
      val threads = new Array[Thread](numThreads)
      // KafkaRequestHandler任务
      val runnables = new Array[KafkaRequestHandler](numThreads)
      for(i <- 0 until numThreads) {
        runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
        threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
        // 启动任务
        threads(i).start()
      }
    }

2.2 KafkaRequestHandler

KafkaRequestHandler本质是一个Runnable任务,它会不断从RequestChannel的请求队列中获取Request对象,然后交给Kafak API层进行处理:

    // KafkaRequestHandler.scala
    
    def run() {
      while(true) {
        try {
          var req : RequestChannel.Request = null
          while (req == null) {
            val startSelectTime = time.nanoseconds
            // 从RequestChannel的请求队列中获取Request
            req = requestChannel.receiveRequest(300)
          }
          //...
          // 将请求交给API层处理
          apis.handle(req)
        } catch {
          case e: Throwable => error("Exception when handling request", e)
        }
      }
    }

这里我额外展开下,我们看下Kafka API层的内部,其实就是根据Request请求的不同类型进行处理:

    // KafkaApis.scala
    
    def handle(request: RequestChannel.Request) {
      try {
        trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
          format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
        // 根据Request请求的不同类型进行处理
        ApiKeys.forId(request.requestId) match {
          case ApiKeys.PRODUCE => handleProducerRequest(request)
          case ApiKeys.FETCH => handleFetchRequest(request)
          case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
          case ApiKeys.METADATA => handleTopicMetadataRequest(request)
          case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
          case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
          case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
          case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
          case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
          case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
          case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
          case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
          case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
          case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
          case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
          case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
          case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
          case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
          case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
          case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
          case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
          case requestId => throw new KafkaException("Unknown api code " + requestId)
        }
      } catch {
        //...
      } finally
        request.apiLocalCompleteTimeMs = time.milliseconds
    }

我们重点看下handleProducerRequest,它对普通Producer生产者的请求进行处理,处理完成后会触发以下回调函数。可以看到, 最终处理完的响应结果会被封装程Response对象,交给RequestChannel处理:

    def produceResponseCallback(delayTimeMs: Int) {
      // 如果该请求不需要ACK确认
      if (produceRequest.acks == 0) {
        //...
      } 
      // 如果该请求需要ACK确认
      else {
      val respBody = request.header.apiVersion match {
         case 0 => new ProduceResponse(mergedResponseStatus.asJava)
         case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
         case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.")
      }
       // 将响应入队到RequestChannel
       requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
      }
    }

三、总结

本章,我对网络层的最后两个组件RequestChannel和RequestHandler进行了讲解。它们的整体设计思路还是比较清晰简单的,本质就是利用不用的内存队列和线程池对请求/响应进行处理,提升整体吞吐量,这种思路在实际的生产应用中运用非常多,我在《分布式系统从理论到实战系列》专栏的实战篇中也讲解过这类运用,感兴趣的读者可以去看一看。

阅读全文