/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.transactions;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.GridKernalState;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.transactions.TrackCommittedResult;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.U;

public class LocalPendingTransactionsTracker {
    private final GridCacheSharedContext<?, ?> cctx;
    private final IgniteLogger log;
    private final Set<GridCacheVersion> currentlyCommittingTxs = U.newConcurrentHashSet();
    private volatile boolean enabled = IgniteSystemProperties.getBoolean("IGNITE_PENDING_TX_TRACKER_ENABLED", false);
    private final ConcurrentHashMap<GridCacheVersion, Integer> preparedCommittedTxsCounters = new ConcurrentHashMap();
    private volatile GridConcurrentHashSet<GridCacheVersion> trackedPreparedTxs = new GridConcurrentHashSet();
    private volatile GridConcurrentHashSet<GridCacheVersion> trackedCommittedTxs = new GridConcurrentHashSet();
    private volatile ConcurrentHashMap<KeyCacheObject, Set<GridCacheVersion>> writtenKeysToNearXidVer = new ConcurrentHashMap();
    private volatile ConcurrentHashMap<GridCacheVersion, Set<GridCacheVersion>> dependentTransactionsGraph = new ConcurrentHashMap();
    private final ReentrantReadWriteLock stateLock = new ReentrantReadWriteLock();
    private final AtomicBoolean trackPrepared = new AtomicBoolean(false);
    private final AtomicBoolean trackCommitted = new AtomicBoolean(false);
    private volatile TxFinishAwaiting txFinishAwaiting;

    public LocalPendingTransactionsTracker(GridCacheSharedContext<?, ?> cctx) {
        this.cctx = cctx;
        this.log = cctx.logger(this.getClass());
    }

    void enable() {
        assert (this.cctx.kernalContext().gateway().getState() == GridKernalState.STARTING);
        this.enabled = true;
    }

    public boolean enabled() {
        return this.enabled;
    }

    public Set<GridCacheVersion> currentlyPreparedTxs() {
        assert (this.stateLock.writeLock().isHeldByCurrentThread());
        return U.sealSet(this.preparedCommittedTxsCounters.keySet());
    }

    public void startTrackingPrepared() {
        assert (this.stateLock.writeLock().isHeldByCurrentThread());
        assert (!this.trackPrepared.get()) : "Tracking prepared transactions is already initialized.";
        this.trackPrepared.set(true);
    }

    public Set<GridCacheVersion> stopTrackingPrepared() {
        assert (this.stateLock.writeLock().isHeldByCurrentThread());
        assert (this.trackPrepared.get()) : "Tracking prepared transactions is not initialized yet.";
        this.trackPrepared.set(false);
        Set<GridCacheVersion> res = U.sealSet(this.trackedPreparedTxs);
        this.trackedPreparedTxs = new GridConcurrentHashSet();
        return res;
    }

    public void startTrackingCommitted() {
        assert (this.stateLock.writeLock().isHeldByCurrentThread());
        assert (!this.trackCommitted.get()) : "Tracking committed transactions is already initialized.";
        this.trackCommitted.set(true);
    }

    public TrackCommittedResult stopTrackingCommitted() {
        assert (this.stateLock.writeLock().isHeldByCurrentThread());
        assert (this.trackCommitted.get()) : "Tracking committed transactions is not initialized yet.";
        this.trackCommitted.set(false);
        Set<GridCacheVersion> committedTxs = U.sealSet(this.trackedCommittedTxs);
        Map<GridCacheVersion, Set<GridCacheVersion>> dependentTxs = U.sealMap(this.dependentTransactionsGraph);
        this.trackedCommittedTxs = new GridConcurrentHashSet();
        this.writtenKeysToNearXidVer = new ConcurrentHashMap();
        this.dependentTransactionsGraph = new ConcurrentHashMap();
        return new TrackCommittedResult(committedTxs, dependentTxs);
    }

    public Set<GridCacheVersion> startTxFinishAwaiting(long preparedTxsTimeout, long committingTxsTimeout) {
        TxFinishAwaiting awaiting;
        assert (this.stateLock.writeLock().isHeldByCurrentThread());
        assert (this.txFinishAwaiting == null) : this.txFinishAwaiting;
        this.txFinishAwaiting = awaiting = new TxFinishAwaiting(preparedTxsTimeout, committingTxsTimeout);
        return awaiting.committingTxs;
    }

    public IgniteInternalFuture<Set<GridCacheVersion>> awaitPendingTxsFinished(Set<GridCacheVersion> globalCommittingTxs) {
        assert (this.stateLock.writeLock().isHeldByCurrentThread());
        TxFinishAwaiting awaiting = this.txFinishAwaiting;
        assert (awaiting != null);
        awaiting.addGlobalCommittingTxs(globalCommittingTxs);
        return awaiting.fut;
    }

    public void writeLockState() {
        this.stateLock.writeLock().lock();
    }

    public void writeUnlockState() {
        this.stateLock.writeLock().unlock();
    }

