package org.gridgain.kafka.source;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.internal.client.table.ClientSchema;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.ContinuousQueryOptions;
import org.apache.ignite.table.IgniteTables;
import org.apache.ignite.table.Table;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.gridgain.kafka.GridGainKafkaConnectVersion;
import org.gridgain.kafka.util.TopicTableNameConverter;
import org.gridgain.lang.GridgainErrorGroups;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/gridgain/kafka/source/GridGainSourceTask.class */
public class GridGainSourceTask extends SourceTask {
    private TopicTableNameConverter tableTopicNameConverter;

    @Nullable
    private IgniteClient igniteClient;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final BlockingQueue<SourceEvent> events = new LinkedBlockingQueue();
    private final Map<ClientSchema, KafkaSchema> schemaCache = new HashMap();
    private int pollIntervalMs = ContinuousQueryOptions.DEFAULT.pollIntervalMs();
    private int prefetchPages = 1;
    private int schemaCacheMaxSize = 1000;
    private int pageSize = ContinuousQueryOptions.DEFAULT.pageSize();
    private SourceRecordConverterConfig sourceRecordConverterConfig = SourceRecordConverterConfig.DEFAULT;
    private SourceOffsetFailMode sourceOffsetFailMode = SourceOffsetFailMode.FAIL;
    private final List<ContinuousQuerySubscriber> subscribers = new ArrayList();

    public String version() {
        return GridGainKafkaConnectVersion.getVersion();
    }

    public void start(Map<String, String> map) {
        GridGainSourceConnectorConfig gridGainSourceConnectorConfig = new GridGainSourceConnectorConfig(map);
        this.pollIntervalMs = gridGainSourceConnectorConfig.pollInterval();
        this.prefetchPages = gridGainSourceConnectorConfig.prefetchPages();
        this.schemaCacheMaxSize = gridGainSourceConnectorConfig.schemaCacheMaxSize();
        this.sourceOffsetFailMode = gridGainSourceConnectorConfig.sourceOffsetFailMode();
        this.pageSize = gridGainSourceConnectorConfig.pageSize();
        Objects.requireNonNull(gridGainSourceConnectorConfig);
        this.tableTopicNameConverter = new TopicTableNameConverter((v1) -> {
            return r3.tableToTopicNameRegexPair(v1);
        });
        this.igniteClient = gridGainSourceConnectorConfig.clientBuilder().build();
        IgniteTables tables = this.igniteClient.tables();
        this.sourceRecordConverterConfig = new SourceRecordConverterConfig(gridGainSourceConnectorConfig.dateMode(), gridGainSourceConnectorConfig.timeMode(), gridGainSourceConnectorConfig.dateTimeMode(), gridGainSourceConnectorConfig.timestampMode());
        String[] tables2 = gridGainSourceConnectorConfig.tables();
        if (tables2.length == 0) {
            throw new ConnectException("No tables specified");
        }
        for (String str : tables2) {
            Table table = tables.table(str);
            if (table == null) {
                throw new ConnectException("Table not found: " + str);
            }
            ContinuousQuerySubscriber continuousQuerySubscriber = new ContinuousQuerySubscriber(str, table, this.events, gridGainSourceConnectorConfig.sourceOffsetMode() == SourceOffsetMode.ALL);
            this.subscribers.add(continuousQuerySubscriber);
            ContinuousQueryOptions.Builder optionsBuilder = getOptionsBuilder();
            if (gridGainSourceConnectorConfig.sourceOffsetMode() != SourceOffsetMode.NONE) {
                optionsBuilder.watermark(OffsetUtils.readWatermark(this.context.offsetStorageReader(), str));
            }
            table.recordView().queryContinuously(continuousQuerySubscriber, optionsBuilder.build());
        }
        this.running.set(true);
    }

    @Nullable
    public List<SourceRecord> poll() throws InterruptedException {
        ArrayList arrayList = null;
        while (this.running.get()) {
            SourceEvent poll = this.events.poll(this.pollIntervalMs, TimeUnit.MILLISECONDS);
            if (poll == null) {
                for (ContinuousQuerySubscriber continuousQuerySubscriber : this.subscribers) {
                    Throwable error = continuousQuerySubscriber.error();
                    if (error != null) {
                        IgniteException unwrapCause = ExceptionUtils.unwrapCause(error);
                        if (!(unwrapCause instanceof IgniteException) || GridgainErrorGroups.ContinuousQuery.WATERMARK_TOO_OLD_ERR != unwrapCause.code() || SourceOffsetFailMode.IGNORE != this.sourceOffsetFailMode) {
                            throw new ConnectException("Continuous query error", error);
                        }
                        restartQueryWithoutWatermark(continuousQuerySubscriber);
                    } else {
                        continuousQuerySubscriber.ensureRequested(this.prefetchPages);
                    }
                }
                return arrayList;
            }
            arrayList = arrayList == null ? new ArrayList() : arrayList;
            arrayList.add(SourceRecordConverter.toSourceRecord(poll, this.tableTopicNameConverter, this.schemaCache, this.schemaCacheMaxSize, this.sourceRecordConverterConfig));
        }
        return null;
    }

    private void restartQueryWithoutWatermark(ContinuousQuerySubscriber continuousQuerySubscriber) {
        continuousQuerySubscriber.reset();
        continuousQuerySubscriber.table().recordView().queryContinuously(continuousQuerySubscriber, getOptionsBuilder().build());
    }

    public void stop() {
        this.running.set(false);
        Iterator<ContinuousQuerySubscriber> it = this.subscribers.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
        closeClient();
    }

    private void closeClient() {
        IgniteClient igniteClient = this.igniteClient;
        this.igniteClient = null;
        if (igniteClient != null) {
            try {
                igniteClient.close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private ContinuousQueryOptions.Builder getOptionsBuilder() {
        return ContinuousQueryOptions.builder().pageSize(this.pageSize).pollIntervalMs(this.pollIntervalMs);
    }
}
