package org.apache.ignite.internal.table.distributed.raft.snapshot.incoming;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.message.GetLowWatermarkResponse;
import org.apache.ignite.internal.lowwatermark.message.LowWatermarkMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse;
import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse;
import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataResponse;
import org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotUri;
import org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency;
import org.apache.ignite.internal.tx.message.TxMetaMessage;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.jraft.entity.RaftOutter;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.class */
public class IncomingSnapshotCopier extends SnapshotCopier {
    private static final IgniteLogger LOG;
    private static final PartitionReplicationMessagesFactory TABLE_MSG_FACTORY;
    private static final LowWatermarkMessagesFactory LWM_MSG_FACTORY;
    private static final long NETWORK_TIMEOUT = Long.MAX_VALUE;
    private static final long MAX_MV_DATA_PAYLOADS_BATCH_BYTES_HINT = 102400;
    private static final int MAX_TX_DATA_BATCH_SIZE = 1000;
    private final PartitionSnapshotStorage partitionSnapshotStorage;
    private final SnapshotUri snapshotUri;
    private final long waitForMetadataCatchupMs;
    private final AtomicBoolean cancellationGuard = new AtomicBoolean();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

    @Nullable
    private volatile RaftOutter.SnapshotMeta snapshotMeta;

    @Nullable
    private volatile CompletableFuture<Boolean> metadataSufficiencyFuture;

    @Nullable
    private volatile CompletableFuture<Void> rebalanceFuture;

    @Nullable
    private volatile CompletableFuture<?> joinFuture;
    static final /* synthetic */ boolean $assertionsDisabled;

    public IncomingSnapshotCopier(PartitionSnapshotStorage partitionSnapshotStorage, SnapshotUri snapshotUri, long j) {
        this.partitionSnapshotStorage = partitionSnapshotStorage;
        this.snapshotUri = snapshotUri;
        this.waitForMetadataCatchupMs = j;
    }

