KafkaProducer发送消息的过程中可能会出现 消息超时 的问题。本章,我会从Kafka客户端的底层对该问题进行讲解。
一、超时场景
我们先来看下,哪些情况下会出现超时问题:
- RecordBatch长时间停留在 BufferPool 缓冲区中,压根没有被Sender线程获取;
- Sender线程将消息发送出去了,但是一直没有收到响应,NetworkSend请求长时间积压在 InFlightRequests 中。
1.1 BufferPool超时
我们先来看第一种情况。Sender线程的运行主流程中有这么一行代码:
// Sender.java
void run(long now) {
//...
// 5.剔除超时的batch(默认60s)
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
}
我们来看RecordAccumulator的abortExpiredBatches
方法,它的处理逻辑如下:
// RecordAccumulator.java
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) {
List<RecordBatch> expiredBatches = new ArrayList<>();
int count = 0;
// 1.遍历查找超时RecordBatch
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
Deque<RecordBatch> dq = entry.getValue();
TopicPartition tp = entry.getKey();
if (!muted.contains(tp)) {
synchronized (dq) {
RecordBatch lastBatch = dq.peekLast();
Iterator<RecordBatch> batchIterator = dq.iterator();
while (batchIterator.hasNext()) {
RecordBatch batch = batchIterator.next();
boolean isFull = batch != lastBatch || batch.isFull();
// 判断是否超时
if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) {
expiredBatches.add(batch);
count++;
batchIterator.remove(); //移除
} else {
break;
}
}
}
}
}
// 2.触发回调函数
if (!expiredBatches.isEmpty()) {
log.trace("Expired {} batches in accumulator", count);
for (RecordBatch batch : expiredBatches) {
// 回调
batch.expirationDone();
// 回收分配的Buffer
deallocate(batch);
}
}
return expiredBatches;
}
RecordBatch的expirationDone
方法最终会调用内部的done
方法,也就是触发回调函数的执行:
// RecordBatch.java
void expirationDone() {
if (expiryErrorMessage == null)
throw new IllegalStateException("Batch has not expired");
this.done(-1L, Record.NO_TIMESTAMP,
new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage));
}
public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
//...
// execute callbacks
for (Thunk thunk : thunks) {
try {
if (exception == null) {
RecordMetadata metadata = thunk.future.value();
thunk.callback.onCompletion(metadata, null);
} else {
thunk.callback.onCompletion(null, exception);
}
} catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
}
}
produceFuture.done();
}
1.2 InFlightRequests超时
再来看另一种超时的场景,Sender线程的主流程中调用了NetworkClient的poll
方法:
// Sender.java
void run(long now) {
//...
this.client.poll(pollTimeout, now);
}
NetworkClient的poll
方法内部有一段超时逻辑的判断,也就是说如果发现有对Broker的请求超时了,即超过request.timeout.ms
(默认60s)还没响应,此时会关闭掉跟那个Broker的连接,认为那个Broker已经故障了 。同时,进行内存数据结构的清理,并再次标记为需要去重新拉取元数据:
// NetworkClient.java
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
//处理超时请求
handleTimedOutRequests(responses, updatedNow);
return responses;
}
private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
// 获取超时的目标Broker
List<String> nodeIds = this.inFlightRequests
.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
for (String nodeId : nodeIds) {
// 关闭与该Broker的连接
this.selector.close(nodeId);
processDisconnection(responses, nodeId, now);
}
// 标记更新元数据
if (!nodeIds.isEmpty())
metadataUpdater.requestUpdate();
}
private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
connectionStates.disconnected(nodeId, now);
nodeApiVersions.remove(nodeId);
nodesNeedingApiVersionsFetch.remove(nodeId);
// 清理InFlightRequests中缓存的针对该Broker的请求
for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id)
metadataUpdater.handleDisconnection(request.destination);
else
responses.add(request.disconnected(now));
}
}
超时逻辑判断:
// InFlightRequests.java
public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) {
List<String> nodeIds = new LinkedList<>();
for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>> requestEntry : requests.entrySet()) {
String nodeId = requestEntry.getKey();
Deque<NetworkClient.InFlightRequest> deque = requestEntry.getValue();
if (!deque.isEmpty()) {
NetworkClient.InFlightRequest request = deque.peekLast();
// 当前事件-请求发送事件超过了`request.timeout.ms`
long timeSinceSend = now - request.sendTimeMs;
if (timeSinceSend > requestTimeout)
nodeIds.add(nodeId);
}
}
return nodeIds;
}
二、总结
本章,我对KafkaProducer发送消息的过程中可能会出现的 消息超时 问题进行讲解,整体分为两种情况:
- 请求积压在BufferPool;
- 请求积压在InFlightRequests。
无论哪种情况,请求超时的判断逻辑中都涉及参数request.timeout.ms
,默认超时时间为60s。同时,超时后最终会触发回调函数的执行。