/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.kafka.source;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.internal.client.table.ClientSchema;
import org.apache.ignite.internal.continuousquery.ContinuousQueryEventWatermark;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.ContinuousQueryOptions;
import org.apache.ignite.table.ContinuousQueryWatermark;
import org.apache.ignite.table.IgniteTables;
import org.apache.ignite.table.Table;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.gridgain.kafka.GridGainKafkaConnectVersion;
import org.gridgain.kafka.source.ContinuousQuerySubscriber;
import org.gridgain.kafka.source.GridGainSourceConnectorConfig;
import org.gridgain.kafka.source.KafkaSchema;
import org.gridgain.kafka.source.OffsetUtils;
import org.gridgain.kafka.source.SourceEvent;
import org.gridgain.kafka.source.SourceOffsetFailMode;
import org.gridgain.kafka.source.SourceOffsetMode;
import org.gridgain.kafka.source.SourceRecordConverter;
import org.gridgain.kafka.source.SourceRecordConverterConfig;
import org.gridgain.kafka.util.TopicTableNameConverter;
import org.gridgain.lang.GridgainErrorGroups;
import org.jetbrains.annotations.Nullable;

public class GridGainSourceTask
extends SourceTask {
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final BlockingQueue<SourceEvent> events = new LinkedBlockingQueue<SourceEvent>();
    private final Map<ClientSchema, KafkaSchema> schemaCache = new HashMap<ClientSchema, KafkaSchema>();
    private int pollIntervalMs = ContinuousQueryOptions.DEFAULT.pollIntervalMs();
    private int prefetchPages = 1;
    private int schemaCacheMaxSize = 1000;
    private int pageSize = ContinuousQueryOptions.DEFAULT.pageSize();
    private SourceRecordConverterConfig sourceRecordConverterConfig = SourceRecordConverterConfig.DEFAULT;
    private TopicTableNameConverter tableTopicNameConverter;
    @Nullable
    private IgniteClient igniteClient;
    private SourceOffsetFailMode sourceOffsetFailMode = SourceOffsetFailMode.FAIL;
    private final List<ContinuousQuerySubscriber> subscribers = new ArrayList<ContinuousQuerySubscriber>();

    public String version() {
        return GridGainKafkaConnectVersion.getVersion();
    }

    public void start(Map<String, String> map) {
        GridGainSourceConnectorConfig cfg = new GridGainSourceConnectorConfig(map);
        this.pollIntervalMs = cfg.pollInterval();
        this.prefetchPages = cfg.prefetchPages();
        this.schemaCacheMaxSize = cfg.schemaCacheMaxSize();
        this.sourceOffsetFailMode = cfg.sourceOffsetFailMode();
        this.pageSize = cfg.pageSize();
        this.tableTopicNameConverter = new TopicTableNameConverter(cfg::tableToTopicNameRegexPair);
        this.igniteClient = cfg.clientBuilder().build();
        IgniteTables tables = this.igniteClient.tables();
        this.sourceRecordConverterConfig = new SourceRecordConverterConfig(cfg.dateMode(), cfg.timeMode(), cfg.dateTimeMode(), cfg.timestampMode());
        String[] tableNames = cfg.tables();
        if (tableNames.length == 0) {
            throw new ConnectException("No tables specified");
        }
        for (String tableName : tableNames) {
            Table table = tables.table(tableName);
            if (table == null) {
                throw new ConnectException("Table not found: " + tableName);
            }
            boolean saveWatermark = cfg.sourceOffsetMode() == SourceOffsetMode.ALL;
            ContinuousQuerySubscriber subscriber = new ContinuousQuerySubscriber(tableName, table, this.events, saveWatermark);
            this.subscribers.add(subscriber);
            ContinuousQueryOptions.Builder optionsBuilder = this.getOptionsBuilder();
            if (cfg.sourceOffsetMode() != SourceOffsetMode.NONE) {
                ContinuousQueryEventWatermark cqWatermark = OffsetUtils.readWatermark(this.context.offsetStorageReader(), tableName);
                optionsBuilder.watermark((ContinuousQueryWatermark)cqWatermark);
            }
            table.recordView().queryContinuously((Flow.Subscriber)subscriber, optionsBuilder.build());
        }
        this.running.set(true);
    }

    @Nullable
    public List<SourceRecord> poll() throws InterruptedException {
        ArrayList<SourceRecord> result = null;
        Instant pollStart = Instant.now();
        do {
            if (!this.running.get()) {
                return null;
            }
            SourceEvent event = this.events.poll(this.pollIntervalMs, TimeUnit.MILLISECONDS);
            if (event == null) break;
            result = result == null ? new ArrayList<SourceRecord>() : result;
            result.add(SourceRecordConverter.toSourceRecord(event, this.tableTopicNameConverter, this.schemaCache, this.schemaCacheMaxSize, this.sourceRecordConverterConfig));
        } while (result.size() < this.pageSize && Duration.between(pollStart, Instant.now()).toMillis() < (long)this.pollIntervalMs);
        for (ContinuousQuerySubscriber subscriber : this.subscribers) {
            Throwable error = subscriber.error();
            if (error != null) {
                Throwable cause = ExceptionUtils.unwrapCause((Throwable)error);
                if (cause instanceof IgniteException && GridgainErrorGroups.ContinuousQuery.WATERMARK_TOO_OLD_ERR == ((IgniteException)cause).code() && SourceOffsetFailMode.IGNORE == this.sourceOffsetFailMode) {
                    this.restartQueryWithoutWatermark(subscriber);
                    continue;
                }
                throw new ConnectException("Continuous query error", error);
            }
            subscriber.ensureRequested(this.prefetchPages);
        }
        return result;
    }

    private void restartQueryWithoutWatermark(ContinuousQuerySubscriber subscriber) {
        subscriber.reset();
        subscriber.table().recordView().queryContinuously((Flow.Subscriber)subscriber, this.getOptionsBuilder().build());
    }

    public void stop() {
        this.running.set(false);
        for (ContinuousQuerySubscriber subscriber : this.subscribers) {
            subscriber.cancel();
        }
        this.closeClient();
    }

    private void closeClient() {
        IgniteClient client = this.igniteClient;
        this.igniteClient = null;
        if (client != null) {
            try {
                client.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private ContinuousQueryOptions.Builder getOptionsBuilder() {
        return ContinuousQueryOptions.builder().pageSize(this.pageSize).pollIntervalMs(this.pollIntervalMs).skipOldEntries(true);
    }
}

