/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.tx.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.IgniteThrottledLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.PartitionEnlistment;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
import org.apache.ignite.internal.tx.impl.PlacementDriverHelper;
import org.apache.ignite.internal.tx.impl.TxCleanupExceptionUtils;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfoMessage;
import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.util.CompletableFutures;
import org.jetbrains.annotations.Nullable;

public class TxCleanupRequestSender {
    private static final IgniteLogger LOG = Loggers.forClass(TxCleanupRequestSender.class);
    private static final int ATTEMPTS_LOG_THRESHOLD = 100;
    private final IgniteThrottledLogger throttledLog;
    private final PlacementDriverHelper placementDriverHelper;
    private final TxMessageSender txMessageSender;
    private final ConcurrentMap<UUID, CleanupContext> writeIntentsReplicated = new ConcurrentHashMap<UUID, CleanupContext>();
    private final VolatileTxStateMetaStorage txStateVolatileStorage;
    private final ExecutorService cleanupExecutor;

    public TxCleanupRequestSender(TxMessageSender txMessageSender, PlacementDriverHelper placementDriverHelper, VolatileTxStateMetaStorage txStateVolatileStorage, ExecutorService cleanupExecutor, Executor throttledLogExecutor) {
        this.txMessageSender = txMessageSender;
        this.placementDriverHelper = placementDriverHelper;
        this.txStateVolatileStorage = txStateVolatileStorage;
        this.cleanupExecutor = cleanupExecutor;
        this.throttledLog = Loggers.toThrottledLogger((IgniteLogger)Loggers.forClass(TxCleanupRequestSender.class), (Executor)throttledLogExecutor);
    }

    public void start() {
        this.txMessageSender.messagingService().addMessageHandler(TxMessageGroup.class, (msg, sender, correlationId) -> {
            if (msg instanceof TxCleanupMessageResponse && correlationId == null) {
                assert (!(msg instanceof TxCleanupMessageErrorResponse)) : "Cleanup error response is not expected here.";
                CleanupReplicatedInfoMessage result = ((TxCleanupMessageResponse)msg).result();
                assert (result != null) : "Result for the cleanup response cannot be null.";
                this.onCleanupReplicated(result.asCleanupReplicatedInfo());
            }
        });
    }

    private void onCleanupReplicated(CleanupReplicatedInfo info) {
        CleanupContext ctx = this.writeIntentsReplicated.computeIfPresent(info.txId(), (uuid, cleanupContext) -> {
            cleanupContext.partitions.removeAll(info.partitions());
            return cleanupContext;
        });
        if (ctx != null && ctx.partitions.isEmpty()) {
            this.markTxnCleanupReplicated(info.txId(), ctx.txState, ctx.commitPartitionId);
            this.writeIntentsReplicated.remove(info.txId());
        }
    }

