悬赏贴:求在Kafka消费因为网络延时,产生一个日志信息

悬赏贴:求在Kafka消费因为网络延时,产生一个日志信息

问题描述:

Kafka因为网络延时或者中断时,将该条信息打印到日志中,就想知道该条日志打印的代码是加在哪里,dowork方法中嘛,kafkaConsumer.poll(timeOut),这条应该是去拉数据的请求吧,不知道是不是加在这附近

日志级别,日志配置文件


    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
        acquireAndEnsureOpen();
        try {
            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }
            do {
                client.maybeTriggerWakeup();
                if (includeMetadataInTimeout) {
                    // try to update assignment metadata BUT do not need to block on the timer for join group
                    updateAssignmentMetadataIfNeeded(timer, false);
                } else {
                    while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
                        log.warn("Still waiting for metadata");
                    }
                }
                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
                if (!records.isEmpty()) {
                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                        client.transmitSends();
                    }

                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }
            } while (timer.notExpired());

            return ConsumerRecords.empty();
        } finally {
            release();
            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
        }
    }

1、kafka源码中,如果传入的时间不为0,那么就循环等待超时后,返回空,可以在return ConsumerRecords.empty();之前打日志。