/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.partition.replicator.raft.snapshot.incoming;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite3.internal.catalog.Catalog;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.IgniteThrottledLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.lowwatermark.message.GetLowWatermarkResponse;
import org.apache.ignite3.internal.lowwatermark.message.LowWatermarkMessagesFactory;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.NetworkMessage;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite3.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
import org.apache.ignite3.internal.partition.replicator.network.raft.SnapshotMetaResponse;
import org.apache.ignite3.internal.partition.replicator.network.raft.SnapshotMvDataResponse;
import org.apache.ignite3.internal.partition.replicator.network.raft.SnapshotTxDataResponse;
import org.apache.ignite3.internal.partition.replicator.network.replication.BinaryRowMessage;
import org.apache.ignite3.internal.partition.replicator.network.replication.BinaryRowVersionMessage;
import org.apache.ignite3.internal.partition.replicator.raft.PartitionSnapshotInfo;
import org.apache.ignite3.internal.partition.replicator.raft.PartitionSnapshotInfoSerializer;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.SnapshotUri;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotReader;
import org.apache.ignite3.internal.raft.RaftGroupConfiguration;
import org.apache.ignite3.internal.raft.RaftGroupConfigurationSerializer;
import org.apache.ignite3.internal.schema.BinaryRow;
import org.apache.ignite3.internal.storage.RowId;
import org.apache.ignite3.internal.storage.StorageRebalanceException;
import org.apache.ignite3.internal.storage.engine.MvPartitionMeta;
import org.apache.ignite3.internal.storage.lease.LeaseInfo;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.versioned.VersionedSerialization;
import org.apache.ignite3.raft.jraft.error.RaftError;
import org.apache.ignite3.raft.jraft.storage.snapshot.SnapshotCopier;
import org.apache.ignite3.raft.jraft.storage.snapshot.SnapshotReader;
import org.jetbrains.annotations.Nullable;

