package org.apache.ignite3.internal.partition.replicator.raft.snapshot.incoming;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite3.internal.catalog.Catalog;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.lowwatermark.message.GetLowWatermarkResponse;
import org.apache.ignite3.internal.lowwatermark.message.LowWatermarkMessagesFactory;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite3.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
import org.apache.ignite3.internal.partition.replicator.network.raft.SnapshotMetaResponse;
import org.apache.ignite3.internal.partition.replicator.network.raft.SnapshotMvDataResponse;
import org.apache.ignite3.internal.partition.replicator.network.raft.SnapshotTxDataResponse;
import org.apache.ignite3.internal.partition.replicator.network.replication.BinaryRowMessage;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.RaftSnapshotPartitionMeta;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.SnapshotUri;
import org.apache.ignite3.internal.raft.RaftGroupConfiguration;
import org.apache.ignite3.internal.schema.BinaryRow;
import org.apache.ignite3.internal.storage.RowId;
import org.apache.ignite3.internal.storage.StorageRebalanceException;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.network.ClusterNode;
import org.apache.ignite3.raft.jraft.error.RaftError;
import org.apache.ignite3.raft.jraft.storage.snapshot.SnapshotCopier;
import org.apache.ignite3.raft.jraft.storage.snapshot.SnapshotReader;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite3/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.class */
public class IncomingSnapshotCopier extends SnapshotCopier {
    private static final IgniteLogger LOG;
    private static final PartitionReplicationMessagesFactory TABLE_MSG_FACTORY;
    private static final LowWatermarkMessagesFactory LWM_MSG_FACTORY;
    private static final long NETWORK_TIMEOUT = Long.MAX_VALUE;
    private static final long MAX_MV_DATA_PAYLOADS_BATCH_BYTES_HINT = 102400;
    private static final int MAX_TX_DATA_BATCH_SIZE = 1000;
    private final PartitionSnapshotStorage partitionSnapshotStorage;
    private final Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId;
    private final SnapshotUri snapshotUri;
    private final long waitForMetadataCatchupMs;
    private final AtomicBoolean cancellationGuard = new AtomicBoolean();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

    @Nullable
    private volatile CompletableFuture<PartitionSnapshotMeta> snapshotMetaFuture;

    @Nullable
    private volatile CompletableFuture<Void> rebalanceFuture;

    @Nullable
    private volatile CompletableFuture<?> joinFuture;
    static final /* synthetic */ boolean $assertionsDisabled;

