package org.apache.ignite.stream.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.stream.StreamAdapter;

/* loaded from: input_file:org/apache/ignite/stream/kafka/KafkaStreamer.class */
public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
    private static final long DFLT_RETRY_TIMEOUT = 10000;
    private IgniteLogger log;
    private ExecutorService executor;
    private String topic;
    private int threads;
    private ConsumerConfig consumerCfg;
    private Decoder<K> keyDecoder;
    private Decoder<V> valDecoder;
    private ConsumerConnector consumer;
    private long retryTimeout = DFLT_RETRY_TIMEOUT;
    private volatile boolean stopped;

    public void setTopic(String str) {
        this.topic = str;
    }

    public void setThreads(int i) {
        this.threads = i;
    }

    public void setConsumerConfig(ConsumerConfig consumerConfig) {
        this.consumerCfg = consumerConfig;
    }

    public void setKeyDecoder(Decoder<K> decoder) {
        this.keyDecoder = decoder;
    }

    public void setValueDecoder(Decoder<V> decoder) {
        this.valDecoder = decoder;
    }

    public void setRetryTimeout(long j) {
        A.ensure(j > 0, "retryTimeout > 0");
        this.retryTimeout = j;
    }

    public void start() {
        A.notNull(getStreamer(), "streamer");
        A.notNull(getIgnite(), "ignite");
        A.notNull(this.topic, "topic");
        A.notNull(this.keyDecoder, "key decoder");
        A.notNull(this.valDecoder, "value decoder");
        A.notNull(this.consumerCfg, "kafka consumer config");
        A.ensure(this.threads > 0, "threads > 0");
        this.log = getIgnite().log();
        this.consumer = Consumer.createJavaConsumerConnector(this.consumerCfg);
        HashMap hashMap = new HashMap();
        hashMap.put(this.topic, Integer.valueOf(this.threads));
        List<KafkaStream> list = (List) this.consumer.createMessageStreams(hashMap, this.keyDecoder, this.valDecoder).get(this.topic);
        this.executor = Executors.newFixedThreadPool(this.threads);
        this.stopped = false;
        for (final KafkaStream kafkaStream : list) {
            this.executor.execute(new Runnable() { // from class: org.apache.ignite.stream.kafka.KafkaStreamer.1
                @Override // java.lang.Runnable
                public void run() {
                    while (!KafkaStreamer.this.stopped) {
                        try {
                            ConsumerIterator it = kafkaStream.iterator();
                            while (it.hasNext() && !KafkaStreamer.this.stopped) {
                                MessageAndMetadata next = it.next();
                                try {
                                    KafkaStreamer.this.getStreamer().addData(next.key(), next.message());
                                } catch (Exception e) {
                                    U.error(KafkaStreamer.this.log, "Message is ignored due to an error [msg=" + next + ']', e);
                                }
                            }
                        } catch (Exception e2) {
                            U.error(KafkaStreamer.this.log, "Message can't be consumed from stream. Retry after " + KafkaStreamer.this.retryTimeout + " ms.", e2);
                            try {
                                Thread.sleep(KafkaStreamer.this.retryTimeout);
                            } catch (InterruptedException e3) {
                            }
                        }
                    }
                }
            });
        }
    }

    public void stop() {
        this.stopped = true;
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                if (!this.executor.awaitTermination(5000L, TimeUnit.MILLISECONDS) && this.log.isDebugEnabled()) {
                    this.log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
                }
            } catch (InterruptedException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Interrupted during shutdown, exiting uncleanly.");
                }
            }
        }
    }
}
