/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.tx.impl;

import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.replicator.PartitionGroupId;
import org.apache.ignite3.internal.replicator.ReplicaService;
import org.apache.ignite3.internal.replicator.ReplicationGroupId;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite3.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite3.internal.replicator.message.ReplicaRequest;
import org.apache.ignite3.internal.tx.LockManager;
import org.apache.ignite3.internal.tx.TxState;
import org.apache.ignite3.internal.tx.TxStateMeta;
import org.apache.ignite3.internal.tx.TxStateMetaAbandoned;
import org.apache.ignite3.internal.tx.event.LockEvent;
import org.apache.ignite3.internal.tx.event.LockEventParameters;
import org.apache.ignite3.internal.tx.impl.PlacementDriverHelper;
import org.apache.ignite3.internal.tx.impl.TxCleanupRequestSender;
import org.apache.ignite3.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite3.internal.tx.message.TxMessagesFactory;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.FastTimestamps;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;

public class OrphanDetector {
    private static final IgniteLogger LOG = Loggers.forClass(OrphanDetector.class);
    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final TopologyService topologyService;
    private final ReplicaService replicaService;
    private final PlacementDriverHelper placementDriverHelper;
    private final LockManager lockManager;
    private final TxCleanupRequestSender requestSender;
    private final Executor partitionOperationsExecutor;
    private volatile Supplier<Long> checkTxStateIntervalProvider;
    private VolatileTxStateMetaStorage txLocalStateStorage;

    public OrphanDetector(TopologyService topologyService, ReplicaService replicaService, PlacementDriverHelper placementDriverHelper, LockManager lockManager, TxCleanupRequestSender requestSender, Executor partitionOperationsExecutor) {
        this.topologyService = topologyService;
        this.replicaService = replicaService;
        this.placementDriverHelper = placementDriverHelper;
        this.lockManager = lockManager;
        this.requestSender = requestSender;
        this.partitionOperationsExecutor = partitionOperationsExecutor;
    }

    public void start(VolatileTxStateMetaStorage txLocalStateStorage, Supplier<Long> checkTxStateIntervalProvider) {
        this.txLocalStateStorage = txLocalStateStorage;
        this.checkTxStateIntervalProvider = checkTxStateIntervalProvider;
        this.lockManager.listen(LockEvent.LOCK_CONFLICT, this::lockConflictListener);
    }

    public void stop() {
        this.busyLock.block();
        this.lockManager.removeListener(LockEvent.LOCK_CONFLICT, this::lockConflictListener);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Boolean> lockConflictListener(LockEventParameters params) {
        if (this.busyLock.enterBusy()) {
            try {
                ArrayList<CompletableFuture<Boolean>> futs = new ArrayList<CompletableFuture<Boolean>>(params.lockHolderTxs().size());
                for (UUID txId : params.lockHolderTxs()) {
                    futs.add(this.checkTxOrphanedInternal(txId));
                }
                CompletionStage completionStage = CompletableFutures.allOf(futs).thenApply(unused -> false);
                return completionStage;
            }
            finally {
                this.busyLock.leaveBusy();
            }
        }
        return CompletableFutures.falseCompletedFuture();
    }

    private CompletableFuture<Boolean> checkTxOrphanedInternal(UUID txId) {
        TxStateMeta txState = this.txLocalStateStorage.state(txId);
        if (txState == null || TxState.isFinalState(txState.txState()) || this.isTxCoordinatorAlive(txState)) {
            return CompletableFutures.falseCompletedFuture();
        }
        if (this.makeTxAbandoned(txId, txState)) {
            LOG.info("Conflict was found, and the coordinator of the transaction that holds a lock is not available [txId={}, txCrd={}].", txId, txState.txCoordinatorId());
            if (PartitionGroupId.isAbsent(txState.commitPartitionId())) {
                this.partitionOperationsExecutor.execute(() -> this.requestSender.cleanup(txState.commitPartitionId(), this.topologyService.localMember().name(), txId));
            } else {
                this.partitionOperationsExecutor.execute(() -> this.sendTxRecoveryMessage(txState.commitPartitionId(), txId));
            }
        }
        return CompletableFuture.failedFuture(new Exception());
    }

    private void sendTxRecoveryMessage(ReplicationGroupId cmpPartGrp, UUID txId) {
        ((CompletableFuture)this.placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(cmpPartGrp).thenCompose(replicaMeta -> {
            InternalClusterNode commitPartPrimaryNode = this.topologyService.getByConsistentId(replicaMeta.getLeaseholder());
            if (commitPartPrimaryNode == null) {
                LOG.warn("The primary replica of the commit partition is not available [commitPartGrp={}, tx={}]", cmpPartGrp, txId);
                return CompletableFutures.nullCompletedFuture();
            }
            return this.replicaService.invoke(commitPartPrimaryNode, (ReplicaRequest)TX_MESSAGES_FACTORY.txRecoveryMessage().groupId(ReplicaMessageUtils.toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, cmpPartGrp)).enlistmentConsistencyToken(replicaMeta.getStartTime().longValue()).txId(txId).build());
        })).exceptionally(throwable -> {
            if (throwable != null) {
                LOG.warn("A recovery message for the transaction was handled with the error [tx={}].", (Throwable)throwable, (Object)txId);
            }
            return null;
        });
    }

    private boolean isTxCoordinatorAlive(TxStateMeta txState) {
        return txState.txCoordinatorId() != null && this.topologyService.getById(txState.txCoordinatorId()) != null;
    }

    private boolean makeTxAbandoned(UUID txId, TxStateMeta txState) {
        Object updatedTxState;
        if (!this.isRecoveryNeeded(txState)) {
            return false;
        }
        TxStateMetaAbandoned txAbandonedState = txState.abandoned();
        return txAbandonedState == (updatedTxState = this.txLocalStateStorage.updateMeta(txId, txStateMeta -> {
            if (this.isRecoveryNeeded((TxStateMeta)txStateMeta)) {
                return txAbandonedState;
            }
            return txStateMeta;
        }));
    }

    private boolean isRecoveryNeeded(@Nullable TxStateMeta txState) {
        return txState != null && !TxState.isFinalState(txState.txState()) && txState.txState() != TxState.FINISHING && !this.isTxAbandonedRecently(txState);
    }

    private boolean isTxAbandonedRecently(TxStateMeta txState) {
        if (txState.txState() != TxState.ABANDONED) {
            return false;
        }
        assert (txState instanceof TxStateMetaAbandoned) : "The transaction state does not match the metadata [mata=" + txState + "].";
        TxStateMetaAbandoned txStateAbandoned = (TxStateMetaAbandoned)txState;
        return txStateAbandoned.lastAbandonedMarkerTs() + this.checkTxStateIntervalProvider.get() >= FastTimestamps.coarseCurrentTimeMillis();
    }
}