    public IncomingSnapshotCopier(PartitionSnapshotStorage partitionSnapshotStorage, SnapshotUri snapshotUri, long j) {
        this.partitionSnapshotStorage = partitionSnapshotStorage;
        this.partitionsByTableId = partitionSnapshotStorage.partitionsByTableId();
        this.snapshotUri = snapshotUri;
        this.waitForMetadataCatchupMs = j;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v9, types: [void, java.util.function.Function] */
    /* JADX WARN: Type inference failed for: r2v5, types: [void, java.util.function.Function] */
    @Override // org.apache.ignite3.raft.jraft.storage.snapshot.SnapshotCopier
    public void start() {
        Executor incomingSnapshotsExecutor = this.partitionSnapshotStorage.getIncomingSnapshotsExecutor();
        LOG.info("Copier is started for the partition [{}]", createPartitionInfo());
        ClusterNode snapshotSender = getSnapshotSender(this.snapshotUri.nodeName);
        CompletableFuture failedFuture = snapshotSender == null ? CompletableFuture.failedFuture(new StorageRebalanceException("Snapshot sender not found: " + this.snapshotUri.nodeName)) : loadSnapshotMeta(snapshotSender).thenCompose((Function<? super PartitionSnapshotMeta, ? extends CompletionStage<U>>) partitionSnapshotMeta -> {
            return waitForMetadataWithTimeout(partitionSnapshotMeta).thenApply(obj -> {
                if (metadataIsSufficientlyComplete(partitionSnapshotMeta)) {
                    return partitionSnapshotMeta;
                }
                logMetadataInsufficiencyAndSetError(partitionSnapshotMeta);
                return null;
            });
        });
        this.snapshotMetaFuture = failedFuture;
        CompletableFuture thenComposeAsync = failedFuture.thenComposeAsync((Function) partitionSnapshotMeta2 -> {
            return partitionSnapshotMeta2 == null ? CompletableFutures.nullCompletedFuture() : startRebalance().thenCompose(r9 -> {
                if ($assertionsDisabled || snapshotSender != null) {
                    return loadSnapshotMvData(partitionSnapshotMeta2, snapshotSender, incomingSnapshotsExecutor).thenCompose(obj -> {
                        return loadSnapshotTxData(snapshotSender, incomingSnapshotsExecutor);
                    }).thenRunAsync(() -> {
                        setNextRowIdToBuildIndexes(partitionSnapshotMeta2);
                    }, incomingSnapshotsExecutor);
                }
                throw new AssertionError(createPartitionInfo());
            });
        }, incomingSnapshotsExecutor);
        this.rebalanceFuture = thenComposeAsync;
        this.joinFuture = failedFuture.thenCompose((Function) partitionSnapshotMeta3 -> {
            return partitionSnapshotMeta3 == null ? CompletableFutures.nullCompletedFuture() : thenComposeAsync.handleAsync((r6, th) -> {
                return completeRebalance(partitionSnapshotMeta3, th);
            }, incomingSnapshotsExecutor).thenCompose(Function.identity()).thenCompose(r7 -> {
                if ($assertionsDisabled || snapshotSender != null) {
                    return tryUpdateLowWatermark(snapshotSender, incomingSnapshotsExecutor);
                }
                throw new AssertionError(createPartitionInfo());
            });
        });
    }

    private CompletableFuture<?> waitForMetadataWithTimeout(PartitionSnapshotMeta partitionSnapshotMeta) {
        return CompletableFuture.anyOf(this.partitionSnapshotStorage.catalogService().catalogReadyFuture(partitionSnapshotMeta.requiredCatalogVersion()), completeOnMetadataReadinessTimeout());
    }

    private CompletableFuture<?> completeOnMetadataReadinessTimeout() {
        return new CompletableFuture().orTimeout(this.waitForMetadataCatchupMs, TimeUnit.MILLISECONDS).exceptionally(th -> {
            if ($assertionsDisabled || (th instanceof TimeoutException)) {
                return null;
            }
            throw new AssertionError();
        });
    }

    @Override // org.apache.ignite3.raft.jraft.storage.snapshot.SnapshotCopier
    public void join() throws InterruptedException {
        CompletableFuture<?> completableFuture = this.joinFuture;
        if (completableFuture != null) {
            try {
                completableFuture.get();
            } catch (CancellationException e) {
            } catch (ExecutionException e2) {
                Throwable cause = e2.getCause();
                if (cause instanceof CancellationException) {
                    return;
                }
                LOG.error("Error when completing the copier", cause);
                if (isOk()) {
                    setError(RaftError.UNKNOWN, "Unknown error on completion the copier", new Object[0]);
                }
                throw new IllegalStateException(cause);
            }
        }
    }

    @Override // org.apache.ignite3.raft.jraft.storage.snapshot.SnapshotCopier
    public void cancel() {
        if (this.cancellationGuard.compareAndSet(false, true)) {
            this.busyLock.block();
            LOG.info("Copier is canceled for partition [{}]", createPartitionInfo());
            List list = (List) Stream.of((Object[]) new CompletableFuture[]{this.snapshotMetaFuture, this.rebalanceFuture}).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
            list.forEach(completableFuture -> {
                completableFuture.cancel(false);
            });
            if (list.isEmpty()) {
                return;
            }
            try {
                join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }

    @Override // org.apache.ignite3.raft.jraft.storage.snapshot.SnapshotCopier
    public SnapshotReader getReader() {
        CompletableFuture<PartitionSnapshotMeta> completableFuture = this.snapshotMetaFuture;
        if ($assertionsDisabled || (completableFuture != null && completableFuture.isDone())) {
            return new IncomingSnapshotReader(completableFuture.isCompletedExceptionally() ? null : completableFuture.join());
        }
        throw new AssertionError();
    }

    @Nullable
    private ClusterNode getSnapshotSender(String str) {
        return this.partitionSnapshotStorage.topologyService().getByConsistentId(str);
    }

    private CompletableFuture<PartitionSnapshotMeta> loadSnapshotMeta(ClusterNode clusterNode) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            return this.partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(clusterNode, TABLE_MSG_FACTORY.snapshotMetaRequest().id(this.snapshotUri.snapshotId).build(), Long.MAX_VALUE).thenApply(networkMessage -> {
                PartitionSnapshotMeta meta = ((SnapshotMetaResponse) networkMessage).meta();
                LOG.info("Copier has loaded the snapshot meta for the partition [{}, meta={}]", createPartitionInfo(), meta);
                return meta;
            });
        } finally {
            this.busyLock.leaveBusy();
        }
    }

    private boolean metadataIsSufficientlyComplete(PartitionSnapshotMeta partitionSnapshotMeta) {
        return partitionSnapshotMeta.requiredCatalogVersion() <= this.partitionSnapshotStorage.catalogService().latestCatalogVersion();
    }

    private void logMetadataInsufficiencyAndSetError(PartitionSnapshotMeta partitionSnapshotMeta) {
        LOG.warn("Metadata not yet available, rejecting snapshot installation [uri={}, requiredVersion={}].", this.snapshotUri, Integer.valueOf(partitionSnapshotMeta.requiredCatalogVersion()));
        String format = String.format("Metadata not yet available, URI '%s', required level %s; rejecting snapshot installation.", this.snapshotUri, Integer.valueOf(partitionSnapshotMeta.requiredCatalogVersion()));
        if (isOk()) {
            setError(RaftError.EBUSY, format, new Object[0]);
        }
    }

    private CompletableFuture<?> loadSnapshotMvData(PartitionSnapshotMeta partitionSnapshotMeta, ClusterNode clusterNode, Executor executor) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletableFuture thenComposeAsync = this.partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(clusterNode, TABLE_MSG_FACTORY.snapshotMvDataRequest().id(this.snapshotUri.snapshotId).batchSizeHint(MAX_MV_DATA_PAYLOADS_BATCH_BYTES_HINT).build(), Long.MAX_VALUE).thenComposeAsync(networkMessage -> {
                SnapshotMvDataResponse snapshotMvDataResponse = (SnapshotMvDataResponse) networkMessage;
                for (SnapshotMvDataResponse.ResponseEntry responseEntry : snapshotMvDataResponse.rows()) {
                    for (int i = 0; i < responseEntry.rowVersions().size(); i++) {
                        if (!this.busyLock.enterBusy()) {
                            return CompletableFutures.nullCompletedFuture();
                        }
                        try {
                            writeVersion(partitionSnapshotMeta, responseEntry, i);
                            this.busyLock.leaveBusy();
                        } catch (Throwable th) {
                            this.busyLock.leaveBusy();
                            throw th;
                        }
                    }
                }
                if (snapshotMvDataResponse.finish()) {
                    LOG.info("Copier has finished loading multi-versioned data [{}, rows={}]", createPartitionInfo(), Integer.valueOf(snapshotMvDataResponse.rows().size()));
                    return CompletableFutures.nullCompletedFuture();
                }
                LOG.info("Copier has loaded a portion of multi-versioned data [{}, rows={}]", createPartitionInfo(), Integer.valueOf(snapshotMvDataResponse.rows().size()));
                return loadSnapshotMvData(partitionSnapshotMeta, clusterNode, executor);
            }, executor);
            this.busyLock.leaveBusy();
            return thenComposeAsync;
        } catch (Throwable th) {
            this.busyLock.leaveBusy();
            throw th;
        }
    }

    private CompletableFuture<Void> loadSnapshotTxData(ClusterNode clusterNode, Executor executor) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletableFuture thenComposeAsync = this.partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(clusterNode, TABLE_MSG_FACTORY.snapshotTxDataRequest().id(this.snapshotUri.snapshotId).maxTransactionsInBatch(1000).build(), Long.MAX_VALUE).thenComposeAsync(networkMessage -> {
                SnapshotTxDataResponse snapshotTxDataResponse = (SnapshotTxDataResponse) networkMessage;
                if (!$assertionsDisabled && snapshotTxDataResponse.txMeta().size() != snapshotTxDataResponse.txIds().size()) {
                    throw new AssertionError(createPartitionInfo());
                }
                for (int i = 0; i < snapshotTxDataResponse.txMeta().size(); i++) {
                    if (!this.busyLock.enterBusy()) {
                        return CompletableFutures.nullCompletedFuture();
                    }
                    try {
                        this.partitionSnapshotStorage.txState().addTxMeta(snapshotTxDataResponse.txIds().get(i), snapshotTxDataResponse.txMeta().get(i).asTxMeta());
                        this.busyLock.leaveBusy();
                    } catch (Throwable th) {
                        this.busyLock.leaveBusy();
                        throw th;
                    }
                }
                if (snapshotTxDataResponse.finish()) {
                    LOG.info("Copier has finished loading transaction meta [{}, metas={}]", createPartitionInfo(), Integer.valueOf(snapshotTxDataResponse.txMeta().size()));
                    return CompletableFutures.nullCompletedFuture();
                }
                LOG.info("Copier has loaded a portion of transaction meta [{}, metas={}]", createPartitionInfo(), Integer.valueOf(snapshotTxDataResponse.txMeta().size()));
                return loadSnapshotTxData(clusterNode, executor);
            }, executor);
            this.busyLock.leaveBusy();
            return thenComposeAsync;
        } catch (Throwable th) {
            this.busyLock.leaveBusy();
            throw th;
        }
    }

    private CompletableFuture<Void> completeRebalance(PartitionSnapshotMeta partitionSnapshotMeta, @Nullable Throwable th) {
        if (!this.busyLock.enterBusy()) {
            if (isOk()) {
                setError(RaftError.ECANCELED, "Copier is cancelled", new Object[0]);
            }
            return abortRebalance();
        }
        try {
            if (th == null) {
                RaftGroupConfiguration raftGroupConfiguration = new RaftGroupConfiguration(partitionSnapshotMeta.cfgIndex(), partitionSnapshotMeta.cfgTerm(), partitionSnapshotMeta.peersList(), partitionSnapshotMeta.learnersList(), partitionSnapshotMeta.oldPeersList(), partitionSnapshotMeta.oldLearnersList());
                LOG.info("Copier completes the rebalancing of the partition: [{}, lastAppliedIndex={}, lastAppliedTerm={}, raftGroupConfig={}]", createPartitionInfo(), Long.valueOf(partitionSnapshotMeta.lastIncludedIndex()), Long.valueOf(partitionSnapshotMeta.lastIncludedTerm()), raftGroupConfiguration);
                CompletableFuture<Void> finishRebalance = finishRebalance(RaftSnapshotPartitionMeta.fromSnapshotMeta(partitionSnapshotMeta, raftGroupConfiguration));
                this.busyLock.leaveBusy();
                return finishRebalance;
            }
            LOG.error("Partition rebalancing error [{}]", th, createPartitionInfo());
            if (isOk()) {
                setError(RaftError.UNKNOWN, th.getMessage(), new Object[0]);
            }
            CompletableFuture thenCompose = abortRebalance().thenCompose(r3 -> {
                return CompletableFuture.failedFuture(th);
            });
            this.busyLock.leaveBusy();
            return thenCompose;
        } catch (Throwable th2) {
            this.busyLock.leaveBusy();
            throw th2;
        }
    }

    private int partId() {
        return this.partitionSnapshotStorage.partitionKey().partitionId();
    }

    private String createPartitionInfo() {
        return this.partitionSnapshotStorage.partitionKey().toString();
    }

    private void writeVersion(PartitionSnapshotMeta partitionSnapshotMeta, SnapshotMvDataResponse.ResponseEntry responseEntry, int i) {
        PartitionMvStorageAccess partitionMvStorageAccess = (PartitionMvStorageAccess) this.partitionsByTableId.get(responseEntry.tableId());
        RowId rowId = new RowId(partId(), responseEntry.rowId());
        BinaryRowMessage binaryRowMessage = responseEntry.rowVersions().get(i);
        BinaryRow asBinaryRow = binaryRowMessage == null ? null : binaryRowMessage.asBinaryRow();
        int requiredCatalogVersion = partitionSnapshotMeta.requiredCatalogVersion();
        if (i != responseEntry.timestamps().length) {
            partitionMvStorageAccess.addWriteCommitted(rowId, asBinaryRow, HybridTimestamp.hybridTimestamp(responseEntry.timestamps()[i]), requiredCatalogVersion);
            return;
        }
        if (!$assertionsDisabled && responseEntry.txId() == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && responseEntry.commitTableOrZoneId() == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && responseEntry.commitPartitionId() == -1) {
            throw new AssertionError();
        }
        partitionMvStorageAccess.addWrite(rowId, asBinaryRow, responseEntry.txId(), responseEntry.commitTableOrZoneId().intValue(), responseEntry.commitPartitionId(), requiredCatalogVersion);
    }

    private void setNextRowIdToBuildIndexes(PartitionSnapshotMeta partitionSnapshotMeta) {
        if (this.busyLock.enterBusy()) {
            try {
                Map<Integer, UUID> nextRowIdToBuildByIndexId = partitionSnapshotMeta.nextRowIdToBuildByIndexId();
                if (CollectionUtils.nullOrEmpty(nextRowIdToBuildByIndexId)) {
                    return;
                }
                Catalog catalog = this.partitionSnapshotStorage.catalogService().catalog(partitionSnapshotMeta.requiredCatalogVersion());
                HashMap hashMap = new HashMap();
                nextRowIdToBuildByIndexId.forEach((num, uuid) -> {
                    ((Map) hashMap.computeIfAbsent(Integer.valueOf(catalog.index(num.intValue()).tableId()), num -> {
                        return new HashMap();
                    })).put(num, new RowId(partId(), uuid));
                });
                hashMap.forEach((num2, map) -> {
                    ((PartitionMvStorageAccess) this.partitionSnapshotStorage.partitionsByTableId().get(num2)).setNextRowIdToBuildIndex(map);
                });
                this.busyLock.leaveBusy();
            } finally {
                this.busyLock.leaveBusy();
            }
        }
    }

    private CompletableFuture<Void> tryUpdateLowWatermark(ClusterNode clusterNode, Executor executor) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletableFuture<Void> thenAcceptAsync = this.partitionSnapshotStorage.outgoingSnapshotsManager().messagingService().invoke(clusterNode, LWM_MSG_FACTORY.getLowWatermarkRequest().build(), Long.MAX_VALUE).thenAcceptAsync(networkMessage -> {
                HybridTimestamp nullableHybridTimestamp = HybridTimestamp.nullableHybridTimestamp(((GetLowWatermarkResponse) networkMessage).lowWatermark());
                if (nullableHybridTimestamp != null) {
                    this.partitionsByTableId.values().forEach(partitionMvStorageAccess -> {
                        partitionMvStorageAccess.updateLowWatermark(nullableHybridTimestamp);
                    });
                }
            }, executor);
            this.busyLock.leaveBusy();
            return thenAcceptAsync;
        } catch (Throwable th) {
            this.busyLock.leaveBusy();
            throw th;
        }
    }

    private CompletableFuture<Void> startRebalance() {
        return CompletableFuture.allOf(aggregateFutureFromPartitions((v0) -> {
            return v0.startRebalance();
        }), this.partitionSnapshotStorage.txState().startRebalance());
    }

    private CompletableFuture<Void> finishRebalance(RaftSnapshotPartitionMeta raftSnapshotPartitionMeta) {
        return CompletableFuture.allOf(aggregateFutureFromPartitions(partitionMvStorageAccess -> {
            return partitionMvStorageAccess.finishRebalance(raftSnapshotPartitionMeta);
        }), this.partitionSnapshotStorage.txState().finishRebalance(raftSnapshotPartitionMeta));
    }

    private CompletableFuture<Void> abortRebalance() {
        return CompletableFuture.allOf(aggregateFutureFromPartitions((v0) -> {
            return v0.abortRebalance();
        }), this.partitionSnapshotStorage.txState().abortRebalance());
    }

    private CompletableFuture<Void> aggregateFutureFromPartitions(Function<PartitionMvStorageAccess, CompletableFuture<Void>> function) {
        return CompletableFuture.allOf((CompletableFuture[]) this.partitionsByTableId.values().stream().map(function).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    static {
        $assertionsDisabled = !IncomingSnapshotCopier.class.desiredAssertionStatus();
        LOG = Loggers.forClass(IncomingSnapshotCopier.class);
        TABLE_MSG_FACTORY = new PartitionReplicationMessagesFactory();
        LWM_MSG_FACTORY = new LowWatermarkMessagesFactory();
    }
}
