package org.apache.ignite.stream.storm;

import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.Ignition;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.stream.StreamAdapter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

/* loaded from: input_file:org/apache/ignite/stream/storm/StormStreamer.class */
public class StormStreamer<K, V> extends StreamAdapter<Tuple, K, V> implements IRichBolt {
    private static final long DFLT_FLUSH_FREQ = 10000;
    private static final String DFLT_TUPLE_FIELD = "ignite";
    private IgniteLogger log;
    private String igniteTupleField = DFLT_TUPLE_FIELD;
    private long autoFlushFrequency = DFLT_FLUSH_FREQ;
    private boolean allowOverwrite = false;
    private static volatile boolean stopped = true;
    private OutputCollector collector;
    private static String igniteConfigFile;
    private static String cacheName;

    /* loaded from: input_file:org/apache/ignite/stream/storm/StormStreamer$StreamerContext.class */
    public static class StreamerContext {

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/ignite/stream/storm/StormStreamer$StreamerContext$Holder.class */
        public static class Holder {
            private static final Ignite IGNITE = Ignition.start(StormStreamer.igniteConfigFile);
            private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(StormStreamer.cacheName);

            private Holder() {
            }
        }

        private StreamerContext() {
        }

        public static Ignite getIgnite() {
            return Holder.IGNITE;
        }

        public static IgniteDataStreamer getStreamer() {
            return Holder.STREAMER;
        }
    }

    public String getIgniteTupleField() {
        return this.igniteTupleField;
    }

    public void setIgniteTupleField(String str) {
        this.igniteTupleField = str;
    }

    public String getCacheName() {
        return cacheName;
    }

    public void setCacheName(String str) {
        cacheName = str;
    }

    public String getIgniteConfigFile() {
        return igniteConfigFile;
    }

    public void setIgniteConfigFile(String str) {
        igniteConfigFile = str;
    }

    public long getAutoFlushFrequency() {
        return this.autoFlushFrequency;
    }

    public void setAutoFlushFrequency(long j) {
        this.autoFlushFrequency = j;
    }

    public boolean getAllowOverwrite() {
        return this.allowOverwrite;
    }

    public void setAllowOverwrite(boolean z) {
        this.allowOverwrite = z;
    }

    public void start() throws IgniteException {
        A.notNull(igniteConfigFile, "Ignite config file");
        A.notNull(cacheName, "Cache name");
        A.notNull(this.igniteTupleField, "Ignite tuple field");
        setIgnite(StreamerContext.getIgnite());
        IgniteDataStreamer streamer = StreamerContext.getStreamer();
        streamer.autoFlushFrequency(this.autoFlushFrequency);
        streamer.allowOverwrite(this.allowOverwrite);
        setStreamer(streamer);
        this.log = getIgnite().log();
        stopped = false;
    }

    public void stop() throws IgniteException {
        if (stopped) {
            return;
        }
        stopped = true;
        getIgnite().dataStreamer(cacheName).close(true);
        getIgnite().close();
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        start();
        this.collector = outputCollector;
    }

    public void execute(Tuple tuple) {
        if (stopped) {
            return;
        }
        if (!(tuple.getValueByField(this.igniteTupleField) instanceof Map)) {
            throw new IgniteException("Map as a streamer input is expected!");
        }
        Map map = (Map) tuple.getValueByField(this.igniteTupleField);
        try {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Tuple (id:" + tuple.getMessageId() + ") from storm: " + map);
            }
            getStreamer().addData(map);
            this.collector.ack(tuple);
        } catch (Exception e) {
            this.log.error("Error while processing tuple of " + map, e);
            this.collector.fail(tuple);
        }
    }

    public void cleanup() {
        stop();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
