/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.tx.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite.internal.configuration.SystemPropertyView;
import org.apache.ignite.internal.event.Event;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.handlers.FailureHandler;
import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.metrics.MetricSource;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse;
import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.systemview.api.SystemView;
import org.apache.ignite.internal.systemview.api.SystemViewProvider;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.internal.tx.DelayedAckException;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.LocalRwTxCounter;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
import org.apache.ignite.internal.tx.PartitionEnlistment;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TransactionResult;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxPriority;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.TxStateMetaFinishing;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.DeadlockPreventionPolicyImpl;
import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
import org.apache.ignite.internal.tx.impl.IgniteAbstractTransactionImpl;
import org.apache.ignite.internal.tx.impl.OrphanDetector;
import org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer;
import org.apache.ignite.internal.tx.impl.PlacementDriverHelper;
import org.apache.ignite.internal.tx.impl.PrimaryReplicaExpiredException;
import org.apache.ignite.internal.tx.impl.ReadOnlyImplicitTransactionImpl;
import org.apache.ignite.internal.tx.impl.ReadOnlyTransactionImpl;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.apache.ignite.internal.tx.impl.RemoteReadWriteTransaction;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionExpirationRegistry;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxCleanupRequestHandler;
import org.apache.ignite.internal.tx.impl.TxCleanupRequestSender;
import org.apache.ignite.internal.tx.impl.TxIdAndTimestamp;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.impl.WriteIntentSwitchProcessor;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
import org.apache.ignite.internal.tx.views.LocksViewProvider;
import org.apache.ignite.internal.tx.views.TransactionsViewProvider;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class TxManagerImpl
implements TxManager,
NetworkMessageHandler,
SystemViewProvider {
    private static final String ABANDONED_CHECK_TS_PROP = "txnAbandonedCheckTs";
    private static final long ABANDONED_CHECK_TS_PROP_DEFAULT_VALUE = 5000L;
    private static final String LOCK_RETRY_COUNT_PROP = "txnLockRetryCount";
    private static final int LOCK_RETRY_COUNT_PROP_DEFAULT_VALUE = 3;
    public static final String RESOURCE_TTL_PROP = "txnResourceTtl";
    private static final int RESOURCE_TTL_PROP_DEFAULT_VALUE = 30000;
    private static final DeadlockPreventionPolicyImpl.TxIdComparators DEFAULT_TX_ID_COMPARATOR = DeadlockPreventionPolicyImpl.TxIdComparators.NATURAL;
    private static final long DEFAULT_LOCK_TIMEOUT = 0L;
    private static final long EXPIRE_FREQ_MILLIS = 1000L;
    private static final IgniteLogger LOG = Loggers.forClass(TxManagerImpl.class);
    private final TransactionConfiguration txConfig;
    private final SystemDistributedConfiguration systemCfg;
    private final LockManager lockManager;
    private final ExecutorService writeIntentSwitchPool;
    private final ClockService clockService;
    private final TransactionIdGenerator transactionIdGenerator;
    private final VolatileTxStateMetaStorage txStateVolatileStorage = new VolatileTxStateMetaStorage();
    private final LowWatermark lowWatermark;
    private final PlacementDriver placementDriver;
    private final PlacementDriverHelper placementDriverHelper;
    private final LongSupplier idleSafeTimePropagationPeriodMsSupplier;
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final OrphanDetector orphanDetector;
    private final TopologyService topologyService;
    private final MessagingService messagingService;
    private volatile UUID localNodeId;
    private final TxCleanupRequestHandler txCleanupRequestHandler;
    private final TxCleanupRequestSender txCleanupRequestSender;
    private final TxMessageSender txMessageSender;
    private final EventListener<PrimaryReplicaEventParameters> primaryReplicaExpiredListener;
    private final EventListener<PrimaryReplicaEventParameters> primaryReplicaElectedListener;
    private final LocalRwTxCounter localRwTxCounter;
    private final Executor partitionOperationsExecutor;
    private final TransactionInflights transactionInflights;
    private final ReplicaService replicaService;
    private final ScheduledExecutorService commonScheduler;
    private final FailureProcessor failureProcessor;
    private final TransactionsViewProvider txViewProvider = new TransactionsViewProvider();
    private volatile PersistentTxStateVacuumizer persistentTxStateVacuumizer;
    private final TransactionExpirationRegistry transactionExpirationRegistry = new TransactionExpirationRegistry();
    @Nullable
    private volatile ScheduledFuture<?> transactionExpirationJobFuture;
    private volatile int lockRetryCount = 0;
    private final MetricManager metricsManager;
    private final TransactionMetricsSource txMetrics;
    private volatile boolean isStopping;
    private final ConcurrentLinkedQueue<CompletableFuture<?>> stopFuts = new ConcurrentLinkedQueue();

    @TestOnly
    public TxManagerImpl(TransactionConfiguration txConfig, SystemDistributedConfiguration systemCfg, ClusterService clusterService, ReplicaService replicaService, LockManager lockManager, ClockService clockService, TransactionIdGenerator transactionIdGenerator, PlacementDriver placementDriver, LongSupplier idleSafeTimePropagationPeriodMsSupplier, LocalRwTxCounter localRwTxCounter, RemotelyTriggeredResourceRegistry resourcesRegistry, TransactionInflights transactionInflights, LowWatermark lowWatermark, ScheduledExecutorService commonScheduler, MetricManager metricManager) {
        this(clusterService.nodeName(), txConfig, systemCfg, clusterService.messagingService(), clusterService.topologyService(), replicaService, lockManager, clockService, transactionIdGenerator, placementDriver, idleSafeTimePropagationPeriodMsSupplier, localRwTxCounter, ForkJoinPool.commonPool(), resourcesRegistry, transactionInflights, lowWatermark, commonScheduler, (FailureProcessor)new FailureManager((FailureHandler)new NoOpFailureHandler()), metricManager);
    }

    public TxManagerImpl(String nodeName, TransactionConfiguration txConfig, SystemDistributedConfiguration systemCfg, MessagingService messagingService, TopologyService topologyService, ReplicaService replicaService, LockManager lockManager, ClockService clockService, TransactionIdGenerator transactionIdGenerator, PlacementDriver placementDriver, LongSupplier idleSafeTimePropagationPeriodMsSupplier, LocalRwTxCounter localRwTxCounter, Executor partitionOperationsExecutor, RemotelyTriggeredResourceRegistry resourcesRegistry, TransactionInflights transactionInflights, LowWatermark lowWatermark, ScheduledExecutorService commonScheduler, FailureProcessor failureProcessor, MetricManager metricManager) {
        this.txConfig = txConfig;
        this.systemCfg = systemCfg;
        this.lockManager = lockManager;
        this.clockService = clockService;
        this.transactionIdGenerator = transactionIdGenerator;
        this.placementDriver = placementDriver;
        this.idleSafeTimePropagationPeriodMsSupplier = idleSafeTimePropagationPeriodMsSupplier;
        this.topologyService = topologyService;
        this.messagingService = messagingService;
        this.primaryReplicaExpiredListener = this::primaryReplicaExpiredListener;
        this.primaryReplicaElectedListener = this::primaryReplicaElectedListener;
        this.localRwTxCounter = localRwTxCounter;
        this.partitionOperationsExecutor = partitionOperationsExecutor;
        this.transactionInflights = transactionInflights;
        this.lowWatermark = lowWatermark;
        this.replicaService = replicaService;
        this.commonScheduler = commonScheduler;
        this.failureProcessor = failureProcessor;
        this.metricsManager = metricManager;
        this.placementDriverHelper = new PlacementDriverHelper(placementDriver, clockService);
        int cpus = Runtime.getRuntime().availableProcessors();
        this.writeIntentSwitchPool = Executors.newFixedThreadPool(cpus, (ThreadFactory)IgniteThreadFactory.create((String)nodeName, (String)"tx-async-write-intent", (IgniteLogger)LOG, (ThreadOperation[])new ThreadOperation[]{ThreadOperation.STORAGE_READ, ThreadOperation.STORAGE_WRITE}));
        this.txMessageSender = new TxMessageSender(messagingService, replicaService, clockService);
        WriteIntentSwitchProcessor writeIntentSwitchProcessor = new WriteIntentSwitchProcessor(this.placementDriverHelper, this.txMessageSender, topologyService);
        this.txCleanupRequestHandler = new TxCleanupRequestHandler(messagingService, lockManager, clockService, writeIntentSwitchProcessor, resourcesRegistry, this.writeIntentSwitchPool);
        this.txCleanupRequestSender = new TxCleanupRequestSender(this.txMessageSender, this.placementDriverHelper, this.txStateVolatileStorage, this.writeIntentSwitchPool, commonScheduler);
        this.txMetrics = new TransactionMetricsSource(clockService);
        this.orphanDetector = new OrphanDetector(topologyService, replicaService, this.placementDriverHelper, lockManager, this.txCleanupRequestSender, partitionOperationsExecutor);
    }

    private CompletableFuture<Boolean> primaryReplicaEventListener(PrimaryReplicaEventParameters eventParameters, Consumer<ZonePartitionId> action) {
        assert (eventParameters.groupId() instanceof ZonePartitionId) : "Invalid replication group type: " + eventParameters.groupId().getClass();
        action.accept((ZonePartitionId)eventParameters.groupId());
        return CompletableFutures.falseCompletedFuture();
    }

    private CompletableFuture<Boolean> primaryReplicaElectedListener(PrimaryReplicaEventParameters eventParameters) {
        return this.primaryReplicaEventListener(eventParameters, groupId -> {
            if (this.localNodeId.equals(eventParameters.leaseholderId())) {
                String localNodeName = this.topologyService.localMember().name();
                this.txMessageSender.sendRecoveryCleanup(localNodeName, (ZonePartitionId)groupId);
            }
        });
    }

    private CompletableFuture<Boolean> primaryReplicaExpiredListener(PrimaryReplicaEventParameters eventParameters) {
        return this.primaryReplicaEventListener(eventParameters, this.transactionInflights::cancelWaitingInflights);
    }

    @Override
    public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly, String txLabel) {
        if (readOnly) {
            return new ReadOnlyImplicitTransactionImpl(timestampTracker, this.clockService.current());
        }
        HybridTimestamp beginTimestamp = this.createBeginTimestampWithIncrementRwTxCounter();
        ReadWriteTransactionImpl tx = this.beginReadWriteTransaction(timestampTracker, beginTimestamp, true, InternalTxOptions.defaults());
        this.txStateVolatileStorage.initialize(tx, txLabel);
        this.txMetrics.onTransactionStarted();
        return tx;
    }

    @Override
    public InternalTransaction beginExplicit(HybridTimestampTracker timestampTracker, boolean readOnly, InternalTxOptions txOptions) {
        IgniteAbstractTransactionImpl tx;
        if (readOnly) {
            HybridTimestamp beginTimestamp = this.clockService.now();
            tx = this.beginReadOnlyTransaction(timestampTracker, beginTimestamp, txOptions);
        } else {
            HybridTimestamp beginTimestamp = this.createBeginTimestampWithIncrementRwTxCounter();
            tx = this.beginReadWriteTransaction(timestampTracker, beginTimestamp, false, txOptions);
        }
        this.txStateVolatileStorage.initialize(tx, txOptions.txLabel());
        this.txMetrics.onTransactionStarted();
        return tx;
    }

    private ReadWriteTransactionImpl beginReadWriteTransaction(HybridTimestampTracker timestampTracker, HybridTimestamp beginTimestamp, boolean implicit, InternalTxOptions options) {
        UUID txId = this.transactionIdGenerator.transactionIdFor(beginTimestamp, options.priority());
        long timeout = TxManagerImpl.getTimeoutOrDefault(options, (Long)this.txConfig.readWriteTimeoutMillis().value());
        ReadWriteTransactionImpl transaction = new ReadWriteTransactionImpl(this, timestampTracker, txId, this.localNodeId, implicit, timeout, false);
        if (!implicit) {
            this.transactionExpirationRegistry.register(transaction);
            if (this.isStopping) {
                transaction.fail(new TransactionException(ErrorGroups.Common.NODE_STOPPING_ERR, "Failed to finish the transaction because a node is stopping: [txId=" + txId + "]"));
            }
        }
        return transaction;
    }

    private static long getTimeoutOrDefault(InternalTxOptions options, long defaultValue) {
        return options.timeoutMillis() == 0L ? defaultValue : options.timeoutMillis();
    }

    private ReadOnlyTransactionImpl beginReadOnlyTransaction(HybridTimestampTracker timestampTracker, HybridTimestamp beginTimestamp, InternalTxOptions options) {
        boolean lockAcquired;
        UUID txId = this.transactionIdGenerator.transactionIdFor(beginTimestamp, options.priority());
        HybridTimestamp readTimestamp = options.readTimestamp();
        if (readTimestamp == null) {
            HybridTimestamp observableTimestamp = timestampTracker.get();
            HybridTimestamp hybridTimestamp = readTimestamp = observableTimestamp != null ? HybridTimestamp.max((HybridTimestamp[])new HybridTimestamp[]{observableTimestamp, this.currentReadTimestamp(beginTimestamp)}) : this.currentReadTimestamp(beginTimestamp);
        }
        if (!(lockAcquired = this.lowWatermark.tryLock(txId, readTimestamp))) {
            throw new IgniteInternalException(ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR, "Attempted to read data below the garbage collection watermark: [readTimestamp={}, gcTimestamp={}]", new Object[]{readTimestamp, this.lowWatermark.getLowWatermark()});
        }
        try {
            CompletableFuture<Void> txFuture = new CompletableFuture<Void>();
            long timeout = TxManagerImpl.getTimeoutOrDefault(options, (Long)this.txConfig.readOnlyTimeoutMillis().value());
            ReadOnlyTransactionImpl transaction = new ReadOnlyTransactionImpl(this, timestampTracker, txId, this.localNodeId, timeout, readTimestamp, txFuture);
            this.transactionExpirationRegistry.register(transaction);
            txFuture.whenComplete((unused, throwable) -> {
                this.lowWatermark.unlock(txId);
                this.transactionExpirationRegistry.unregister(transaction);
            });
            return transaction;
        }
        catch (Throwable t) {
            this.lowWatermark.unlock(txId);
            throw t;
        }
    }

    private static long physicalExpirationTimeMillis(HybridTimestamp beginTimestamp, long effectiveTimeoutMillis) {
        return TxManagerImpl.sumWithSaturation(beginTimestamp.getPhysical(), effectiveTimeoutMillis);
    }

    private static long sumWithSaturation(long a, long b) {
        assert (a >= 0L) : a;
        assert (b >= 0L) : b;
        long sum = a + b;
        if (sum < 0L) {
            return Long.MAX_VALUE;
        }
        return sum;
    }

    private long defaultReadOnlyTransactionTimeoutMillis() {
        return (Long)this.txConfig.readOnlyTimeoutMillis().value();
    }

    @Override
    public InternalTransaction beginExternal(HybridTimestampTracker timestampTracker, boolean implicit) {
        return this.beginExternal(timestampTracker, implicit, TxPriority.NORMAL);
    }

    @Override
    public InternalTransaction beginExternal(HybridTimestampTracker timestampTracker, boolean implicit, TxPriority priority) {
        HybridTimestamp beginTimestamp = this.clockService.now();
        UUID txId = this.transactionIdGenerator.transactionIdFor(beginTimestamp, priority);
        this.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, this.localNodeId, ZonePartitionId.NOT_EXISTING, null, null, null));
        return new ReadWriteTransactionImpl(this, timestampTracker, txId, this.localNodeId, implicit, (Long)this.txConfig.readWriteTimeoutMillis().value(), true);
    }

    @Override
    public InternalTransaction castToExternal(InternalTransaction tx) {
        assert (tx.implicit() && !tx.external() && !tx.isReadOnly()) : "The Transaction cannot be casted to external [tx=" + tx + "].";
        UUID txId = tx.id();
        HybridTimestamp beginTimestamp = tx.schemaTimestamp();
        this.localRwTxCounter.decrementRwTxCount(beginTimestamp);
        this.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, this.localNodeId, ZonePartitionId.NOT_EXISTING, null, null, null));
        tx.external(true);
        this.txMetrics.onTransactionCastToExternal();
        return tx;
    }

    private HybridTimestamp currentReadTimestamp(HybridTimestamp beginTx) {
        return beginTx.subtractPhysicalTime(this.idleSafeTimePropagationPeriodMsSupplier.getAsLong() + this.clockService.maxClockSkewMillis());
    }

    @Override
    @Nullable
    public TxStateMeta stateMeta(UUID txId) {
        return this.txStateVolatileStorage.state(txId);
    }

    @Override
    public CompletableFuture<@Nullable TransactionMeta> checkEnlistedPartitionsAndAbortIfNeeded(TxStateMeta txMeta, InternalTransaction tx, long currentEnlistmentConsistencyToken, ZonePartitionId senderGroupId) {
        PendingTxPartitionEnlistment enlistment = tx.enlistedPartition(senderGroupId);
        if (enlistment != null && enlistment.consistencyToken() != currentEnlistmentConsistencyToken) {
            return tx.rollbackAsync().thenApply(unused -> {
                TxStateMeta newMeta = this.stateMeta(tx.id());
                assert (TxState.isFinalState(newMeta.txState()));
                return newMeta;
            });
        }
        return CompletableFuture.completedFuture(txMeta);
    }

    @TestOnly
    public Collection<TxStateMeta> states() {
        return this.txStateVolatileStorage.states();
    }

    @Override
    @Nullable
    public <T extends TxStateMeta> T updateTxMeta(UUID txId, Function<@Nullable TxStateMeta, TxStateMeta> updater) {
        return this.txStateVolatileStorage.updateMeta(txId, updater);
    }

    @Override
    public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, @Nullable HybridTimestamp ts, boolean commit, boolean timeoutExceeded) {
        TxState finalState;
        if (commit) {
            assert (ts != null) : "RW transaction commit timestamp cannot be null.";
            timestampTracker.update(ts);
            finalState = TxState.COMMITTED;
        } else {
            finalState = TxState.ABORTED;
        }
        this.updateTxMeta(txId, old -> TxStateMeta.builder(old, finalState).commitTimestamp(ts).finishedDueToTimeout(timeoutExceeded).build());
        this.txMetrics.onReadWriteTransactionFinished(txId, finalState == TxState.COMMITTED);
        this.decrementRwTxCount(txId);
    }

    @Nullable
    private HybridTimestamp commitTimestamp(boolean commit) {
        return commit ? this.clockService.now() : null;
    }

    @Override
    public CompletableFuture<Void> finish(HybridTimestampTracker observableTimestampTracker, @Nullable ZonePartitionId commitPartition, boolean commitIntent, boolean timeout, boolean recovery, boolean noRemoteWrites, Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedGroups, UUID txId) {
        Object stateMeta;
        LOG.debug("Finish [commit={}, txId={}, groups={}, commitPartId={}].", new Object[]{commitIntent, txId, enlistedGroups, commitPartition});
        assert (enlistedGroups != null);
        if (enlistedGroups.isEmpty()) {
            this.updateTxMeta(txId, old -> TxStateMeta.builder(old, commitIntent ? TxState.COMMITTED : TxState.ABORTED).txCoordinatorId(this.localNodeId).commitPartitionId(commitPartition).commitTimestamp(this.commitTimestamp(commitIntent)).finishedDueToTimeout(timeout).build());
            this.txMetrics.onReadWriteTransactionFinished(txId, commitIntent);
            this.decrementRwTxCount(txId);
            return CompletableFutures.nullCompletedFuture();
        }
        TxStateMeta txMeta = this.stateMeta(txId);
        TxStateMetaFinishing finishingStateMeta = txMeta == null ? new TxStateMetaFinishing(null, commitPartition, timeout, null) : txMeta.finishing(timeout);
        if (finishingStateMeta != (stateMeta = this.updateTxMeta(txId, oldMeta -> finishingStateMeta))) {
            if (((TxStateMeta)stateMeta).txState() == TxState.FINISHING) {
                return ((TxStateMetaFinishing)stateMeta).txFinishFuture().thenCompose(meta -> TxManagerImpl.checkTxOutcome(commitIntent, txId, meta));
            }
            return TxManagerImpl.checkTxOutcome(commitIntent, txId, stateMeta);
        }
        TransactionInflights.ReadWriteTxContext txContext = this.transactionInflights.lockTxForNewUpdates(txId, enlistedGroups);
        return txContext.performFinish(commitIntent, commit -> this.prepareFinish(observableTimestampTracker, commitPartition, (boolean)commit, enlistedGroups, txId, finishingStateMeta.txFinishFuture(), txContext.isNoWrites() && noRemoteWrites && !recovery)).whenComplete((unused, throwable) -> {
            if (this.localNodeId.equals(finishingStateMeta.txCoordinatorId())) {
                boolean external = ZonePartitionId.NOT_EXISTING.equals((Object)finishingStateMeta.commitPartitionId());
                if (!external) {
                    this.txMetrics.onReadWriteTransactionFinished(txId, commitIntent && throwable == null);
                }
                this.decrementRwTxCount(txId);
            }
            this.transactionInflights.removeTxContext(txId);
        });
    }

    private static CompletableFuture<Void> checkTxOutcome(boolean commit, UUID txId, TransactionMeta stateMeta) {
        if (stateMeta.txState() == TxState.COMMITTED == commit) {
            return CompletableFutures.nullCompletedFuture();
        }
        return CompletableFuture.failedFuture((Throwable)((Object)new MismatchingTransactionOutcomeInternalException("Failed to change the outcome of a finished transaction [txId=" + txId + ", txState=" + stateMeta.txState() + "].", new TransactionResult(stateMeta.txState(), stateMeta.commitTimestamp()))));
    }

    private CompletableFuture<Void> prepareFinish(HybridTimestampTracker observableTimestampTracker, @Nullable ZonePartitionId commitPartition, boolean commit, Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedGroups, UUID txId, CompletableFuture<TransactionMeta> txFinishFuture, boolean unlockOnly) {
        HybridTimestamp commitTimestamp = this.commitTimestamp(commit);
        CompletableFuture<Void> verificationFuture = commit ? this.verifyCommitTimestamp(enlistedGroups, commitTimestamp) : CompletableFutures.nullCompletedFuture();
        return ((CompletableFuture)((CompletableFuture)verificationFuture.handle((unused, throwable) -> {
            boolean verifiedCommit = throwable == null && commit;
            Map<ZonePartitionId, PartitionEnlistment> groups = enlistedGroups.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            if (unlockOnly) {
                return this.txCleanupRequestSender.cleanup(null, groups, verifiedCommit, commitTimestamp, txId).thenAccept(ignored -> {
                    TxStateMeta previous = this.txStateVolatileStorage.state(txId);
                    this.txStateVolatileStorage.updateMeta(txId, old -> null);
                    TxStateMeta meta = TxStateMeta.builder(verifiedCommit ? TxState.COMMITTED : TxState.ABORTED).txCoordinatorId(this.localNodeId).commitTimestamp(commitTimestamp).cleanupCompletionTimestamp(System.currentTimeMillis()).txLabel(previous == null ? null : previous.txLabel()).build();
                    txFinishFuture.complete(meta);
                });
            }
            if (commitPartition == null) {
                return this.cleanupOnly(observableTimestampTracker, commitTimestamp, groups, commit, txId, txFinishFuture);
            }
            return this.durableFinish(observableTimestampTracker, commitPartition, verifiedCommit, groups, txId, commitTimestamp, txFinishFuture);
        })).thenCompose(Function.identity())).thenCompose(r -> verificationFuture);
    }

    private CompletableFuture<Void> cleanupOnly(HybridTimestampTracker observableTimestampTracker, HybridTimestamp commitTimestamp, Map<ZonePartitionId, PartitionEnlistment> enlistedGroups, boolean commit, UUID txId, CompletableFuture<TransactionMeta> txFinishFuture) {
        Object finalTxStateMeta = this.updateTxMeta(txId, old -> new TxStateMeta(commit ? TxState.COMMITTED : TxState.ABORTED, old.txCoordinatorId(), null, commitTimestamp, null, null));
        txFinishFuture.complete((TransactionMeta)finalTxStateMeta);
        if (commit) {
            observableTimestampTracker.update(commitTimestamp);
        }
        ZonePartitionId commitPartitionId = ZonePartitionId.NOT_EXISTING;
        return this.cleanup(commitPartitionId, enlistedGroups, commit, commitTimestamp, txId);
    }

    private <T> CompletableFuture<T> trackFuture(CompletableFuture<T> fut) {
        if (fut.isDone()) {
            return fut;
        }
        if (this.isStopping) {
            this.stopFuts.add(fut);
        }
        return fut;
    }

    private CompletableFuture<Void> durableFinish(HybridTimestampTracker observableTimestampTracker, ZonePartitionId commitPartition, boolean commit, Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions, UUID txId, HybridTimestamp commitTimestamp, CompletableFuture<TransactionMeta> txFinishFuture) {
        assert (commitPartition != null);
        return this.trackFuture((CompletableFuture)((CompletableFuture)((CompletableFuture)this.placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartition).thenCompose(meta -> this.sendFinishRequest(observableTimestampTracker, commitPartition, meta.getLeaseholder(), meta.getStartTime().longValue(), commit, enlistedPartitions, txId, commitTimestamp, txFinishFuture))).handle((res, ex) -> {
            if (ex != null) {
                Throwable cause = ExceptionUtils.unwrapRootCause((Throwable)ex);
                if (cause instanceof MismatchingTransactionOutcomeInternalException) {
                    MismatchingTransactionOutcomeInternalException transactionException = (MismatchingTransactionOutcomeInternalException)((Object)((Object)cause));
                    TransactionResult result = transactionException.transactionResult();
                    Object updatedMeta = this.updateTxMeta(txId, old -> TxStateMeta.builder(old, result.transactionState()).commitPartitionId(commitPartition).commitTimestamp(result.commitTimestamp()).build());
                    txFinishFuture.complete((TransactionMeta)updatedMeta);
                    return CompletableFuture.failedFuture(cause);
                }
                if (ReplicatorRecoverableExceptions.isRecoverable((Throwable)cause)) {
                    LOG.debug("Failed to finish Tx. The operation will be retried [txId={}].", ex, new Object[]{txId});
                    return CompletableFuture.supplyAsync(() -> this.durableFinish(observableTimestampTracker, commitPartition, commit, enlistedPartitions, txId, commitTimestamp, txFinishFuture), this.partitionOperationsExecutor).thenCompose(Function.identity());
                }
                LOG.warn("Failed to finish Tx [txId={}].", ex, new Object[]{txId});
                return CompletableFuture.failedFuture(cause);
            }
            return CompletableFutures.nullCompletedFuture();
        })).thenCompose(Function.identity()));
    }

    private CompletableFuture<Void> sendFinishRequest(HybridTimestampTracker observableTimestampTracker, ZonePartitionId commitPartition, String primaryConsistentId, Long enlistmentConsistencyToken, boolean commit, Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions, UUID txId, HybridTimestamp commitTimestamp, CompletableFuture<TransactionMeta> txFinishFuture) {
        LOG.debug("Finish [partition={}, node={}, enlistmentConsistencyToken={} commit={}, txId={}, groups={}", new Object[]{commitPartition, primaryConsistentId, enlistmentConsistencyToken, commit, txId, enlistedPartitions});
        return this.txMessageSender.finish(primaryConsistentId, commitPartition, enlistedPartitions, txId, enlistmentConsistencyToken, commit, commitTimestamp).thenAccept(txResult -> {
            TxManagerImpl.validateTxFinishedAsExpected(commit, txId, txResult);
            Object updatedMeta = this.updateTxMeta(txId, old -> TxStateMeta.builder(old, txResult.transactionState()).txCoordinatorId(this.localNodeId).commitTimestamp(txResult.commitTimestamp()).build());
            assert (TxState.isFinalState(((TxStateMeta)updatedMeta).txState())) : "Unexpected transaction state [id=" + txId + ", state=" + ((TxStateMeta)updatedMeta).txState() + "].";
            txFinishFuture.complete((TransactionMeta)updatedMeta);
            if (commit) {
                observableTimestampTracker.update(commitTimestamp);
            }
        });
    }

    private static void validateTxFinishedAsExpected(boolean commit, UUID txId, TransactionResult txResult) {
        if (commit != (txResult.transactionState() == TxState.COMMITTED)) {
            LOG.error("Failed to finish a transaction that is already finished [txId={}, expectedState={}, actualState={}].", new Object[]{txId, commit ? TxState.COMMITTED : TxState.ABORTED, txResult.transactionState()});
            throw new MismatchingTransactionOutcomeInternalException("Failed to change the outcome of a finished transaction [txId=" + txId + ", txState=" + txResult.transactionState() + "].", txResult);
        }
    }

    @Override
    public int finished() {
        return (int)this.txMetrics.finishedTransactions();
    }

    @Override
    public int pending() {
        return (int)this.txMetrics.activeTransactions();
    }

    @Override
    public InternalTransaction beginRemote(final UUID txId, ZonePartitionId commitPartId, UUID coord, long token, long timeout, final boolean external, final Consumer<Throwable> cb) {
        assert (commitPartId.zoneId() >= 0 && commitPartId.partitionId() >= 0) : "Illegal condition for direct mapping: " + commitPartId;
        timeout = timeout == 0L ? (Long)this.txConfig.readWriteTimeoutMillis().value() : timeout;
        RemoteReadWriteTransaction tx = new RemoteReadWriteTransaction(txId, commitPartId, coord, token, this.topologyService.localMember(), timeout + this.clockService.maxClockSkewMillis()){
            boolean isTimeout;
            TxState txState;
            {
                super(txId2, commitGroupId, coord, token, localNode, timeout);
                this.isTimeout = false;
                this.txState = TxState.PENDING;
            }

            @Override
            public TxState state() {
                return this.txState;
            }

            @Override
            public CompletableFuture<Void> rollbackTimeoutExceededAsync() {
                this.isTimeout = true;
                TxManagerImpl.this.partitionOperationsExecutor.execute(() -> TxManagerImpl.this.lockManager.releaseAll(txId));
                return CompletableFutures.nullCompletedFuture();
            }

            @Override
            public boolean isRolledBackWithTimeoutExceeded() {
                return this.isTimeout;
            }

            @Override
            public CompletableFuture<Void> kill() {
                this.txState = TxState.ABORTED;
                return CompletableFutures.nullCompletedFuture();
            }

            @Override
            public void processDelayedAck(Object ignored, @Nullable Throwable err) {
                try {
                    cb.accept(err);
                }
                catch (Throwable t) {
                    LOG.error("Failed to process delayed ack [tx={}]", t, new Object[]{this});
                }
            }

            @Override
            public boolean external() {
                return external;
            }
        };
        this.transactionExpirationRegistry.register(tx);
        return tx;
    }

    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        DeadlockPreventionPolicyImpl deadlockPreventionPolicy = new DeadlockPreventionPolicyImpl(DEFAULT_TX_ID_COMPARATOR, 0L);
        this.lockManager.start(deadlockPreventionPolicy);
        this.localNodeId = this.topologyService.localMember().id();
        this.messagingService.addMessageHandler(ReplicaMessageGroup.class, (NetworkMessageHandler)this);
        this.persistentTxStateVacuumizer = new PersistentTxStateVacuumizer(this.replicaService, this.topologyService.localMember(), this.clockService, this.placementDriver, this.failureProcessor);
        this.txStateVolatileStorage.start();
        this.txViewProvider.init(this.localNodeId, this.txStateVolatileStorage.statesMap());
        this.orphanDetector.start(this.txStateVolatileStorage, () -> TxManagerImpl.longProperty(this.systemCfg, ABANDONED_CHECK_TS_PROP, 5000L));
        this.txCleanupRequestSender.start();
        this.txCleanupRequestHandler.start();
        this.placementDriver.listen((Event)PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.primaryReplicaExpiredListener);
        this.placementDriver.listen((Event)PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this.primaryReplicaElectedListener);
        this.transactionExpirationJobFuture = this.commonScheduler.scheduleAtFixedRate(this::expireTransactionsUpToNow, 1000L, 1000L, TimeUnit.MILLISECONDS);
        this.lockRetryCount = Math.toIntExact(TxManagerImpl.longProperty(this.systemCfg, LOCK_RETRY_COUNT_PROP, 3L));
        this.metricsManager.registerSource((MetricSource)this.txMetrics);
        this.metricsManager.enable((MetricSource)this.txMetrics);
        return CompletableFutures.nullCompletedFuture();
    }

    private void expireTransactionsUpToNow() {
        HybridTimestamp expirationTime = null;
        try {
            expirationTime = this.clockService.current();
            this.transactionExpirationRegistry.expireUpTo(expirationTime.getPhysical());
        }
        catch (Throwable t) {
            this.failureProcessor.process(new FailureContext(t, String.format("Could not expire transactions up to %s", expirationTime)));
        }
    }

    public void beforeNodeStop() {
        this.isStopping = true;
        this.orphanDetector.stop();
    }

    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return CompletableFutures.nullCompletedFuture();
        }
        ArrayList toWait = new ArrayList();
        for (CompletableFuture<?> stopFut : this.stopFuts) {
            if (stopFut.isDone()) continue;
            toWait.add(stopFut);
        }
        this.stopFuts.clear();
        LOG.debug("Waiting for tx finish futures on shutdown [cnt=" + toWait.size() + "]", new Object[0]);
        return CompletableFutures.allOf(toWait).handle((r, e) -> {
            this.txStateVolatileStorage.stop();
            this.txCleanupRequestHandler.stop();
            this.placementDriver.removeListener((Event)PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.primaryReplicaExpiredListener);
            this.placementDriver.removeListener((Event)PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this.primaryReplicaElectedListener);
            ScheduledFuture<?> expirationJobFuture = this.transactionExpirationJobFuture;
            if (expirationJobFuture != null) {
                expirationJobFuture.cancel(false);
            }
            this.transactionExpirationRegistry.abortAllRegistered();
            IgniteUtils.shutdownAndAwaitTermination((ExecutorService)this.writeIntentSwitchPool, (long)10L, (TimeUnit)TimeUnit.SECONDS);
            return null;
        });
    }

    @Override
    public LockManager lockManager() {
        return this.lockManager;
    }

    @Override
    public CompletableFuture<Void> cleanup(@Nullable ZonePartitionId commitPartitionId, Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId) {
        return this.txCleanupRequestSender.cleanup(commitPartitionId, enlistedPartitions, commit, commitTimestamp, txId);
    }

    @Override
    public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, Collection<EnlistedPartitionGroup> enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId) {
        return this.txCleanupRequestSender.cleanup(commitPartitionId, enlistedPartitions, commit, commitTimestamp, txId);
    }

    @Override
    public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, String node, UUID txId) {
        return this.txCleanupRequestSender.cleanup(commitPartitionId, node, txId);
    }

    @Override
    public CompletableFuture<Void> vacuum(ResourceVacuumMetrics resourceVacuumMetrics) {
        if (this.persistentTxStateVacuumizer == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        long vacuumObservationTimestamp = System.currentTimeMillis();
        return this.txStateVolatileStorage.vacuum(vacuumObservationTimestamp, TxManagerImpl.longProperty(this.systemCfg, RESOURCE_TTL_PROP, 30000L), this.persistentTxStateVacuumizer::vacuumPersistentTxStates, resourceVacuumMetrics);
    }

    @Override
    public CompletableFuture<Boolean> kill(UUID txId) {
        TxStateMeta state = this.txStateVolatileStorage.state(txId);
        if (state != null && state.tx() != null) {
            if (!state.tx().isReadOnly() && state.tx().implicit()) {
                return CompletableFutures.falseCompletedFuture();
            }
            return state.tx().kill().thenApply(unused -> true);
        }
        return CompletableFutures.falseCompletedFuture();
    }

    @Override
    public int lockRetryCount() {
        return this.lockRetryCount;
    }

    @Override
    public CompletableFuture<Void> executeWriteIntentSwitchAsync(Runnable runnable) {
        return CompletableFuture.runAsync(runnable, this.writeIntentSwitchPool);
    }

    void onCompleteReadOnlyTransaction(boolean commitIntent, TxIdAndTimestamp txIdAndTimestamp) {
        UUID txId = txIdAndTimestamp.getTxId();
        this.txMetrics.onReadOnlyTransactionFinished(txId, commitIntent);
        this.transactionInflights.markReadOnlyTxFinished(txId);
    }

    public void onReceived(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) {
        if (!(message instanceof ReplicaResponse) || correlationId != null) {
            return;
        }
        if (message instanceof ErrorReplicaResponse) {
            ErrorReplicaResponse response = (ErrorReplicaResponse)message;
            Throwable err = response.throwable();
            Throwable cause = ExceptionUtils.unwrapCause((Throwable)err);
            if (cause instanceof DelayedAckException) {
                DelayedAckException err0 = (DelayedAckException)((Object)cause);
                this.transactionInflights.removeInflight(err0.txId(), (Throwable)((Object)err0));
            }
            return;
        }
        ReplicaResponse response = (ReplicaResponse)message;
        Object result = response.result();
        if (result instanceof UUID) {
            this.transactionInflights.removeInflight((UUID)result);
        }
        if (result instanceof WriteIntentSwitchReplicatedInfo) {
            this.txCleanupRequestHandler.writeIntentSwitchReplicated((WriteIntentSwitchReplicatedInfo)result);
        }
    }

    private CompletableFuture<Void> verifyCommitTimestamp(Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedGroups, HybridTimestamp commitTimestamp) {
        CompletableFuture[] verificationFutures = new CompletableFuture[enlistedGroups.size()];
        int cnt = -1;
        for (Map.Entry<ZonePartitionId, PendingTxPartitionEnlistment> enlistedGroup : enlistedGroups.entrySet()) {
            ZonePartitionId groupId = enlistedGroup.getKey();
            long expectedEnlistmentConsistencyToken = enlistedGroup.getValue().consistencyToken();
            verificationFutures[++cnt] = this.placementDriver.getPrimaryReplica((ReplicationGroupId)groupId, commitTimestamp).thenAccept(currentPrimaryReplica -> {
                if (currentPrimaryReplica == null || expectedEnlistmentConsistencyToken != currentPrimaryReplica.getStartTime().longValue()) {
                    throw new PrimaryReplicaExpiredException(groupId, expectedEnlistmentConsistencyToken, commitTimestamp, (ReplicaMeta)currentPrimaryReplica);
                }
                assert (commitTimestamp.compareTo(currentPrimaryReplica.getExpirationTime()) <= 0) : IgniteStringFormatter.format((String)"Commit timestamp is greater than primary replica expiration timestamp: [groupId = {}, commit timestamp = {}, primary replica expiration timestamp = {}]", (Object[])new Object[]{groupId, commitTimestamp, currentPrimaryReplica.getExpirationTime()});
            });
        }
        return CompletableFuture.allOf(verificationFutures);
    }

    public List<SystemView<?>> systemViews() {
        LocksViewProvider lockViewProvider = new LocksViewProvider(this.lockManager::locks);
        return List.of(this.txViewProvider.get(), lockViewProvider.get());
    }

    private HybridTimestamp createBeginTimestampWithIncrementRwTxCounter() {
        return this.localRwTxCounter.inUpdateRwTxCountLock(() -> {
            HybridTimestamp beginTs = this.clockService.now();
            this.localRwTxCounter.incrementRwTxCount(beginTs);
            return beginTs;
        });
    }

    private void decrementRwTxCount(UUID txId) {
        this.localRwTxCounter.inUpdateRwTxCountLock(() -> {
            this.localRwTxCounter.decrementRwTxCount(TransactionIds.beginTimestamp(txId));
            return null;
        });
    }

    private static long longProperty(SystemDistributedConfiguration systemProperties, String name, long defaultValue) {
        SystemPropertyView property = (SystemPropertyView)((NamedListView)systemProperties.properties().value()).get(name);
        return property == null ? defaultValue : Long.parseLong(property.propertyValue());
    }

    @TestOnly
    public void clearLocalRwTxCounter() {
        this.localRwTxCounter.clear();
    }
}