    public void onTxPrepared(GridCacheVersion nearXidVer) {
        if (!this.enabled) {
            return;
        }
        this.stateLock.readLock().lock();
        try {
            this.preparedCommittedTxsCounters.compute(nearXidVer, (key, value) -> value == null ? 1 : value + 1);
            if (this.trackPrepared.get()) {
                this.trackedPreparedTxs.add(nearXidVer);
            }
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    public void onTxCommitted(GridCacheVersion nearXidVer) {
        if (!this.enabled) {
            return;
        }
        this.stateLock.readLock().lock();
        try {
            Integer newCntr = this.preparedCommittedTxsCounters.compute(nearXidVer, (key, value) -> {
                if (value == null || value <= 0) {
                    throw new AssertionError((Object)("Committing transaction that was rolled back or concurrently committed [nearXidVer=" + nearXidVer + ", currentCntr=" + value + ']'));
                }
                if (value == 1) {
                    return null;
                }
                return value - 1;
            });
            if (newCntr == null) {
                this.currentlyCommittingTxs.remove(nearXidVer);
                if (this.trackCommitted.get()) {
                    this.trackedCommittedTxs.add(nearXidVer);
                }
                this.checkTxFinishFutureDone(nearXidVer);
            }
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    public void onTxRolledBack(GridCacheVersion nearXidVer) {
        if (!this.enabled) {
            return;
        }
        this.stateLock.readLock().lock();
        try {
            Integer newCntr = this.preparedCommittedTxsCounters.compute(nearXidVer, (key, value) -> {
                if (value == null || value <= 1) {
                    return null;
                }
                return value - 1;
            });
            if (newCntr == null) {
                this.currentlyCommittingTxs.remove(nearXidVer);
                this.checkTxFinishFutureDone(nearXidVer);
            }
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onKeysWritten(GridCacheVersion nearXidVer, List<KeyCacheObject> keys) {
        if (!this.enabled) {
            return;
        }
        this.stateLock.readLock().lock();
        try {
            if (!this.preparedCommittedTxsCounters.containsKey(nearXidVer)) {
                throw new AssertionError((Object)("Tx should be in PREPARED state when logging data records: " + nearXidVer));
            }
            this.currentlyCommittingTxs.add(nearXidVer);
            if (!this.trackCommitted.get()) {
                return;
            }
            for (KeyCacheObject key : keys) {
                this.writtenKeysToNearXidVer.compute(key, (keyObj, keyTxsSet) -> {
                    Set keyTxs = keyTxsSet == null ? new HashSet() : keyTxsSet;
                    for (GridCacheVersion previousTx : keyTxs) {
                        this.dependentTransactionsGraph.compute(previousTx, (tx, depTxsSet) -> {
                            Set dependentTxs = depTxsSet == null ? new HashSet() : depTxsSet;
                            dependentTxs.add(nearXidVer);
                            return dependentTxs;
                        });
                    }
                    keyTxs.add(nearXidVer);
                    return keyTxs;
                });
            }
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onKeysRead(GridCacheVersion nearXidVer, List<KeyCacheObject> keys) {
        if (!this.enabled) {
            return;
        }
        this.stateLock.readLock().lock();
        try {
            if (!this.preparedCommittedTxsCounters.containsKey(nearXidVer)) {
                throw new AssertionError((Object)("Tx should be in PREPARED state when logging data records: " + nearXidVer));
            }
            this.currentlyCommittingTxs.add(nearXidVer);
            if (!this.trackCommitted.get()) {
                return;
            }
            for (KeyCacheObject key : keys) {
                this.writtenKeysToNearXidVer.computeIfPresent(key, (keyObj, keyTxsSet) -> {
                    for (GridCacheVersion previousTx : keyTxsSet) {
                        this.dependentTransactionsGraph.compute(previousTx, (tx, depTxsSet) -> {
                            Set dependentTxs = depTxsSet == null ? new HashSet() : depTxsSet;
                            dependentTxs.add(nearXidVer);
                            return dependentTxs;
                        });
                    }
                    return keyTxsSet;
                });
            }
        }
        finally {
            this.stateLock.readLock().unlock();
        }
    }

    public void reset() {
        this.stateLock.writeLock().lock();
        try {
            this.txFinishAwaiting = null;
            this.trackCommitted.set(false);
            this.trackedCommittedTxs = new GridConcurrentHashSet();
            this.trackPrepared.set(false);
            this.trackedPreparedTxs = new GridConcurrentHashSet();
            this.writtenKeysToNearXidVer = new ConcurrentHashMap();
            this.dependentTransactionsGraph = new ConcurrentHashMap();
        }
        finally {
            this.stateLock.writeLock().unlock();
        }
    }

    private void checkTxFinishFutureDone(GridCacheVersion nearXidVer) {
        if (!this.enabled) {
            return;
        }
        TxFinishAwaiting awaiting = this.txFinishAwaiting;
        if (awaiting != null) {
            awaiting.onTxFinished(nearXidVer);
        }
    }

    private class TxFinishAwaiting {
        private final GridFutureAdapter<Set<GridCacheVersion>> fut;
        private final Set<GridCacheVersion> notCommittedInTimeoutTxs;
        private final Set<GridCacheVersion> committingTxs;
        private volatile boolean globalCommittingTxsAdded;
        private volatile boolean awaitingPreparedIsDone;
        private volatile boolean timeout;

        private TxFinishAwaiting(final long preparedTxsTimeout, final long committingTxsTimeout) {
            assert (preparedTxsTimeout > 0L) : preparedTxsTimeout;
            assert (committingTxsTimeout > 0L) : committingTxsTimeout;
            assert (committingTxsTimeout >= preparedTxsTimeout) : committingTxsTimeout + " < " + preparedTxsTimeout;
            this.fut = new GridFutureAdapter();
            this.notCommittedInTimeoutTxs = new GridConcurrentHashSet<GridCacheVersion>(LocalPendingTransactionsTracker.this.preparedCommittedTxsCounters.keySet());
            this.committingTxs = U.newConcurrentHashSet(LocalPendingTransactionsTracker.this.currentlyCommittingTxs);
            if (committingTxsTimeout > preparedTxsTimeout) {
                LocalPendingTransactionsTracker.this.cctx.time().addTimeoutObject(new GridTimeoutObjectAdapter(preparedTxsTimeout){

                    @Override
                    public void onTimeout() {
                        TxFinishAwaiting.this.awaitingPreparedIsDone = true;
                        if (TxFinishAwaiting.this != LocalPendingTransactionsTracker.this.txFinishAwaiting || TxFinishAwaiting.this.fut.isDone()) {
                            return;
                        }
                        LocalPendingTransactionsTracker.this.stateLock.readLock().lock();
                        try {
                            if (TxFinishAwaiting.this.allCommittingIsFinished()) {
                                TxFinishAwaiting.this.finish();
                            } else {
                                LocalPendingTransactionsTracker.this.log.warning("Committing transactions not completed in " + preparedTxsTimeout + " ms: " + TxFinishAwaiting.this.committingTxs);
                            }
                        }
                        finally {
                            LocalPendingTransactionsTracker.this.stateLock.readLock().unlock();
                        }
                    }
                });
            }
            LocalPendingTransactionsTracker.this.cctx.time().addTimeoutObject(new GridTimeoutObjectAdapter(committingTxsTimeout){

                @Override
                public void onTimeout() {
                    TxFinishAwaiting.this.timeout = true;
                    if (committingTxsTimeout == preparedTxsTimeout) {
                        TxFinishAwaiting.this.awaitingPreparedIsDone = true;
                    }
                    if (TxFinishAwaiting.this != LocalPendingTransactionsTracker.this.txFinishAwaiting || TxFinishAwaiting.this.fut.isDone()) {
                        return;
                    }
                    LocalPendingTransactionsTracker.this.stateLock.readLock().lock();
                    try {
                        if (!TxFinishAwaiting.this.allCommittingIsFinished()) {
                            LocalPendingTransactionsTracker.this.log.warning("Committing transactions not completed in " + committingTxsTimeout + " ms: " + TxFinishAwaiting.this.committingTxs);
                        }
                        TxFinishAwaiting.this.finish();
                    }
                    finally {
                        LocalPendingTransactionsTracker.this.stateLock.readLock().unlock();
                    }
                }
            });
        }

        void onTxFinished(GridCacheVersion nearXidVer) {
            this.notCommittedInTimeoutTxs.remove(nearXidVer);
            this.checkTxsFinished();
        }

        void checkTxsFinished() {
            if (this.notCommittedInTimeoutTxs.isEmpty() || this.awaitingPreparedIsDone && this.allCommittingIsFinished()) {
                this.finish();
            }
        }

        void finish() {
            if (this.globalCommittingTxsAdded || this.timeout) {
                LocalPendingTransactionsTracker.this.txFinishAwaiting = null;
                this.fut.onDone(this.notCommittedInTimeoutTxs.isEmpty() ? Collections.emptySet() : U.sealSet(this.notCommittedInTimeoutTxs));
            }
        }

        boolean allCommittingIsFinished() {
            this.committingTxs.retainAll(this.notCommittedInTimeoutTxs);
            return this.committingTxs.isEmpty();
        }

        void addGlobalCommittingTxs(Set<GridCacheVersion> globalCommittingTxs) {
            assert (LocalPendingTransactionsTracker.this.stateLock.writeLock().isHeldByCurrentThread());
            this.notCommittedInTimeoutTxs.addAll(LocalPendingTransactionsTracker.this.preparedCommittedTxsCounters.keySet());
            HashSet<GridCacheVersion> pendingTxs = new HashSet<GridCacheVersion>(this.notCommittedInTimeoutTxs);
            pendingTxs.retainAll(globalCommittingTxs);
            this.committingTxs.addAll(pendingTxs);
            this.globalCommittingTxsAdded = true;
            assert (!this.fut.isDone() || this.timeout);
            this.checkTxsFinished();
        }
    }
}

