上一章,我讲解了Kafka发送请求时,对拆包问题的处理流程。Kafka客户端会与每个Broker都建立一个TCP长连接,每一个请求都会发送完后才会发送下一个请求,所以一般不存在粘包问题。
但是,当Kafka客户端读取Broker的响应时就不一样了,存在两种情况:
- 读取的响应中,包含了多个请求的完整响应,也就是出现了 粘包 ;
- 读取的响应中,只包含了一个请求的部分响应,此时就需要多次读取,将多次读取的结果进行合并,也就是出现了 拆包 。
本章,我先来讲解Kafka客户端对 OP_READ 事件的处理流程,然后对读响应的拆包和粘包问题进行分析。
一、读响应
我们先来回顾下Kafka客户端的底层是如何读取响应数据的。本质还是通过Selector.poll()
方法,这块大家应该已经很熟悉了:
// Selector.java
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives; // 按照Broker维度缓存响应请求
private final List<NetworkReceive> completedReceives; // 保存每个Broker的最近一个响应请求
public void poll(long timeout) throws IOException {
//...
// 1.遍历SelectionKey进行处理
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}
// 2.将最近一个读取完成的响应,添加到响应列表completedReceives
addToCompletedReceives();
}
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
// ...
// Channel中有OP_READ事件发生,且不存在已经读取完毕的响应
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive; //用于暂存读取的数据
// 循环从Channel读取数据,networkReceive!=null,表示一个完整响应读取完成
while ((networkReceive = channel.read()) != null)
// 将读取到的数据添加到“接受队列”中
addToStagedReceives(channel, networkReceive);
}
}
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
// 不存在则首次创建
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
// 将响应添加到队列
Deque<NetworkReceive> deque = stagedReceives.get(channel);
deque.add(receive);
}
private void addToCompletedReceives() {
if (!this.stagedReceives.isEmpty()) {
Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
KafkaChannel channel = entry.getKey();
if (!channel.isMute()) {
Deque<NetworkReceive> deque = entry.getValue();
addToCompletedReceives(channel, deque);
if (deque.isEmpty())
iter.remove();
}
}
}
}
private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
// 获取最近一个读取完成的响应
NetworkReceive networkReceive = stagedDeque.poll();
this.completedReceives.add(networkReceive);
this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
}
1.1 读取流程
Selector.poll主流程中的KafkaChannel.read()
方法,它从底层的SocketChannel中读取字节并封装成一个NetworkReceive对象,缓存在KafkaChannel的receive字段中,代表当前正在读取的响应。如果一个响应全部读取完毕,就返回结果并清除receive缓存,这个逻辑是上一章讲解的发送请求的逻辑是类似的,主要是因为存在拆包的情况:
// KafkaChannel.java
private NetworkReceive receive; //缓存当前在读的响应对象
public NetworkReceive read() throws IOException {
NetworkReceive result = null;
// 初始化NetworkReceive,maxReceiveSize就是一次最大能接受的字节,id标识一个Broker
if (receive == null) {
receive = new NetworkReceive(maxReceiveSize, id);
}
// 读取字节
receive(receive);
// 如果一个响应全部读取完毕,就返回结果并清除receive缓存
if (receive.complete()) {
receive.payload().rewind(); //消息体的position置为0
result = receive;
receive = null; //清除请求缓存
}
return result;
}
// 调用NetworkReceive的方法,从底层的SocketChannel读取字节
private long receive(NetworkReceive receive) throws IOException {
return receive.readFrom(transportLayer);
}
从上面的代码可以看到,真正读取字节的行为是调用了NetworkReceive.readFrom()
方法。NetworkReceive是什么呢?我们接着看。
1.2 NetworkReceive
NetworkReceive代表了一个完整的响应请求,它包含了一个4字节的消息头,以及一个消息体,而消息头的内容就是消息体的大小 。它对应了我们上一章讲解的发送请求的NetworkSend
对象:
// NetworkReceive.java
public class NetworkReceive implements Receive {
public final static String UNKNOWN_SOURCE = "";
public final static int UNLIMITED = -1;
// Broker ID
private final String source;
// 消息头Buffer
private final ByteBuffer size;
// 最大消息体大小
private final int maxSize;
// 消息体Buffer
private ByteBuffer buffer;
public NetworkReceive(int maxSize, String source) {
this.source = source;
this.size = ByteBuffer.allocate(4);
this.buffer = null;
this.maxSize = maxSize;
}
@Override
public boolean complete() {
// 消息头写满4字节且消息体写满,才算读取完一个完整请求
return !size.hasRemaining() && !buffer.hasRemaining();
}
public long readFrom(ScatteringByteChannel channel) throws IOException {
return readFromReadableChannel(channel);
}
public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
int read = 0;
if (size.hasRemaining()) { //如果消息头Buffer有空余
// 读取字节到消息头Buffer,如果出现拆包,可能不足4字节
int bytesRead = channel.read(size);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
if (!size.hasRemaining()) { //如果消息头Buffer没有空余(已读满)
// 计算消息体的大小
size.rewind();
int receiveSize = size.getInt();
if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
// 为消息体分配Buffer
this.buffer = ByteBuffer.allocate(receiveSize);
}
}
if (buffer != null) {
// 读取消息体
int bytesRead = channel.read(buffer);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
}
return read;
}
// 返回消息体
public ByteBuffer payload() {
return this.buffer;
}
}
上述这块代码完美解决了拆包和粘包的问题。
我们先来看第一种“粘包”的情况,也就是说响应中包含了多个不同请求的响应:
- Channel一次性读取满数据到消息头;
- 计算消息头的数值,得到消息体的大小size;
- 一次性读取size大小的消息体;
- 这样一个完整的响应就读完了,
Selector.pollSelectionKeys()
中的While循环就会退出,响应也会被添加到completedReceives列表中。
我们再来看另一种“拆包”的情况,也就是说一次读取并没有完整读取到一个请求,可能只读取了部分消息头,或者部分消息体:
- Channel尝试一次性读满消息头Buffer,但是很遗憾没读满,因为出现了粘包;
Selector.pollSelectionKeys()
中的While循环不会退出,继续下一轮;- KafkaChannel中缓存了当前正在读取的响应对象receive,继续调用该对象的
readFromReadableChannel
方法,完成响应的读取; - 全部读取完成后,
Selector.pollSelectionKeys()
中的While循环就会退出,响应也会被添加到completedReceives列表中。
二、处理响应
通过NetworkReceive完成响应的读取之后,Selector会将每一个Broker的多个响应都缓存到一个Map中,然后遍历这个Map,将 每个Broker的最近一个响应请求添加到completedReceives队列 。
2.1 缓存最近响应
缓存的最近响应列表存放在Selector的completedReceives字段中:
// Selector.java
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives; // 按照Broker维度缓存响应请求
private final List<NetworkReceive> completedReceives; // 保存每个Broker的最近一个响应请求
public void poll(long timeout) throws IOException {
//...
// 1.遍历SelectionKey进行处理
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}
// 2.将最近一个读取完成的响应,添加到响应列表completedReceives
addToCompletedReceives();
}
private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
// 获取最近一个读取完成的响应
NetworkReceive networkReceive = stagedDeque.poll();
this.completedReceives.add(networkReceive);
this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
}
2.2 解析响应
现在所有读取完毕的最近响应都已经存放到completedReceives
列表中了,我们来看下Kafka客户端是如何处理这些响应的:
// NetworkClient.java
public List<ClientResponse> poll(long timeout, long now) {
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// 处理响应
handleCompletedReceives(responses, updatedNow);
// 触发回调函数
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
return responses;
}
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
// 遍历响应对象
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source(); //Broker ID
// 1.获取针对该Broker缓存的最早(oldest)发送的请求
InFlightRequest req = inFlightRequests.completeNext(source);
// 2.解析响应对象
AbstractResponse body = parseResponse(receive.payload(), req.header);
if (req.isInternalRequest && body instanceof MetadataResponse) // 元数据更新响应
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
else if (req.isInternalRequest && body instanceof ApiVersionsResponse) // API版本响应
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
else
// 3.添加到
responses.add(req.completed(body, now));
}
}
public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
// 解析响应内容头
ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
short apiKey = requestHeader.apiKey();
short apiVer = requestHeader.apiVersion();
// 解析响应内容体
Struct responseBody = ProtoUtils.responseSchema(apiKey, apiVer).read(responseBuffer);
// 匹配发送请求和响应
correlate(requestHeader, responseHeader);
// 创建一个ClientResponse对象并返回
return AbstractResponse.getResponse(apiKey, responseBody);
}
// 判断请求与响应是否配对
private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
// correlation_id,是全局唯一的,用来标识一次请求的
if (requestHeader.correlationId() != responseHeader.correlationId())
throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
+ ") does not match request (" + requestHeader.correlationId() + "), request header: " + requestHeader);
}
上述handleCompletedReceives方法的处理逻辑如下:
- 首先,遍历响应对象列表completedReceives,按照下面的步骤处理每一个响应;
- 找到该响应来自哪个Broker,从InFlightRequest移除针对该Broker缓存的 最早(oldest)发送的请求 ;
- 解析响应内容,对响应头中的correlationId和发送请求头中的correlationId进行匹配,如果不能配对则抛出异常;
- 根据解析的响应内容,创建一个ClientResponse对象并返回。
这里关键要理解一点: 正常情况下InFlightRequest中缓存的最早(oldest)发送的请求与该Broker的最新一个响应是配对的 。
2.3 处理回调
解析完响应之后,会遍历ClientResponse列表,调用回调处理器RequestCompletionHandler
进行处理:
// NetworkClient.java
public List<ClientResponse> poll(long timeout, long now) {
//...
// 调用回调处理器
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
return responses;
}
ClientResponse里面包含了请求头信息和响应内容:
public class ClientResponse {
private final RequestHeader requestHeader; // 请求头
private final RequestCompletionHandler callback; // 回调处理
private final String destination; // Broker ID
private final long receivedTimeMs;
private final long latencyMs;
private final boolean disconnected;
private final RuntimeException versionMismatch;
private final AbstractResponse responseBody; // 响应内容
public void onComplete() {
if (callback != null)
callback.onComplete(this);
}
}
RequestCompletionHandler是在Sender线程内部创建的,我们来看下RequestCompletionHandler.onComplete()
方法,其实就是遍历响应对象,然后关联到发送请求时的RecordBatch,最后触发我们自定义的回调函数:
// Sender.java
private void sendProduceRequest(long now, int destination, short acks,
int timeout, List<RecordBatch> batches) {
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
}
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches,
long now) {
// 根据响应的不同状态进行处理
if (response.wasDisconnected()) {
//...
} else if (response.versionMismatch() != null) {
//...
} else {
// 正常响应
if (response.hasResponse()) {
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
// 遍历响应对象
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry :
produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey(); // 分区
ProduceResponse.PartitionResponse partResp = entry.getValue();
// batches是在发送请求的时候组装的,可根据分区匹配到RecordBatch
RecordBatch batch = batches.get(tp);
// 处理
completeBatch(batch, partResp, correlationId, now);
}
}
// 不需要响应的情况(acks==0)
else {
for (RecordBatch batch : batches.values()) {
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
}
}
}
}
private void completeBatch(RecordBatch batch, ProduceResponse.PartitionResponse response,
long correlationId, long now) {
Errors error = response.error;
// 1.异常情况,且可以重试
if (error != Errors.NONE && canRetry(batch, error)) {
// 重试Batch需要重新入缓冲区(放入队列头),后面重新发送
this.accumulator.reenqueue(batch, now);
}
else {
RuntimeException exception;
if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
exception = new TopicAuthorizationException(batch.topicPartition.topic());
else
exception = error.exception();
// 2.正常情况,完成回调处理
batch.done(response.baseOffset, response.logAppendTime, exception);
// 释放缓冲区对应Buffer
this.accumulator.deallocate(batch);
if (error != Errors.NONE)
this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
}
// 3.元数据异常
if (error.exception() instanceof InvalidMetadataException) {
if (error.exception() instanceof UnknownTopicOrPartitionException)
// 需要重新更新元数据
metadata.requestUpdate();
}
if (guaranteeMessageOrder)
this.accumulator.unmutePartition(batch.topicPartition);
}
最后,来看RecordBatch.done()
,其实就是设置一些字段值,然后触发我们自定义的回调函数:
// RecordBatch.java
public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
topicPartition, baseOffset, exception);
if (completed.getAndSet(true))
throw new IllegalStateException("Batch has already been completed");
// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
produceFuture.set(baseOffset, logAppendTime, exception);
// 执行回调函数
for (Thunk thunk : thunks) {
try {
// 1.正常请求
if (exception == null) {
// 响应元数据
RecordMetadata metadata = thunk.future.value();
// 执行回调方法
thunk.callback.onCompletion(metadata, null);
}
// 2.存在异常(可能是连续重试都失败)
else {
thunk.callback.onCompletion(null, exception);
}
} catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
}
}
produceFuture.done();
}
三、总结
本章,我对Kafka客户端处理响应的流程和底层原理进行了详细分析,读者需要关注的重点是Kafka客户端是如何对响应的拆包和粘包问题进行处理的,以及解析完响应内容后的后续处理流程,包含回调、异常、缓冲区处理等等。
Kafka客户端的KafkaChannel缓存了最近读取响应的NetworkReceive对象,只要一个完整的请求没有全部读取完,就不会移除对该响应的缓存。