public class IncomingSnapshotCopier
extends SnapshotCopier {
    private static final IgniteLogger LOG = Loggers.forClass(IncomingSnapshotCopier.class);
    private static final PartitionReplicationMessagesFactory TABLE_MSG_FACTORY = new PartitionReplicationMessagesFactory();
    private static final LowWatermarkMessagesFactory LWM_MSG_FACTORY = new LowWatermarkMessagesFactory();
    private static final long NETWORK_TIMEOUT = Long.MAX_VALUE;
    private static final long MAX_MV_DATA_PAYLOADS_BATCH_BYTES_HINT = 102400L;
    private static final int MAX_TX_DATA_BATCH_SIZE = 1000;
    private final PartitionSnapshotStorage partitionSnapshotStorage;
    private final SnapshotUri snapshotUri;
    private final long waitForMetadataCatchupMs;
    private final AtomicBoolean cancellationGuard = new AtomicBoolean();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final Executor executor;
    private final IgniteThrottledLogger throttledLogger;
    @Nullable
    private volatile CompletableFuture<SnapshotContext> snapshotMetaFuture;
    @Nullable
    private volatile CompletableFuture<Void> rebalanceFuture;
    @Nullable
    private volatile CompletableFuture<Void> joinFuture;

    public IncomingSnapshotCopier(PartitionSnapshotStorage partitionSnapshotStorage, SnapshotUri snapshotUri, Executor executor, long waitForMetadataCatchupMs) {
        this.partitionSnapshotStorage = partitionSnapshotStorage;
        this.snapshotUri = snapshotUri;
        this.executor = executor;
        this.throttledLogger = Loggers.toThrottledLogger(LOG, executor);
        this.waitForMetadataCatchupMs = waitForMetadataCatchupMs;
    }

    @Override
    public void start() {
        LOG.info("Copier is started for the partition [{}]", this.createPartitionInfo());
        InternalClusterNode snapshotSender = this.getSnapshotSender(this.snapshotUri.nodeName);
        CompletableFuture metadataSufficiencyFuture = snapshotSender == null ? CompletableFuture.failedFuture(new StorageRebalanceException("Snapshot sender not found: " + this.snapshotUri.nodeName)) : this.loadSnapshotMeta(snapshotSender).thenCompose(snapshotMeta -> this.waitForMetadataWithTimeout((PartitionSnapshotMeta)snapshotMeta).thenApply(unused -> {
            boolean metadataIsSufficientlyComplete = this.metadataIsSufficientlyComplete((PartitionSnapshotMeta)snapshotMeta);
            if (!metadataIsSufficientlyComplete) {
                this.logMetadataInsufficiencyAndSetError((PartitionSnapshotMeta)snapshotMeta);
                return null;
            }
            return new SnapshotContext((PartitionSnapshotMeta)snapshotMeta, this.partitionSnapshotStorage.partitionsByTableId());
        }));
        this.snapshotMetaFuture = metadataSufficiencyFuture;
        CompletionStage rebalanceFuture = metadataSufficiencyFuture.thenComposeAsync(snapshotContext -> {
            if (snapshotContext == null) {
                return CompletableFutures.nullCompletedFuture();
            }
            assert (snapshotSender != null) : this.createPartitionInfo();
            return ((CompletableFuture)((CompletableFuture)this.startRebalance((SnapshotContext)snapshotContext).thenCompose(v -> this.loadSnapshotMvData((SnapshotContext)snapshotContext, snapshotSender))).thenCompose(v -> this.loadSnapshotTxData(snapshotSender))).thenRunAsync(() -> this.setNextRowIdToBuildIndexes((SnapshotContext)snapshotContext), this.executor);
        }, this.executor);
        this.rebalanceFuture = rebalanceFuture;
        this.joinFuture = metadataSufficiencyFuture.thenCompose(arg_0 -> this.lambda$start$8(snapshotSender, (CompletableFuture)rebalanceFuture, arg_0));
    }

    private CompletableFuture<?> waitForMetadataWithTimeout(PartitionSnapshotMeta snapshotMeta) {
        CompletableFuture<Void> metadataReadyFuture = this.partitionSnapshotStorage.catalogService().catalogReadyFuture(snapshotMeta.requiredCatalogVersion());
        CompletableFuture<?> readinessTimeoutFuture = this.completeOnMetadataReadinessTimeout();
        return CompletableFuture.anyOf(metadataReadyFuture, readinessTimeoutFuture);
    }

    private CompletableFuture<?> completeOnMetadataReadinessTimeout() {
        return new CompletableFuture().orTimeout(this.waitForMetadataCatchupMs, TimeUnit.MILLISECONDS).exceptionally(ex -> {
            assert (ex instanceof TimeoutException);
            return null;
        });
    }

    @Override
    public void join() throws InterruptedException {
        block5: {
            CompletableFuture<Void> fut = this.joinFuture;
            if (fut != null) {
                try {
                    fut.get();
                }
                catch (CancellationException cancellationException) {
                }
                catch (ExecutionException e) {
                    Throwable cause = e.getCause();
                    if (cause instanceof CancellationException) break block5;
                    this.partitionSnapshotStorage.failureProcessor().process(new FailureContext(e, "Error when completing the copier"));
                    if (this.isOk()) {
                        this.setError(RaftError.UNKNOWN, "Unknown error on completion the copier", new Object[0]);
                    }
                    throw new IllegalStateException(cause);
                }
            }
        }
    }

    @Override
    public void cancel() {
        if (!this.cancellationGuard.compareAndSet(false, true)) {
            return;
        }
        this.busyLock.block();
        LOG.info("Copier is canceled for partition [{}]", this.createPartitionInfo());
        List<CompletableFuture> futuresToCancel = Stream.of(this.snapshotMetaFuture, this.rebalanceFuture).filter(Objects::nonNull).collect(Collectors.toList());
        futuresToCancel.forEach(future -> future.cancel(false));
        if (!futuresToCancel.isEmpty()) {
            try {
                this.join();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public void close() {
    }

    @Override
    public SnapshotReader getReader() {
        CompletableFuture<SnapshotContext> snapshotMetaFuture = this.snapshotMetaFuture;
        assert (snapshotMetaFuture != null && snapshotMetaFuture.isDone());
        SnapshotContext context = snapshotMetaFuture.isCompletedExceptionally() ? null : snapshotMetaFuture.join();
        return new IncomingSnapshotReader(context == null ? null : context.meta);
    }

    @Nullable
    private InternalClusterNode getSnapshotSender(String nodeName) {
        return this.partitionSnapshotStorage.topologyService().getByConsistentId(nodeName);
    }

    private CompletableFuture<PartitionSnapshotMeta> loadSnapshotMeta(InternalClusterNode snapshotSender) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletionStage completionStage = this.partitionSnapshotStorage.messagingService().invoke(snapshotSender, (NetworkMessage)TABLE_MSG_FACTORY.snapshotMetaRequest().id(this.snapshotUri.snapshotId).build(), Long.MAX_VALUE).thenApply(response -> {
                PartitionSnapshotMeta snapshotMeta = ((SnapshotMetaResponse)response).meta();
                LOG.info("Copier has loaded the snapshot meta for the partition [{}, meta={}]", this.createPartitionInfo(), snapshotMeta);
                return snapshotMeta;
            });
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private boolean metadataIsSufficientlyComplete(PartitionSnapshotMeta snapshotMeta) {
        return this.partitionSnapshotStorage.catalogService().catalogReadyFuture(snapshotMeta.requiredCatalogVersion()).isDone();
    }

    private void logMetadataInsufficiencyAndSetError(PartitionSnapshotMeta snapshotMeta) {
        LOG.warn("Metadata not yet available, rejecting snapshot installation [uri={}, requiredVersion={}].", this.snapshotUri, snapshotMeta.requiredCatalogVersion());
        String errorMessage = String.format("Metadata not yet available, URI '%s', required level %s; rejecting snapshot installation.", this.snapshotUri, snapshotMeta.requiredCatalogVersion());
        if (this.isOk()) {
            this.setError(RaftError.EBUSY, errorMessage, new Object[0]);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<?> loadSnapshotMvData(SnapshotContext snapshotContext, InternalClusterNode snapshotSender) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletionStage completionStage = this.partitionSnapshotStorage.messagingService().invoke(snapshotSender, (NetworkMessage)TABLE_MSG_FACTORY.snapshotMvDataRequest().id(this.snapshotUri.snapshotId).batchSizeHint(102400L).build(), Long.MAX_VALUE).thenComposeAsync(response -> {
                SnapshotMvDataResponse snapshotMvDataResponse = (SnapshotMvDataResponse)response;
                for (SnapshotMvDataResponse.ResponseEntry entry : snapshotMvDataResponse.rows()) {
                    for (int i = 0; i < entry.rowVersions().size(); ++i) {
                        if (!this.busyLock.enterBusy()) {
                            return CompletableFutures.nullCompletedFuture();
                        }
                        try {
                            this.writeVersion(snapshotContext, entry, i);
                            continue;
                        }
                        finally {
                            this.busyLock.leaveBusy();
                        }
                    }
                }
                if (snapshotMvDataResponse.finish()) {
                    LOG.info("Copier has finished loading multi-versioned data [{}, rows={}]", this.createPartitionInfo(), snapshotMvDataResponse.rows().size());
                    return CompletableFutures.nullCompletedFuture();
                }
                LOG.info("Copier has loaded a portion of multi-versioned data [{}, rows={}]", this.createPartitionInfo(), snapshotMvDataResponse.rows().size());
                return this.loadSnapshotMvData(snapshotContext, snapshotSender);
            }, this.executor);
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private CompletableFuture<Void> loadSnapshotTxData(InternalClusterNode snapshotSender) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletionStage completionStage = this.partitionSnapshotStorage.messagingService().invoke(snapshotSender, (NetworkMessage)TABLE_MSG_FACTORY.snapshotTxDataRequest().id(this.snapshotUri.snapshotId).maxTransactionsInBatch(1000).build(), Long.MAX_VALUE).thenComposeAsync(response -> {
                SnapshotTxDataResponse snapshotTxDataResponse = (SnapshotTxDataResponse)response;
                assert (snapshotTxDataResponse.txMeta().size() == snapshotTxDataResponse.txIds().size()) : this.createPartitionInfo();
                for (int i = 0; i < snapshotTxDataResponse.txMeta().size(); ++i) {
                    if (!this.busyLock.enterBusy()) {
                        return CompletableFutures.nullCompletedFuture();
                    }
                    try {
                        this.partitionSnapshotStorage.txState().addTxMeta(snapshotTxDataResponse.txIds().get(i), snapshotTxDataResponse.txMeta().get(i).asTxMeta());
                        continue;
                    }
                    finally {
                        this.busyLock.leaveBusy();
                    }
                }
                if (snapshotTxDataResponse.finish()) {
                    LOG.info("Copier has finished loading transaction meta [{}, metas={}]", this.createPartitionInfo(), snapshotTxDataResponse.txMeta().size());
                    return CompletableFutures.nullCompletedFuture();
                }
                LOG.info("Copier has loaded a portion of transaction meta [{}, metas={}]", this.createPartitionInfo(), snapshotTxDataResponse.txMeta().size());
                return this.loadSnapshotTxData(snapshotSender);
            }, this.executor);
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> completeRebalance(SnapshotContext snapshotContext, @Nullable Throwable throwable) {
        if (!this.busyLock.enterBusy()) {
            if (this.isOk()) {
                this.setError(RaftError.ECANCELED, "Copier is cancelled", new Object[0]);
            }
            return this.abortRebalance(snapshotContext);
        }
        try {
            if (throwable != null) {
                String errorMessage = String.format("Partition rebalancing error [%s]", this.createPartitionInfo());
                this.partitionSnapshotStorage.failureProcessor().process(new FailureContext(throwable, errorMessage));
                if (this.isOk()) {
                    this.setError(RaftError.UNKNOWN, throwable.getMessage(), new Object[0]);
                }
                CompletionStage completionStage = ((CompletableFuture)this.abortRebalance(snapshotContext).exceptionally(e -> {
                    throwable.addSuppressed((Throwable)e);
                    return null;
                })).thenCompose(unused -> CompletableFuture.failedFuture(throwable));
                return completionStage;
            }
            if (LOG.isInfoEnabled()) {
                LOG.info("Copier completes the rebalancing of the partition: [{}, meta={}]", this.createPartitionInfo(), snapshotContext.meta);
            }
            MvPartitionMeta snapshotMeta = IncomingSnapshotCopier.mvPartitionMeta(snapshotContext);
            CompletableFuture<Void> completableFuture = this.finishRebalance(snapshotMeta, snapshotContext);
            return completableFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private static MvPartitionMeta mvPartitionMeta(SnapshotContext snapshotContext) {
        PartitionSnapshotMeta meta = snapshotContext.meta;
        RaftGroupConfiguration raftGroupConfig = IncomingSnapshotCopier.raftGroupConfig(meta);
        LeaseInfo leaseInfo = IncomingSnapshotCopier.leaseInfo(meta);
        byte[] raftGroupConfigBytes = VersionedSerialization.toBytes(raftGroupConfig, RaftGroupConfigurationSerializer.INSTANCE);
        PartitionSnapshotInfo snapshotInfo = new PartitionSnapshotInfo(meta.lastIncludedIndex(), meta.lastIncludedTerm(), leaseInfo, raftGroupConfigBytes, (Collection<Integer>)snapshotContext.partitionsByTableId.keySet());
        byte[] snapshotInfoBytes = VersionedSerialization.toBytes(snapshotInfo, PartitionSnapshotInfoSerializer.INSTANCE);
        return new MvPartitionMeta(meta.lastIncludedIndex(), meta.lastIncludedTerm(), raftGroupConfigBytes, leaseInfo, snapshotInfoBytes);
    }

    private static RaftGroupConfiguration raftGroupConfig(PartitionSnapshotMeta meta) {
        return new RaftGroupConfiguration(meta.cfgIndex(), meta.cfgTerm(), meta.sequenceToken(), meta.oldSequenceToken(), meta.peersList(), meta.learnersList(), meta.oldPeersList(), meta.oldLearnersList());
    }

    @Nullable
    private static LeaseInfo leaseInfo(PartitionSnapshotMeta meta) {
        if (meta.primaryReplicaNodeId() == null) {
            return null;
        }
        return new LeaseInfo(meta.leaseStartTime(), meta.primaryReplicaNodeId(), meta.primaryReplicaNodeName());
    }

    private int partId() {
        return this.partitionSnapshotStorage.partitionKey().partitionId();
    }

    private String createPartitionInfo() {
        return this.partitionSnapshotStorage.partitionKey().toString();
    }

    private void writeVersion(SnapshotContext snapshotContext, SnapshotMvDataResponse.ResponseEntry entry, int entryIndex) {
        PartitionMvStorageAccess partition = (PartitionMvStorageAccess)snapshotContext.partitionsByTableId.get(entry.tableId());
        if (partition == null) {
            this.throttledLogger.warn("No partition storage found locally for tableId={} while installing a snapshot", entry.tableId());
            return;
        }
        RowId rowId = new RowId(this.partId(), entry.rowId());
        BinaryRowMessage rowVersion = entry.rowVersions().get(entryIndex);
        BinaryRow binaryRow = rowVersion == null ? null : rowVersion.asBinaryRow();
        int snapshotCatalogVersion = snapshotContext.meta.requiredCatalogVersion();
        if (entryIndex == entry.timestamps().length) {
            assert (entry.txId() != null);
            assert (entry.commitTableOrZoneId() != null);
            assert (entry.commitPartitionId() != -1);
            partition.addWrite(rowId, binaryRow, entry.txId(), entry.commitTableOrZoneId(), entry.commitPartitionId(), snapshotCatalogVersion, IncomingSnapshotCopier.isArchivation(rowVersion));
        } else {
            partition.addWriteCommitted(rowId, binaryRow, HybridTimestamp.hybridTimestamp(entry.timestamps()[entryIndex]), snapshotCatalogVersion);
        }
    }

    private static boolean isArchivation(@Nullable BinaryRowMessage rowVersion) {
        BinaryRowVersionMessage rowVersionMessage = rowVersion instanceof BinaryRowVersionMessage ? (BinaryRowVersionMessage)rowVersion : null;
        return rowVersionMessage != null && rowVersionMessage.isArchived();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setNextRowIdToBuildIndexes(SnapshotContext snapshotContext) {
        if (!this.busyLock.enterBusy()) {
            return;
        }
        try {
            Map<Integer, UUID> nextRowUuidToBuildByIndexId = snapshotContext.meta.nextRowIdToBuildByIndexId();
            if (CollectionUtils.nullOrEmpty(nextRowUuidToBuildByIndexId)) {
                return;
            }
            Catalog catalog = this.partitionSnapshotStorage.catalogService().catalog(snapshotContext.meta.requiredCatalogVersion());
            Int2ObjectOpenHashMap nextRowIdToBuildByIndexIdAndTableId = new Int2ObjectOpenHashMap();
            nextRowUuidToBuildByIndexId.forEach((indexId, rowUuid) -> {
                int tableId = catalog.index((int)indexId).tableId();
                ((Map)nextRowIdToBuildByIndexIdAndTableId.computeIfAbsent(tableId, unused -> new HashMap())).put(indexId, new RowId(this.partId(), (UUID)rowUuid));
            });
            for (Int2ObjectMap.Entry e : nextRowIdToBuildByIndexIdAndTableId.int2ObjectEntrySet()) {
                int tableId = e.getIntKey();
                PartitionMvStorageAccess partitionAccess = (PartitionMvStorageAccess)snapshotContext.partitionsByTableId.get(tableId);
                if (partitionAccess == null) {
                    this.throttledLogger.warn("No partition storage found locally for tableId={} while installing a snapshot", tableId);
                    continue;
                }
                partitionAccess.setNextRowIdToBuildIndex((Map)e.getValue());
            }
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> tryUpdateLowWatermark(SnapshotContext snapshotContext, InternalClusterNode snapshotSender) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletionStage completionStage = this.partitionSnapshotStorage.messagingService().invoke(snapshotSender, (NetworkMessage)LWM_MSG_FACTORY.getLowWatermarkRequest().build(), Long.MAX_VALUE).thenAcceptAsync(response -> {
                GetLowWatermarkResponse getLowWatermarkResponse = (GetLowWatermarkResponse)response;
                HybridTimestamp senderLowWatermark = HybridTimestamp.nullableHybridTimestamp(getLowWatermarkResponse.lowWatermark());
                if (senderLowWatermark != null) {
                    snapshotContext.partitionsByTableId.values().forEach(mvPartition -> mvPartition.updateLowWatermark(senderLowWatermark));
                }
            }, this.executor);
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private CompletableFuture<Void> startRebalance(SnapshotContext snapshotContext) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletableFuture<Void> completableFuture = CompletableFuture.allOf(IncomingSnapshotCopier.aggregateFutureFromPartitions(PartitionMvStorageAccess::startRebalance, snapshotContext), this.partitionSnapshotStorage.txState().startRebalance());
            return completableFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private CompletableFuture<Void> finishRebalance(MvPartitionMeta meta, SnapshotContext snapshotContext) {
        CompletableFuture<Void> partitionsFinishRebalanceFuture = IncomingSnapshotCopier.aggregateFutureFromPartitions(mvPartition -> mvPartition.finishRebalance(meta), snapshotContext);
        return partitionsFinishRebalanceFuture.thenComposeAsync(v -> this.partitionSnapshotStorage.txState().finishRebalance(meta), this.executor);
    }

    private CompletableFuture<Void> abortRebalance(SnapshotContext snapshotContext) {
        return CompletableFuture.allOf(IncomingSnapshotCopier.aggregateFutureFromPartitions(PartitionMvStorageAccess::abortRebalance, snapshotContext), this.partitionSnapshotStorage.txState().abortRebalance());
    }

    private static CompletableFuture<Void> aggregateFutureFromPartitions(Function<PartitionMvStorageAccess, CompletableFuture<Void>> action, SnapshotContext snapshotContext) {
        CompletableFuture[] futures = (CompletableFuture[])snapshotContext.partitionsByTableId.values().stream().map(action).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(futures);
    }

    private /* synthetic */ CompletionStage lambda$start$8(InternalClusterNode snapshotSender, CompletableFuture rebalanceFuture, SnapshotContext snapshotContext) {
        if (snapshotContext == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        assert (snapshotSender != null) : this.createPartitionInfo();
        return ((CompletableFuture)((CompletableFuture)rebalanceFuture.handleAsync((v, throwable) -> this.completeRebalance(snapshotContext, (Throwable)throwable), this.executor)).thenCompose(Function.identity())).thenCompose(v -> this.tryUpdateLowWatermark(snapshotContext, snapshotSender));
    }

    private static class SnapshotContext {
        final PartitionSnapshotMeta meta;
        final Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId;

        SnapshotContext(PartitionSnapshotMeta meta, Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId) {
            this.meta = meta;
            this.partitionsByTableId = partitionsByTableId;
        }
    }
}

