/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database.txdr;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
import org.apache.ignite.internal.pagemem.wal.record.TimeStampedConsistentCutRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.IterationReason;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.transactions.LocalPendingTransactionsTracker;
import org.apache.ignite.internal.processors.cache.transactions.TrackCommittedResult;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.gridgain.grid.internal.processors.cache.database.snapshot.SnapshotCreateFuture;
import org.gridgain.grid.internal.processors.cache.database.txdr.ConsistentCut;
import org.gridgain.grid.internal.processors.cache.database.txdr.GlobalConsistentCutData;
import org.gridgain.grid.internal.processors.cache.database.txdr.LocalConsistentCutData;
import org.gridgain.grid.internal.processors.cache.database.txdr.LocalConsistentCutMetrics;

public class ConsistentCutContext {
    private final long WAITING_FOR_REORDERED_ATOMIC_UPDATES_INTERVAL = Long.getLong("GG_WAITING_FOR_REORDERED_ATOMIC_UPDATES_INTERVAL", 3000L);
    private static final long NOT_USED_IDENTIFIER = -1L;
    static final long PREPARED_TXS_TIMEOUT = 5000L;
    private static final long COMMITTING_TXS_TIMEOUT = 600000L;
    private final IgniteWriteAheadLogManager walMgr;
    private final IgniteTxManager txMgr;
    private final IgniteCacheDatabaseSharedManager dbMgr;
    private final GridCacheSharedContext<?, ?> cacheSharedCtx;
    private final long cutId;
    private final long snapshotId;
    private final long spawnId;
    private final Set<GridCacheVersion> locSkipTxs;
    private final Set<GridCacheVersion> possibleRollbackTxs;
    private final Map<GridCacheVersion, Set<GridCacheVersion>> dependentTxsGraph;
    private final LocalConsistentCutMetrics metrics;
    private volatile WALPointer fuzzyBorderStartPtr;
    private volatile WALPointer consistentCutPtr;
    private volatile long consistentCutPtrTimestamp;
    final SnapshotCreateFuture.ExchangelessSnapshotContext exchangelessCtx;
    private Map<Integer, Map<Integer, Long>> locAtomicUpdCntrs;

    private ConsistentCutContext(long cutId, long snapshotId, long spawnId, SnapshotCreateFuture.ExchangelessSnapshotContext exchangelessCtx, GridCacheSharedContext<?, ?> cctx) {
        if (!cctx.tm().pendingTxsTracker().enabled()) {
            throw new IgniteException("Local transaction tracker is not enabled. Consider changing 'IGNITE_PENDING_TX_TRACKER_ENABLED' environmet variable.");
        }
        this.cutId = cutId;
        this.snapshotId = snapshotId;
        this.spawnId = spawnId;
        this.walMgr = cctx.wal();
        this.txMgr = cctx.tm();
        this.dbMgr = cctx.database();
        this.cacheSharedCtx = cctx;
        this.locSkipTxs = new GridConcurrentHashSet();
        this.possibleRollbackTxs = new GridConcurrentHashSet();
        this.dependentTxsGraph = new ConcurrentHashMap<GridCacheVersion, Set<GridCacheVersion>>();
        this.locAtomicUpdCntrs = exchangelessCtx == null ? Collections.emptyMap() : new ConcurrentHashMap();
        this.exchangelessCtx = exchangelessCtx;
        this.metrics = new LocalConsistentCutMetrics();
    }

    public ConsistentCutContext(long cutId, long spawnId, GridCacheSharedContext<?, ?> cctx) {
        this(cutId, -1L, spawnId, null, cctx);
    }

    public ConsistentCutContext(long cutId, long snapshotId, SnapshotCreateFuture.ExchangelessSnapshotContext exchangelessCtx, GridCacheSharedContext<?, ?> cctx) {
        this(cutId, snapshotId, -1L, exchangelessCtx, cctx);
    }

    public long cutId() {
        return this.cutId;
    }

    public LocalConsistentCutMetrics metrics() {
        return this.metrics;
    }

