package org.gridgain.kafka.sink;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.IllegalWorkerStateException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.gridgain.kafka.CacheEntry;
import org.gridgain.kafka.DataGrid;
import org.gridgain.kafka.IgniteConnectUtils;
import org.gridgain.kafka.LogFormat;
import org.gridgain.kafka.SystemEvent;
import org.gridgain.kafka.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/gridgain/kafka/sink/IgniteSinkTask.class */
public final class IgniteSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) IgniteSinkTask.class);
    private volatile boolean isStarted;
    private IgniteSinkConnectorConfig cfg;
    private final Map<String, IgniteDataStreamer<Object, Object>> dataStreamers = new HashMap();
    private SinkRecordParser recordParser;
    private Predicate<CacheEntry> filter;
    private TopicToCacheMapper topicToCacheMapper;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        if (this.isStarted) {
            return;
        }
        this.cfg = new IgniteSinkConnectorConfig(map);
        this.recordParser = new SinkRecordParser(this.cfg.keyPolicy(), this.cfg.keyFields());
        this.filter = IgniteConnectUtils.createFilter(this.cfg.cacheFilter(), log);
        DataGrid.SINK.init(this.cfg.igniteCfg());
        this.topicToCacheMapper = TopicToCacheMapperFactory.factorySingleton().newMapper(this.cfg.topicToCacheMapper(), map);
        this.isStarted = true;
        log.info(LogFormat.message(SystemEvent.SINK_TASK_STARTED, "Sink task started"));
    }

    public void put(Collection<SinkRecord> collection) {
        try {
            log.trace("Processing " + collection.size() + " incoming Sink records.");
            processRecordBatch(collection);
            log.trace("Processed batch of " + collection.size() + " Sink records, flushing.");
            flushDataStreamers();
            log.trace("Successfully written batch of " + collection.size() + " records.");
        } catch (Exception e) {
            handleKafkaConnectFailure(e);
        }
    }

    private Exception handleKafkaConnectFailure(Exception exc) {
        log.warn(LogFormat.message(SystemEvent.CACHE_WRITE_FAILURE, exc));
        if (!(exc.getCause() instanceof IgniteClientDisconnectedException)) {
            log.error("Stopping SinkTask due to unrecoverable exception", (Throwable) exc);
            throw new IllegalWorkerStateException("Stopping SinkTask due to unrecoverable error", exc);
        }
        log.warn("Client has been disconnected from the topology, reconnecting...");
        ((IgniteClientDisconnectedException) exc.getCause()).reconnectFuture().get();
        log.warn("Client has been reconnected to the cluster topology, data streamers will be restarted.");
        restartDataStreamers();
        throw new RetriableException("Ignite SinkTask streamers reconnected, put retry required.", exc);
    }

    private void processRecordBatch(Collection<SinkRecord> collection) {
        for (SinkRecord sinkRecord : collection) {
            String cacheName = getCacheName(sinkRecord.topic());
            Object parseKey = this.recordParser.parseKey(sinkRecord);
            Object parseValue = sinkRecord.value() == null ? null : this.recordParser.parseValue(sinkRecord, cacheName);
            try {
                if (this.filter == null || this.filter.test(new CacheEntry(cacheName, parseKey, parseValue))) {
                    IgniteDataStreamer<Object, Object> dataStreamer = getDataStreamer(cacheName);
                    if (parseValue == null) {
                        dataStreamer.removeData(parseKey);
                    } else {
                        dataStreamer.addData(parseKey, parseValue);
                    }
                }
            } catch (Exception e) {
                log.error(LogFormat.message(SystemEvent.EVAL_FILTER_FAILURE, e));
            }
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        log.debug("Flushing on offset commit.");
        flushDataStreamers();
    }

    private void flushDataStreamers() {
        Iterator<IgniteDataStreamer<Object, Object>> it = this.dataStreamers.values().iterator();
        while (it.hasNext()) {
            it.next().flush();
        }
    }

    public void stop() {
        if (this.isStarted) {
            Iterator<IgniteDataStreamer<Object, Object>> it = this.dataStreamers.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                } catch (Exception e) {
                    log.warn(LogFormat.message(SystemEvent.CACHE_WRITE_FAILURE, e));
                }
            }
            DataGrid.SINK.close();
            this.isStarted = false;
            log.info(LogFormat.message(SystemEvent.SINK_TASK_STOPPED, "Sink task stopped"));
        }
    }

    private String getCacheName(String str) {
        return this.topicToCacheMapper.map(str);
    }

    private IgniteDataStreamer<Object, Object> getDataStreamer(String str) {
        IgniteDataStreamer<Object, Object> igniteDataStreamer = this.dataStreamers.get(str);
        if (igniteDataStreamer == null) {
            igniteDataStreamer = createDataStreamer(str, this.cfg.shallProcessUpdates().booleanValue());
            this.dataStreamers.put(str, igniteDataStreamer);
        }
        return igniteDataStreamer;
    }

    private void restartDataStreamers() {
        Set<String> keySet = this.dataStreamers.keySet();
        this.dataStreamers.clear();
        for (String str : keySet) {
            this.dataStreamers.put(str, createDataStreamer(str, this.cfg.shallProcessUpdates().booleanValue()));
        }
    }

    private IgniteDataStreamer<Object, Object> createDataStreamer(String str, boolean z) {
        DataGrid.SINK.ensureCache(str);
        IgniteDataStreamer<Object, Object> dataStreamer = DataGrid.SINK.dataStreamer(str);
        if (z) {
            dataStreamer.allowOverwrite(true);
        }
        if (this.cfg.streamReceiver() != null) {
            dataStreamer.receiver(IgniteConnectUtils.createStreamReceiver(this.cfg.streamReceiver(), log));
        }
        return dataStreamer;
    }
}
