/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database.recovery;

import java.io.Serializable;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord;
import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
import org.apache.ignite.internal.pagemem.wal.record.TimeStampRecord;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.transactions.TransactionState;
import org.gridgain.grid.internal.processors.cache.database.recovery.Sender;
import org.gridgain.grid.internal.processors.cache.database.recovery.TxConvergenceMessage;
import org.gridgain.grid.internal.processors.cache.database.recovery.TxStateRequest;
import org.gridgain.grid.internal.processors.cache.database.recovery.TxStateResponse;
import org.gridgain.grid.internal.processors.cache.database.snapshot.SnapshotOperationContext;
import org.jetbrains.annotations.Nullable;

public abstract class PITRFuture
extends GridFutureAdapter<Result> {
    private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("HH:mm:ss.SSS").withZone(ZoneId.of("UTC"));
    @Nullable
    private final IgniteLogger log;
    private final GridFutureAdapter<?> initFut = new GridFutureAdapter();
    private final Object originalConsistentId;
    private final Set<Object> remainingOrigCIds = new HashSet<Object>();
    private final Sender snd;
    private final WalState walState;
    private final Set<Object> originalNodesCIds;
    private final Set<GridCacheVersion> skipTxs = new GridConcurrentHashSet();
    private final Set<Object> clusterWideNotAliveNodes = new GridConcurrentHashSet();
    private final BaselineTopology snpBlt;
    private final Object crd;
    private final AtomicReference<ConvergenceSession> convSes = new AtomicReference();
    private static final String WAL_KEY_PREFIX = "grp-wal-";
    private static final String WAL_GLOBAL_KEY_PREFIX = "grp-wal-disabled-";
    private static final String WAL_LOCAL_KEY_PREFIX = "grp-wal-local-disabled-";
    private volatile Map<Object, TxStateRequest> originalCId2Req;
    private volatile WALPointer initialWalPointer;
    private volatile SnapshotOperationContext snapshotOperationContext;

    protected PITRFuture(long time, long snpId, BaselineTopology snpBlt, boolean crdLocal, Object crdConstId, Set<Object> originalNodesCIds, Object originalConsistentId, Sender snd, @Nullable IgniteLogger log) {
        this.log = log;
        this.snd = snd;
        this.originalConsistentId = originalConsistentId;
        this.crd = crdConstId;
        assert (this.originalConsistentId != null);
        this.snpBlt = snpBlt;
        this.originalNodesCIds = originalNodesCIds;
        this.walState = new WalState(time, snpId, snpBlt, log);
        if (crdLocal) {
            this.convSes.set(new CoordinatorConvergenceSession(0));
        } else {
            this.convSes.set(new CoordinatedConvergenceSession(0));
        }
    }

    public Set<Object> scan(WALIterator it) {
        return this.scan(it, (IgniteBiPredicate<WALPointer, GridCacheVersion>)(IgniteBiPredicate & Serializable)(ptr, ver) -> true);
    }

    public Set<Object> scan(WALIterator it, IgniteBiPredicate<WALPointer, GridCacheVersion> pred) {
        IgniteBiTuple lastRead = null;
        while (it.hasNext() && this.error() == null) {
            T2<Integer, Boolean> t2;
            IgniteBiTuple tup = (IgniteBiTuple)it.next();
            if (this.initialWalPointer == null) {
                this.initialWalPointer = (WALPointer)tup.get1();
            }
            WALRecord rec = (WALRecord)tup.get2();
            lastRead = tup;
            this.walState.checkTimeReached((IgniteBiTuple<WALPointer, WALRecord>)tup);
            if (rec instanceof TxRecord) {
                TxRecord txRec = (TxRecord)rec;
                if (!pred.apply((Object)txRec.position(), (Object)txRec.nearXidVersion())) continue;
                switch (txRec.state()) {
                    case PREPARED: {
                        this.walState.preparedTx(txRec);
                        break;
                    }
                    case COMMITTED: {
                        this.walState.commitedTx(txRec);
                        break;
                    }
                    case ROLLED_BACK: {
                        this.walState.rollBackedTx(txRec);
                        break;
                    }
                }
            } else if (rec instanceof DataRecord) {
                DataRecord dataRec = (DataRecord)rec;
                this.walState.entryUpdate(dataRec, (IgniteBiPredicate<WALPointer, GridCacheVersion>)pred);
            } else if (rec instanceof ExchangeRecord) {
                this.walState.exchange((ExchangeRecord)rec);
            } else if (rec instanceof CheckpointRecord) {
                CheckpointRecord chpRec = (CheckpointRecord)rec;
                this.walState.checkpoint(chpRec);
            } else if (rec instanceof PartitionMetaStateRecord) {
                this.walState.partitionUpdateState((PartitionMetaStateRecord)rec);
            } else if (rec instanceof MetastoreDataRecord && (t2 = PITRFuture.walKeyToGroupIdAndLocalFlag(((MetastoreDataRecord)rec).key())) != null && t2.getValue() != null && ((MetastoreDataRecord)rec).value() != null) {
                this.walState.nodesWithDisabledWal.add(this.originalConsistentId);
            }
            if (!this.walState.checkFinishRecoveryScan((IgniteBiTuple<WALPointer, WALRecord>)tup)) continue;
            break;
        }
        if (lastRead != null) {
            WALRecord rec = (WALRecord)lastRead.get2();
            String time = null;
            if (rec instanceof TimeStampRecord) {
                TimeStampRecord ts = (TimeStampRecord)rec;
                time = DATE_FORMAT.format(Instant.ofEpochMilli(ts.timestamp()));
            }
            if (this.log.isInfoEnabled()) {
                this.log.info("Last read ptr:" + lastRead.get1() + " " + (time != null ? time : "") + " rec:" + rec);
            }
        } else if (this.log.isInfoEnabled()) {
            this.log.info("No records were read.");
        }
        this.walState.logScanResult(this.log);
        HashSet<Object> unavailableNodes = new HashSet<Object>();
        unavailableNodes.addAll(this.walState.nodesIsNotInTopology);
        unavailableNodes.addAll(this.walState.nodesWithDisabledWal);
        if (this.log != null && this.log.isInfoEnabled()) {
            this.log.info("Set of nodes unavailable for recovery locally calculated: " + unavailableNodes);
        }
        return unavailableNodes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void continueTxStateCommunication(SnapshotOperationContext snapshotOperationContext) {
        boolean done;
        this.snapshotOperationContext = snapshotOperationContext;
        Set<Object> set = this.remainingOrigCIds;
        synchronized (set) {
            this.remainingOrigCIds.addAll(this.originalCId2Req.keySet());
            done = this.remainingOrigCIds.isEmpty();
        }
        if (this.log != null && this.log.isInfoEnabled()) {
            this.log.info("Local scan completed locRestoringNodeOrigCId=" + this.originalConsistentId);
        }
        this.initFut.onDone();
        if (done) {
            this.scanDone();
        } else {
            for (Map.Entry entry : this.originalCId2Req.entrySet()) {
                Object originalCId = entry.getKey();
                TxStateRequest req = (TxStateRequest)entry.getValue();
                if (this.log != null && this.log.isInfoEnabled()) {
                    this.log.info("Send tx state request [" + this.originalConsistentId + " -> " + originalCId + "] txsToCheck " + req.getPreparedTxs().size() + " skipTxs " + req.getSkipTxs().size());
                }
                this.sendMessage(originalCId, req);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNodeLeft(Object originalCId) {
        boolean allReceived;
        if (originalCId == null) {
            return;
        }
        Set<Object> set = this.remainingOrigCIds;
        synchronized (set) {
            this.remainingOrigCIds.remove(originalCId);
            allReceived = this.remainingOrigCIds.isEmpty();
        }
        if (allReceived && this.initFut.isDone()) {
            this.scanDone();
        }
    }

    public Object originalConsistentId() {
        return this.originalConsistentId;
    }

    private Map<Object, TxStateRequest> generateRequests() {
        HashMap<Object, TxStateRequest> originalCId2Req = new HashMap<Object, TxStateRequest>();
        HashSet toNotApply = new HashSet(this.walState.prepared.keySet().size() + this.walState.futureRollbacked.size());
        toNotApply.addAll(this.walState.prepared.keySet());
        toNotApply.addAll(this.walState.futureRollbacked.keySet());
        toNotApply.addAll(this.walState.rollbacked.keySet());
        this.skipTxs.addAll(toNotApply);
        for (Map.Entry entry : this.walState.futureCommited.entrySet()) {
            GridCacheVersion txVer = (GridCacheVersion)entry.getKey();
            WalState.TxHolder txHolder = (WalState.TxHolder)entry.getValue();
            for (Short compOriginalConstId : txHolder.constIds()) {
                Object originalCId = this.snpBlt.compactIdMapping().get(compOriginalConstId);
                TxStateRequest req = (TxStateRequest)originalCId2Req.get(originalCId);
                if (req == null) {
                    req = new TxStateRequest(new ArrayList<GridCacheVersion>(), new ArrayList<GridCacheVersion>(toNotApply));
                    originalCId2Req.put(originalCId, req);
                }
                req.preparedTxs.add(txVer);
            }
        }
        for (Object constId : this.originalNodesCIds) {
            TxStateRequest req = (TxStateRequest)originalCId2Req.get(constId);
            if (req != null) continue;
            originalCId2Req.put(constId, new TxStateRequest(new ArrayList<GridCacheVersion>(), new ArrayList<GridCacheVersion>(toNotApply)));
        }
        originalCId2Req.remove(this.originalConsistentId);
        Iterator constIdIt = originalCId2Req.keySet().iterator();
        while (constIdIt.hasNext()) {
            Object constId;
            constId = constIdIt.next();
            if (this.isAlive(constId) && !this.clusterWideNotAliveNodes.contains(constId)) continue;
            constIdIt.remove();
        }
        return originalCId2Req;
    }

    public int generateRequestsAndGetSize(Set<Object> allLeftNodes) {
        this.clusterWideNotAliveNodes.addAll(allLeftNodes);
        if (this.log.isInfoEnabled()) {
            this.log.info("Set of nodes unavailable for recovery cluster wide, localNodeConstId=" + this.originalConsistentId + " nodes=" + this.clusterWideNotAliveNodes);
        }
        this.originalCId2Req = Collections.unmodifiableMap(this.generateRequests());
        return this.originalCId2Req.size();
    }

    protected abstract boolean isAlive(Object var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processResponse(Object originalSndNodeCId, TxStateResponse res) {
        boolean allReceived;
        assert (originalSndNodeCId != null);
        assert (res != null);
        if (this.log != null && this.log.isInfoEnabled()) {
            this.log.info("Receive tx state response [" + this.originalConsistentId + " <- " + originalSndNodeCId + "] skipTxs " + res.getSkipTxs().size());
        }
        this.skipTxs.addAll(res.getSkipTxs());
        Set<Object> set = this.remainingOrigCIds;
        synchronized (set) {
            if (!this.remainingOrigCIds.remove(originalSndNodeCId)) {
                U.warn((IgniteLogger)this.log, (Object)("Received TxStateResponse from node which was not expected, originalSndNodeCId = " + originalSndNodeCId));
                return;
            }
            allReceived = this.remainingOrigCIds.isEmpty();
            this.snapshotOperationContext.reportWork(1L);
        }
        if (allReceived && this.initFut.isDone()) {
            this.scanDone();
        }
    }

    public void processRequest(final Object originalSndNodeCId, final TxStateRequest req) {
        this.initFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>(){

            public void apply(IgniteInternalFuture<?> f) {
                try {
                    f.get();
                    if (PITRFuture.this.log != null && PITRFuture.this.log.isInfoEnabled()) {
                        PITRFuture.this.log.info("Receive tx state request [" + originalSndNodeCId + " -> " + PITRFuture.this.originalConsistentId + "] txsToCheck " + req.getPreparedTxs().size() + " skipTxs " + req.getSkipTxs().size());
                    }
                    ArrayList<GridCacheVersion> notApply = new ArrayList<GridCacheVersion>();
                    for (GridCacheVersion txVer : req.getPreparedTxs()) {
                        boolean commited = PITRFuture.this.walState.commited.containsKey(txVer);
                        boolean willBeCommited = PITRFuture.this.walState.futureCommited.containsKey(txVer);
                        if ((commited || willBeCommited) && !PITRFuture.this.skipTxs.contains(txVer)) continue;
                        notApply.add(txVer);
                    }
                    PITRFuture.this.skipTxs.addAll(req.getSkipTxs());
                    TxStateResponse res = new TxStateResponse(notApply);
                    if (PITRFuture.this.log != null && PITRFuture.this.log.isInfoEnabled()) {
                        PITRFuture.this.log.info("Send tx state response [" + originalSndNodeCId + " <- " + PITRFuture.this.originalConsistentId + "] txsToNotApply " + res.getSkipTxs().size());
                    }
                    PITRFuture.this.sendMessage(originalSndNodeCId, res);
                }
                catch (IgniteCheckedException e) {
                    U.error((IgniteLogger)PITRFuture.this.log, (Object)"Local scan failure with exception", (Throwable)e);
                }
            }
        });
    }

    public void processConvergenceMessage(Object nodeConstId, TxConvergenceMessage msg) {
        if (this.log != null && this.log.isInfoEnabled()) {
            this.log.info("Receive tx convergence message [" + this.originalConsistentId + " <- " + nodeConstId + "] skipped " + msg.skipped().size() + " step " + msg.step());
        }
        this.convSes.get().onReceived(nodeConstId, msg.skipped(), msg.step());
    }

    private void sendMessage(Object originalCId, Message req) {
        try {
            this.snd.sendRequest(originalCId, req);
        }
        catch (IgniteCheckedException e) {
            U.error((IgniteLogger)this.log, (Object)("Fail send msg [" + this.originalConsistentId + " -> " + originalCId + "], msg=" + req), (Throwable)e);
            this.onNodeLeft(originalCId);
        }
    }

    private void scanDone() {
        HashSet<GridCacheVersion> localSkipped = new HashSet<GridCacheVersion>(this.skipTxs);
        ConvergenceSession session = this.convSes.get();
        if (this.log.isInfoEnabled()) {
            this.log.info("TX convergence algorithm: node=" + this.originalConsistentId + ", step=" + session.step());
        }
        session.future().listen((IgniteInClosure)new IgniteInClosure<IgniteInternalFuture<Set<GridCacheVersion>>>(){

            public void apply(IgniteInternalFuture<Set<GridCacheVersion>> f) {
                try {
                    Set globalSkipped = (Set)f.get();
                    if (globalSkipped.isEmpty()) {
                        PITRFuture.this.finalDone();
                        return;
                    }
                    Set localSkipped = PITRFuture.this.walState.skipNext(globalSkipped, PITRFuture.this.skipTxs);
                    if (localSkipped == null) {
                        throw new UnsupportedOperationException("Not implemented");
                    }
                    ConvergenceSession session = (ConvergenceSession)PITRFuture.this.convSes.get();
                    ConvergenceSession nextSession = session.next();
                    boolean b = PITRFuture.this.convSes.compareAndSet(session, nextSession);
                    nextSession.future().listen((IgniteInClosure)this);
                    nextSession.onLocalDone(localSkipped);
                    assert (b) : PITRFuture.access$2000(PITRFuture.this).get() + ", " + session + ", " + nextSession;
                }
                catch (IgniteCheckedException e) {
                    U.error((IgniteLogger)PITRFuture.this.log, (Object)"Error during convergence algorithm:", (Throwable)e);
                    PITRFuture.this.onDone(e);
                }
            }
        });
        session.onLocalDone(localSkipped);
    }

    private void finalDone() {
        HashMap<Integer, Set<Integer>> parts;
        if (this.log.isInfoEnabled()) {
            this.log.info("TX convergence algorithm [node=" + this.originalConsistentId + ", step=DONE]");
        }
        FileWALPointer txPoint = (FileWALPointer)this.walState.lastCommitedTxPoint;
        FileWALPointer atomicPoint = (FileWALPointer)this.walState.lastAtomicUpdatePoint;
        FileWALPointer point = null;
        if (txPoint != null && atomicPoint == null) {
            point = txPoint;
        }
        if (atomicPoint != null && txPoint == null) {
            point = atomicPoint;
        }
        if (txPoint != null && atomicPoint != null) {
            FileWALPointer fileWALPointer = point = txPoint.compareTo(atomicPoint) > 0 ? txPoint : atomicPoint;
        }
        if (this.log.isInfoEnabled()) {
            this.log.info("Not alive nodes: " + this.clusterWideNotAliveNodes);
        }
        if (this.clusterWideNotAliveNodes.contains(this.originalConsistentId)) {
            parts = new HashMap();
            for (Map.Entry grp : this.walState.parts.entrySet()) {
                parts.put((Integer)grp.getKey(), ((Map)grp.getValue()).keySet());
            }
        } else {
            parts = this.walState.notOwnerParts();
        }
        Result res = new Result(this.originalConsistentId, this.initialWalPointer, (WALPointer)point, this.skipTxs, parts);
        res.log(this.log);
        this.onDone(res);
    }

    @Nullable
    private static T2<Integer, Boolean> walKeyToGroupIdAndLocalFlag(String key) {
        if (key.startsWith(WAL_LOCAL_KEY_PREFIX)) {
            return new T2((Object)Integer.parseInt(key.substring(WAL_LOCAL_KEY_PREFIX.length())), (Object)true);
        }
        if (key.startsWith(WAL_GLOBAL_KEY_PREFIX)) {
            return new T2((Object)Integer.parseInt(key.substring(WAL_GLOBAL_KEY_PREFIX.length())), (Object)false);
        }
        return null;
    }

    public boolean onDone(@Nullable Result res, @Nullable Throwable err) {
        ConvergenceSession convSes0;
        if (!this.initFut.isDone()) {
            this.initFut.onDone(err);
        }
        if ((convSes0 = this.convSes.get()) != null && !convSes0.future().isDone()) {
            convSes0.future().onDone(err);
        }
        return super.onDone((Object)res, err);
    }

    private class CoordinatedConvergenceSession
    implements ConvergenceSession {
        private final GridFutureAdapter<Set<GridCacheVersion>> fut = new GridFutureAdapter();
        private final int step;

        private CoordinatedConvergenceSession(int step) {
            this.step = step;
        }

        @Override
        public void onLocalDone(Collection<GridCacheVersion> skipped) {
            PITRFuture.this.sendMessage(PITRFuture.this.crd, new TxConvergenceMessage(new ArrayList<GridCacheVersion>(skipped), this.step));
        }

        @Override
        public void onReceived(Object consId, Collection<GridCacheVersion> skipped, int step) {
            assert (consId.equals(PITRFuture.this.crd));
            assert (step == this.step) : "rcvdStep=" + step + ", curStep=" + this.step + ", from=" + consId;
            this.fut.onDone(new HashSet<GridCacheVersion>(skipped));
        }

        @Override
        public GridFutureAdapter<Set<GridCacheVersion>> future() {
            return this.fut;
        }

        @Override
        public ConvergenceSession next() {
            return new CoordinatedConvergenceSession(this.step + 1);
        }

        @Override
        public int step() {
            return this.step;
        }
    }

    private class CoordinatorConvergenceSession
    implements ConvergenceSession {
        private final Set<Object> remaining = new GridConcurrentHashSet();
        private final Set<GridCacheVersion> skippedTxIds = new GridConcurrentHashSet();
        private final GridFutureAdapter<Set<GridCacheVersion>> fut = new GridFutureAdapter();
        private final AtomicBoolean done = new AtomicBoolean();
        private final int step;
        private final AtomicReference<CoordinatorConvergenceSession> next = new AtomicReference();

        private CoordinatorConvergenceSession(int step) {
            this.remaining.addAll(PITRFuture.this.originalNodesCIds);
            this.step = step;
        }

        @Override
        public void onLocalDone(Collection<GridCacheVersion> skipped) {
            this.skippedTxIds.addAll(skipped);
            if (this.remaining.remove(PITRFuture.this.originalConsistentId) && this.remaining.isEmpty() && this.done.compareAndSet(false, true)) {
                this.fut.onDone(this.skippedTxIds);
                for (Object consId0 : PITRFuture.this.originalNodesCIds) {
                    if (consId0.equals(PITRFuture.this.originalConsistentId)) continue;
                    PITRFuture.this.sendMessage(consId0, new TxConvergenceMessage(new ArrayList<GridCacheVersion>(this.skippedTxIds), this.step));
                }
            }
        }

        @Override
        public void onReceived(Object consId, Collection<GridCacheVersion> skipped, int step) {
            this.skippedTxIds.addAll(skipped);
            if (step == this.step) {
                if (this.remaining.remove(consId) && this.remaining.isEmpty() && this.done.compareAndSet(false, true)) {
                    this.fut.onDone(this.skippedTxIds);
                    for (Object consId0 : PITRFuture.this.originalNodesCIds) {
                        if (consId0.equals(PITRFuture.this.originalConsistentId)) continue;
                        PITRFuture.this.sendMessage(consId0, new TxConvergenceMessage(new ArrayList<GridCacheVersion>(this.skippedTxIds), step));
                    }
                }
            } else {
                assert (step == this.step + 1) : "rcvdStep=" + step + ", curStep=" + this.step + ", from=" + consId;
                CoordinatorConvergenceSession next0 = this.next.get();
                if (next0 == null && !this.next.compareAndSet(null, next0 = new CoordinatorConvergenceSession(this.step + 1))) {
                    next0 = this.next.get();
                    assert (next0 != null);
                }
                next0.onReceived(consId, skipped, step);
            }
        }

        @Override
        public GridFutureAdapter<Set<GridCacheVersion>> future() {
            return this.fut;
        }

        @Override
        public ConvergenceSession next() {
            CoordinatorConvergenceSession next0 = this.next.get();
            if (next0 == null && !this.next.compareAndSet(null, next0 = new CoordinatorConvergenceSession(this.step + 1))) {
                next0 = this.next.get();
                assert (next0 != null);
            }
            return next0;
        }

        @Override
        public int step() {
            return this.step;
        }
    }

    private static interface ConvergenceSession {
        public void onLocalDone(Collection<GridCacheVersion> var1);

        public void onReceived(Object var1, Collection<GridCacheVersion> var2, int var3);

        public GridFutureAdapter<Set<GridCacheVersion>> future();

        public ConvergenceSession next();

        public int step();
    }

    public static class Result {
        @GridToStringInclude
        private final WALPointer beginWalPointer;
        @GridToStringInclude
        private final WALPointer endWalPointer;
        @GridToStringInclude
        private final Set<GridCacheVersion> skipTxs;
        @GridToStringInclude
        private final Map<Integer, Set<Integer>> partToRebalance;
        private final Object originalConsistentId;

        Result(Object originalConsistentId, WALPointer beginWalPointer, WALPointer endWalPointer, Set<GridCacheVersion> txs, Map<Integer, Set<Integer>> parts) {
            this.originalConsistentId = originalConsistentId;
            this.beginWalPointer = beginWalPointer;
            this.endWalPointer = endWalPointer;
            this.skipTxs = txs;
            this.partToRebalance = parts;
        }

        public WALPointer getBeginWalPointer() {
            return this.beginWalPointer;
        }

        public WALPointer getEndWalPointer() {
            return this.endWalPointer;
        }

        public Set<GridCacheVersion> getSkipTxs() {
            return this.skipTxs;
        }

        public Map<Integer, Set<Integer>> getPartToRebalance() {
            return this.partToRebalance;
        }

        public Object originalConsistentId() {
            return this.originalConsistentId;
        }

        private void log(IgniteLogger log) {
        }

        public String toString() {
            return S.toString(Result.class, (Object)this);
        }
    }

    public static class WalState {
        private final IgniteLogger log;
        private final long storeTxInterval = IgniteSystemProperties.getLong((String)"GG_POINT_IN_TIME_STORED_TX_INTERVAL", (long)20000L);
        private final long doubleCheckInterval = IgniteSystemProperties.getLong((String)"GG_POINT_IN_TIME_DOUBLE_CHECK_INTERVAL", (long)1000L);
        private final Map<GridCacheVersion, TxHolder> prepared = new ConcurrentHashMap<GridCacheVersion, TxHolder>();
        private final ConcurrentSkipListMap<GridCacheVersion, TxHolder> commited = new ConcurrentSkipListMap();
        private final ConcurrentSkipListMap<GridCacheVersion, TxHolder> rollbacked = new ConcurrentSkipListMap();
        private final long time;
        private final long snpId;
        private boolean expTimeReached;
        private WALPointer timePtr;
        private final Map<GridCacheVersion, TxHolder> futureCommited = new ConcurrentSkipListMap<GridCacheVersion, TxHolder>();
        private final Map<GridCacheVersion, TxHolder> futureRollbacked = new ConcurrentSkipListMap<GridCacheVersion, TxHolder>();
        private volatile WALPointer lastCommitedTxPoint;
        private volatile WALPointer lastAtomicUpdatePoint;
        private final Set<Object> nodesIsNotInTopology = new GridConcurrentHashSet();
        private final Set<Object> nodesWithDisabledWal = new GridConcurrentHashSet();
        private final Map<Integer, Map<Integer, Byte>> parts = new HashMap<Integer, Map<Integer, Byte>>();
        private final ConcurrentLinkedQueue<TxHolder> txSequence = new ConcurrentLinkedQueue();
        private final BaselineTopology blt;

        protected WalState(long time, long snpId, BaselineTopology blt, IgniteLogger log) {
            this.log = log;
            this.time = time;
            this.snpId = snpId;
            this.blt = blt;
        }

        public void preparedTx(TxRecord txRec) {
            GridCacheVersion txVer = txRec.nearXidVersion();
            if (this.expTimeReached && !this.prepared.containsKey(txVer) && this.time != txRec.timestamp()) {
                TxHolder txHold = TxHolder.create(txVer, txRec.participatingNodes(), this.blt, txRec.timestamp());
                this.futureRollbacked.put(txVer, txHold);
                txHold.begin = txRec.position();
                return;
            }
            TxHolder txHold = this.prepared.get(txVer);
            if (txHold == null) {
                txHold = TxHolder.create(txVer, txRec.participatingNodes(), this.blt, txRec.timestamp());
                this.prepared.put(txVer, txHold);
                txHold.begin = txRec.position();
            }
            txHold.prepared++;
        }

        public void commitedTx(TxRecord txRec) {
            this.onTxCommitOrRollback(txRec, this.futureCommited, this.commited);
        }

        public void rollBackedTx(TxRecord txRec) {
            this.onTxCommitOrRollback(txRec, this.futureRollbacked, this.rollbacked);
        }

        public void onTxCommitOrRollback(TxRecord txRec, Map<GridCacheVersion, TxHolder> future, ConcurrentSkipListMap<GridCacheVersion, TxHolder> txHolders) {
            GridCacheVersion txVer = txRec.nearXidVersion();
            TxHolder txHold = this.prepared.get(txVer);
            if (txHold == null) {
                if (!this.expTimeReached) {
                    this.log.warning("Unexpected transaction commit marker. " + txRec);
                }
                return;
            }
            txHold.merge(txRec.participatingNodes());
            if (--txHold.prepared == 0) {
                if (txRec.state() == TransactionState.COMMITTED) {
                    this.lastCommitedTxPoint = txRec.position();
                }
                this.prepared.remove(txVer);
                txHold.end = txRec.position();
                if (this.expTimeReached || Math.abs(this.time - txRec.timestamp()) <= this.doubleCheckInterval) {
                    future.put(txVer, txHold);
                } else {
                    if (txRec.state() == TransactionState.COMMITTED) {
                        this.moveWindow(txHolders, txHold);
                    }
                    txHolders.put(txVer, txHold);
                }
            }
        }

        private void moveWindow(ConcurrentSkipListMap<GridCacheVersion, TxHolder> txs, TxHolder txHolder) {
            Map.Entry<GridCacheVersion, TxHolder> last;
            while ((last = txs.firstEntry()) != null && txHolder.timestamp - last.getValue().timestamp > this.storeTxInterval) {
                txs.remove(last.getKey(), last.getValue());
            }
        }

        public void exchange(ExchangeRecord exchRec) {
            Object constId = this.blt.resolveConsistentId(exchRec.getConstId());
            assert (constId != null);
            switch (exchRec.getType()) {
                case LEFT: {
                    this.nodesIsNotInTopology.add(constId);
                    break;
                }
                case JOIN: {
                    this.nodesIsNotInTopology.remove(constId);
                }
            }
        }

        public void checkpoint(CheckpointRecord chpRec) {
            for (Map.Entry e : chpRec.cacheGroupStates().entrySet()) {
                Map<Integer, Byte> grp = this.parts.get(e.getKey());
                if (grp == null) {
                    grp = new HashMap<Integer, Byte>();
                    this.parts.put((Integer)e.getKey(), grp);
                }
                CacheState state = (CacheState)e.getValue();
                for (int i = 0; i < state.size(); ++i) {
                    grp.put(state.partitionByIndex(i), state.stateByIndex(i));
                }
            }
        }

        public void partitionUpdateState(PartitionMetaStateRecord partRec) {
            int grpId = partRec.groupId();
            Map<Integer, Byte> grp = this.parts.get(grpId);
            if (grp == null) {
                grp = new HashMap<Integer, Byte>();
                this.parts.put(grpId, grp);
            }
            grp.put(partRec.partitionId(), partRec.state());
        }

        private void entryUpdate(DataRecord dataRecord, IgniteBiPredicate<WALPointer, GridCacheVersion> filter) {
            if (this.expTimeReached) {
                return;
            }
            for (DataEntry entry : dataRecord.writeEntries()) {
                TxHolder tx;
                if (entry.nearXidVersion() == null) {
                    this.lastAtomicUpdatePoint = dataRecord.position();
                    continue;
                }
                if (!filter.apply((Object)dataRecord.position(), (Object)entry.nearXidVersion()) || (tx = this.prepared.get(entry.nearXidVersion())) == null || !this.expTimeReached && Math.abs(this.time - tx.timestamp) > this.doubleCheckInterval) continue;
                this.txSequence.add(tx);
            }
        }

        private Set<GridCacheVersion> skipNext(Set<GridCacheVersion> prevSkipped, Set<GridCacheVersion> skipTxs) {
            HashSet<GridCacheVersion> res = null;
            for (GridCacheVersion txId : prevSkipped) {
                TxHolder tx = this.prepared.get(txId);
                if (!this.commited.containsKey(txId) && !this.futureCommited.containsKey(txId)) continue;
                skipTxs.add(txId);
                boolean rollback = false;
                for (TxHolder tx0 : this.txSequence) {
                    if (rollback) {
                        boolean alreadyRolledBack;
                        boolean bl = alreadyRolledBack = skipTxs.contains(tx0.txVer) || this.rollbacked.containsKey(tx0.txVer) || this.futureRollbacked.containsKey(tx0.txVer);
                        if (!alreadyRolledBack && !this.commited.containsKey(txId)) {
                            if (res == null) {
                                res = new HashSet<GridCacheVersion>();
                            }
                            res.add(tx0.txVer);
                        }
                        skipTxs.add(tx0.txVer);
                        continue;
                    }
                    if (!tx0.txVer.equals((Object)txId)) continue;
                    rollback = true;
                }
            }
            if (res == null) {
                return Collections.emptySet();
            }
            return res;
        }

        private Map<Integer, Set<Integer>> notOwnerParts() {
            HashMap<Integer, Set<Integer>> res = new HashMap<Integer, Set<Integer>>();
            byte owning = (byte)GridDhtPartitionState.OWNING.ordinal();
            for (Map.Entry<Integer, Map<Integer, Byte>> grp : this.parts.entrySet()) {
                HashSet<Integer> parts = new HashSet<Integer>();
                for (Map.Entry<Integer, Byte> part : grp.getValue().entrySet()) {
                    if (part.getValue().equals(owning)) continue;
                    parts.add(part.getKey());
                }
                if (parts.isEmpty()) continue;
                res.put(grp.getKey(), parts);
            }
            return res;
        }

        private boolean checkTimeReached(IgniteBiTuple<WALPointer, WALRecord> tup) {
            assert (tup != null);
            WALRecord rec = (WALRecord)tup.get2();
            if (rec instanceof TimeStampRecord) {
                TimeStampRecord timeStampRec = (TimeStampRecord)rec;
                return this.checkTime(timeStampRec.timestamp(), timeStampRec.position());
            }
            if (rec instanceof MemoryRecoveryRecord) {
                MemoryRecoveryRecord memRec = (MemoryRecoveryRecord)rec;
                return this.checkTime(memRec.time(), memRec.position());
            }
            if (rec instanceof SnapshotRecord) {
                SnapshotRecord snp = (SnapshotRecord)rec;
                if (this.snpId != -1L && snp.getSnapshotId() != this.snpId) {
                    this.timePtr = rec.position();
                    this.expTimeReached = true;
                }
            }
            return this.expTimeReached;
        }

        private boolean checkTime(long recTime, WALPointer ptr) {
            boolean res;
            boolean bl = res = recTime > this.time;
            if (res && !this.expTimeReached) {
                if (this.log.isInfoEnabled()) {
                    this.log.info("Expected time reached " + this.time + " " + DATE_FORMAT.format(Instant.ofEpochMilli(this.time)) + " ptr=" + ptr);
                }
                this.timePtr = ptr;
                this.expTimeReached = true;
                return true;
            }
            return res;
        }

        private boolean checkFinishRecoveryScan(IgniteBiTuple<WALPointer, WALRecord> tup) {
            return this.checkTimeReached(tup) && this.prepared.isEmpty();
        }

        public void logScanResult(IgniteLogger log) {
            if (!log.isInfoEnabled()) {
                return;
            }
            log.info("storeCommitedInterval:" + this.storeTxInterval);
            log.info("doubleCheckInterval:" + this.doubleCheckInterval);
            log.info("timeReachedPoint:" + this.timePtr);
            log.info("lastCommitedTxPoint:" + this.lastCommitedTxPoint);
            log.info("lastAtomicUpdatePoint:" + this.lastAtomicUpdatePoint);
            log.info("prepared:" + this.prepared.size());
            this.printTxInfo(log, this.prepared);
            log.info("commited:" + this.commited.size());
            this.printTxInfo(log, this.commited);
            log.info("futureCommited:" + this.futureCommited.size());
            this.printTxInfo(log, this.futureCommited);
            log.info("rollBacked:" + this.rollbacked.size());
            this.printTxInfo(log, this.rollbacked);
            log.info("futureRollBacked:" + this.futureRollbacked.size());
            this.printTxInfo(log, this.futureRollbacked);
            log.info("partition state:");
            for (Map.Entry<Integer, Map<Integer, Byte>> entry : this.parts.entrySet()) {
                Integer grpId = entry.getKey();
                log.info("\tgrpId=" + grpId);
                for (Map.Entry<Integer, Byte> entry0 : entry.getValue().entrySet()) {
                    if (entry0.getValue().byteValue() == GridDhtPartitionState.OWNING.ordinal()) continue;
                    log.info("\t\t" + entry0.getKey() + " - " + GridDhtPartitionState.fromOrdinal((int)entry0.getValue().byteValue()));
                }
            }
        }

        private void printTxInfo(IgniteLogger log, Map<GridCacheVersion, TxHolder> txs) {
            if (!txs.isEmpty()) {
                SB sb = new SB();
                sb.a("\n");
                for (TxHolder txHolder : txs.values()) {
                    sb.a((Object)txHolder).a("\n");
                }
                log.info(sb.toString());
            }
        }

        private static class TxHolder {
            private int prepared;
            private final GridCacheVersion txVer;
            private final Map<Short, Set<Short>> nodes;
            private final BaselineTopology blt;
            private final long timestamp;
            private WALPointer begin;
            private WALPointer end;

            private TxHolder(GridCacheVersion txVer, Map<Short, Set<Short>> nodes, BaselineTopology blt, long timestamp) {
                this.txVer = txVer;
                this.nodes = nodes;
                this.blt = blt;
                this.timestamp = timestamp;
            }

            private static TxHolder create(GridCacheVersion txVer, Map<Short, Collection<Short>> nodes, BaselineTopology blt, long timestamp) {
                HashMap<Short, Set<Short>> map = new HashMap<Short, Set<Short>>();
                for (Map.Entry<Short, Collection<Short>> entry : nodes.entrySet()) {
                    map.put(entry.getKey(), new HashSet<Short>(entry.getValue()));
                }
                return new TxHolder(txVer, map, blt, timestamp);
            }

            private void merge(Map<Short, Collection<Short>> nodes) {
                for (Map.Entry<Short, Collection<Short>> entry : nodes.entrySet()) {
                    Short constId = entry.getKey();
                    Collection backups = this.nodes.get(constId);
                    if (backups == null) {
                        this.nodes.put(constId, new HashSet<Short>(entry.getValue()));
                        continue;
                    }
                    backups.addAll(entry.getValue());
                }
            }

            private Set<Short> constIds() {
                HashSet<Short> nodes = new HashSet<Short>();
                block0: for (Map.Entry<Short, Set<Short>> entry : this.nodes.entrySet()) {
                    nodes.add(entry.getKey());
                    for (Short id : entry.getValue()) {
                        if (id.equals((short)Short.MAX_VALUE)) {
                            nodes.addAll(this.blt.compactIdMapping().keySet());
                            continue block0;
                        }
                        nodes.add(id);
                    }
                }
                return nodes;
            }

            private String constIdsToString(Map<Short, Set<Short>> nodes) {
                StringBuilder sb = new StringBuilder();
                if (nodes.isEmpty()) {
                    sb.append("[]");
                }
                for (Map.Entry<Short, Set<Short>> entry : nodes.entrySet()) {
                    sb.append(this.blt.compactIdMapping().get(entry.getKey())).append("->");
                    Collection backUps = entry.getValue();
                    boolean empty = backUps.isEmpty();
                    if (empty) {
                        sb.append("[] ");
                        continue;
                    }
                    sb.append("[");
                    int size = backUps.size();
                    int last = size - 1;
                    int cnt = 0;
                    for (Short node : entry.getValue()) {
                        sb.append(this.blt.compactIdMapping().get(node));
                        if (cnt != last) {
                            sb.append(",");
                        }
                        ++cnt;
                    }
                    sb.append("] ");
                }
                return sb.toString();
            }

            private String txVer() {
                return "ver[topVer=" + this.txVer.topologyVersion() + ", order=" + this.txVer.order() + ", nodeOrder=" + this.txVer.nodeOrder() + ']';
            }

            private String time() {
                return DATE_FORMAT.format(Instant.ofEpochMilli(this.timestamp));
            }

            public String toString() {
                return "Tx time=" + this.timestamp + " " + this.time() + " (" + this.begin + " - " + this.end + ") " + this.txVer() + " nodes=" + this.constIdsToString(this.nodes) + " prepared=" + this.prepared;
            }
        }
    }
}