    public LocalConsistentCutData startTrackingTransactionsLocally() throws IgniteCheckedException {
        Set committingTxs;
        if (this.exchangelessCtx != null) {
            long start = U.currentTimeMillis();
            while (U.currentTimeMillis() - start < this.WAITING_FOR_REORDERED_ATOMIC_UPDATES_INTERVAL) {
                U.sleep((long)this.WAITING_FOR_REORDERED_ATOMIC_UPDATES_INTERVAL);
            }
        }
        LocalPendingTransactionsTracker tracker = this.txMgr.pendingTxsTracker();
        tracker.writeLockState();
        try {
            this.fuzzyBorderStartPtr = this.walMgr.log((WALRecord)new TimeStampedConsistentCutRecord(this.snapshotId));
            tracker.startTrackingCommitted();
            committingTxs = U.sealSet((Collection)tracker.startTxFinishAwaiting(5000L, 600000L));
        }
        finally {
            tracker.writeUnlockState();
        }
        return new LocalConsistentCutData(committingTxs, Collections.emptyMap());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void awaitPendingTransactionsLocally(Set<GridCacheVersion> globalCommittingTxs) throws IgniteCheckedException {
        IgniteInternalFuture fut;
        LocalPendingTransactionsTracker tracker = this.txMgr.pendingTxsTracker();
        tracker.writeLockState();
        long ts = System.currentTimeMillis();
        try {
            fut = tracker.awaitPendingTxsFinished(globalCommittingTxs);
        }
        finally {
            tracker.writeUnlockState();
        }
        Set failedToAwait = (Set)fut.get();
        this.metrics.awaitFinishOfPreparedMs = System.currentTimeMillis() - ts;
        this.locSkipTxs.addAll(failedToAwait);
        this.metrics.failedToAwaitFinishOfPreparedSize = failedToAwait.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void markConsistentCutPoint() throws IgniteCheckedException {
        Set preparedAt2;
        TrackCommittedResult res;
        LocalPendingTransactionsTracker tracker = this.txMgr.pendingTxsTracker();
        this.dbMgr.checkpointReadLock();
        try {
            tracker.writeLockState();
            try {
                RolloverType rolloverType = this.exchangelessCtx == null ? RolloverType.CURRENT_SEGMENT : RolloverType.NONE;
                TimeStampedConsistentCutRecord ts = new TimeStampedConsistentCutRecord(this.snapshotId);
                this.consistentCutPtr = this.walMgr.log((WALRecord)ts, rolloverType);
                this.consistentCutPtrTimestamp = ts.timestamp();
                res = tracker.stopTrackingCommitted();
                preparedAt2 = tracker.currentlyPreparedTxs();
                tracker.startTrackingPrepared();
            }
            finally {
                tracker.writeUnlockState();
            }
        }
        finally {
            this.dbMgr.checkpointReadUnlock();
        }
        this.locSkipTxs.addAll(preparedAt2);
        this.possibleRollbackTxs.addAll(res.committedTxs());
        this.metrics.commitedFrom1to2size = res.committedTxs().size();
        this.metrics.preparedAt2size = preparedAt2.size();
        Map depTxsGraph = res.dependentTxsGraph();
        this.metrics.dependentTransactionsGraphVertices = depTxsGraph.size();
        this.metrics.dependentTransactionsGraphEdges = depTxsGraph.values().stream().mapToInt(Set::size).sum();
        this.dependentTxsGraph.putAll(depTxsGraph);
        this.collectUpdateCounters();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void collectUpdateCounters() throws IgniteCheckedException {
        if (this.exchangelessCtx == null) {
            return;
        }
        WALPointer atomicStartTrackingPtr = this.exchangelessCtx.snapshotRecordPointer();
        FileWALPointer cutPtr = (FileWALPointer)this.consistentCutPtr;
        if (!this.walMgr.reserve(atomicStartTrackingPtr)) {
            throw new IgniteCheckedException("Cannot reserve WAL starting from start tracking pointer [ptr=" + atomicStartTrackingPtr + ']');
        }
        HashMap<Integer, Map> trackers = new HashMap<Integer, Map>();
        try (WALIterator it = this.walMgr.replay(atomicStartTrackingPtr, IterationReason.UPDATE_COUNTERS);){
            while (it.hasNext()) {
                IgniteBiTuple rec = (IgniteBiTuple)it.next();
                FileWALPointer recPtr = (FileWALPointer)((WALRecord)rec.get2()).position();
                if (recPtr.compareTo(cutPtr) >= 0) {
                    break;
                }
                if (((WALRecord)rec.get2()).type() != WALRecord.RecordType.DATA_RECORD && ((WALRecord)rec.get2()).type() != WALRecord.RecordType.DATA_RECORD_V2 && ((WALRecord)rec.get2()).type() != WALRecord.RecordType.OUT_OF_ORDER_UPDATE) continue;
                DataRecord dataRecord = (DataRecord)rec.get2();
                for (DataEntry e : dataRecord.writeEntries()) {
                    GridCacheContext cacheCtx;
                    if (e.nearXidVersion() != null || (cacheCtx = this.cacheSharedCtx.cacheContext(e.cacheId())) == null || !this.exchangelessCtx.isTracked(cacheCtx.groupId(), e.partitionId())) continue;
                    trackers.computeIfAbsent(cacheCtx.groupId(), grp -> new HashMap()).computeIfAbsent(e.partitionId(), part -> new PartitionCounterTracker(this.exchangelessCtx.initialUpdateCounter(cacheCtx.groupId(), (int)part))).apply(e);
                }
            }
        }
        finally {
            this.walMgr.release(atomicStartTrackingPtr);
        }
        this.locAtomicUpdCntrs = trackers.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, val -> ((Map)val.getValue()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> ((PartitionCounterTracker)v.getValue()).applicableUpdateCounter()))));
    }

