/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.cdc.core.exec;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.schema.SchemaRegistry;
import org.apache.ignite3.internal.table.TableImpl;
import org.apache.ignite3.internal.table.distributed.PublicApiThreadingTable;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.table.IgniteTables;
import org.apache.ignite3.table.QualifiedName;
import org.apache.ignite3.table.Table;
import org.apache.ignite3.tx.Transaction;
import org.gridgain.internal.cdc.api.Violation;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkInitException;
import org.gridgain.internal.cdc.api.replication.CdcReplicationInstance;
import org.gridgain.internal.cdc.api.replication.CdcReplicationMode;
import org.gridgain.internal.cdc.api.replication.CdcReplicationStatus;
import org.gridgain.internal.cdc.api.sink.SinkDefinition;
import org.gridgain.internal.cdc.api.sink.SinkStatus;
import org.gridgain.internal.cdc.api.sink.TableSink;
import org.gridgain.internal.cdc.api.source.SourceDefinition;
import org.gridgain.internal.cdc.api.source.SourceStatus;
import org.gridgain.internal.cdc.api.source.SourceTableDefinition;
import org.gridgain.internal.cdc.core.exec.CdcReplicationHandle;
import org.gridgain.internal.cdc.core.exec.CdcReplicationSubscriber;
import org.gridgain.internal.cdc.core.exec.LocalSubscribersRegistry;
import org.gridgain.internal.cdc.core.sink.CdcSinkFactory;
import org.gridgain.internal.cdc.core.sink.CdcSinkValidatorFactory;

public class LocalCdcReplicationExecutor {
    private static final IgniteLogger LOG = Loggers.forClass(LocalCdcReplicationExecutor.class);
    private final Supplier<String> localNodeId;
    private final CdcSinkFactory sinkFactory;
    private final CdcSinkValidatorFactory sinkValidatorFactory;
    private final IgniteTables igniteTables;
    private final LocalSubscribersRegistry subscribersRegistry;
    private final ExecutorService executor;

    public LocalCdcReplicationExecutor(Supplier<String> localNodeName, CdcSinkFactory sinkFactory, CdcSinkValidatorFactory sinkValidatorFactory, IgniteTables igniteTables) {
        this.localNodeId = localNodeName;
        this.sinkFactory = sinkFactory;
        this.sinkValidatorFactory = sinkValidatorFactory;
        this.igniteTables = igniteTables;
        this.subscribersRegistry = new LocalSubscribersRegistry();
        this.executor = Executors.newCachedThreadPool(IgniteThreadFactory.create(localNodeName.get(), "cdc", LOG, new ThreadOperation[0]));
    }

    public CompletableFuture<CdcReplicationHandle> startReplication(Transaction tx, CdcReplicationInstance replicationInstance, SourceDefinition source, SinkDefinition sink) {
        return this.verifyReplicationCanBeStarted(replicationInstance).thenComposeAsync(v -> {
            CdcReplicationInstance updatedInstance = replicationInstance.toBuilder().status(CdcReplicationStatus.RUNNING).build();
            Map<QualifiedName, TableSink> tableSinks = this.createTableNameToTableSinkMap(source, sink);
            return ((CompletableFuture)this.initTableSinks(sink, tableSinks).thenApply(ign -> {
                Map<QualifiedName, CdcReplicationSubscriber> subscribersByTableName = this.createContinuoursQuerySibscribersMap(tableSinks);
                this.subscribersRegistry.save(replicationInstance.name(), subscribersByTableName);
                return new CdcReplicationHandle(updatedInstance, this);
            })).exceptionally(th -> new CdcReplicationHandle(updatedInstance.toBuilder().status(CdcReplicationStatus.FAILED).build(), this));
        }, (Executor)this.executor);
    }

