上一章讲到,Sender线程将请求封装成ClientRequest,然后缓存到 InFlightRequests 和 KafkaChannel 中,最后关注KafkaChannel上的OP_WRITE
事件。
本章,我们就看看Sender线程Loop执行的最后一步:
// Sender.java
this.client.poll(pollTimeout, now);
在之前的章节,我其实已经讲解过NetworkClient.poll()
方法的底层原理,当时主要关注的是连接建立(OP_CONNECT)。本章,我重点讲解Slector是如何对Channel上的读写事件(OP_READ/OP_WRITE)进行处理的。
一、监听事件
Kafka客户端的底层对Java NIO进行了封装,但是万变不离其宗。Java NIO的Selector通过一个线程来管理多个Socket,我们可以将多个Channel注册到一个Selector上(一个Channel代表了一个Socket,在Java NIO中就是SocketChannel),并设置其感兴趣的事件。
这样一来,在Selector.select()
操作时,若发现Channel中有我们感兴趣的事件发生,Selector就会将其记录下来(即SelectedKey),然后我们就可以对事件进行相应的处理了。
一个SelectionKey是与一个KafkaChannel关联的,Kafka Selector会监听以下事件的发生:
- OP_CONNECT: 连接就绪事件;
- OP_WRITE: 写操作就绪事件,即客户端可以将数据写到缓冲区;
- OP_READ: 读操作就绪事件,即客户端可以从缓冲区读取数据。
// NetworkClient.java
public List<ClientResponse> poll(long timeout, long now) {
//...
try {
// 调用org.apache.kafka.common.network.Selector的poll方法轮询Channel
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
//...
}
二、Selector轮询
我们来看Selector的poll方法:
// Selector.java
public void poll(long timeout) throws IOException {
//...
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}
//...
}
核心就是pollSelectionKeys
,Selector会轮询一批SelectionKey,如果有关注的事件发生了就执行:
// Selector.java
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected,
long currentTimeNanos) {
// 遍历SelectionKey
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
try {
if (isImmediatelyConnected || key.isConnectable()) {
// 1.建立连接 OP_CONNECT -> OP_READ
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
} else
continue;
}
// 2.读取数据 OP_READ -> OP_WRITE
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
// 3.写数据
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
//...
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel, true);
}
}
}
2.1 OP_CONNECT
OP_CONNECT事件,是客户端首次尝试与Broker建立连接时设置关注的:
最终由Selector.connect()
方法完成对OP_CONNECT
事件的监听:
// Selector.java
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
//...
// 在SocketChannel上注册对OP_CONNECT事件的监听
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
//...
}
注册完监听之后,Sender线程在后续的Loop执行中,会调用到Seletor.poll
方法,最终通过KafkaChannel.finishConnect()
方法完成连接的建立:
// Seletor.java
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
//...
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
} else
continue;
}
// KafkaChannel.java
public boolean finishConnect() throws IOException {
return transportLayer.finishConnect();
}
// SslTransportLayer.java
public boolean finishConnect() throws IOException {
// 调用SocketChannel的finishConnect方法,完成连接建立
boolean connected = socketChannel.finishConnect();
if (connected)
// 建立成功后,取消对OP_CONNECT事件的关注,增加对OP_READ事件的关注
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
return connected;
}
上述代码,在连接建立完成后,会取消对OP_CONNECT事件的关注,增加对OP_READ的关注。
整个建立连接的调用链路如下图:
2.2 OP_READ
OP_READ事件,是客户端与Broker建立完成连接后设置关注的:
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
注册完监听之后,Sender线程在后续的Loop执行中,会调用到Seletor.pollSelectionKeys
方法,当出现OP_READ事件时,就会通过KafkaChannel.read()
方法读取数据:
// Selector.java
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
//...
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
// 读取数据
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
//...
}
2.3 OP_WRITE
OP_WRITE事件,是客户端在发送消息的过程中设置关注的:
最终会有KafkaChannel将要发送的请求进行缓存,同时增加对这个Channel上的OP_WRITE事件的监听:
// KafkaChannel.java
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send; //缓存在KafkaChannel内部
// 增加对OP_WRITE事件的监听
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
注册完监听之后,Sender线程在后续的Loop执行中,会调用到Seletor.pollSelectionKeys
方法,当出现OP_WRITE事件时,就会通过KafkaChannel.write()
方法写数据:
// Selector.java
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
//...
if (channel.ready() && key.isWritable()) {
// 写数据
Send send = channel.write();
if (send != null) {
// 将发送出去的请求添加到完成列表CompletedSends中
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
//...
}
最后,还要注意一点,一旦写完消息之后,就会取消OP_WRITE事件的监听,也就是不再关注这个写请求的事件了,此时仅仅保留关注OP_READ事件,只有当下一次有写请求缓存时,才会重新关注:
// KafkaChannel.java
private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
// 写成功后,取消到OP_WRITE的关注
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}
三、请求后续处理
当请求发送完成后,整个流程并没有结束,因为客户端还没有收到Broker的响应。客户端还会做两件事:
- 对响应进行处理;
- 对回调进行处理
3.1 回调处理
Selector.poll()
写完数据后,会将最近发送的一个请求添加到完成列表CompletedSends
中:
public class Selector implements Selectable {
private final List<Send> completedSends;
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
//...
if (channel.ready() && key.isWritable()) {
// 写消息(发送请求)
Send send = channel.write();
if (send != null) {
// 添加到“完成列表”
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
}
}
接下来,NetworkClient会对已经发送的请求进行处理,如果请求不需要响应,直接删除inFlightRequests缓存的最近一个等待响应的请求,然后执行回调函数ClientResponse.onComplete()
,callback就是RequestCompletionHandler
,也就是我们发送消息时自己指定的回调函数:
// NetworkClient.java
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
//...
// 处理已经发送的请求
handleCompletedSends(responses, updatedNow);
//...
// 执行回调函数
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
return responses;
}
private void handleCompletedSends(List<ClientResponse> responses, long now) {
// 遍历已经发送的请求
for (Send send : this.selector.completedSends()) {
// 往对应的Broker发送的最近一个请求
InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
// 不期待响应,说明acks==0
if (!request.expectResponse) {
// 删除inFlightRequests中最近发送的请求
this.inFlightRequests.completeLastSent(send.destination());
// 添加到响应列表
responses.add(request.completed(null, now));
}
}
}
// ClientResponse.java
public void onComplete() {
if (callback != null)
callback.onComplete(this);
}
默认定义的回调函数地方是在Sender线程里面:
// Sender.java
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
//...
// 创建回调函数
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
// 注意这里,acks != 0就是expectResponse的值
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
处理响应:
// Sender.java
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
//...
if (response.hasResponse()) {
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
RecordBatch batch = batches.get(tp);
// 处理每个批次消息的响应
completeBatch(batch, partResp, correlationId, now);
}
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
this.sensors.recordThrottleTime(produceResponse.getThrottleTime());
} else {
// this is the acks = 0 case, just complete all requests
for (RecordBatch batch : batches.values()) {
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
}
}
}
后面对响应的具体处理我就不赘述了,读者可以自己顺着思路去看源码。
四、总结
本章,我对Sender线程Loop循环处理的最后一步——Selector轮询处理进行了讲解,这也是最核心的一步。Kafka客户端对于请求的处理,完全是NIO模式,Sender线程对请求的异步处理机制,也非常值得我们借鉴学习。
到本章为止,Kafka客户端的消息发送的整体流程我就已经全部分析完了,底层NIO组件的工作机制也已经初步做了分析。从下一章开始,我将深入讲解Kafka底层NIO通讯组件的工作原理,包含对“粘包”和”拆包“的处理、异常处理等等。