package org.apache.ignite.stream.kafka;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.stream.StreamAdapter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;

/* loaded from: input_file:org/apache/ignite/stream/kafka/KafkaStreamer.class */
public class KafkaStreamer<K, V> extends StreamAdapter<ConsumerRecord, K, V> {
    private static final long DFLT_TIMEOUT = 100;
    private IgniteLogger log;
    private ExecutorService executor;
    private List<String> topics;
    private int threads;
    private Properties consumerCfg;
    private long timeout = DFLT_TIMEOUT;
    private final List<KafkaStreamer<K, V>.ConsumerTask> consumerTasks = new ArrayList();

    /* loaded from: input_file:org/apache/ignite/stream/kafka/KafkaStreamer$ConsumerTask.class */
    class ConsumerTask implements Callable<Void> {
        private final KafkaConsumer<?, ?> consumer;
        private volatile boolean stopped;

        public ConsumerTask(Properties properties) {
            this.consumer = new KafkaConsumer<>(properties);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() {
            this.consumer.subscribe(KafkaStreamer.this.topics);
            while (!this.stopped) {
                try {
                    try {
                        Iterator it = this.consumer.poll(KafkaStreamer.this.timeout).iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            try {
                                KafkaStreamer.this.addMessage(consumerRecord);
                            } catch (Exception e) {
                                U.error(KafkaStreamer.this.log, "Record is ignored due to an error [record = " + consumerRecord + ']', e);
                            }
                        }
                    } catch (WakeupException e2) {
                        KafkaStreamer.this.log.info("Consumer is being stopped.");
                        this.consumer.close();
                        return null;
                    } catch (KafkaException e3) {
                        KafkaStreamer.this.log.error("Kafka error", e3);
                        this.consumer.close();
                        return null;
                    }
                } catch (Throwable th) {
                    this.consumer.close();
                    throw th;
                }
            }
            this.consumer.close();
            return null;
        }

        public void stop() {
            this.stopped = true;
            if (this.consumer != null) {
                this.consumer.wakeup();
            }
        }
    }

    public void setTopic(List<String> list) {
        this.topics = list;
    }

    public void setThreads(int i) {
        this.threads = i;
    }

    public void setConsumerConfig(Properties properties) {
        this.consumerCfg = properties;
    }

    public void setTimeout(long j) {
        A.ensure(j > 0, "timeout > 0");
        this.timeout = j;
    }

    public void start() {
        A.notNull(getStreamer(), "streamer");
        A.notNull(getIgnite(), "ignite");
        A.notNull(this.topics, "topics");
        A.notNull(this.consumerCfg, "kafka consumer config");
        A.ensure(this.threads > 0, "threads > 0");
        A.ensure((null == getSingleTupleExtractor() && null == getMultipleTupleExtractor()) ? false : true, "Extractor must be configured");
        this.log = getIgnite().log();
        this.executor = Executors.newFixedThreadPool(this.threads);
        IntStream.range(0, this.threads).forEach(i -> {
            this.consumerTasks.add(new ConsumerTask(this.consumerCfg));
        });
        Iterator<KafkaStreamer<K, V>.ConsumerTask> it = this.consumerTasks.iterator();
        while (it.hasNext()) {
            this.executor.submit(it.next());
        }
    }

    public void stop() {
        Iterator<KafkaStreamer<K, V>.ConsumerTask> it = this.consumerTasks.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                if (!this.executor.awaitTermination(5000L, TimeUnit.MILLISECONDS) && this.log.isDebugEnabled()) {
                    this.log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
                }
            } catch (InterruptedException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Interrupted during shutdown, exiting uncleanly.");
                }
            }
        }
    }
}
