/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.tx.impl;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.ComponentStoppingException;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.placementdriver.PlacementDriver;
import org.apache.ignite3.internal.raft.GroupOverloadedException;
import org.apache.ignite3.internal.replicator.ReplicaService;
import org.apache.ignite3.internal.replicator.ReplicationGroupId;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.replicator.exception.AwaitReplicaTimeoutException;
import org.apache.ignite3.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite3.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite3.internal.replicator.message.ReplicaRequest;
import org.apache.ignite3.internal.tx.message.TxMessagesFactory;
import org.apache.ignite3.internal.tx.message.VacuumTxStateReplicaRequest;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.jetbrains.annotations.Nullable;

public class PersistentTxStateVacuumizer {
    private static final IgniteLogger LOG = Loggers.forClass(PersistentTxStateVacuumizer.class);
    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private final ReplicaService replicaService;
    private final InternalClusterNode localNode;
    private final ClockService clockService;
    private final PlacementDriver placementDriver;
    private final FailureProcessor failureProcessor;

    public PersistentTxStateVacuumizer(ReplicaService replicaService, InternalClusterNode localNode, ClockService clockService, PlacementDriver placementDriver, FailureProcessor failureProcessor) {
        this.replicaService = replicaService;
        this.localNode = localNode;
        this.clockService = clockService;
        this.placementDriver = placementDriver;
        this.failureProcessor = failureProcessor;
    }

    public CompletableFuture<PersistentTxStateVacuumResult> vacuumPersistentTxStates(Map<ZonePartitionId, Set<VacuumizableTx>> txIds) {
        ConcurrentHashMap.KeySetView successful = ConcurrentHashMap.newKeySet();
        ArrayList futures = new ArrayList();
        AtomicInteger vacuumizedPersistentTxnStatesCount = new AtomicInteger();
        HybridTimestamp now = this.clockService.now();
        txIds.forEach((commitPartitionId, txs) -> {
            CompletionStage future = this.placementDriver.getPrimaryReplica((ReplicationGroupId)commitPartitionId, now).thenCompose(replicaMeta -> {
                if (replicaMeta != null && this.localNode.id().equals(replicaMeta.getLeaseholderId())) {
                    HashSet<UUID> filteredTxIds = new HashSet<UUID>();
                    for (VacuumizableTx v2 : txs) {
                        if (v2.cleanupCompletionTimestamp == null) {
                            successful.add(v2.txId);
                            continue;
                        }
                        filteredTxIds.add(v2.txId);
                    }
                    if (filteredTxIds.isEmpty()) {
                        return CompletableFutures.nullCompletedFuture();
                    }
                    VacuumTxStateReplicaRequest request = TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest().enlistmentConsistencyToken(replicaMeta.getStartTime().longValue()).groupId(ReplicaMessageUtils.toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartitionId)).transactionIds(filteredTxIds).build();
                    return this.replicaService.invoke(this.localNode, (ReplicaRequest)request).whenComplete((v, e) -> {
                        if (e == null) {
                            successful.addAll(filteredTxIds);
                            vacuumizedPersistentTxnStatesCount.addAndGet(filteredTxIds.size());
                        } else if (PersistentTxStateVacuumizer.expectedException(e)) {
                            LOG.debug("Failed to vacuum tx states from the persistent storage.", (Throwable)e);
                        } else {
                            this.failureProcessor.process(new FailureContext((Throwable)e, "Failed to vacuum tx states from the persistent storage."));
                        }
                    });
                }
                successful.addAll(txs.stream().map(v -> v.txId).collect(Collectors.toSet()));
                return CompletableFutures.nullCompletedFuture();
            });
            futures.add(future);
        });
        return CompletableFutures.allOf(futures).handle((unused, unusedEx) -> new PersistentTxStateVacuumResult(successful, vacuumizedPersistentTxnStatesCount.get()));
    }

    private static boolean expectedException(Throwable e) {
        return ExceptionUtils.hasCause(e, PrimaryReplicaMissException.class, NodeStoppingException.class, ComponentStoppingException.class, GroupOverloadedException.class, AwaitReplicaTimeoutException.class);
    }

    public static class PersistentTxStateVacuumResult {
        final Set<UUID> txnsToVacuum;
        final int vacuumizedPersistentTxnStatesCount;

        public PersistentTxStateVacuumResult(Set<UUID> txnsToVacuum, int vacuumizedPersistentTxnStatesCount) {
            this.txnsToVacuum = txnsToVacuum;
            this.vacuumizedPersistentTxnStatesCount = vacuumizedPersistentTxnStatesCount;
        }
    }

    public static class VacuumizableTx {
        final UUID txId;
        @Nullable
        final Long cleanupCompletionTimestamp;

        VacuumizableTx(UUID txId, @Nullable Long cleanupCompletionTimestamp) {
            this.txId = txId;
            this.cleanupCompletionTimestamp = cleanupCompletionTimestamp;
        }
    }
}

