package org.gridgain.internal.dcr.table;

import java.time.Instant;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.ignite3.internal.future.InFlightFutures;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.lang.AsyncCursor;
import org.apache.ignite3.sql.ResultSet;
import org.apache.ignite3.sql.SqlException;
import org.apache.ignite3.sql.SqlRow;
import org.apache.ignite3.table.ContinuousQueryOptions;
import org.apache.ignite3.table.ContinuousQueryWatermark;
import org.apache.ignite3.table.DataStreamerItem;
import org.apache.ignite3.table.Tuple;
import org.apache.ignite3.tx.Transaction;
import org.gridgain.internal.dcr.event.FstProgressEvent;
import org.gridgain.internal.dcr.event.ReplicationEventHandler;
import org.gridgain.internal.dcr.event.TableEvent;
import org.gridgain.internal.dcr.exception.ReplicationNoSourceTableException;
import org.gridgain.internal.dcr.metrics.DcrMetricSource;
import org.gridgain.internal.dcr.table.schema.ManualSyncPolicy;
import org.gridgain.internal.dcr.table.schema.OneTimeSchemaSyncChecker;
import org.gridgain.internal.dcr.table.schema.SchemaSyncPolicy;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/gridgain/internal/dcr/table/TableReplication.class */
public class TableReplication {
    private static final IgniteLogger LOG = Loggers.forClass(TableReplication.class);
    private final String nodeName;
    private final TableManager tableManager;
    private final Executor fstExecutionPool;

    @Nullable
    private Instant flushPoint;
    private final ReplicationEventHandler<FstProgressEvent> progressHandler;
    private final DcrMetricSource metricSource;
    private final AtomicReference<ReplicationSubscriber> subscriber = new AtomicReference<>();
    private final AtomicLong replicatedCount = new AtomicLong(0);
    private volatile long totalCount = 0;
    private final InFlightFutures inFlightFutures = new InFlightFutures();

    public TableReplication(String str, TableManager tableManager, Executor executor, @Nullable Instant instant, ReplicationEventHandler<FstProgressEvent> replicationEventHandler, DcrMetricSource dcrMetricSource) {
        this.nodeName = str;
        this.tableManager = tableManager;
        this.fstExecutionPool = executor;
        this.flushPoint = instant;
        this.progressHandler = replicationEventHandler;
        this.metricSource = dcrMetricSource;
    }

    public void updateFlushPoint(Instant instant) {
        this.flushPoint = instant;
        ReplicationSubscriber replicationSubscriber = this.subscriber.get();
        if (replicationSubscriber != null) {
            replicationSubscriber.flushPoint(instant);
        }
    }

