package org.gridgain.kafka.sink;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.client.BasicAuthenticator;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.IgniteClientConnectionException;
import org.apache.ignite.client.SslConfiguration;
import org.apache.ignite.internal.client.table.ClientSchema;
import org.apache.ignite.internal.client.table.ClientTable;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.table.DataStreamerException;
import org.apache.ignite.table.DataStreamerItem;
import org.apache.ignite.table.DataStreamerOperationType;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.gridgain.kafka.GridGainKafkaConnectVersion;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/gridgain/kafka/sink/GridGainSinkTask.class */
public final class GridGainSinkTask extends SinkTask {
    private static final IgniteLogger log;
    private TopicTableNameConverter topicTableNameConverter;

    @Nullable
    private IgniteClient igniteClient;
    private final Map<String, Streamer> tableToStreamer = new HashMap();
    private final Map<String, Table> cachedTables = new HashMap();
    private SinkRecordConvertOptions convertOptions;
    private FlushMode flushMode;
    private int retryBackoff;
    private DataStreamerOptions dataStreamerOptions;
    private ErrorHandlingPolicy errorPolicy;

    @Nullable
    private ErrantRecordReporter errantRecordReporter;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/kafka/sink/GridGainSinkTask$Streamer.class */
    public static final class Streamer {
        private final CompletableFuture<Void> future;
        private final StreamerPublisher<DataStreamerItem<Tuple>> publisher;
        private final ClientTable table;

        private Streamer(CompletableFuture<Void> completableFuture, StreamerPublisher<DataStreamerItem<Tuple>> streamerPublisher, ClientTable clientTable) {
            this.future = completableFuture;
            this.publisher = streamerPublisher;
            this.table = clientTable;
        }

        private void flushAndClose() {
            this.publisher.close();
            this.future.join();
        }
    }

    public String version() {
        return GridGainKafkaConnectVersion.getVersion();
    }

    public void start(Map<String, String> map) {
        GridGainSinkConnectorConfig gridGainSinkConnectorConfig = new GridGainSinkConnectorConfig(map);
        this.topicTableNameConverter = new TopicTableNameConverter(gridGainSinkConnectorConfig);
        this.igniteClient = IgniteClient.builder().addresses(gridGainSinkConnectorConfig.igniteAddresses().split(",")).connectTimeout(gridGainSinkConnectorConfig.igniteConnectTimeout()).reconnectInterval(gridGainSinkConnectorConfig.igniteReconnectInterval()).heartbeatInterval(gridGainSinkConnectorConfig.igniteHeartbeatInterval()).heartbeatTimeout(gridGainSinkConnectorConfig.igniteHeartbeatTimeout()).ssl(SslConfiguration.builder().enabled(gridGainSinkConnectorConfig.igniteSslEnabled()).clientAuth(gridGainSinkConnectorConfig.igniteSslClientAuthenticationMode()).ciphers(gridGainSinkConnectorConfig.igniteSslCiphers()).keyStorePath(gridGainSinkConnectorConfig.igniteSslKeyStorePath()).keyStorePassword(gridGainSinkConnectorConfig.igniteSslKeyStorePassword()).trustStorePath(gridGainSinkConnectorConfig.igniteSslTrustStorePath()).trustStorePassword(gridGainSinkConnectorConfig.igniteSslTrustStorePassword()).build()).authenticator(gridGainSinkConnectorConfig.igniteAuthenticatorBasicUsername().isBlank() ? null : BasicAuthenticator.builder().username(gridGainSinkConnectorConfig.igniteAuthenticatorBasicUsername()).password(gridGainSinkConnectorConfig.igniteAuthenticatorBasicPassword()).build()).build();
        this.convertOptions = new SinkRecordConvertOptions(gridGainSinkConnectorConfig.nestedStructMode(), gridGainSinkConnectorConfig.nestedStructConcatSeparator());
        this.flushMode = gridGainSinkConnectorConfig.flushMode();
        this.retryBackoff = gridGainSinkConnectorConfig.retryBackoff();
        this.dataStreamerOptions = DataStreamerOptions.builder().pageSize(gridGainSinkConnectorConfig.igniteStreamerPageSize()).retryLimit(gridGainSinkConnectorConfig.igniteStreamerRetryLimit()).autoFlushInterval(gridGainSinkConnectorConfig.igniteStreamerAutoFlushInterval()).perPartitionParallelOperations(gridGainSinkConnectorConfig.igniteStreamerParallelOps()).build();
        this.errorPolicy = gridGainSinkConnectorConfig.errorHandlingPolicy();
        try {
            this.errantRecordReporter = this.context.errantRecordReporter();
        } catch (NoClassDefFoundError | NoSuchMethodError e) {
            this.errantRecordReporter = null;
        }
        log.info("GridGainSinkTask started, connected to nodes: " + ((String) this.igniteClient.connections().stream().map(clusterNode -> {
            return clusterNode.address().toString();
        }).collect(Collectors.joining(", "))), new Object[0]);
    }

