/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.kafka.sink;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.IllegalWorkerStateException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.gridgain.kafka.CacheEntry;
import org.gridgain.kafka.DataGrid;
import org.gridgain.kafka.IgniteConnectUtils;
import org.gridgain.kafka.LogFormat;
import org.gridgain.kafka.SystemEvent;
import org.gridgain.kafka.Version;
import org.gridgain.kafka.sink.IgniteSinkConnectorConfig;
import org.gridgain.kafka.sink.SinkRecordParser;
import org.gridgain.kafka.sink.TopicToCacheMapper;
import org.gridgain.kafka.sink.TopicToCacheMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class IgniteSinkTask
extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(IgniteSinkTask.class);
    private volatile boolean isStarted;
    private IgniteSinkConnectorConfig cfg;
    private final Map<String, IgniteDataStreamer<Object, Object>> dataStreamers = new HashMap<String, IgniteDataStreamer<Object, Object>>();
    private SinkRecordParser recordParser;
    private Predicate<CacheEntry> filter;
    private TopicToCacheMapper topicToCacheMapper;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> cfgMap) {
        if (this.isStarted) {
            return;
        }
        this.cfg = new IgniteSinkConnectorConfig(cfgMap);
        this.recordParser = new SinkRecordParser(this.cfg.keyPolicy(), this.cfg.keyFields());
        this.filter = IgniteConnectUtils.createFilter(this.cfg.cacheFilter(), log);
        DataGrid.SINK.init(this.cfg.igniteCfg());
        this.topicToCacheMapper = TopicToCacheMapperFactory.factorySingleton().newMapper(this.cfg.topicToCacheMapper(), cfgMap);
        this.isStarted = true;
        log.info(LogFormat.message(SystemEvent.SINK_TASK_STARTED, "Sink task started"));
    }

    public void put(Collection<SinkRecord> col) {
        try {
            if (log.isTraceEnabled()) {
                log.trace("Processing " + col.size() + " incoming Sink records.");
            }
            this.processRecordBatch(col);
            if (log.isTraceEnabled()) {
                log.trace("Processed batch of " + col.size() + " Sink records, flushing.");
            }
            this.flushDataStreamers();
            if (log.isTraceEnabled()) {
                log.trace("Successfully written batch of " + col.size() + " records.");
            }
        }
        catch (IllegalWorkerStateException iwse) {
            throw iwse;
        }
        catch (Exception ex) {
            this.handleKafkaConnectFailure(ex);
        }
    }

    private void handleKafkaConnectFailure(Exception ex) {
        log.warn(LogFormat.message(SystemEvent.CACHE_WRITE_FAILURE, ex));
        if (ex.getCause() instanceof IgniteClientDisconnectedException) {
            log.warn("Client has been disconnected from the topology, reconnecting...");
            IgniteClientDisconnectedException disconnectedException = (IgniteClientDisconnectedException)ex.getCause();
            disconnectedException.reconnectFuture().get();
            log.warn("Client has been reconnected to the cluster topology, data streamers will be restarted.");
            this.restartDataStreamers();
            throw new RetriableException("Ignite SinkTask streamers reconnected, put retry required.", (Throwable)ex);
        }
        throw new IllegalWorkerStateException("Stopping SinkTask due to unrecoverable error", (Throwable)ex);
    }

    private void processRecordBatch(Collection<SinkRecord> col) {
        block7: for (SinkRecord rec : col) {
            try {
                Object val;
                Object key;
                String cacheName;
                block10: {
                    cacheName = this.getCacheName(rec.topic());
                    key = this.recordParser.parseKey(rec);
                    val = rec.value() == null ? null : this.recordParser.parseValue(rec, cacheName);
                    try {
                        if (this.filter != null && !this.filter.test(new CacheEntry(cacheName, key, val))) {
                        }
                        break block10;
                    }
                    catch (Exception ex) {
                        log.error(LogFormat.message(SystemEvent.EVAL_FILTER_FAILURE, ex));
                    }
                    continue;
                }
                IgniteDataStreamer<Object, Object> dataStreamer = this.getDataStreamer(cacheName);
                if (val == null) {
                    dataStreamer.removeData(key);
                    continue;
                }
                dataStreamer.addData(key, val);
            }
            catch (Exception e) {
                switch (this.cfg.errorHandlingPolicy()) {
                    case LOG_ONLY: {
                        log.warn(IgniteSinkTask.formatRecordFailureMsg(rec), (Throwable)e);
                        continue block7;
                    }
                }
                log.warn(IgniteSinkTask.formatRecordFailureMsg(rec));
                throw new IllegalWorkerStateException("Stopping SinkTask due to unrecoverable error", (Throwable)e);
            }
        }
    }

    private static String formatRecordFailureMsg(SinkRecord rec) {
        StringBuilder recStr = new StringBuilder().append('[').append("offset=").append(rec.kafkaOffset()).append(", topic=").append(rec.topic()).append(", partition=").append(rec.kafkaPartition()).append(", ts=").append(rec.timestamp()).append(", keySchema=").append(rec.keySchema()).append(", valSchema=").append(rec.valueSchema());
        if (!S.includeSensitive()) {
            recStr.append(", key=***, val=*** see IGNITE_SENSITIVE_DATA_LOGGING");
        } else {
            recStr.append(", key=").append(rec.key());
            recStr.append(", val=").append(rec.value());
        }
        recStr.append(']');
        return LogFormat.message(SystemEvent.FAILED_TO_PROCESS_SINK_RECORD, "Failed to process sink record: " + recStr);
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        log.debug("Flushing on offset commit.");
        this.flushDataStreamers();
    }

    private void flushDataStreamers() {
        for (IgniteDataStreamer<Object, Object> s : this.dataStreamers.values()) {
            s.flush();
        }
    }

    public void stop() {
        if (!this.isStarted) {
            return;
        }
        for (IgniteDataStreamer<Object, Object> s : this.dataStreamers.values()) {
            try {
                s.close();
            }
            catch (Exception ex) {
                log.warn(LogFormat.message(SystemEvent.CACHE_WRITE_FAILURE, ex));
            }
        }
        DataGrid.SINK.close();
        this.isStarted = false;
        log.info(LogFormat.message(SystemEvent.SINK_TASK_STOPPED, "Sink task stopped"));
    }

    private String getCacheName(String topic) {
        return this.topicToCacheMapper.map(topic);
    }

    private IgniteDataStreamer<Object, Object> getDataStreamer(String cacheName) {
        IgniteDataStreamer<Object, Object> streamer = this.dataStreamers.get(cacheName);
        if (streamer == null) {
            streamer = this.createDataStreamer(cacheName, this.cfg.shallProcessUpdates());
            this.dataStreamers.put(cacheName, streamer);
        }
        return streamer;
    }

    private void restartDataStreamers() {
        Set<String> streamers = this.dataStreamers.keySet();
        this.dataStreamers.clear();
        for (String cacheName : streamers) {
            IgniteDataStreamer<Object, Object> streamer = this.createDataStreamer(cacheName, this.cfg.shallProcessUpdates());
            this.dataStreamers.put(cacheName, streamer);
        }
    }

    private IgniteDataStreamer<Object, Object> createDataStreamer(String cacheName, boolean shallProcessUpdates) {
        DataGrid.SINK.ensureCache(cacheName);
        IgniteDataStreamer streamer = DataGrid.SINK.dataStreamer(cacheName);
        if (shallProcessUpdates) {
            streamer.allowOverwrite(true);
        }
        if (this.cfg.streamReceiver() != null) {
            streamer.receiver(IgniteConnectUtils.createStreamReceiver(this.cfg.streamReceiver(), log));
        }
        return streamer;
    }
}

