/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.kafka.sink;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.IgniteClientConnectionException;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.client.table.ClientSchema;
import org.apache.ignite.internal.client.table.ClientTable;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.table.DataStreamerException;
import org.apache.ignite.table.DataStreamerItem;
import org.apache.ignite.table.DataStreamerOperationType;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.ReceiverDescriptor;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.gridgain.kafka.GridGainKafkaConnectVersion;
import org.gridgain.kafka.sink.ErrorHandlingPolicy;
import org.gridgain.kafka.sink.FlushMode;
import org.gridgain.kafka.sink.GridGainSinkConnectorConfig;
import org.gridgain.kafka.sink.SinkRecordConvertOptions;
import org.gridgain.kafka.sink.SinkRecordConverter;
import org.gridgain.kafka.sink.SinkRecordReceiverItem;
import org.gridgain.kafka.sink.SinkRecordTuple;
import org.gridgain.kafka.sink.StreamerPublisher;
import org.gridgain.kafka.sink.UnmappedFieldPolicy;
import org.gridgain.kafka.util.TopicTableNameConverter;
import org.jetbrains.annotations.Nullable;

public final class GridGainSinkTask
extends SinkTask {
    private static final IgniteLogger log = Loggers.forClass(GridGainSinkTask.class);
    private TopicTableNameConverter topicTableNameConverter;
    @Nullable
    private IgniteClient igniteClient;
    private final Map<String, Streamer> tableToStreamer = new HashMap<String, Streamer>();
    private final Map<String, Table> cachedTables = new HashMap<String, Table>();
    private SinkRecordConvertOptions convertOptions;
    private FlushMode flushMode;
    private int retryBackoff;
    private DataStreamerOptions dataStreamerOptions;
    private ErrorHandlingPolicy errorPolicy;
    @Nullable
    private ErrantRecordReporter errantRecordReporter;
    @Nullable
    private String receiverClassName;
    private List<DeploymentUnit> receiverDeploymentUnits;

    public String version() {
        return GridGainKafkaConnectVersion.getVersion();
    }

    public void start(Map<String, String> map) {
        GridGainSinkConnectorConfig cfg = new GridGainSinkConnectorConfig(map);
        this.topicTableNameConverter = new TopicTableNameConverter(cfg::topicToTableNameRegexPair);
        this.igniteClient = cfg.clientBuilder().build();
        UnmappedFieldPolicy unmappedFieldPolicy = cfg.igniteStreamerReceiverClassName() == null ? cfg.unmappedFieldPolicy() : UnmappedFieldPolicy.IGNORE;
        this.convertOptions = new SinkRecordConvertOptions(cfg.nestedStructMode(), cfg.nestedStructConcatSeparator(), cfg.dropKeyFields(), cfg.dropValueFields(), unmappedFieldPolicy);
        this.flushMode = cfg.flushMode();
        this.retryBackoff = cfg.retryBackoff();
        this.dataStreamerOptions = DataStreamerOptions.builder().pageSize(cfg.igniteStreamerPageSize()).retryLimit(cfg.igniteStreamerRetryLimit()).autoFlushInterval(cfg.igniteStreamerAutoFlushInterval()).perPartitionParallelOperations(cfg.igniteStreamerParallelOps()).build();
        this.errorPolicy = cfg.errorHandlingPolicy();
        this.receiverClassName = cfg.igniteStreamerReceiverClassName();
        this.receiverDeploymentUnits = cfg.igniteStreamerReceiverDeploymentUnits();
        try {
            this.errantRecordReporter = this.context.errantRecordReporter();
        }
        catch (NoClassDefFoundError | NoSuchMethodError e) {
            this.errantRecordReporter = null;
        }
        log.info("GridGainSinkTask started, connected to nodes: " + this.igniteClient.connections().stream().map(c -> c.address().toString()).collect(Collectors.joining(", ")), new Object[0]);
    }

    public void put(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            return;
        }
        try {
            for (SinkRecord record : collection) {
                Boolean submitted;
                Streamer streamer = this.getStreamer(record.topic());
                ClientSchema schema = (ClientSchema)streamer.table.getLatestSchema().join();
                Object item = this.convertRecordToStreamerItem(record, schema);
                if (item == null || (submitted = this.trySubmit(record, streamer, item, true)) == null || submitted.booleanValue()) continue;
                this.context.timeout((long)this.retryBackoff);
                throw new RetriableException("Streamer buffer is full, retry later");
            }
            if (this.flushMode == FlushMode.ON_PUT) {
                this.flush();
            }
        }
        catch (IgniteClientConnectionException e) {
            this.context.timeout((long)this.retryBackoff);
            throw new RetriableException(e.getMessage());
        }
    }

    @Nullable
    private Boolean trySubmit(SinkRecord record, Streamer streamer, Object item, boolean allowRetry) {
        try {
            return streamer.submit(item);
        }
        catch (Throwable e) {
            if (allowRetry && streamer.future.isCompletedExceptionally()) {
                streamer = this.getStreamer(record.topic());
                return this.trySubmit(record, streamer, item, false);
            }
            switch (this.errorPolicy) {
                case LOG_ONLY: {
                    log.error("Failed to submit record: " + e.getMessage(), e);
                    return null;
                }
                case DEAD_LETTER_QUEUE: {
                    if (this.errantRecordReporter != null) {
                        this.errantRecordReporter.report(record, e);
                        return null;
                    }
                    throw new ConnectException("Failed to submit record, DEAD_LETTER_QUEUE error policy is configured, but errantRecordReporter is not available: " + e.getMessage(), e);
                }
            }
            throw new ConnectException("Failed to submit record: " + e.getMessage(), e);
        }
    }

    @Nullable
    private Object convertRecordToStreamerItem(SinkRecord record, ClientSchema schema) {
        try {
            if (this.receiverClassName == null) {
                Tuple rec = SinkRecordConverter.convert(record, schema, this.convertOptions);
                boolean delete = record.value() == null;
                DataStreamerOperationType op = delete ? DataStreamerOperationType.REMOVE : DataStreamerOperationType.PUT;
                return DataStreamerItem.of((Object)rec, (DataStreamerOperationType)op);
            }
            Tuple key = SinkRecordConverter.convert(record, schema, this.convertOptions);
            Tuple payload = SinkRecordConverter.convertForReceiver(record);
            return new SinkRecordReceiverItem(key, payload);
        }
        catch (Throwable e) {
            switch (this.errorPolicy) {
                case LOG_ONLY: {
                    log.error("Failed to convert record: " + e.getMessage(), e);
                    return null;
                }
                case DEAD_LETTER_QUEUE: {
                    if (this.errantRecordReporter != null) {
                        this.errantRecordReporter.report(record, e);
                        return null;
                    }
                    throw new ConnectException("Failed to convert record, DEAD_LETTER_QUEUE error policy is configured, but errantRecordReporter is not available: " + e.getMessage(), e);
                }
            }
            throw new ConnectException("Failed to convert record: " + e.getMessage(), e);
        }
    }

    private Streamer getStreamer(String topic) {
        String tableName = this.tableName(topic);
        Streamer streamer = this.tableToStreamer.computeIfAbsent(tableName, t -> this.createStreamer(tableName, topic));
        if (streamer.future.isCompletedExceptionally()) {
            this.streamerFlushAndClose(streamer);
            streamer = this.createStreamer(tableName, topic);
            this.tableToStreamer.put(tableName, streamer);
        }
        return streamer;
    }

    private Streamer createStreamer(String tableName, String topic) {
        assert (this.igniteClient != null);
        Table table = this.table(tableName);
        if (table == null) {
            throw new ConnectException("Ignite table '" + tableName + "' is not found for Kafka topic '" + topic + "'");
        }
        if (this.receiverClassName == null) {
            StreamerPublisher publisher = new StreamerPublisher();
            CompletableFuture streamerFut = table.recordView().streamData(publisher, this.dataStreamerOptions);
            return new Streamer(streamerFut, publisher, (ClientTable)table);
        }
        StreamerPublisher publisher = new StreamerPublisher();
        Function<SinkRecordReceiverItem, Tuple> keyFunc = SinkRecordReceiverItem::key;
        Function<SinkRecordReceiverItem, Tuple> payloadFunc = SinkRecordReceiverItem::payload;
        ReceiverDescriptor receiver = ReceiverDescriptor.builder((String)this.receiverClassName).units(this.receiverDeploymentUnits).build();
        Tuple receiverArg = Tuple.create().set("topic", (Object)topic).set("table", (Object)tableName);
        CompletableFuture streamerFut = table.recordView().streamData(publisher, keyFunc, payloadFunc, receiver, null, this.dataStreamerOptions, (Object)receiverArg);
        return new Streamer(streamerFut, publisher, (ClientTable)table);
    }

    @Nullable
    private Table table(String tableName) {
        Table table = this.cachedTables.get(tableName);
        if (table != null) {
            return table;
        }
        table = this.igniteClient.tables().table(tableName);
        if (table != null) {
            this.cachedTables.put(tableName, table);
        }
        return table;
    }

    public void stop() {
        this.cachedTables.clear();
        this.closeClient();
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        super.flush(currentOffsets);
        for (TopicPartition tp : currentOffsets.keySet()) {
            String tableName = this.tableName(tp.topic());
            Streamer streamer = this.tableToStreamer.remove(tableName);
            if (streamer == null) continue;
            this.streamerFlushAndClose(streamer);
        }
    }

    private void flush() {
        for (Streamer streamer : this.tableToStreamer.values()) {
            this.streamerFlushAndClose(streamer);
        }
        this.tableToStreamer.clear();
    }

    private void streamerFlushAndClose(Streamer streamer) {
        try {
            streamer.flushAndClose();
        }
        catch (Throwable t) {
            switch (this.errorPolicy) {
                case LOG_ONLY: {
                    log.error("Failed to flush streamer: " + t.getMessage(), t);
                    break;
                }
                case DEAD_LETTER_QUEUE: {
                    if (this.errantRecordReporter != null) {
                        Set<Object> failedItems = GridGainSinkTask.getFailedItems(t);
                        for (Object obj : failedItems) {
                            DataStreamerItem item = (DataStreamerItem)obj;
                            SinkRecordTuple tuple = (SinkRecordTuple)item.get();
                            try {
                                this.errantRecordReporter.report(tuple.record(), t).get();
                            }
                            catch (Throwable e) {
                                throw new ConnectException("Failed to report errant record: " + e.getMessage(), e);
                            }
                        }
                        break;
                    }
                    throw new ConnectException("Failed to flush streamer, DEAD_LETTER_QUEUE error policy is configured, but errantRecordReporter is not available: " + t.getMessage(), t);
                }
                default: {
                    throw new ConnectException("Failed to flush streamer: " + t.getMessage(), t);
                }
            }
        }
    }

    private void closeClient() {
        for (Streamer streamer : this.tableToStreamer.values()) {
            try {
                streamer.flushAndClose();
            }
            catch (Throwable t) {
                log.error("Failed to flush and close streamer: " + t.getMessage(), t);
            }
        }
        IgniteClient client = this.igniteClient;
        this.igniteClient = null;
        if (client != null) {
            try {
                client.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private String tableName(String topic) {
        return this.topicTableNameConverter.convert(topic);
    }

    private static Set<Object> getFailedItems(Throwable t) {
        do {
            if (!(t instanceof DataStreamerException)) continue;
            return ((DataStreamerException)t).failedItems();
        } while ((t = t.getCause()) != null);
        return Set.of();
    }

    private static final class Streamer {
        private final CompletableFuture<Void> future;
        private final StreamerPublisher publisher;
        private final ClientTable table;

        private Streamer(CompletableFuture<Void> future, StreamerPublisher<?> publisher, ClientTable table) {
            this.future = future;
            this.publisher = publisher;
            this.table = table;
        }

        private boolean submit(Object item) {
            return this.publisher.submit(item);
        }

        private void flushAndClose() {
            this.publisher.close();
            this.future.join();
        }
    }
}

