/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.dcr.table;

import java.time.Instant;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.future.InFlightFutures;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.lang.AsyncCursor;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.ContinuousQueryOptions;
import org.apache.ignite.table.ContinuousQueryWatermark;
import org.apache.ignite.table.DataStreamerItem;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.gridgain.internal.dcr.event.FstProgressEvent;
import org.gridgain.internal.dcr.event.ReplicationEventHandler;
import org.gridgain.internal.dcr.event.TableEvent;
import org.gridgain.internal.dcr.exception.ReplicationNoSourceTableException;
import org.gridgain.internal.dcr.metrics.DcrMetricSource;
import org.gridgain.internal.dcr.table.ReplicationSubscriber;
import org.gridgain.internal.dcr.table.TableManager;
import org.gridgain.internal.dcr.table.schema.ManualSyncPolicy;
import org.gridgain.internal.dcr.table.schema.OneTimeSchemaSyncChecker;
import org.gridgain.internal.dcr.table.schema.SchemaSyncPolicy;
import org.jetbrains.annotations.Nullable;

public class TableReplication {
    private static final IgniteLogger LOG = Loggers.forClass(TableReplication.class);
    private final String nodeName;
    private final TableManager tableManager;
    private final InFlightFutures inFlightFutures;
    private final Executor fstExecutionPool;
    private final AtomicReference<ReplicationSubscriber> subscriber = new AtomicReference();
    @Nullable
    private Instant flushPoint;
    private final ReplicationEventHandler<FstProgressEvent> progressHandler;
    private final DcrMetricSource metricSource;
    private final ClockService clockService;
    private final AtomicLong replicatedCount = new AtomicLong(0L);
    private volatile long totalCount = 0L;

    public TableReplication(String nodeName, TableManager tableManager, Executor fstExecutionPool, @Nullable Instant flushPoint, ReplicationEventHandler<FstProgressEvent> progressHandler, DcrMetricSource metricSource, ClockService clockService) {
        this.nodeName = nodeName;
        this.tableManager = tableManager;
        this.fstExecutionPool = fstExecutionPool;
        this.flushPoint = flushPoint;
        this.progressHandler = progressHandler;
        this.metricSource = metricSource;
        this.clockService = clockService;
        this.inFlightFutures = new InFlightFutures();
    }

    public void updateFlushPoint(Instant flushPoint) {
        this.flushPoint = flushPoint;
        ReplicationSubscriber replicationSubscriber = this.subscriber.get();
        if (replicationSubscriber != null) {
            replicationSubscriber.flushPoint(flushPoint);
        }
    }

    public void start(ReplicationEventHandler<TableEvent> handler) {
        long observableTimestamp = this.tableManager.observableTimestamp();
        HybridTimestamp hybridObservableTimestamp = HybridTimestamp.hybridTimestamp((long)observableTimestamp);
        Instant fstStartTime = Instant.ofEpochMilli(hybridObservableTimestamp.getPhysical());
        Transaction fstTransaction = this.tableManager.startRemoteTransaction(new TransactionOptions().timeoutMillis(Long.MAX_VALUE).readOnly(true));
        Transaction totalEntryTransaction = this.tableManager.startRemoteTransaction(new TransactionOptions().readOnly(true));
        CompletableFuture<Void> calculateTotalCount = CompletableFuture.supplyAsync(() -> this.calculateTotalEntries(totalEntryTransaction), this.fstExecutionPool);
        ManualSyncPolicy schemaSyncPolicy = new ManualSyncPolicy(this.nodeName, this.tableManager);
        CompletableFuture<Void> fstFuture = this.fst(fstTransaction, schemaSyncPolicy);
        this.inFlightFutures.registerFuture(calculateTotalCount);
        this.inFlightFutures.registerFuture(fstFuture);
        CompletionStage cqFuture = ((CompletableFuture)((CompletableFuture)((CompletableFuture)calculateTotalCount.whenComplete((unused, throwable) -> {
            if (throwable != null) {
                handler.handle(TableEvent.failed(throwable));
            }
            totalEntryTransaction.commit();
        })).thenCompose(unused -> fstFuture)).whenComplete((unused, throwable) -> {
            if (throwable != null) {
                handler.handle(TableEvent.failed(throwable));
            } else {
                handler.handle(TableEvent.fstFinished());
            }
            fstTransaction.commit();
        })).thenComposeAsync(unused -> {
            CompletableFuture<Void> finishFuture = new CompletableFuture<Void>();
            ReplicationSubscriber subscriber = new ReplicationSubscriber(this.tableManager, schemaSyncPolicy, this.flushPoint, finishFuture, this.metricSource, this.clockService);
            ReplicationSubscriber previousSubscribe = this.subscriber.getAndSet(subscriber);
            if (previousSubscribe != null) {
                LOG.info("Replication for table " + this.tableManager.tableName() + " has been completed. New one started.", new Object[0]);
                finishFuture.complete(null);
            }
            this.tableManager.remoteTable().queryContinuously((Flow.Subscriber)subscriber, ContinuousQueryOptions.builder().watermark(ContinuousQueryWatermark.ofInstant((Instant)fstStartTime)).enableEmptyBatches(true).skipOldEntries(true).build());
            return finishFuture;
        }, this.fstExecutionPool);
        this.inFlightFutures.registerFuture((CompletableFuture)cqFuture);
        ((CompletableFuture)cqFuture).whenComplete((unused, throwable) -> {
            if (throwable != null) {
                handler.handle(TableEvent.failed(throwable));
            } else {
                handler.handle(TableEvent.finished());
            }
            this.closeSubscriber();
        });
    }

