/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.tx.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
import org.apache.ignite.internal.tx.PartitionEnlistment;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
import org.apache.ignite.internal.tx.impl.PlacementDriverHelper;
import org.apache.ignite.internal.tx.impl.TxCleanupExceptionUtils;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfoMessage;
import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.util.CompletableFutures;
import org.jetbrains.annotations.Nullable;

public class TxCleanupRequestSender {
    private static final IgniteLogger LOG = Loggers.forClass(TxCleanupRequestSender.class);
    private final PlacementDriverHelper placementDriverHelper;
    private final TxMessageSender txMessageSender;
    private final ConcurrentMap<UUID, CleanupContext> writeIntentsReplicated = new ConcurrentHashMap<UUID, CleanupContext>();
    private final VolatileTxStateMetaStorage txStateVolatileStorage;

    public TxCleanupRequestSender(TxMessageSender txMessageSender, PlacementDriverHelper placementDriverHelper, VolatileTxStateMetaStorage txStateVolatileStorage) {
        this.txMessageSender = txMessageSender;
        this.placementDriverHelper = placementDriverHelper;
        this.txStateVolatileStorage = txStateVolatileStorage;
    }

    public void start() {
        this.txMessageSender.messagingService().addMessageHandler(TxMessageGroup.class, (msg, sender, correlationId) -> {
            if (msg instanceof TxCleanupMessageResponse && correlationId == null) {
                assert (!(msg instanceof TxCleanupMessageErrorResponse)) : "Cleanup error response is not expected here.";
                CleanupReplicatedInfoMessage result = ((TxCleanupMessageResponse)msg).result();
                assert (result != null) : "Result for the cleanup response cannot be null.";
                this.onCleanupReplicated(result.asCleanupReplicatedInfo());
            }
        });
    }

    private void onCleanupReplicated(CleanupReplicatedInfo info) {
        CleanupContext ctx = this.writeIntentsReplicated.computeIfPresent(info.txId(), (uuid, cleanupContext) -> {
            cleanupContext.partitions.removeAll(info.partitions());
            return cleanupContext;
        });
        if (ctx != null && ctx.partitions.isEmpty()) {
            this.markTxnCleanupReplicated(info.txId(), ctx.txState, ctx.commitPartitionId);
            this.writeIntentsReplicated.remove(info.txId());
        }
    }

    private void markTxnCleanupReplicated(UUID txId, TxState state, ReplicationGroupId commitPartitionId) {
        long cleanupCompletionTimestamp = System.currentTimeMillis();
        this.txStateVolatileStorage.updateMeta(txId, oldMeta -> new TxStateMeta(oldMeta == null ? state : oldMeta.txState(), oldMeta == null ? null : oldMeta.txCoordinatorId(), commitPartitionId, oldMeta == null ? null : oldMeta.commitTimestamp(), oldMeta == null ? null : oldMeta.tx(), oldMeta == null ? null : oldMeta.initialVacuumObservationTimestamp(), cleanupCompletionTimestamp, oldMeta == null ? null : oldMeta.isFinishedDueToTimeout()));
    }

    public CompletableFuture<Void> cleanup(ReplicationGroupId commitPartitionId, String node, UUID txId) {
        return this.sendCleanupMessageWithRetries(commitPartitionId, false, null, txId, node, null);
    }

    public CompletableFuture<Void> cleanup(ReplicationGroupId commitPartitionId, Map<ReplicationGroupId, PartitionEnlistment> enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId) {
        this.writeIntentsReplicated.put(txId, new CleanupContext(commitPartitionId, enlistedPartitions.keySet(), commit ? TxState.COMMITTED : TxState.ABORTED));
        HashMap<String, List<EnlistedPartitionGroup>> partitionsByPrimaryName = new HashMap<String, List<EnlistedPartitionGroup>>();
        enlistedPartitions.forEach((partitionId, partition) -> {
            List enlistedPartitionGroups = partitionsByPrimaryName.computeIfAbsent(partition.primaryNodeConsistentId(), node -> new ArrayList());
            enlistedPartitionGroups.add(new EnlistedPartitionGroup((ReplicationGroupId)partitionId, partition.tableIds()));
        });
        return this.cleanupPartitions(commitPartitionId, partitionsByPrimaryName, commit, commitTimestamp, txId);
    }