    public void start() {
        Executor incomingSnapshotsExecutor = this.partitionSnapshotStorage.getIncomingSnapshotsExecutor();
        LOG.info("Copier is started for the partition [{}]", new Object[]{createPartitionInfo()});
        ClusterNode snapshotSender = getSnapshotSender(this.snapshotUri.nodeName);
        this.metadataSufficiencyFuture = snapshotSender == null ? CompletableFuture.failedFuture(new StorageRebalanceException("Snapshot sender not found: " + this.snapshotUri.nodeName)) : loadSnapshotMeta(snapshotSender).thenCompose(obj -> {
            return waitForMetadataWithTimeout();
        }).thenApply((Function<? super U, ? extends U>) obj2 -> {
            boolean metadataIsSufficientlyComplete = metadataIsSufficientlyComplete();
            if (!metadataIsSufficientlyComplete) {
                logMetadataInsufficiencyAndSetError();
            }
            return Boolean.valueOf(metadataIsSufficientlyComplete);
        });
        this.rebalanceFuture = this.metadataSufficiencyFuture.thenComposeAsync(bool -> {
            return bool.booleanValue() ? this.partitionSnapshotStorage.partition().startRebalance().thenCompose(r8 -> {
                if ($assertionsDisabled || snapshotSender != null) {
                    return loadSnapshotMvData(snapshotSender, incomingSnapshotsExecutor).thenCompose(obj3 -> {
                        return loadSnapshotTxData(snapshotSender, incomingSnapshotsExecutor);
                    }).thenRunAsync(this::setNextRowIdToBuildIndexes, incomingSnapshotsExecutor);
                }
                throw new AssertionError(createPartitionInfo());
            }) : CompletableFutures.nullCompletedFuture();
        }, incomingSnapshotsExecutor);
        this.joinFuture = this.metadataSufficiencyFuture.thenCompose(bool2 -> {
            return bool2.booleanValue() ? this.rebalanceFuture.handleAsync((r4, th) -> {
                return completeRebalance(th);
            }, incomingSnapshotsExecutor).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity()).thenCompose(r7 -> {
                if ($assertionsDisabled || snapshotSender != null) {
                    return tryUpdateLowWatermark(snapshotSender, incomingSnapshotsExecutor);
                }
                throw new AssertionError(createPartitionInfo());
            }) : CompletableFutures.nullCompletedFuture();
        });
    }

    private CompletableFuture<?> waitForMetadataWithTimeout() {
        return CompletableFuture.anyOf(this.partitionSnapshotStorage.catalogService().catalogReadyFuture(this.snapshotMeta.requiredCatalogVersion()), completeOnMetadataReadinessTimeout());
    }

    private CompletableFuture<?> completeOnMetadataReadinessTimeout() {
        return new CompletableFuture().orTimeout(this.waitForMetadataCatchupMs, TimeUnit.MILLISECONDS).exceptionally(th -> {
            if ($assertionsDisabled || (th instanceof TimeoutException)) {
                return null;
            }
            throw new AssertionError();
        });
    }

    public void join() throws InterruptedException {
        CompletableFuture<?> completableFuture = this.joinFuture;
        if (completableFuture != null) {
            try {
                completableFuture.get();
            } catch (CancellationException e) {
            } catch (ExecutionException e2) {
                Throwable cause = e2.getCause();
                if (cause instanceof CancellationException) {
                    return;
                }
                LOG.error("Error when completing the copier", cause);
                if (isOk()) {
                    setError(RaftError.UNKNOWN, "Unknown error on completion the copier", new Object[0]);
                }
                throw new IllegalStateException(cause);
            }
        }
    }

    public void cancel() {
        if (this.cancellationGuard.compareAndSet(false, true)) {
            this.busyLock.block();
            LOG.info("Copier is canceled for partition [{}]", new Object[]{createPartitionInfo()});
            List list = (List) Stream.of((Object[]) new CompletableFuture[]{this.metadataSufficiencyFuture, this.rebalanceFuture}).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
            list.forEach(completableFuture -> {
                completableFuture.cancel(false);
            });
            if (list.isEmpty()) {
                return;
            }
            try {
                join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void close() {
    }

    public SnapshotReader getReader() {
        return new IncomingSnapshotReader(this.snapshotMeta);
    }

    @Nullable
    private ClusterNode getSnapshotSender(String str) {
        return this.partitionSnapshotStorage.topologyService().getByConsistentId(str);
    }

    private CompletableFuture<?> loadSnapshotMeta(ClusterNode clusterNode) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            return this.partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(clusterNode, TABLE_MSG_FACTORY.snapshotMetaRequest().id(this.snapshotUri.snapshotId).build(), NETWORK_TIMEOUT).thenAccept(networkMessage -> {
                this.snapshotMeta = ((SnapshotMetaResponse) networkMessage).meta();
                LOG.info("Copier has loaded the snapshot meta for the partition [{}, meta={}]", new Object[]{createPartitionInfo(), this.snapshotMeta});
            });
        } finally {
            this.busyLock.leaveBusy();
        }
    }

    private boolean metadataIsSufficientlyComplete() {
        RaftOutter.SnapshotMeta snapshotMeta = this.snapshotMeta;
        if ($assertionsDisabled || snapshotMeta != null) {
            return CatalogVersionSufficiency.isMetadataAvailableFor(snapshotMeta.requiredCatalogVersion(), this.partitionSnapshotStorage.catalogService());
        }
        throw new AssertionError();
    }

    private void logMetadataInsufficiencyAndSetError() {
        LOG.warn("Metadata not yet available, rejecting snapshot installation [uri={}, requiredVersion={}].", new Object[]{this.snapshotUri, Integer.valueOf(this.snapshotMeta.requiredCatalogVersion())});
        String format = String.format("Metadata not yet available, URI '%s', required level %s; rejecting snapshot installation.", this.snapshotUri, Integer.valueOf(this.snapshotMeta.requiredCatalogVersion()));
        if (isOk()) {
            setError(RaftError.EBUSY, format, new Object[0]);
        }
    }

    private CompletableFuture<?> loadSnapshotMvData(ClusterNode clusterNode, Executor executor) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletableFuture<?> thenComposeAsync = this.partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(clusterNode, TABLE_MSG_FACTORY.snapshotMvDataRequest().id(this.snapshotUri.snapshotId).batchSizeHint(MAX_MV_DATA_PAYLOADS_BATCH_BYTES_HINT).build(), NETWORK_TIMEOUT).thenComposeAsync(networkMessage -> {
                SnapshotMvDataResponse snapshotMvDataResponse = (SnapshotMvDataResponse) networkMessage;
                for (SnapshotMvDataResponse.ResponseEntry responseEntry : snapshotMvDataResponse.rows()) {
                    for (int i = 0; i < responseEntry.rowVersions().size(); i++) {
                        if (!this.busyLock.enterBusy()) {
                            return CompletableFutures.nullCompletedFuture();
                        }
                        try {
                            writeVersion(responseEntry, i);
                            this.busyLock.leaveBusy();
                        } catch (Throwable th) {
                            this.busyLock.leaveBusy();
                            throw th;
                        }
                    }
                }
                if (snapshotMvDataResponse.finish()) {
                    LOG.info("Copier has finished loading multi-versioned data [{}, rows={}]", new Object[]{createPartitionInfo(), Integer.valueOf(snapshotMvDataResponse.rows().size())});
                    return CompletableFutures.nullCompletedFuture();
                }
                LOG.info("Copier has loaded a portion of multi-versioned data [{}, rows={}]", new Object[]{createPartitionInfo(), Integer.valueOf(snapshotMvDataResponse.rows().size())});
                return loadSnapshotMvData(clusterNode, executor);
            }, executor);
            this.busyLock.leaveBusy();
            return thenComposeAsync;
        } catch (Throwable th) {
            this.busyLock.leaveBusy();
            throw th;
        }
    }

    private CompletableFuture<Void> loadSnapshotTxData(ClusterNode clusterNode, Executor executor) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletableFuture<Void> thenComposeAsync = this.partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(clusterNode, TABLE_MSG_FACTORY.snapshotTxDataRequest().id(this.snapshotUri.snapshotId).maxTransactionsInBatch(MAX_TX_DATA_BATCH_SIZE).build(), NETWORK_TIMEOUT).thenComposeAsync(networkMessage -> {
                SnapshotTxDataResponse snapshotTxDataResponse = (SnapshotTxDataResponse) networkMessage;
                if (!$assertionsDisabled && snapshotTxDataResponse.txMeta().size() != snapshotTxDataResponse.txIds().size()) {
                    throw new AssertionError(createPartitionInfo());
                }
                for (int i = 0; i < snapshotTxDataResponse.txMeta().size(); i++) {
                    if (!this.busyLock.enterBusy()) {
                        return CompletableFutures.nullCompletedFuture();
                    }
                    try {
                        this.partitionSnapshotStorage.partition().addTxMeta((UUID) snapshotTxDataResponse.txIds().get(i), ((TxMetaMessage) snapshotTxDataResponse.txMeta().get(i)).asTxMeta());
                        this.busyLock.leaveBusy();
                    } catch (Throwable th) {
                        this.busyLock.leaveBusy();
                        throw th;
                    }
                }
                if (snapshotTxDataResponse.finish()) {
                    LOG.info("Copier has finished loading transaction meta [{}, metas={}]", new Object[]{createPartitionInfo(), Integer.valueOf(snapshotTxDataResponse.txMeta().size())});
                    return CompletableFutures.nullCompletedFuture();
                }
                LOG.info("Copier has loaded a portion of transaction meta [{}, metas={}]", new Object[]{createPartitionInfo(), Integer.valueOf(snapshotTxDataResponse.txMeta().size())});
                return loadSnapshotTxData(clusterNode, executor);
            }, executor);
            this.busyLock.leaveBusy();
            return thenComposeAsync;
        } catch (Throwable th) {
            this.busyLock.leaveBusy();
            throw th;
        }
    }

    private CompletableFuture<Void> completeRebalance(@Nullable Throwable th) {
        if (!this.busyLock.enterBusy()) {
            if (isOk()) {
                setError(RaftError.ECANCELED, "Copier is cancelled", new Object[0]);
            }
            return this.partitionSnapshotStorage.partition().abortRebalance();
        }
        try {
            if (th != null) {
                LOG.error("Partition rebalancing error [{}]", th, new Object[]{createPartitionInfo()});
                if (isOk()) {
                    setError(RaftError.UNKNOWN, th.getMessage(), new Object[0]);
                }
                CompletableFuture thenCompose = this.partitionSnapshotStorage.partition().abortRebalance().thenCompose(r3 -> {
                    return CompletableFuture.failedFuture(th);
                });
                this.busyLock.leaveBusy();
                return thenCompose;
            }
            RaftOutter.SnapshotMeta snapshotMeta = this.snapshotMeta;
            RaftGroupConfiguration raftGroupConfiguration = new RaftGroupConfiguration(snapshotMeta.peersList(), snapshotMeta.learnersList(), snapshotMeta.oldPeersList(), snapshotMeta.oldLearnersList());
            LOG.info("Copier completes the rebalancing of the partition: [{}, lastAppliedIndex={}, lastAppliedTerm={}, raftGroupConfig={}]", new Object[]{createPartitionInfo(), Long.valueOf(snapshotMeta.lastIncludedIndex()), Long.valueOf(snapshotMeta.lastIncludedTerm()), raftGroupConfiguration});
            CompletableFuture<Void> finishRebalance = this.partitionSnapshotStorage.partition().finishRebalance(snapshotMeta.lastIncludedIndex(), snapshotMeta.lastIncludedTerm(), raftGroupConfiguration);
            this.busyLock.leaveBusy();
            return finishRebalance;
        } catch (Throwable th2) {
            this.busyLock.leaveBusy();
            throw th2;
        }
    }

    private int partId() {
        return this.partitionSnapshotStorage.partition().partitionKey().partitionId();
    }

    private String createPartitionInfo() {
        return "tableId=" + this.partitionSnapshotStorage.partition().partitionKey().tableId() + ", partitionId=" + partId();
    }

    private void writeVersion(SnapshotMvDataResponse.ResponseEntry responseEntry, int i) {
        RowId rowId = new RowId(partId(), responseEntry.rowId());
        BinaryRowMessage binaryRowMessage = (BinaryRowMessage) responseEntry.rowVersions().get(i);
        BinaryRow asBinaryRow = binaryRowMessage == null ? null : binaryRowMessage.asBinaryRow();
        PartitionAccess partition = this.partitionSnapshotStorage.partition();
        int requiredCatalogVersion = this.snapshotMeta.requiredCatalogVersion();
        if (i != responseEntry.timestamps().length) {
            partition.addWriteCommitted(rowId, asBinaryRow, HybridTimestamp.hybridTimestamp(responseEntry.timestamps()[i]), requiredCatalogVersion);
            return;
        }
        if (!$assertionsDisabled && responseEntry.txId() == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && responseEntry.commitTableId() == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && responseEntry.commitPartitionId() == -1) {
            throw new AssertionError();
        }
        partition.addWrite(rowId, asBinaryRow, responseEntry.txId(), responseEntry.commitTableId().intValue(), responseEntry.commitPartitionId(), requiredCatalogVersion);
    }

    private void setNextRowIdToBuildIndexes() {
        if (this.busyLock.enterBusy()) {
            try {
                Map nextRowIdToBuildByIndexId = this.snapshotMeta.nextRowIdToBuildByIndexId();
                if (!CollectionUtils.nullOrEmpty(nextRowIdToBuildByIndexId)) {
                    this.partitionSnapshotStorage.partition().setNextRowIdToBuildIndex((Map) nextRowIdToBuildByIndexId.entrySet().stream().collect(Collectors.toMap((v0) -> {
                        return v0.getKey();
                    }, entry -> {
                        return new RowId(partId(), (UUID) entry.getValue());
                    })));
                }
            } finally {
                this.busyLock.leaveBusy();
            }
        }
    }

    private CompletableFuture<Void> tryUpdateLowWatermark(ClusterNode clusterNode, Executor executor) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletableFuture<Void> thenAcceptAsync = this.partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(clusterNode, LWM_MSG_FACTORY.getLowWatermarkRequest().build(), NETWORK_TIMEOUT).thenAcceptAsync(networkMessage -> {
                HybridTimestamp nullableHybridTimestamp = HybridTimestamp.nullableHybridTimestamp(((GetLowWatermarkResponse) networkMessage).lowWatermark());
                if (nullableHybridTimestamp != null) {
                    this.partitionSnapshotStorage.partition().updateLowWatermark(nullableHybridTimestamp);
                }
            }, executor);
            this.busyLock.leaveBusy();
            return thenAcceptAsync;
        } catch (Throwable th) {
            this.busyLock.leaveBusy();
            throw th;
        }
    }

    static {
        $assertionsDisabled = !IncomingSnapshotCopier.class.desiredAssertionStatus();
        LOG = Loggers.forClass(IncomingSnapshotCopier.class);
        TABLE_MSG_FACTORY = new PartitionReplicationMessagesFactory();
        LWM_MSG_FACTORY = new LowWatermarkMessagesFactory();
    }
}