    @Nullable
    private Void calculateTotalEntries(Transaction totalEntryTransaction) {
        try (ResultSet result = this.tableManager.client().sql().execute(totalEntryTransaction, "select count(*) from " + this.tableManager.tableName(), new Object[0]);){
            if (result.hasRowSet() && result.hasNext()) {
                this.totalCount = ((SqlRow)result.next()).longValue(0);
                this.progressHandler.handle(new FstProgressEvent(this.tableManager.tableName(), this.totalCount, this.replicatedCount.get()));
            }
        }
        catch (SqlException e) {
            throw new ReplicationNoSourceTableException(e.getMessage(), this.nodeName, this.tableManager.tableName());
        }
        return null;
    }

    private CompletableFuture<Void> fst(Transaction transaction, SchemaSyncPolicy<Map.Entry<Tuple, Tuple>> schemaSyncPolicy) {
        return this.tableManager.remoteTable().queryAsync(transaction, null).thenComposeAsync(cursor -> {
            SubmissionPublisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>> publisher = new SubmissionPublisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>>();
            CompletableFuture future = this.tableManager.localTable().streamData(publisher, null);
            return ((CompletableFuture)this.processPage((AsyncCursor<Map.Entry<Tuple, Tuple>>)cursor, publisher, new OneTimeSchemaSyncChecker(schemaSyncPolicy)).whenComplete((unused, t) -> {
                if (t == null) {
                    publisher.close();
                } else {
                    publisher.closeExceptionally((Throwable)t);
                }
            })).thenCompose(unused -> future);
        }, this.fstExecutionPool);
    }

    private CompletableFuture<? extends AsyncCursor<Tuple>> processPage(AsyncCursor<Map.Entry<Tuple, Tuple>> cursor, SubmissionPublisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>> publisher, SchemaSyncPolicy<Map.Entry<Tuple, Tuple>> schemaSyncPolicy) {
        Iterable currentPage = cursor.currentPage();
        for (Map.Entry tupleEntry : currentPage) {
            schemaSyncPolicy.isSchemaSync(tupleEntry);
            publisher.submit((DataStreamerItem<Map.Entry<Tuple, Tuple>>)DataStreamerItem.of((Object)tupleEntry));
            this.replicatedCount.incrementAndGet();
            this.progressHandler.handle(new FstProgressEvent(this.tableManager.tableName(), this.totalCount, this.replicatedCount.get()));
        }
        if (cursor.hasMorePages()) {
            return cursor.fetchNextPage().thenCompose(nextCursor -> this.processPage((AsyncCursor<Map.Entry<Tuple, Tuple>>)nextCursor, publisher, schemaSyncPolicy));
        }
        return CompletableFutures.nullCompletedFuture();
    }

    public void stop() {
        this.inFlightFutures.cancelInFlightFutures();
        this.closeSubscriber();
    }

    private void closeSubscriber() {
        ReplicationSubscriber sub = this.subscriber.getAndSet(null);
        if (sub != null) {
            sub.stop();
        }
    }
}

