package org.apache.flink.streaming.connectors.kafka.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.connectors.kafka.internals.Handover;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.class */
public class KafkaConsumerThread<T> extends Thread {
    private final Logger log;
    private final Handover handover;
    private final AtomicReference<Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback>> nextOffsetsToCommit;
    private final Properties kafkaProperties;
    private final ClosableBlockingQueue<KafkaTopicPartitionState<T, TopicPartition>> unassignedPartitionsQueue;
    private final long pollTimeout;
    private final boolean useMetrics;

    @Deprecated
    private final MetricGroup subtaskMetricGroup;
    private final MetricGroup consumerMetricGroup;
    private volatile KafkaConsumer<byte[], byte[]> consumer;
    private final Object consumerReassignmentLock;
    private boolean hasAssignedPartitions;
    private volatile boolean hasBufferedWakeup;
    private volatile boolean running;
    private volatile boolean commitInProgress;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread$AbortedReassignmentException.class */
    public static class AbortedReassignmentException extends Exception {
        private static final long serialVersionUID = 1;

        private AbortedReassignmentException() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread$CommitCallback.class */
    private class CommitCallback implements OffsetCommitCallback {
        private final KafkaCommitCallback internalCommitCallback;

        CommitCallback(KafkaCommitCallback kafkaCommitCallback) {
            this.internalCommitCallback = (KafkaCommitCallback) Preconditions.checkNotNull(kafkaCommitCallback);
        }

        @Override // org.apache.kafka.clients.consumer.OffsetCommitCallback
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            KafkaConsumerThread.this.commitInProgress = false;
            if (exc == null) {
                this.internalCommitCallback.onSuccess();
            } else {
                KafkaConsumerThread.this.log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", exc);
                this.internalCommitCallback.onException(exc);
            }
        }
    }