    public void put(Collection<SinkRecord> collection) {
        Boolean trySubmit;
        if (collection.isEmpty()) {
            return;
        }
        try {
            for (SinkRecord sinkRecord : collection) {
                Streamer streamer = getStreamer(sinkRecord.topic());
                DataStreamerItem<Tuple> convertRecordToStreamerItem = convertRecordToStreamerItem(sinkRecord, (ClientSchema) streamer.table.getLatestSchema().join());
                if (convertRecordToStreamerItem != null && (trySubmit = trySubmit(sinkRecord, streamer, convertRecordToStreamerItem, true)) != null && !trySubmit.booleanValue()) {
                    this.context.timeout(this.retryBackoff);
                    throw new RetriableException("Streamer buffer is full, retry later");
                }
            }
            if (this.flushMode == FlushMode.ON_PUT) {
                flush();
            }
        } catch (IgniteClientConnectionException e) {
            this.context.timeout(this.retryBackoff);
            throw new RetriableException(e.getMessage());
        }
    }

    @Nullable
    private Boolean trySubmit(SinkRecord sinkRecord, Streamer streamer, DataStreamerItem<Tuple> dataStreamerItem, boolean z) {
        try {
            return Boolean.valueOf(streamer.publisher.submit(dataStreamerItem));
        } catch (Throwable th) {
            if (z && streamer.future.isCompletedExceptionally()) {
                return trySubmit(sinkRecord, getStreamer(sinkRecord.topic()), dataStreamerItem, false);
            }
            switch (this.errorPolicy) {
                case LOG_ONLY:
                    log.error("Failed to submit record: " + th.getMessage(), th);
                    return null;
                case DEAD_LETTER_QUEUE:
                    if (this.errantRecordReporter == null) {
                        throw new ConnectException("Failed to submit record, DEAD_LETTER_QUEUE error policy is configured, but errantRecordReporter is not available: " + th.getMessage(), th);
                    }
                    this.errantRecordReporter.report(sinkRecord, th);
                    return null;
                case STOP_TASK:
                default:
                    throw new ConnectException("Failed to submit record: " + th.getMessage(), th);
            }
        }
    }

    @Nullable
    private DataStreamerItem<Tuple> convertRecordToStreamerItem(SinkRecord sinkRecord, ClientSchema clientSchema) {
        try {
            return DataStreamerItem.of(SinkRecordConverter.convert(sinkRecord, clientSchema, this.convertOptions), sinkRecord.value() == null ? DataStreamerOperationType.REMOVE : DataStreamerOperationType.PUT);
        } catch (Throwable th) {
            switch (this.errorPolicy) {
                case LOG_ONLY:
                    log.error("Failed to convert record: " + th.getMessage(), th);
                    return null;
                case DEAD_LETTER_QUEUE:
                    if (this.errantRecordReporter == null) {
                        throw new ConnectException("Failed to convert record, DEAD_LETTER_QUEUE error policy is configured, but errantRecordReporter is not available: " + th.getMessage(), th);
                    }
                    this.errantRecordReporter.report(sinkRecord, th);
                    return null;
                case STOP_TASK:
                default:
                    throw new ConnectException("Failed to convert record: " + th.getMessage(), th);
            }
        }
    }