    public CompletableFuture<Void> cleanup(ReplicationGroupId commitPartitionId, Collection<EnlistedPartitionGroup> partitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId) {
        Map partitionIds = partitions.stream().collect(Collectors.toMap(EnlistedPartitionGroup::groupId, Function.identity()));
        this.writeIntentsReplicated.put(txId, new CleanupContext(commitPartitionId, new HashSet<ReplicationGroupId>(partitionIds.keySet()), commit ? TxState.COMMITTED : TxState.ABORTED));
        return this.placementDriverHelper.findPrimaryReplicas(partitionIds.keySet()).thenCompose(partitionData -> {
            this.cleanupPartitionsWithoutPrimary(commitPartitionId, commit, commitTimestamp, txId, TxCleanupRequestSender.toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds));
            Map<String, List<EnlistedPartitionGroup>> partitionsByPrimaryName = TxCleanupRequestSender.toPartitionInfosByPrimaryName(partitionData.partitionsByNode, partitionIds);
            return this.cleanupPartitions(commitPartitionId, partitionsByPrimaryName, commit, commitTimestamp, txId);
        });
    }

    private static Map<String, List<EnlistedPartitionGroup>> toPartitionInfosByPrimaryName(Map<String, Set<ReplicationGroupId>> partitionsByNode, Map<ReplicationGroupId, EnlistedPartitionGroup> partitionIds) {
        return partitionsByNode.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> TxCleanupRequestSender.toPartitionInfos((Set)entry.getValue(), partitionIds)));
    }

    private static List<EnlistedPartitionGroup> toPartitionInfos(Set<ReplicationGroupId> groupIds, Map<ReplicationGroupId, EnlistedPartitionGroup> partitionIds) {
        return groupIds.stream().map(partitionIds::get).collect(Collectors.toList());
    }

    private void cleanupPartitionsWithoutPrimary(ReplicationGroupId commitPartitionId, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId, List<EnlistedPartitionGroup> partitionsWithoutPrimary) {
        Map partitionIds = partitionsWithoutPrimary.stream().collect(Collectors.toMap(EnlistedPartitionGroup::groupId, Function.identity()));
        this.placementDriverHelper.awaitPrimaryReplicas(partitionIds.keySet()).thenCompose(partitionIdsByPrimaryName -> {
            Map<String, List<EnlistedPartitionGroup>> partitionsByPrimaryName = TxCleanupRequestSender.toPartitionInfosByPrimaryName(partitionIdsByPrimaryName, partitionIds);
            return this.cleanupPartitions(commitPartitionId, partitionsByPrimaryName, commit, commitTimestamp, txId);
        });
    }

    private CompletableFuture<Void> cleanupPartitions(ReplicationGroupId commitPartitionId, Map<String, List<EnlistedPartitionGroup>> partitionsByNode, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId) {
        ArrayList<CompletableFuture<Void>> cleanupFutures = new ArrayList<CompletableFuture<Void>>();
        for (Map.Entry<String, List<EnlistedPartitionGroup>> entry : partitionsByNode.entrySet()) {
            String node = entry.getKey();
            List<EnlistedPartitionGroup> nodePartitions = entry.getValue();
            cleanupFutures.add(this.sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId, node, nodePartitions));
        }
        return CompletableFuture.allOf(cleanupFutures.toArray(new CompletableFuture[0]));
    }

    private CompletableFuture<Void> sendCleanupMessageWithRetries(ReplicationGroupId commitPartitionId, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId, String node, @Nullable Collection<EnlistedPartitionGroup> partitions) {
        return ((CompletableFuture)this.txMessageSender.cleanup(node, partitions, txId, commit, commitTimestamp).handle((networkMessage, throwable) -> {
            TxCleanupMessageErrorResponse errorResponse;
            if (throwable != null) {
                if (ReplicatorRecoverableExceptions.isRecoverable((Throwable)throwable)) {
                    if (partitions == null) {
                        return this.sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId, node, partitions);
                    }
                    return this.cleanup(commitPartitionId, partitions, commit, commitTimestamp, txId);
                }
                return CompletableFuture.failedFuture(throwable);
            }
            if (networkMessage instanceof TxCleanupMessageErrorResponse && TxCleanupExceptionUtils.writeIntentSwitchFailureShouldBeLogged((errorResponse = (TxCleanupMessageErrorResponse)networkMessage).throwable())) {
                LOG.warn("First cleanup attempt failed (the transaction outcome is not affected) [txId={}]", errorResponse.throwable(), new Object[]{txId});
            }
            return CompletableFutures.nullCompletedFuture();
        })).thenCompose(v -> v);
    }

    private static class CleanupContext {
        private final ReplicationGroupId commitPartitionId;
        private final Set<ReplicationGroupId> partitions;
        private final TxState txState;

        private CleanupContext(ReplicationGroupId commitPartitionId, Set<ReplicationGroupId> partitions, TxState txState) {
            this.commitPartitionId = commitPartitionId;
            this.partitions = partitions;
            this.txState = txState;
        }
    }
}