    public void start(ReplicationEventHandler<TableEvent> replicationEventHandler) {
        Instant ofEpochMilli = Instant.ofEpochMilli(HybridTimestamp.hybridTimestamp(this.tableManager.observableTimestamp()).getPhysical());
        Transaction startRemoteTransaction = this.tableManager.startRemoteTransaction(true);
        Transaction startRemoteTransaction2 = this.tableManager.startRemoteTransaction(true);
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
            return calculateTotalEntries(startRemoteTransaction2);
        }, this.fstExecutionPool);
        ManualSyncPolicy manualSyncPolicy = new ManualSyncPolicy(this.nodeName, this.tableManager);
        CompletableFuture<Void> fst = fst(startRemoteTransaction, manualSyncPolicy);
        this.inFlightFutures.registerFuture(supplyAsync);
        this.inFlightFutures.registerFuture(fst);
        CompletableFuture thenComposeAsync = supplyAsync.whenComplete((r5, th) -> {
            if (th != null) {
                replicationEventHandler.handle(TableEvent.failed(th));
            }
            startRemoteTransaction2.commit();
        }).thenCompose(r3 -> {
            return fst;
        }).whenComplete((r52, th2) -> {
            if (th2 != null) {
                replicationEventHandler.handle(TableEvent.failed(th2));
            } else {
                replicationEventHandler.handle(TableEvent.fstFinished());
            }
            startRemoteTransaction.commit();
        }).thenComposeAsync(r11 -> {
            CompletableFuture completableFuture = new CompletableFuture();
            ReplicationSubscriber replicationSubscriber = new ReplicationSubscriber(this.tableManager, manualSyncPolicy, this.flushPoint, completableFuture, this.metricSource);
            if (this.subscriber.getAndSet(replicationSubscriber) != null) {
                LOG.info("Replication for table " + this.tableManager.tableName() + " has been completed. New one started.", new Object[0]);
                completableFuture.complete(null);
            }
            this.tableManager.remoteTable().queryContinuously(replicationSubscriber, ContinuousQueryOptions.builder().watermark(ContinuousQueryWatermark.ofInstant(ofEpochMilli)).build());
            return completableFuture;
        }, this.fstExecutionPool);
        this.inFlightFutures.registerFuture(thenComposeAsync);
        thenComposeAsync.whenComplete((r53, th3) -> {
            if (th3 != null) {
                replicationEventHandler.handle(TableEvent.failed(th3));
            } else {
                replicationEventHandler.handle(TableEvent.finished());
            }
            closeSubscriber();
        });
    }

    @Nullable
    private Void calculateTotalEntries(Transaction transaction) {
        try {
            ResultSet<SqlRow> execute = this.tableManager.client().sql().execute(transaction, "select count(*) from " + this.tableManager.tableName(), new Object[0]);
            try {
                if (execute.hasRowSet() && execute.hasNext()) {
                    this.totalCount = ((SqlRow) execute.next()).longValue(0);
                    this.progressHandler.handle(new FstProgressEvent(this.tableManager.tableName(), this.totalCount, this.replicatedCount.get()));
                }
                if (execute != null) {
                    execute.close();
                }
                return null;
            } finally {
            }
        } catch (SqlException e) {
            throw new ReplicationNoSourceTableException(e.getMessage(), this.nodeName, this.tableManager.tableName());
        }
    }

    private CompletableFuture<Void> fst(Transaction transaction, SchemaSyncPolicy<Map.Entry<Tuple, Tuple>> schemaSyncPolicy) {
        return this.tableManager.remoteTable().queryAsync(transaction, null).thenComposeAsync((Function<? super AsyncCursor<Map.Entry<K, V>>, ? extends CompletionStage<U>>) asyncCursor -> {
            SubmissionPublisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>> submissionPublisher = new SubmissionPublisher<>();
            CompletableFuture<Void> streamData = this.tableManager.localTable().streamData(submissionPublisher, null);
            return processPage(asyncCursor, submissionPublisher, new OneTimeSchemaSyncChecker(schemaSyncPolicy)).whenComplete((asyncCursor, th) -> {
                if (th == null) {
                    submissionPublisher.close();
                } else {
                    submissionPublisher.closeExceptionally(th);
                }
            }).thenCompose(asyncCursor2 -> {
                return streamData;
            });
        }, this.fstExecutionPool);
    }

    private CompletableFuture<? extends AsyncCursor<Tuple>> processPage(AsyncCursor<Map.Entry<Tuple, Tuple>> asyncCursor, SubmissionPublisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>> submissionPublisher, SchemaSyncPolicy<Map.Entry<Tuple, Tuple>> schemaSyncPolicy) {
        for (Map.Entry<Tuple, Tuple> entry : asyncCursor.currentPage()) {
            schemaSyncPolicy.isSchemaSync(entry);
            submissionPublisher.submit(DataStreamerItem.of(entry));
            this.replicatedCount.incrementAndGet();
            this.progressHandler.handle(new FstProgressEvent(this.tableManager.tableName(), this.totalCount, this.replicatedCount.get()));
        }
        return asyncCursor.hasMorePages() ? asyncCursor.fetchNextPage().thenCompose(asyncCursor2 -> {
            return processPage(asyncCursor2, submissionPublisher, schemaSyncPolicy);
        }) : CompletableFutures.nullCompletedFuture();
    }

    public void stop() {
        this.inFlightFutures.cancelInFlightFutures();
        closeSubscriber();
    }

    private void closeSubscriber() {
        ReplicationSubscriber andSet = this.subscriber.getAndSet(null);
        if (andSet != null) {
            andSet.stop();
        }
    }
}