    private Streamer getStreamer(String str) {
        String tableName = tableName(str);
        Streamer computeIfAbsent = this.tableToStreamer.computeIfAbsent(tableName, str2 -> {
            return createStreamer(tableName, str);
        });
        if (computeIfAbsent.future.isCompletedExceptionally()) {
            streamerFlushAndClose(computeIfAbsent);
            computeIfAbsent = createStreamer(tableName, str);
            this.tableToStreamer.put(tableName, computeIfAbsent);
        }
        return computeIfAbsent;
    }

    private Streamer createStreamer(String str, String str2) {
        if (!$assertionsDisabled && this.igniteClient == null) {
            throw new AssertionError();
        }
        ClientTable table = table(str);
        if (table == null) {
            throw new ConnectException("Ignite table '" + str + "' is not found for Kafka topic '" + str2 + "'");
        }
        StreamerPublisher streamerPublisher = new StreamerPublisher();
        return new Streamer(table.recordView().streamData(streamerPublisher, this.dataStreamerOptions), streamerPublisher, table);
    }

    @Nullable
    private Table table(String str) {
        Table table = this.cachedTables.get(str);
        if (table != null) {
            return table;
        }
        Table table2 = this.igniteClient.tables().table(str);
        if (table2 != null) {
            this.cachedTables.put(str, table2);
        }
        return table2;
    }

    public void stop() {
        this.cachedTables.clear();
        closeClient();
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        super.flush(map);
        Iterator<TopicPartition> it = map.keySet().iterator();
        while (it.hasNext()) {
            Streamer remove = this.tableToStreamer.remove(tableName(it.next().topic()));
            if (remove != null) {
                streamerFlushAndClose(remove);
            }
        }
    }

    private void flush() {
        Iterator<Streamer> it = this.tableToStreamer.values().iterator();
        while (it.hasNext()) {
            streamerFlushAndClose(it.next());
        }
        this.tableToStreamer.clear();
    }

    private void streamerFlushAndClose(Streamer streamer) {
        try {
            streamer.flushAndClose();
        } catch (Throwable th) {
            switch (this.errorPolicy) {
                case LOG_ONLY:
                    log.error("Failed to flush streamer: " + th.getMessage(), th);
                    return;
                case DEAD_LETTER_QUEUE:
                    if (this.errantRecordReporter == null) {
                        throw new ConnectException("Failed to flush streamer, DEAD_LETTER_QUEUE error policy is configured, but errantRecordReporter is not available: " + th.getMessage(), th);
                    }
                    Iterator<Object> it = getFailedItems(th).iterator();
                    while (it.hasNext()) {
                        try {
                            this.errantRecordReporter.report(((SinkRecordTuple) ((DataStreamerItem) it.next()).get()).record(), th).get();
                        } catch (Throwable th2) {
                            throw new ConnectException("Failed to report errant record: " + th2.getMessage(), th2);
                        }
                    }
                    return;
                case STOP_TASK:
                default:
                    throw new ConnectException("Failed to flush streamer: " + th.getMessage(), th);
            }
        }
    }

    private void closeClient() {
        Iterator<Streamer> it = this.tableToStreamer.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().flushAndClose();
            } catch (Throwable th) {
                log.error("Failed to flush and close streamer: " + th.getMessage(), th);
            }
        }
        IgniteClient igniteClient = this.igniteClient;
        this.igniteClient = null;
        if (igniteClient != null) {
            try {
                igniteClient.close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private String tableName(String str) {
        return this.topicTableNameConverter.tableName(str);
    }

    private static Set<Object> getFailedItems(Throwable th) {
        while (!(th instanceof DataStreamerException)) {
            th = th.getCause();
            if (th == null) {
                return Set.of();
            }
        }
        return ((DataStreamerException) th).failedItems();
    }

    static {
        $assertionsDisabled = !GridGainSinkTask.class.desiredAssertionStatus();
        log = Loggers.forClass(GridGainSinkTask.class);
    }
}