    public KafkaConsumerThread(Logger logger, Handover handover, Properties properties, ClosableBlockingQueue<KafkaTopicPartitionState<T, TopicPartition>> closableBlockingQueue, String str, long j, boolean z, MetricGroup metricGroup, MetricGroup metricGroup2) {
        super(str);
        setDaemon(true);
        this.log = (Logger) Preconditions.checkNotNull(logger);
        this.handover = (Handover) Preconditions.checkNotNull(handover);
        this.kafkaProperties = (Properties) Preconditions.checkNotNull(properties);
        this.consumerMetricGroup = (MetricGroup) Preconditions.checkNotNull(metricGroup);
        this.subtaskMetricGroup = (MetricGroup) Preconditions.checkNotNull(metricGroup2);
        this.unassignedPartitionsQueue = (ClosableBlockingQueue) Preconditions.checkNotNull(closableBlockingQueue);
        this.pollTimeout = j;
        this.useMetrics = z;
        this.consumerReassignmentLock = new Object();
        this.nextOffsetsToCommit = new AtomicReference<>();
        this.running = true;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> andSet;
        if (this.running) {
            Handover handover = this.handover;
            try {
                try {
                    this.consumer = getConsumer(this.kafkaProperties);
                    try {
                        if (this.useMetrics) {
                            Map<MetricName, ? extends Metric> metrics = this.consumer.metrics();
                            if (metrics == null) {
                                this.log.info("Consumer implementation does not support metrics");
                            } else {
                                for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
                                    this.consumerMetricGroup.gauge(entry.getKey().name(), new KafkaMetricWrapper(entry.getValue()));
                                    this.subtaskMetricGroup.gauge(entry.getKey().name(), new KafkaMetricWrapper(entry.getValue()));
                                }
                            }
                        }
                        if (!this.running) {
                            handover.close();
                            try {
                                this.consumer.close();
                                return;
                            } catch (Throwable th) {
                                this.log.warn("Error while closing Kafka consumer", th);
                                return;
                            }
                        }
                        ConsumerRecords<byte[], byte[]> consumerRecords = null;
                        while (this.running) {
                            if (!this.commitInProgress && (andSet = this.nextOffsetsToCommit.getAndSet(null)) != null) {
                                this.log.debug("Sending async offset commit request to Kafka broker");
                                this.commitInProgress = true;
                                this.consumer.commitAsync((Map) andSet.f0, new CommitCallback((KafkaCommitCallback) andSet.f1));
                            }
                            try {
                                List<KafkaTopicPartitionState<T, TopicPartition>> pollBatch = this.hasAssignedPartitions ? this.unassignedPartitionsQueue.pollBatch() : this.unassignedPartitionsQueue.getBatchBlocking();
                                if (pollBatch != null) {
                                    reassignPartitions(pollBatch);
                                }
                                if (this.hasAssignedPartitions) {
                                    if (consumerRecords == null) {
                                        try {
                                            consumerRecords = this.consumer.poll(this.pollTimeout);
                                        } catch (WakeupException e) {
                                        }
                                    }
                                    try {
                                        handover.produce(consumerRecords);
                                        consumerRecords = null;
                                    } catch (Handover.WakeupException e2) {
                                    }
                                }
                            } catch (AbortedReassignmentException e3) {
                            }
                        }
                    } catch (Throwable th2) {
                        handover.reportError(th2);
                        handover.close();
                        try {
                            this.consumer.close();
                        } catch (Throwable th3) {
                            this.log.warn("Error while closing Kafka consumer", th3);
                        }
                    }
                } catch (Throwable th4) {
                    handover.reportError(th4);
                }
            } finally {
                handover.close();
                try {
                    this.consumer.close();
                } catch (Throwable th5) {
                    this.log.warn("Error while closing Kafka consumer", th5);
                }
            }
        }
    }

    public void shutdown() {
        this.running = false;
        this.unassignedPartitionsQueue.close();
        this.handover.wakeupProducer();
        synchronized (this.consumerReassignmentLock) {
            if (this.consumer != null) {
                this.consumer.wakeup();
            } else {
                this.hasBufferedWakeup = true;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> map, @Nonnull KafkaCommitCallback kafkaCommitCallback) {
        if (this.nextOffsetsToCommit.getAndSet(Tuple2.of(map, kafkaCommitCallback)) != null) {
            this.log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity.");
        }
        this.handover.wakeupProducer();
        synchronized (this.consumerReassignmentLock) {
            if (this.consumer != null) {
                this.consumer.wakeup();
            } else {
                this.hasBufferedWakeup = true;
            }
        }
    }

    @VisibleForTesting
    void reassignPartitions(List<KafkaTopicPartitionState<T, TopicPartition>> list) throws Exception {
        KafkaConsumer<byte[], byte[]> kafkaConsumer;
        if (list.size() == 0) {
            return;
        }
        this.hasAssignedPartitions = true;
        boolean z = false;
        synchronized (this.consumerReassignmentLock) {
            kafkaConsumer = this.consumer;
            this.consumer = null;
        }
        HashMap hashMap = new HashMap();
        try {
            for (TopicPartition topicPartition : kafkaConsumer.assignment()) {
                hashMap.put(topicPartition, Long.valueOf(kafkaConsumer.position(topicPartition)));
            }
            ArrayList arrayList = new ArrayList(list.size() + hashMap.size());
            arrayList.addAll(hashMap.keySet());
            arrayList.addAll(convertKafkaPartitions(list));
            kafkaConsumer.assign(arrayList);
            z = true;
            for (Map.Entry entry : hashMap.entrySet()) {
                kafkaConsumer.seek((TopicPartition) entry.getKey(), ((Long) entry.getValue()).longValue());
            }
            for (KafkaTopicPartitionState<T, TopicPartition> kafkaTopicPartitionState : list) {
                if (kafkaTopicPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
                    kafkaConsumer.seekToBeginning(Collections.singletonList(kafkaTopicPartitionState.getKafkaPartitionHandle()));
                    kafkaTopicPartitionState.setOffset(kafkaConsumer.position(kafkaTopicPartitionState.getKafkaPartitionHandle()) - 1);
                } else if (kafkaTopicPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
                    kafkaConsumer.seekToEnd(Collections.singletonList(kafkaTopicPartitionState.getKafkaPartitionHandle()));
                    kafkaTopicPartitionState.setOffset(kafkaConsumer.position(kafkaTopicPartitionState.getKafkaPartitionHandle()) - 1);
                } else if (kafkaTopicPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                    kafkaTopicPartitionState.setOffset(kafkaConsumer.position(kafkaTopicPartitionState.getKafkaPartitionHandle()) - 1);
                } else {
                    kafkaConsumer.seek(kafkaTopicPartitionState.getKafkaPartitionHandle(), kafkaTopicPartitionState.getOffset() + 1);
                }
            }
            synchronized (this.consumerReassignmentLock) {
                this.consumer = kafkaConsumer;
                if (this.hasBufferedWakeup) {
                    this.consumer.wakeup();
                    this.hasBufferedWakeup = false;
                }
            }
        } catch (WakeupException e) {
            synchronized (this.consumerReassignmentLock) {
                this.consumer = kafkaConsumer;
                if (z) {
                    this.consumer.assign(new ArrayList(hashMap.keySet()));
                    for (Map.Entry entry2 : hashMap.entrySet()) {
                        this.consumer.seek((TopicPartition) entry2.getKey(), ((Long) entry2.getValue()).longValue());
                    }
                }
                this.hasBufferedWakeup = false;
                Iterator<KafkaTopicPartitionState<T, TopicPartition>> it = list.iterator();
                while (it.hasNext()) {
                    this.unassignedPartitionsQueue.add(it.next());
                }
                throw new AbortedReassignmentException();
            }
        }
    }

    @VisibleForTesting
    KafkaConsumer<byte[], byte[]> getConsumer(Properties properties) {
        return new KafkaConsumer<>(properties);
    }

    private static <T> List<TopicPartition> convertKafkaPartitions(List<KafkaTopicPartitionState<T, TopicPartition>> list) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<KafkaTopicPartitionState<T, TopicPartition>> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getKafkaPartitionHandle());
        }
        return arrayList;
    }
}
