package org.apache.ignite.sink.flink;

import java.util.Map;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.Ignition;
import org.apache.ignite.internal.util.typedef.internal.A;

/* loaded from: input_file:org/apache/ignite/sink/flink/IgniteSink.class */
public class IgniteSink<IN> extends RichSinkFunction<IN> {
    private static final long DFLT_FLUSH_FREQ = 10000;
    private final IgniteLogger log;
    private long autoFlushFrequency = DFLT_FLUSH_FREQ;
    private boolean allowOverwrite = false;
    private static volatile boolean stopped = true;
    private static String igniteCfgFile;
    private static String cacheName;

    /* loaded from: input_file:org/apache/ignite/sink/flink/IgniteSink$SinkContext.class */
    private static class SinkContext {

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/ignite/sink/flink/IgniteSink$SinkContext$Holder.class */
        public static class Holder {
            private static final Ignite IGNITE = Ignition.start(IgniteSink.igniteCfgFile);
            private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(IgniteSink.cacheName);

            private Holder() {
            }
        }

        private SinkContext() {
        }

        private static Ignite getIgnite() {
            return Holder.IGNITE;
        }

        private static IgniteDataStreamer getStreamer() {
            return Holder.STREAMER;
        }

        static /* synthetic */ Ignite access$000() {
            return getIgnite();
        }

        static /* synthetic */ IgniteDataStreamer access$100() {
            return getStreamer();
        }
    }

    public String getCacheName() {
        return cacheName;
    }

    public String getIgniteConfigFile() {
        return igniteCfgFile;
    }

    public long getAutoFlushFrequency() {
        return this.autoFlushFrequency;
    }

    public void setAutoFlushFrequency(long j) {
        this.autoFlushFrequency = j;
    }

    public boolean getAllowOverwrite() {
        return this.allowOverwrite;
    }

    public void setAllowOverwrite(boolean z) {
        this.allowOverwrite = z;
    }

    public IgniteSink(String str, String str2) {
        cacheName = str;
        igniteCfgFile = str2;
        this.log = SinkContext.access$000().log();
    }

    public void start() throws IgniteException {
        A.notNull(igniteCfgFile, "Ignite config file");
        A.notNull(cacheName, "Cache name");
        SinkContext.access$100().autoFlushFrequency(this.autoFlushFrequency);
        SinkContext.access$100().allowOverwrite(this.allowOverwrite);
        stopped = false;
    }

    public void stop() throws IgniteException {
        if (stopped) {
            return;
        }
        stopped = true;
        SinkContext.access$100().close();
        SinkContext.access$000().cache(cacheName).close();
        SinkContext.access$000().close();
    }

    public void invoke(IN in) {
        try {
            if (!(in instanceof Map)) {
                throw new IgniteException("Map as a streamer input is expected!");
            }
            SinkContext.access$100().addData((Map) in);
        } catch (Exception e) {
            this.log.error("Error while processing IN of " + cacheName, e);
        }
    }
}