    public LocalConsistentCutData finishTrackingTransactionsLocally() {
        Set preparedFrom2to3;
        LocalPendingTransactionsTracker tracker = this.txMgr.pendingTxsTracker();
        tracker.writeLockState();
        try {
            preparedFrom2to3 = tracker.stopTrackingPrepared();
        }
        finally {
            tracker.writeUnlockState();
        }
        this.locSkipTxs.addAll(preparedFrom2to3);
        this.metrics.preparedFrom2to3size = preparedFrom2to3.size();
        return new LocalConsistentCutData(this.locSkipTxs, this.dependentTxsGraph, this.locAtomicUpdCntrs);
    }

    public ConsistentCut completeConsistentCutCreation(GlobalConsistentCutData globalData, Collection<BinaryMetadata> metas) {
        for (GridCacheVersion gridCacheVersion : this.possibleRollbackTxs) {
            if (!globalData.globalTxs().contains(gridCacheVersion)) continue;
            this.locSkipTxs.add(gridCacheVersion);
        }
        for (Map.Entry entry : this.locAtomicUpdCntrs.entrySet()) {
            Map<Integer, Long> globalParts = globalData.atomicUpdateCounters().get(entry.getKey());
            assert (globalParts != null) : "Global partitions map must not be null [grpId=" + entry.getKey() + ']';
            for (Map.Entry entry2 : ((Map)entry.getValue()).entrySet()) {
                entry2.setValue(globalParts.get(entry2.getKey()));
            }
        }
        this.metrics.globalSkipTxsSize = globalData.globalTxs().size();
        this.metrics.locSkipTxsSize = this.locSkipTxs.size();
        return new ConsistentCut(this.cutId, this.spawnId, this.fuzzyBorderStartPtr, this.consistentCutPtr, this.consistentCutPtrTimestamp, this.locSkipTxs, this.locAtomicUpdCntrs, metas, false, globalData.eventLogSnapshot().nodeLastEvents(), null);
    }

    public void reset() {
        this.locSkipTxs.clear();
        this.possibleRollbackTxs.clear();
        this.dependentTxsGraph.clear();
        this.locAtomicUpdCntrs.clear();
        this.txMgr.pendingTxsTracker().reset();
    }

    private static class PartitionCounterTracker {
        private final long initCntr;
        private long lwm;
        private NavigableMap<Long, DataEntry> pendingUpdates;

        public PartitionCounterTracker(long initCntr) {
            this.initCntr = initCntr;
            this.lwm = initCntr;
        }

        public long applicableUpdateCounter() {
            return this.lwm;
        }

        public void apply(DataEntry entry) {
            if (entry.partitionCounter() < this.initCntr) {
                return;
            }
            if (entry.partitionCounter() == this.lwm + 1L) {
                this.lwm = entry.partitionCounter();
                if (!F.isEmpty(this.pendingUpdates)) {
                    this.tryCloseGaps();
                }
            } else {
                if (this.pendingUpdates == null) {
                    this.pendingUpdates = new TreeMap<Long, DataEntry>();
                }
                this.pendingUpdates.put(entry.partitionCounter(), entry);
                this.tryCloseGaps();
            }
        }

        private void tryCloseGaps() {
            boolean closedGap = false;
            for (Map.Entry pendingUpd : this.pendingUpdates.entrySet()) {
                assert ((Long)pendingUpd.getKey() > this.lwm);
                if (this.lwm + 1L != (Long)pendingUpd.getKey()) break;
                this.lwm = (Long)pendingUpd.getKey();
                closedGap = true;
            }
            if (closedGap) {
                this.pendingUpdates.headMap(this.lwm, true).clear();
            }
        }
    }
}