    private Map<QualifiedName, CdcReplicationSubscriber> createContinuoursQuerySibscribersMap(Map<QualifiedName, TableSink> tableSinks) {
        return tableSinks.entrySet().stream().map(e -> {
            Table table = this.igniteTables.table((QualifiedName)e.getKey());
            CdcReplicationSubscriber subscriber = new CdcReplicationSubscriber((TableSink)e.getValue());
            table.keyValueView().queryContinuously(subscriber);
            return new AbstractMap.SimpleEntry<QualifiedName, CdcReplicationSubscriber>((QualifiedName)e.getKey(), subscriber);
        }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private CompletableFuture<Void> initTableSinks(SinkDefinition sink, Map<QualifiedName, TableSink> tableSinks) {
        return this.getSinkStatus(sink).thenApply(status -> {
            if (!status.success()) {
                throw new CdcSinkInitException(String.format("Sink `%s` validation failed: %s", sink.name(), status.violations()));
            }
            tableSinks.forEach((tableName, tableSink) -> {
                Table table = this.igniteTables.table((QualifiedName)tableName);
                if (table == null) {
                    throw new CdcSinkInitException("Ignite table does not exist: " + tableName);
                }
                if (!(table instanceof PublicApiThreadingTable)) {
                    throw new IllegalStateException();
                }
                TableImpl internalTable = ((PublicApiThreadingTable)table).unwrap(TableImpl.class);
                SchemaRegistry schema = internalTable.schemaView();
                tableSink.init((QualifiedName)tableName, schema.lastKnownSchema().columns(), sink);
            });
            return null;
        });
    }

    private Map<QualifiedName, TableSink> createTableNameToTableSinkMap(SourceDefinition source, SinkDefinition sink) {
        return Arrays.stream(source.tables()).map(tableDefinition -> {
            TableSink tableSink = this.sinkFactory.createTableSink(sink);
            return new AbstractMap.SimpleEntry<QualifiedName, TableSink>(tableDefinition.qualifiedName(), tableSink);
        }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private CompletableFuture<Void> verifyReplicationCanBeStarted(CdcReplicationInstance replicationInstance) {
        List<String> nodes = replicationInstance.executionNodes().nodes();
        if (nodes == null || nodes.isEmpty() || !nodes.contains(this.localNodeId.get())) {
            return CompletableFuture.failedFuture(new IllegalStateException(String.format("CDC replication instance must be executed on the local node (nodeId=%s), but replication defined executionNodes=%s.", this.localNodeId, nodes)));
        }
        if (replicationInstance.mode() != CdcReplicationMode.NEW_DATA) {
            return CompletableFuture.failedFuture(new IllegalStateException("CDC replication can be started only in NEW_DATA mode, but was: " + replicationInstance.mode()));
        }
        return CompletableFuture.completedFuture(null);
    }

    public CompletableFuture<Void> stopReplication(CdcReplicationInstance instance) {
        return CompletableFuture.runAsync(() -> {
            Map<QualifiedName, CdcReplicationSubscriber> subscribersByTableName = this.subscribersRegistry.removeByReplicationName(instance.name());
            if (subscribersByTableName == null) {
                return;
            }
            subscribersByTableName.forEach((tableName, subscriber) -> subscriber.stop());
        }, this.executor);
    }

    public CompletableFuture<SourceStatus> getSourceStatus(SourceDefinition sourceDefinition) {
        return CompletableFuture.completedFuture(SourceStatus.builder().definition(sourceDefinition).checks(this.tablesExistChecks(sourceDefinition)).build());
    }

    private List<Violation> tablesExistChecks(SourceDefinition sourceDefinition) {
        ArrayList<Violation> result = new ArrayList<Violation>();
        for (SourceTableDefinition table : sourceDefinition.tables()) {
            String checkName = "Table `" + table.qualifiedName().toCanonicalForm() + "` should exist";
            Table tbl = this.igniteTables.table(table.qualifiedName());
            if (tbl != null) continue;
            result.add(Violation.create(checkName, "There is no table with such a name."));
        }
        return result;
    }

    public CompletableFuture<SinkStatus> getSinkStatus(SinkDefinition sinkDefinition) {
        return CompletableFuture.completedFuture(SinkStatus.builder().definition(sinkDefinition).violation(this.sinkValidatorFactory.createValidator(sinkDefinition).validate(sinkDefinition)).build());
    }
}

