悬赏贴:求在Kafka消费因为网络延时,产生一个日志信息
问题描述:
Kafka因为网络延时或者中断时,将该条信息打印到日志中,就想知道该条日志打印的代码是加在哪里,dowork方法中嘛,kafkaConsumer.poll(timeOut),这条应该是去拉数据的请求吧,不知道是不是加在这附近
答
日志级别,日志配置文件
答
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}
do {
client.maybeTriggerWakeup();
if (includeMetadataInTimeout) {
// try to update assignment metadata BUT do not need to block on the timer for join group
updateAssignmentMetadataIfNeeded(timer, false);
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
log.warn("Still waiting for metadata");
}
}
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
release();
this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
}
}
1、kafka源码中,如果传入的时间不为0,那么就循环等待超时后,返回空,可以在return ConsumerRecords.empty();之前打日志。