package org.gridgain.kafka.sink;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Predicate;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.gridgain.kafka.CacheEntry;
import org.gridgain.kafka.DataGrid;
import org.gridgain.kafka.LogFormat;
import org.gridgain.kafka.SystemEvent;
import org.gridgain.kafka.TopicNaming;
import org.gridgain.kafka.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/gridgain/kafka/sink/IgniteSinkTask.class */
public final class IgniteSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(IgniteSinkTask.class);
    private IgniteSinkConnectorConfig cfg;
    private SinkRecordParser recordParser;
    private Predicate<CacheEntry> filter;
    private volatile boolean isStarted = false;
    private final Map<String, IgniteDataStreamer<Object, Object>> dataStreamers = new HashMap();

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        if (this.isStarted) {
            return;
        }
        this.cfg = new IgniteSinkConnectorConfig(map);
        this.recordParser = new SinkRecordParser(this.cfg.keyPolicy(), this.cfg.keyFields());
        this.filter = createFilter(this.cfg.cacheFilter());
        DataGrid.SINK.init(this.cfg.igniteCfg());
        this.isStarted = true;
        log.info(LogFormat.message(SystemEvent.SINK_TASK_STARTED, "Sink task started"));
    }

    public void put(Collection<SinkRecord> collection) {
        Object parseKey;
        Object parseValue;
        for (SinkRecord sinkRecord : collection) {
            String cacheName = getCacheName(sinkRecord.topic());
            try {
                parseKey = this.recordParser.parseKey(sinkRecord);
                parseValue = sinkRecord.value() == null ? null : this.recordParser.parseValue(sinkRecord);
                try {
                } catch (Exception e) {
                    log.error(LogFormat.message(SystemEvent.EVAL_FILTER_FAILURE, e));
                }
            } catch (Exception e2) {
                log.warn(LogFormat.message(SystemEvent.CACHE_WRITE_FAILURE, e2));
            }
            if (this.filter != null && !this.filter.test(new CacheEntry(cacheName, parseKey, parseValue))) {
                return;
            }
            IgniteDataStreamer<Object, Object> dataStreamer = getDataStreamer(cacheName);
            if (parseValue == null) {
                dataStreamer.removeData(parseKey);
            } else {
                dataStreamer.addData(parseKey, parseValue);
            }
            if (!this.cfg.flushOnOffsetCommit().booleanValue()) {
                dataStreamer.flush();
            }
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        if (this.cfg.flushOnOffsetCommit().booleanValue()) {
            Iterator<IgniteDataStreamer<Object, Object>> it = this.dataStreamers.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().flush();
                } catch (Exception e) {
                    log.warn(LogFormat.message(SystemEvent.CACHE_WRITE_FAILURE, e));
                }
            }
        }
    }

    public void stop() {
        if (this.isStarted) {
            Iterator<IgniteDataStreamer<Object, Object>> it = this.dataStreamers.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                } catch (Exception e) {
                    log.warn(LogFormat.message(SystemEvent.CACHE_WRITE_FAILURE, e));
                }
            }
            DataGrid.SINK.close();
            this.isStarted = false;
            log.info(LogFormat.message(SystemEvent.SINK_TASK_STOPPED, "Sink task stopped"));
        }
    }

    private static Predicate<CacheEntry> createFilter(Class cls) {
        if (cls == null) {
            return null;
        }
        try {
            return (Predicate) cls.newInstance();
        } catch (Exception e) {
            throw new ConnectException(LogFormat.message(SystemEvent.CREATE_FILTER_FAILURE, e));
        }
    }

    private String getCacheName(String str) {
        String cacheName = new TopicNaming(this.cfg.topicPrefix()).cacheName(str);
        String cachePrefix = this.cfg.cachePrefix();
        return cachePrefix == null ? cacheName : cachePrefix + cacheName;
    }

    private IgniteDataStreamer<Object, Object> getDataStreamer(String str) {
        IgniteDataStreamer<Object, Object> igniteDataStreamer = this.dataStreamers.get(str);
        if (igniteDataStreamer == null) {
            DataGrid.SINK.ensureCache(str);
            igniteDataStreamer = DataGrid.SINK.dataStreamer(str);
            if (this.cfg.shallProcessUpdates().booleanValue()) {
                igniteDataStreamer.allowOverwrite(true);
            }
            this.dataStreamers.put(str, igniteDataStreamer);
        }
        return igniteDataStreamer;
    }
}
