/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.IgniteThrottledLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.message.GetLowWatermarkResponse;
import org.apache.ignite.internal.lowwatermark.message.LowWatermarkMessagesFactory;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse;
import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse;
import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataResponse;
import org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowVersionMessage;
import org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfo;
import org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfoSerializer;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccess;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotReader;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotStats;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.ReplicationLogStorageKey;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupConfigurationSerializer;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteBusyLock;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.apache.ignite.internal.versioned.VersionedSerializer;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.jetbrains.annotations.Nullable;

public class IncomingSnapshotCopier
extends SnapshotCopier {
    private static final IgniteLogger LOG = Loggers.forClass(IncomingSnapshotCopier.class);
    private static final PartitionReplicationMessagesFactory TABLE_MSG_FACTORY = new PartitionReplicationMessagesFactory();
    private static final LowWatermarkMessagesFactory LWM_MSG_FACTORY = new LowWatermarkMessagesFactory();
    private static final long NETWORK_TIMEOUT_MILLIS = 120000L;
    private static final long MAX_MV_DATA_PAYLOADS_BATCH_BYTES_HINT = 102400L;
    private static final int MAX_TX_DATA_BATCH_SIZE = 1000;
    private final PartitionSnapshotStorage partitionSnapshotStorage;
    private final SnapshotUri snapshotUri;
    private final long waitForMetadataCatchupMs;
    private final AtomicBoolean cancellationGuard = new AtomicBoolean();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final Executor executor;
    private final IgniteThrottledLogger throttledLogger;
    @Nullable
    private volatile CompletableFuture<SnapshotContext> snapshotMetaFuture;
    @Nullable
    private volatile CompletableFuture<Void> rebalanceFuture;
    @Nullable
    private volatile CompletableFuture<Void> joinFuture;
    private final IncomingSnapshotStats snapshotStats = new IncomingSnapshotStats();
    private final RaftSnapshotsMetricsSource snapshotsMetricsSource;

    public IncomingSnapshotCopier(PartitionSnapshotStorage partitionSnapshotStorage, SnapshotUri snapshotUri, Executor executor, long waitForMetadataCatchupMs, RaftSnapshotsMetricsSource snapshotsMetricsSource) {
        this.partitionSnapshotStorage = partitionSnapshotStorage;
        this.snapshotUri = snapshotUri;
        this.executor = executor;
        this.throttledLogger = Loggers.toThrottledLogger((IgniteLogger)LOG, (Executor)executor);
        this.waitForMetadataCatchupMs = waitForMetadataCatchupMs;
        this.snapshotsMetricsSource = snapshotsMetricsSource;
    }

    public void start() {
        InternalClusterNode snapshotSender;
        this.snapshotStats.onSnapshotInstallationStart();
        this.snapshotsMetricsSource.onSnapshotInstallationStart();
        if (LOG.isInfoEnabled()) {
            LOG.info("Rebalance is started [snapshotId={}, {}]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo()});
        }
        CompletableFuture metadataSufficiencyFuture = (snapshotSender = this.getSnapshotSender(this.snapshotUri.nodeName)) == null ? CompletableFuture.failedFuture((Throwable)new StorageRebalanceException("Snapshot sender not found: " + this.snapshotUri.nodeName)) : this.loadSnapshotMeta(snapshotSender).thenCompose(snapshotMeta -> this.waitForMetadataWithTimeout((PartitionSnapshotMeta)snapshotMeta).thenApply(unused -> {
            boolean metadataIsSufficientlyComplete = this.metadataIsSufficientlyComplete((PartitionSnapshotMeta)snapshotMeta);
            if (!metadataIsSufficientlyComplete) {
                this.logMetadataInsufficiencyAndSetError((PartitionSnapshotMeta)snapshotMeta);
                return null;
            }
            return new SnapshotContext((PartitionSnapshotMeta)snapshotMeta, this.partitionSnapshotStorage.partitionsByTableId());
        }));
        this.snapshotMetaFuture = metadataSufficiencyFuture;
        CompletionStage rebalanceFuture = metadataSufficiencyFuture.thenComposeAsync(snapshotContext -> {
            if (snapshotContext == null) {
                return CompletableFutures.nullCompletedFuture();
            }
            assert (snapshotSender != null) : this.createPartitionInfo();
            return ((CompletableFuture)((CompletableFuture)this.startRebalance((SnapshotContext)snapshotContext).thenCompose(v -> this.loadSnapshotMvData((SnapshotContext)snapshotContext, snapshotSender))).thenCompose(v -> this.loadSnapshotTxData(snapshotSender))).thenRunAsync(() -> this.setNextRowIdToBuildIndexes((SnapshotContext)snapshotContext), this.executor);
        }, this.executor);
        this.rebalanceFuture = rebalanceFuture;
        this.joinFuture = metadataSufficiencyFuture.thenCompose(arg_0 -> this.lambda$start$8(snapshotSender, (CompletableFuture)rebalanceFuture, arg_0));
    }

    private CompletableFuture<?> waitForMetadataWithTimeout(PartitionSnapshotMeta snapshotMeta) {
        this.snapshotStats.onWaitingCatalogPhaseStart();
        this.snapshotsMetricsSource.onWaitingCatalogPhaseStart();
        if (LOG.isInfoEnabled()) {
            LOG.info("Waiting for catalog version [snapshotId={}, {}, catalogVersion={}]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo(), snapshotMeta.requiredCatalogVersion()});
        }
        CompletableFuture metadataReadyFuture = this.partitionSnapshotStorage.catalogService().catalogReadyFuture(snapshotMeta.requiredCatalogVersion());
        CompletableFuture<?> readinessTimeoutFuture = this.completeOnMetadataReadinessTimeout();
        return CompletableFuture.anyOf(metadataReadyFuture, readinessTimeoutFuture).whenComplete((ignored, throwable) -> {
            this.snapshotStats.onWaitingCatalogPhaseEnd();
            this.snapshotsMetricsSource.onWaitingCatalogPhaseEnd();
            if (LOG.isInfoEnabled()) {
                LOG.info("Finished waiting for the catalog readiness [snapshotId={}, {}, waitingTime={}ms, result={}]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo(), this.snapshotStats.totalWaitingCatalogPhaseDuration(), this.metadataIsSufficientlyComplete(snapshotMeta) ? "success" : "timeout"});
            }
        });
    }

    private CompletableFuture<?> completeOnMetadataReadinessTimeout() {
        return new CompletableFuture().orTimeout(this.waitForMetadataCatchupMs, TimeUnit.MILLISECONDS).exceptionally(ex -> {
            assert (ex instanceof TimeoutException);
            return null;
        });
    }

    public void join() throws InterruptedException {
        block5: {
            CompletableFuture<Void> fut = this.joinFuture;
            if (fut != null) {
                try {
                    fut.get();
                }
                catch (CancellationException cancellationException) {
                }
                catch (ExecutionException e) {
                    if (ExceptionUtils.hasCause((Throwable)e, (Class[])new Class[]{CancellationException.class, NodeStoppingException.class, RecipientLeftException.class})) break block5;
                    this.partitionSnapshotStorage.failureProcessor().process(new FailureContext((Throwable)e, "Error when completing the copier"));
                    if (this.isOk()) {
                        this.setError(RaftError.UNKNOWN, "Unknown error on completion the copier", new Object[0]);
                    }
                    throw new IllegalStateException(e);
                }
            }
        }
    }

    public void cancel() {
        if (!this.cancellationGuard.compareAndSet(false, true)) {
            return;
        }
        this.busyLock.block();
        if (LOG.isInfoEnabled()) {
            LOG.info("Rebalance is canceled [snapshotId={}, {}]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo()});
        }
        List<CompletableFuture> futuresToCancel = Stream.of(this.snapshotMetaFuture, this.rebalanceFuture).filter(Objects::nonNull).collect(Collectors.toList());
        futuresToCancel.forEach(future -> future.cancel(false));
        if (!futuresToCancel.isEmpty()) {
            try {
                this.join();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void close() {
    }

    public SnapshotReader getReader() {
        CompletableFuture<SnapshotContext> snapshotMetaFuture = this.snapshotMetaFuture;
        assert (snapshotMetaFuture != null && snapshotMetaFuture.isDone());
        SnapshotContext context = snapshotMetaFuture.isCompletedExceptionally() ? null : snapshotMetaFuture.join();
        return new IncomingSnapshotReader(context == null ? null : context.meta);
    }

    @Nullable
    private InternalClusterNode getSnapshotSender(String nodeName) {
        return this.partitionSnapshotStorage.topologyService().getByConsistentId(nodeName);
    }

    private CompletableFuture<PartitionSnapshotMeta> loadSnapshotMeta(InternalClusterNode snapshotSender) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.snapshotStats.onLoadSnapshotPhaseStart();
        this.snapshotsMetricsSource.onLoadSnapshotMetaPhaseStart();
        if (LOG.isInfoEnabled()) {
            LOG.info("Start loading snapshot meta [snapshotId={}, {}]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo()});
        }
        try {
            CompletionStage completionStage = this.partitionSnapshotStorage.messagingService().invoke(snapshotSender, (NetworkMessage)TABLE_MSG_FACTORY.snapshotMetaRequest().id(this.snapshotUri.snapshotId).build(), 120000L).thenApply(response -> {
                PartitionSnapshotMeta snapshotMeta = ((SnapshotMetaResponse)response).meta();
                this.snapshotStats.onLoadSnapshotPhaseEnd();
                this.snapshotsMetricsSource.onLoadSnapshotMetaPhaseEnd();
                if (LOG.isInfoEnabled()) {
                    LOG.info("Snapshot meta has been loaded [snapshotId={}, {}, meta={}, loadingTime={}ms]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo(), snapshotMeta, this.snapshotStats.totalLoadSnapshotPhaseDuration()});
                }
                return snapshotMeta;
            });
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private boolean metadataIsSufficientlyComplete(PartitionSnapshotMeta snapshotMeta) {
        return this.partitionSnapshotStorage.catalogService().catalogReadyFuture(snapshotMeta.requiredCatalogVersion()).isDone();
    }

    private void logMetadataInsufficiencyAndSetError(PartitionSnapshotMeta snapshotMeta) {
        LOG.warn("Metadata not yet available, rejecting snapshot installation [uri={}, requiredVersion={}].", new Object[]{this.snapshotUri, snapshotMeta.requiredCatalogVersion()});
        String errorMessage = String.format("Metadata not yet available, URI '%s', required level %s; rejecting snapshot installation.", this.snapshotUri, snapshotMeta.requiredCatalogVersion());
        if (this.isOk()) {
            this.setError(RaftError.EBUSY, errorMessage, new Object[0]);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<?> loadSnapshotMvData(SnapshotContext snapshotContext, InternalClusterNode snapshotSender) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.snapshotStats.onLoadMvDataPhaseStart();
        this.snapshotsMetricsSource.onLoadMvDataPhaseStart();
        if (LOG.isInfoEnabled()) {
            LOG.info("Start loading multi-versioned data [snapshotId={}, {}]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo()});
        }
        try {
            CompletionStage completionStage = this.partitionSnapshotStorage.messagingService().invoke(snapshotSender, (NetworkMessage)TABLE_MSG_FACTORY.snapshotMvDataRequest().id(this.snapshotUri.snapshotId).batchSizeHint(102400L).build(), 120000L).thenComposeAsync(response -> {
                SnapshotMvDataResponse snapshotMvDataResponse = (SnapshotMvDataResponse)response;
                for (SnapshotMvDataResponse.ResponseEntry entry : snapshotMvDataResponse.rows()) {
                    for (int i = 0; i < entry.rowVersions().size(); ++i) {
                        if (!this.busyLock.enterBusy()) {
                            return CompletableFutures.nullCompletedFuture();
                        }
                        try {
                            this.writeVersion(snapshotContext, entry, i);
                            continue;
                        }
                        finally {
                            this.busyLock.leaveBusy();
                        }
                    }
                }
                this.snapshotStats.onMvBatchProcessing(snapshotMvDataResponse.rows().size());
                if (snapshotMvDataResponse.finish()) {
                    this.snapshotStats.onLoadMvDataPhaseEnd();
                    this.snapshotsMetricsSource.onLoadMvDataPhaseEnd();
                    if (LOG.isInfoEnabled()) {
                        LOG.info("Multi-versioned data has been loaded [snapshotId={}, {}, totalRows={}, totalBatches={}, mvDataLoadingTime={}ms]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo(), snapshotMvDataResponse.rows().size(), this.snapshotStats.totalMvDataRows(), this.snapshotStats.totalMvDataBatches(), this.snapshotStats.loadMvDataPhaseDuration()});
                    }
                    return CompletableFutures.nullCompletedFuture();
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("A portion of multi-versioned data has been loaded [snapshotId={}, {}, rows={}]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo(), snapshotMvDataResponse.rows().size()});
                }
                return this.loadSnapshotMvData(snapshotContext, snapshotSender);
            }, this.executor);
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private CompletableFuture<Void> loadSnapshotTxData(InternalClusterNode snapshotSender) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.snapshotStats.onLoadTxMetasPhaseStart();
        this.snapshotsMetricsSource.onLoadTxMetasPhaseStart();
        if (LOG.isInfoEnabled()) {
            LOG.info("Start loading transaction meta data [snapshotId={}, {}]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo()});
        }
        try {
            CompletionStage completionStage = this.partitionSnapshotStorage.messagingService().invoke(snapshotSender, (NetworkMessage)TABLE_MSG_FACTORY.snapshotTxDataRequest().id(this.snapshotUri.snapshotId).maxTransactionsInBatch(1000).build(), 120000L).thenComposeAsync(response -> {
                SnapshotTxDataResponse snapshotTxDataResponse = (SnapshotTxDataResponse)response;
                assert (snapshotTxDataResponse.txMeta().size() == snapshotTxDataResponse.txIds().size()) : this.createPartitionInfo();
                for (int i = 0; i < snapshotTxDataResponse.txMeta().size(); ++i) {
                    if (!this.busyLock.enterBusy()) {
                        return CompletableFutures.nullCompletedFuture();
                    }
                    try {
                        this.partitionSnapshotStorage.txState().addTxMeta(snapshotTxDataResponse.txIds().get(i), snapshotTxDataResponse.txMeta().get(i).asTxMeta());
                        continue;
                    }
                    finally {
                        this.busyLock.leaveBusy();
                    }
                }
                this.snapshotStats.onTxMetasBatchProcessing(snapshotTxDataResponse.txMeta().size());
                if (snapshotTxDataResponse.finish()) {
                    this.snapshotStats.onLoadTxMetasPhaseEnd();
                    this.snapshotsMetricsSource.onLoadTxMetasPhaseEnd();
                    if (LOG.isInfoEnabled()) {
                        LOG.info("Transaction meta has been loaded [snapshotId={}, {}, totalMetas={}, totalBatches={}, metaLoadingTime={}ms]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo(), snapshotTxDataResponse.txMeta().size(), this.snapshotStats.totalTxMetas(), this.snapshotStats.totalTxMetasBatches(), this.snapshotStats.loadTxMetasPhaseDuration()});
                    }
                    return CompletableFutures.nullCompletedFuture();
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("A portion of transaction meta has been loaded [snapshotId={}, {}, metas={}]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo(), snapshotTxDataResponse.txMeta().size()});
                }
                return this.loadSnapshotTxData(snapshotSender);
            }, this.executor);
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> completeRebalance(SnapshotContext snapshotContext, @Nullable Throwable throwable) {
        this.snapshotStats.onSnapshotInstallationEnd();
        this.snapshotsMetricsSource.onSnapshotInstallationEnd();
        if (!this.busyLock.enterBusy()) {
            if (this.isOk()) {
                this.setError(RaftError.ECANCELED, "Copier is cancelled", new Object[0]);
            }
            return this.abortRebalance(snapshotContext);
        }
        try {
            if (throwable != null) {
                String errorMessage = String.format("Partition rebalancing error [%s]", this.createPartitionInfo());
                this.partitionSnapshotStorage.failureProcessor().process(new FailureContext(throwable, errorMessage));
                if (this.isOk()) {
                    this.setError(RaftError.UNKNOWN, throwable.getMessage(), new Object[0]);
                }
                CompletionStage completionStage = ((CompletableFuture)this.abortRebalance(snapshotContext).exceptionally(e -> {
                    throwable.addSuppressed((Throwable)e);
                    return null;
                })).thenCompose(unused -> CompletableFuture.failedFuture(throwable));
                return completionStage;
            }
            if (LOG.isInfoEnabled()) {
                LOG.info("Rebalance is done [{}, meta={}, rebalanceTime={}ms]", new Object[]{this.createPartitionInfo(), snapshotContext.meta, this.snapshotStats.totalSnapshotInstallationDuration()});
            }
            MvPartitionMeta snapshotMeta = IncomingSnapshotCopier.mvPartitionMeta(snapshotContext);
            CompletableFuture<Void> completableFuture = this.finishRebalance(snapshotMeta, snapshotContext);
            return completableFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private static MvPartitionMeta mvPartitionMeta(SnapshotContext snapshotContext) {
        PartitionSnapshotMeta meta = snapshotContext.meta;
        RaftGroupConfiguration raftGroupConfig = IncomingSnapshotCopier.raftGroupConfig(meta);
        LeaseInfo leaseInfo = IncomingSnapshotCopier.leaseInfo(meta);
        byte[] raftGroupConfigBytes = VersionedSerialization.toBytes((Object)raftGroupConfig, (VersionedSerializer)RaftGroupConfigurationSerializer.INSTANCE);
        PartitionSnapshotInfo snapshotInfo = new PartitionSnapshotInfo(meta.lastIncludedIndex(), meta.lastIncludedTerm(), leaseInfo, raftGroupConfigBytes, (Collection<Integer>)snapshotContext.partitionsByTableId.keySet());
        byte[] snapshotInfoBytes = VersionedSerialization.toBytes((Object)snapshotInfo, (VersionedSerializer)PartitionSnapshotInfoSerializer.INSTANCE);
        return new MvPartitionMeta(meta.lastIncludedIndex(), meta.lastIncludedTerm(), raftGroupConfigBytes, leaseInfo, snapshotInfoBytes);
    }

    private static RaftGroupConfiguration raftGroupConfig(PartitionSnapshotMeta meta) {
        return new RaftGroupConfiguration(meta.cfgIndex(), meta.cfgTerm(), meta.sequenceToken(), meta.oldSequenceToken(), meta.peersList(), meta.learnersList(), meta.oldPeersList(), meta.oldLearnersList());
    }

    @Nullable
    private static LeaseInfo leaseInfo(PartitionSnapshotMeta meta) {
        if (meta.primaryReplicaNodeId() == null) {
            return null;
        }
        return new LeaseInfo(meta.leaseStartTime(), meta.primaryReplicaNodeId(), meta.primaryReplicaNodeName());
    }

    private int partId() {
        return this.partitionSnapshotStorage.partitionKey().partitionId();
    }

    private String createPartitionInfo() {
        PartitionKey partitionKey = this.partitionSnapshotStorage.partitionKey();
        return "zoneId=" + partitionKey.zoneId() + ", partitionId=" + partitionKey.partitionId();
    }

    private void writeVersion(SnapshotContext snapshotContext, SnapshotMvDataResponse.ResponseEntry entry, int entryIndex) {
        PartitionMvStorageAccess partition = (PartitionMvStorageAccess)snapshotContext.partitionsByTableId.get(entry.tableId());
        if (partition == null) {
            this.throttledLogger.warn("No partition storage found locally for tableId={} while installing a snapshot", new Object[]{entry.tableId()});
            return;
        }
        RowId rowId = new RowId(this.partId(), entry.rowId());
        BinaryRowMessage rowVersion = entry.rowVersions().get(entryIndex);
        BinaryRow binaryRow = rowVersion == null ? null : rowVersion.asBinaryRow();
        int snapshotCatalogVersion = snapshotContext.meta.requiredCatalogVersion();
        if (entryIndex == entry.timestamps().length) {
            assert (entry.txId() != null);
            assert (entry.commitTableOrZoneId() != null);
            assert (entry.commitPartitionId() != -1);
            partition.addWrite(rowId, binaryRow, entry.txId(), entry.commitTableOrZoneId(), entry.commitPartitionId(), snapshotCatalogVersion, IncomingSnapshotCopier.isArchivation(rowVersion));
        } else {
            partition.addWriteCommitted(rowId, binaryRow, HybridTimestamp.hybridTimestamp((long)entry.timestamps()[entryIndex]), snapshotCatalogVersion);
        }
    }

    private static boolean isArchivation(@Nullable BinaryRowMessage rowVersion) {
        BinaryRowVersionMessage rowVersionMessage = rowVersion instanceof BinaryRowVersionMessage ? (BinaryRowVersionMessage)rowVersion : null;
        return rowVersionMessage != null && rowVersionMessage.isArchived();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setNextRowIdToBuildIndexes(SnapshotContext snapshotContext) {
        if (!this.busyLock.enterBusy()) {
            return;
        }
        this.snapshotStats.onSetRowIdToBuildPhaseStart();
        this.snapshotsMetricsSource.onSetRowIdToBuildPhaseStart();
        try {
            Map<Integer, UUID> nextRowUuidToBuildByIndexId = snapshotContext.meta.nextRowIdToBuildByIndexId();
            if (LOG.isInfoEnabled()) {
                LOG.info("Setting next row ID for index building [snapshotId={}, {}, indexIdToRowId={}]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo(), nextRowUuidToBuildByIndexId});
            }
            if (CollectionUtils.nullOrEmpty(nextRowUuidToBuildByIndexId)) {
                return;
            }
            Catalog catalog = this.partitionSnapshotStorage.catalogService().catalog(snapshotContext.meta.requiredCatalogVersion());
            Int2ObjectOpenHashMap nextRowIdToBuildByIndexIdAndTableId = new Int2ObjectOpenHashMap();
            nextRowUuidToBuildByIndexId.forEach((indexId, rowUuid) -> {
                int tableId = catalog.index(indexId.intValue()).tableId();
                ((Map)nextRowIdToBuildByIndexIdAndTableId.computeIfAbsent(tableId, unused -> new HashMap())).put(indexId, new RowId(this.partId(), rowUuid));
            });
            for (Int2ObjectMap.Entry e : nextRowIdToBuildByIndexIdAndTableId.int2ObjectEntrySet()) {
                int tableId = e.getIntKey();
                PartitionMvStorageAccess partitionAccess = (PartitionMvStorageAccess)snapshotContext.partitionsByTableId.get(tableId);
                if (partitionAccess == null) {
                    this.throttledLogger.warn("No partition storage found locally for tableId={} while installing a snapshot", new Object[]{tableId});
                    continue;
                }
                partitionAccess.setNextRowIdToBuildIndex((Map)e.getValue());
            }
            this.snapshotStats.onSetRowIdToBuildPhaseEnd();
            this.snapshotsMetricsSource.onSetRowIdToBuildPhaseEnd();
            if (LOG.isInfoEnabled()) {
                LOG.info("Finished setting next row ID for index building [snapshotId={}, {}, totalTime={}ms]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo(), this.snapshotStats.totalSetRowIdToBuildPhaseDuration()});
            }
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> tryUpdateLowWatermark(SnapshotContext snapshotContext, InternalClusterNode snapshotSender) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletionStage completionStage = this.partitionSnapshotStorage.messagingService().invoke(snapshotSender, (NetworkMessage)LWM_MSG_FACTORY.getLowWatermarkRequest().build(), 120000L).thenAcceptAsync(response -> {
                GetLowWatermarkResponse getLowWatermarkResponse = (GetLowWatermarkResponse)response;
                HybridTimestamp senderLowWatermark = HybridTimestamp.nullableHybridTimestamp((long)getLowWatermarkResponse.lowWatermark());
                if (senderLowWatermark != null) {
                    snapshotContext.partitionsByTableId.values().forEach(mvPartition -> mvPartition.updateLowWatermark(senderLowWatermark));
                }
            }, this.executor);
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private CompletableFuture<Void> startRebalance(SnapshotContext snapshotContext) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.snapshotStats.onPreparingStoragePhaseStart();
        this.snapshotsMetricsSource.onPreparingStoragePhaseStart();
        if (LOG.isInfoEnabled()) {
            LOG.info("Preparing storages for snapshot installation [snapshotId={}, {}]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo()});
        }
        try {
            CompletionStage completionStage = ((CompletableFuture)CompletableFuture.allOf(IncomingSnapshotCopier.aggregateFutureFromPartitions(PartitionMvStorageAccess::startRebalance, snapshotContext), this.partitionSnapshotStorage.txState().startRebalance()).thenComposeAsync(unused -> this.startRebalanceForReplicationLogStorages(snapshotContext), this.executor)).whenComplete((ignore, throwable) -> {
                this.snapshotStats.onPreparingStoragePhaseEnd();
                this.snapshotsMetricsSource.onPreparingStoragePhaseEnd();
                if (LOG.isInfoEnabled()) {
                    LOG.info("Storages are prepared to load data [snapshotId={}, {}, preparationTime={}ms]", new Object[]{this.snapshotUri.snapshotId, this.createPartitionInfo(), this.snapshotStats.totalPreparingStoragePhaseDuration()});
                }
            });
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private CompletableFuture<Void> finishRebalance(MvPartitionMeta meta, SnapshotContext snapshotContext) {
        CompletableFuture<Void> partitionsFinishRebalanceFuture = IncomingSnapshotCopier.aggregateFutureFromPartitions(mvPartition -> mvPartition.finishRebalance(meta), snapshotContext);
        return partitionsFinishRebalanceFuture.thenComposeAsync(v -> this.partitionSnapshotStorage.txState().finishRebalance(meta), this.executor);
    }

    private CompletableFuture<Void> abortRebalance(SnapshotContext snapshotContext) {
        return CompletableFuture.allOf(IncomingSnapshotCopier.aggregateFutureFromPartitions(PartitionMvStorageAccess::abortRebalance, snapshotContext), this.partitionSnapshotStorage.txState().abortRebalance());
    }

    private static CompletableFuture<Void> aggregateFutureFromPartitions(Function<PartitionMvStorageAccess, CompletableFuture<Void>> action, SnapshotContext snapshotContext) {
        CompletableFuture[] futures = (CompletableFuture[])snapshotContext.partitionsByTableId.values().stream().map(action).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(futures);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> startRebalanceForReplicationLogStorages(SnapshotContext snapshotContext) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            Set<ReplicationLogStorageKey> keys = this.collectReplicationLogStorageKeys(snapshotContext);
            CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> IgniteUtils.inBusyLockSafe((IgniteBusyLock)this.busyLock, () -> keys.forEach(this::startRebalanceForReplicationLogStorage)), this.executor);
            return completableFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private Set<ReplicationLogStorageKey> collectReplicationLogStorageKeys(SnapshotContext snapshotContext) {
        return snapshotContext.partitionsByTableId.values().stream().map(partitionMvStorage -> ReplicationLogStorageKey.create(this.partitionSnapshotStorage, partitionMvStorage)).collect(Collectors.toSet());
    }

    private void startRebalanceForReplicationLogStorage(ReplicationLogStorageKey key) throws IgniteInternalException {
        try {
            if (LOG.isInfoEnabled()) {
                LOG.info("Start rebalance for the replication log storage [snapshotId={}, {}]", new Object[]{this.snapshotUri.snapshotId, key});
            }
            LogStorageAccess logStorage = this.partitionSnapshotStorage.logStorage();
            logStorage.destroy((ReplicationGroupId)key.replicationGroupId(), key.isVolatile());
            logStorage.createMetaStorage((ReplicationGroupId)key.replicationGroupId());
        }
        catch (NodeStoppingException e) {
            throw new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)e);
        }
    }

    private /* synthetic */ CompletionStage lambda$start$8(InternalClusterNode snapshotSender, CompletableFuture rebalanceFuture, SnapshotContext snapshotContext) {
        if (snapshotContext == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        assert (snapshotSender != null) : this.createPartitionInfo();
        return ((CompletableFuture)((CompletableFuture)rebalanceFuture.handleAsync((v, throwable) -> this.completeRebalance(snapshotContext, (Throwable)throwable), this.executor)).thenCompose(Function.identity())).thenCompose(v -> this.tryUpdateLowWatermark(snapshotContext, snapshotSender));
    }

    private static class SnapshotContext {
        final PartitionSnapshotMeta meta;
        final Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId;

        SnapshotContext(PartitionSnapshotMeta meta, Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId) {
            this.meta = meta;
            this.partitionsByTableId = partitionsByTableId;
        }
    }
}