    private void markTxnCleanupReplicated(UUID txId, TxState state, ZonePartitionId commitPartitionId) {
        CompletionStage<Object> commitTimestampFuture;
        long cleanupCompletionTimestamp = System.currentTimeMillis();
        TxStateMeta txStateMeta = this.txStateVolatileStorage.state(txId);
        if (state == TxState.COMMITTED && (txStateMeta == null || txStateMeta.commitTimestamp() == null)) {
            commitTimestampFuture = this.placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartitionId).thenCompose(replicaMeta -> {
                String primaryNode = replicaMeta.getLeaseholder();
                HybridTimestamp startTime = replicaMeta.getStartTime();
                return this.txMessageSender.resolveTxStateFromCommitPartition(primaryNode, txId, commitPartitionId, startTime.longValue(), null, null).thenApply(TransactionMeta::commitTimestamp);
            });
        } else {
            HybridTimestamp existingCommitTs = txStateMeta == null ? null : txStateMeta.commitTimestamp();
            commitTimestampFuture = CompletableFuture.completedFuture(existingCommitTs);
        }
        commitTimestampFuture.thenAccept(commitTimestamp -> this.txStateVolatileStorage.updateMeta(txId, oldMeta -> TxStateMeta.builder(oldMeta, state).commitPartitionId(commitPartitionId).commitTimestamp((HybridTimestamp)commitTimestamp).cleanupCompletionTimestamp(cleanupCompletionTimestamp).build()));
    }

    public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, String node, UUID txId) {
        return this.sendCleanupMessageWithRetries(commitPartitionId, false, null, txId, node, null, 0);
    }

    public CompletableFuture<Void> cleanup(@Nullable ZonePartitionId commitPartitionId, Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId) {
        if (commitPartitionId != null) {
            this.writeIntentsReplicated.put(txId, new CleanupContext(commitPartitionId, enlistedPartitions.keySet(), commit ? TxState.COMMITTED : TxState.ABORTED));
        }
        HashMap<String, List<EnlistedPartitionGroup>> partitionsByPrimaryName = new HashMap<String, List<EnlistedPartitionGroup>>();
        enlistedPartitions.forEach((partitionId, partition) -> {
            List enlistedPartitionGroups = partitionsByPrimaryName.computeIfAbsent(partition.primaryNodeConsistentId(), node -> new ArrayList());
            enlistedPartitionGroups.add(new EnlistedPartitionGroup((ZonePartitionId)partitionId, partition.tableIds()));
        });
        return this.cleanupPartitions(commitPartitionId, partitionsByPrimaryName, commit, commitTimestamp, txId, 0);
    }

    public CompletableFuture<Void> cleanup(@Nullable ZonePartitionId commitPartitionId, Collection<EnlistedPartitionGroup> partitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId) {
        return this.cleanup(commitPartitionId, partitions, commit, commitTimestamp, txId, 0);
    }

    private CompletableFuture<Void> cleanup(@Nullable ZonePartitionId commitPartitionId, Collection<EnlistedPartitionGroup> partitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId, int attemptsMade) {
        Map partitionIds = partitions.stream().collect(Collectors.toMap(EnlistedPartitionGroup::groupId, Function.identity()));
        if (commitPartitionId != null) {
            this.writeIntentsReplicated.put(txId, new CleanupContext(commitPartitionId, new HashSet<ZonePartitionId>(partitionIds.keySet()), commit ? TxState.COMMITTED : TxState.ABORTED));
        }
        return this.placementDriverHelper.findPrimaryReplicas(partitionIds.keySet()).thenCompose(partitionData -> {
            this.cleanupPartitionsWithoutPrimary(commitPartitionId, commit, commitTimestamp, txId, TxCleanupRequestSender.toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds), attemptsMade);
            Map<String, List<EnlistedPartitionGroup>> partitionsByPrimaryName = TxCleanupRequestSender.toPartitionInfosByPrimaryName(partitionData.partitionsByNode, partitionIds);
            return this.cleanupPartitions(commitPartitionId, partitionsByPrimaryName, commit, commitTimestamp, txId, attemptsMade);
        });
    }

    private static Map<String, List<EnlistedPartitionGroup>> toPartitionInfosByPrimaryName(Map<String, Set<ZonePartitionId>> partitionsByNode, Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds) {
        return partitionsByNode.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> TxCleanupRequestSender.toPartitionInfos((Set)entry.getValue(), partitionIds)));
    }

    private static List<EnlistedPartitionGroup> toPartitionInfos(Set<ZonePartitionId> groupIds, Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds) {
        return groupIds.stream().map(partitionIds::get).collect(Collectors.toList());
    }

    private void cleanupPartitionsWithoutPrimary(@Nullable ZonePartitionId commitPartitionId, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId, List<EnlistedPartitionGroup> partitionsWithoutPrimary, int attemptsMade) {
        Map partitionIds = partitionsWithoutPrimary.stream().collect(Collectors.toMap(EnlistedPartitionGroup::groupId, Function.identity()));
        this.placementDriverHelper.awaitPrimaryReplicas(partitionIds.keySet()).thenCompose(partitionIdsByPrimaryName -> {
            Map<String, List<EnlistedPartitionGroup>> partitionsByPrimaryName = TxCleanupRequestSender.toPartitionInfosByPrimaryName(partitionIdsByPrimaryName, partitionIds);
            return this.cleanupPartitions(commitPartitionId, partitionsByPrimaryName, commit, commitTimestamp, txId, attemptsMade);
        });
    }

    private CompletableFuture<Void> cleanupPartitions(@Nullable ZonePartitionId commitPartitionId, Map<String, List<EnlistedPartitionGroup>> partitionsByNode, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId, int attemptsMade) {
        ArrayList<CompletableFuture<Void>> cleanupFutures = new ArrayList<CompletableFuture<Void>>();
        for (Map.Entry<String, List<EnlistedPartitionGroup>> entry : partitionsByNode.entrySet()) {
            String node = entry.getKey();
            List<EnlistedPartitionGroup> nodePartitions = entry.getValue();
            cleanupFutures.add(this.sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId, node, commitPartitionId == null ? null : nodePartitions, attemptsMade));
        }
        return CompletableFuture.allOf(cleanupFutures.toArray(new CompletableFuture[0]));
    }

    private CompletableFuture<Void> sendCleanupMessageWithRetries(@Nullable ZonePartitionId commitPartitionId, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId, String node, @Nullable Collection<EnlistedPartitionGroup> partitions, int attemptsMade) {
        return ((CompletableFuture)this.txMessageSender.cleanup(node, partitions, txId, commit, commitTimestamp).handleAsync((networkMessage, throwable) -> {
            TxCleanupMessageErrorResponse errorResponse;
            if (throwable != null) {
                if (ReplicatorRecoverableExceptions.isRecoverable((Throwable)throwable)) {
                    if (attemptsMade > 100) {
                        this.throttledLog.warn("Unsuccessful transaction cleanup after {} attempts, keep retrying [txId={}]", throwable, new Object[]{100, txId});
                    }
                    if (partitions == null) {
                        return this.sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId, node, partitions, attemptsMade + 1);
                    }
                    return this.cleanup(commitPartitionId, partitions, commit, commitTimestamp, txId, attemptsMade + 1);
                }
                return CompletableFuture.failedFuture(throwable);
            }
            if (networkMessage instanceof TxCleanupMessageErrorResponse && TxCleanupExceptionUtils.writeIntentSwitchFailureShouldBeLogged((errorResponse = (TxCleanupMessageErrorResponse)networkMessage).throwable())) {
                LOG.warn("First cleanup attempt failed (the transaction outcome is not affected) [txId={}]", errorResponse.throwable(), new Object[]{txId});
            }
            return CompletableFutures.nullCompletedFuture();
        }, (Executor)this.cleanupExecutor)).thenCompose(v -> v);
    }

    private static class CleanupContext {
        private final ZonePartitionId commitPartitionId;
        private final Set<ZonePartitionId> partitions;
        private final TxState txState;

        private CleanupContext(ZonePartitionId commitPartitionId, Set<ZonePartitionId> partitions, TxState txState) {
            this.commitPartitionId = commitPartitionId;
            this.partitions = partitions;
            this.txState = txState;
        }
    }
}

