/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.dr.ist.distributed;

import java.io.EOFException;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheStoppedException;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridPlainInClosure;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.gridgain.grid.cache.dr.CacheDrPauseReason;
import org.gridgain.grid.events.DrCacheFstEvent;
import org.gridgain.grid.events.DrCacheReplicationEvent;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrPauseInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrPauseKey;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferKey;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferResultInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferResultKey;
import org.gridgain.grid.internal.processors.cache.dr.ist.DrControlTask;
import org.gridgain.grid.internal.processors.cache.dr.ist.StateTransferFuture;
import org.gridgain.grid.internal.processors.cache.dr.ist.StateTransferInfo;
import org.gridgain.grid.internal.processors.cache.dr.ist.distributed.DistributedDrStateManager;
import org.gridgain.grid.internal.processors.cache.dr.ist.distributed.DistributedStateTransferManager;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.jetbrains.annotations.Nullable;

public class SysCacheDrStateManager
implements DistributedStateTransferManager,
DistributedDrStateManager {
    private static final long DFLT_GG_DR_SYSTEM_CACHE_SCAN_TIMEOUT = 60000L;
    private final long timeout = IgniteSystemProperties.getLong("GG_DR_SYSTEM_CACHE_SCAN_TIMEOUT", 60000L);
    private static final int TX_RETRIES_WITH_NO_THROTTLING = 10;
    private final GridCacheContext cctx;
    private final IgniteLogger log;
    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
    private final Consumer<DrControlTask> ctrlTaskExec;
    private final Executor exec;
    private IgniteInternalCache sysCache;
    private final CacheEntryEventSerializableFilter filter;
    private final ConcurrentMap<IgniteUuid, StateTransferFuture> stateTransferFuts = new ConcurrentHashMap<IgniteUuid, StateTransferFuture>();
    private volatile CacheDrPauseInfo globalStopInfo;
    private DistributedDrStateManager.DrStateListener stateLsnr;
    private DistributedStateTransferManager.StateTransferListener stateTransferLsnr;
    private UUID sysCacheQryId;
    private volatile boolean stopping;

    public SysCacheDrStateManager(GridCacheContext cctx, Consumer<DrControlTask> ctrlTaskExec, CacheEntryEventSerializableFilter filter, Executor exec) {
        this.cctx = cctx;
        this.ctrlTaskExec = ctrlTaskExec;
        this.filter = filter;
        this.exec = exec;
        this.log = cctx.logger(SysCacheDrStateManager.class);
    }

    @Override
    public void listen(DistributedDrStateManager.DrStateListener evtLsnr) {
        this.stateLsnr = evtLsnr;
    }

    @Override
    public void listen(DistributedStateTransferManager.StateTransferListener evtLsnr) {
        this.stateTransferLsnr = evtLsnr;
    }

    @Override
    public void start() throws IgniteCheckedException {
        this.sysCache = this.cctx.kernalContext().cache().utilityCache();
        GridDhtPartitionsExchangeFuture topFuture = this.cctx.shared().exchange().lastTopologyFuture();
        assert (topFuture != null) : "DR Worker should start after join to topology (last exchange future is null)";
        if (!this.cctx.gridConfig().isClientMode().booleanValue()) {
            this.startContinuousQry();
        }
    }

    @Override
    public boolean init() throws IgniteCheckedException {
        if (this.cctx.gridConfig().isClientMode().booleanValue()) {
            this.startContinuousQry();
        }
        return this.txOp(oldStopInfo -> {
            this.globalStopInfo = oldStopInfo != null ? oldStopInfo : new CacheDrPauseInfo(this.cctx.localNodeId(), null, null, this.cctx.shared().cache().cacheDescriptor(this.cctx.cacheId()).receivedFrom());
        }, true);
    }

    @Override
    public void stop() {
        this.stopping = true;
        if (this.sysCacheQryId != null) {
            this.cctx.continuousQueries().cancelInternalQuery(this.sysCacheQryId);
        }
        this.busyLock.block();
        this.stateTransferFuts.values().forEach(f -> f.onDone(new CacheStoppedException("Cache has been stopped on node.")));
    }

    @Override
    public IgniteInternalFuture<? extends IgniteInternalFuture<?>> startStateTransfer(Collection<Byte> targetDCs) {
        GridFutureAdapter fut = new GridFutureAdapter();
        this.executeOrdered(new StateTransferStartTask(fut, null, targetDCs));
        return fut;
    }

    @Override
    public IgniteInternalFuture<? extends IgniteInternalFuture<?>> incrementalStateTransfer(long snapshotId, Collection<Byte> targetDCs) {
        GridFutureAdapter fut = new GridFutureAdapter();
        this.executeOrdered(new StateTransferStartTask(fut, snapshotId, targetDCs));
        return fut;
    }

    @Override
    public IgniteInternalFuture<?> stopStateTransfer(CacheDrStateTransferKey fstKey, @Nullable String failureReason) {
        GridFutureAdapter fut = new GridFutureAdapter();
        this.executeOrdered(new StateTransferStopTask(fut, fstKey, failureReason));
        return fut;
    }

    @Override
    public IgniteInternalFuture<?> changeState(CacheDrPauseReason drState) {
        GridFutureAdapter fut = new GridFutureAdapter();
        DrStopTask task = new DrStopTask(fut, drState, null, null);
        this.executeOrdered(task);
        try {
            fut.get();
            return fut;
        }
        catch (IgniteCheckedException e) {
            throw new IllegalStateException("Failed to stop state transfer.", e);
        }
    }

    @Override
    public void markPartitionTransferred(CacheDrStateTransferKey fstKey, int part) {
        this.executeAsync(new FlushStateTransferStateTask(fstKey, new RemoveHandledPartitionClosure(Collections.singletonList(part), 0L)));
    }

    @Override
    public void destroyState(String cacheName) {
        block15: {
            try (GridCloseableIterator it = this.sysCache.context().offheap().cacheEntriesIterator(this.sysCache.context(), true, false, this.sysCache.context().topology().readyTopologyVersion(), false, null, false);){
                while (it.hasNext()) {
                    Cache.Entry entry = (Cache.Entry)it.next();
                    if (!SysCacheDrStateManager.pauseKey(entry.getKey(), cacheName) && !SysCacheDrStateManager.stateTransferKey(entry.getKey(), cacheName) && !SysCacheDrStateManager.stateTransferResultKey(entry.getKey(), cacheName)) continue;
                    it.remove();
                }
            }
            catch (IgniteCheckedException ex) {
                if (!this.log.isDebugEnabled()) break block15;
                this.log.warning("Failed to cleanup DR states on cache destroy", ex);
            }
        }
    }

    @Override
    public List<StateTransferInfo> activeStateTransfers() {
        try {
            ArrayList<StateTransferInfo> res = new ArrayList<StateTransferInfo>();
            Iterator it = this.sysCache.scanIterator(false, null);
            while (it.hasNext()) {
                Cache.Entry entry = it.next();
                if (!SysCacheDrStateManager.stateTransferKey(entry.getKey(), this.cctx.name())) continue;
                CacheDrStateTransferKey key = (CacheDrStateTransferKey)entry.getKey();
                CacheDrStateTransferInfo val = (CacheDrStateTransferInfo)entry.getValue();
                CacheDrStateTransferResultInfo resInfo = (CacheDrStateTransferResultInfo)this.sysCache.get(CacheDrStateTransferResultKey.fromStateTransferKey(key));
                if (resInfo == null || resInfo.done()) continue;
                res.add(this.toStateTransferInfo(key, val));
            }
            return res;
        }
        catch (IgniteCheckedException ignored) {
            throw new IllegalStateException("Failed to list state transfers because grid is stopping.");
        }
    }

    private void startContinuousQry() throws IgniteCheckedException {
        this.sysCacheQryId = this.sysCache.context().continuousQueries().executeInternalQuery(new SystemCacheUpdatedListener(), this.filter, this.cctx.gridConfig().isClientMode() == false, true, false, false);
    }

    private boolean oldestNode() {
        Collection<ClusterNode> cacheNodes = CU.affinityNodes(this.cctx, AffinityTopologyVersion.NONE);
        return F.eq(this.cctx.localNode(), CU.oldest(cacheNodes));
    }

    static boolean pauseKey(Object key, String cacheName) {
        return key instanceof CacheDrPauseKey && F.eq(cacheName, ((CacheDrPauseKey)key).cacheName());
    }

    static boolean stateTransferResultKey(Object key, String cacheName) {
        return key instanceof CacheDrStateTransferResultKey && F.eq(cacheName, ((CacheDrStateTransferResultKey)key).cacheName());
    }

    static boolean stateTransferKey(Object key, String cacheName) {
        return key instanceof CacheDrStateTransferKey && F.eq(cacheName, ((CacheDrStateTransferKey)key).cacheName());
    }

    private void completeFstFuture(StateTransferFuture fut, @Nullable CacheDrPauseInfo stopInfo) {
        if (stopInfo == null) {
            fut.onDone();
        } else {
            fut.onDone(new IgniteCheckedException("State transfer is cancelled: " + stopInfo));
        }
    }

    private boolean isDrStopped() {
        CacheDrPauseReason reason = this.globalStopInfo.reason();
        return reason == CacheDrPauseReason.BATCH_FAILED || reason == CacheDrPauseReason.USER_REQUEST;
    }

    private void recordCacheReplicationStateChangedEvt(CacheDrPauseInfo info) {
        String msg;
        int type;
        assert (info != null) : "info";
        if (info.reason() == null) {
            type = 1023;
            msg = "Replication started.";
        } else {
            type = 1022;
            msg = "Replication stopped.";
        }
        if (!this.cctx.kernalContext().event().isUserRecordable(type)) {
            return;
        }
        ClusterNode node = this.cctx.localNode();
        this.cctx.kernalContext().event().record(new DrCacheReplicationEvent(node, msg, type, this.cctx.name(), info));
    }

    private void recordCacheFstEvt(int type, String cacheName, Collection<Byte> dcIds) {
        if (!this.cctx.kernalContext().event().isUserRecordable(type)) {
            return;
        }
        ClusterNode node = this.cctx.kernalContext().discovery().localNode();
        String msg = null;
        switch (type) {
            case 1024: {
                msg = "Cache full state transfer started.";
                break;
            }
            case 1025: {
                msg = "Cache full state transfer finished.";
                break;
            }
            case 1026: {
                msg = "Cache full state transfer failed.";
                break;
            }
            default: {
                assert (false) : "Unsupported event type " + type;
                break;
            }
        }
        this.cctx.kernalContext().event().record(new DrCacheFstEvent(node, msg, type, cacheName, dcIds));
    }

    private void executeOrdered(DrControlTask task) {
        this.ctrlTaskExec.accept(task);
    }

    private void executeAsync(final DrControlTask task) {
        this.exec.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    task.run();
                    task.onDone();
                }
                catch (Exception err) {
                    task.onError(err);
                }
            }

            public String toString() {
                return task.toString();
            }
        });
    }

    /*
     * Exception decompiling
     */
    public boolean txOp(GridPlainInClosure<CacheDrPauseInfo> clo, boolean globalSync) throws IgniteCheckedException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    StateTransferInfo toStateTransferInfo(CacheDrStateTransferKey key, CacheDrStateTransferInfo val) {
        if (val == null) {
            return null;
        }
        StateTransferInfo info = new StateTransferInfo(key, this.cctx.cacheId(), this.cctx.affinity().partitions(), val.startTime(), val.snapshotId());
        val.partitions().keySet().forEach(p -> info.pendingParts().set((int)p));
        return info;
    }

    void stopStateTransfer0(CacheDrStateTransferKey key, CacheDrPauseInfo reason) throws IgniteCheckedException {
        try {
            CacheDrStateTransferResultKey resKey = CacheDrStateTransferResultKey.fromStateTransferKey(key);
            CacheDrStateTransferResultInfo res = (CacheDrStateTransferResultInfo)this.sysCache.get(resKey);
            if (res != null && !res.done()) {
                this.sysCache.put(resKey, new CacheDrStateTransferResultInfo(true, reason, res.listeners()));
            }
        }
        catch (ClusterTopologyCheckedException e) {
            throw e;
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to set state transfer result.", e);
        }
    }

    private class SystemCacheUpdatedListener
    implements CacheEntryUpdatedListener<Object, Object> {
        private SystemCacheUpdatedListener() {
        }

        @Override
        public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent<?, ?> e : evts) {
                Object key = e.getKey();
                if (SysCacheDrStateManager.stateTransferKey(key, SysCacheDrStateManager.this.cctx.name()) && !SysCacheDrStateManager.this.cctx.kernalContext().clientNode()) {
                    SysCacheDrStateManager.this.executeOrdered(new StateTransferChangeTask((CacheDrStateTransferKey)key, (CacheDrStateTransferInfo)e.getOldValue(), (CacheDrStateTransferInfo)e.getValue()));
                    continue;
                }
                if (SysCacheDrStateManager.stateTransferResultKey(key, SysCacheDrStateManager.this.cctx.name())) {
                    CacheDrStateTransferResultInfo oldInfo = (CacheDrStateTransferResultInfo)e.getOldValue();
                    CacheDrStateTransferResultInfo newInfo = (CacheDrStateTransferResultInfo)e.getValue();
                    boolean transferCompleted = oldInfo != null && !oldInfo.done() && newInfo != null && newInfo.done();
                    if (!transferCompleted) continue;
                    SysCacheDrStateManager.this.executeOrdered(new StateTransferCompleteTask((CacheDrStateTransferResultKey)key, newInfo));
                    continue;
                }
                if (!SysCacheDrStateManager.pauseKey(key, SysCacheDrStateManager.this.cctx.name())) continue;
                assert (key instanceof CacheDrPauseKey) : key;
                CacheDrPauseInfo info = (CacheDrPauseInfo)e.getValue();
                if (info == null) continue;
                if (info.reason() == CacheDrPauseReason.NO_SND_HUBS) {
                    SysCacheDrStateManager.this.log.error("Got DR stop event from node with incremental DR mode disabled. Ignoring.");
                    return;
                }
                SysCacheDrStateManager.this.recordCacheReplicationStateChangedEvt(info);
                if (F.eq(SysCacheDrStateManager.this.cctx.localNodeId(), info.nodeId())) continue;
                SysCacheDrStateManager.this.executeOrdered(new DrStopTask(null, null, null, info));
            }
        }
    }

    abstract class DrAbstractControlTask
    implements DrControlTask {
        final GridTuple<CacheDrPauseInfo> newStopInfo = F.t(null);

        DrAbstractControlTask() {
        }

        @Nullable
        public abstract CacheDrPauseInfo run0(@Nullable CacheDrPauseInfo var1) throws IgniteCheckedException;

        @Nullable
        public AffinityTopologyVersion topologyVersion() {
            return null;
        }

        @Override
        public void run() throws Exception {
            block7: {
                try {
                    boolean res;
                    if (this.topologyVersion() != null) {
                        SysCacheDrStateManager.this.cctx.affinity().affinityReadyFuture(this.topologyVersion()).get();
                    }
                    if (!(res = SysCacheDrStateManager.this.txOp(stopInfo -> this.newStopInfo.set(this.run0((CacheDrPauseInfo)stopInfo)), true))) {
                        this.onError(new CacheStoppedException("Dr task is cancelled due to cache stop: task=" + this.getClass().getSimpleName() + ", cache=" + SysCacheDrStateManager.this.cctx.name()));
                        return;
                    }
                    this.onDone();
                }
                catch (Throwable e) {
                    if (!SysCacheDrStateManager.this.stopping && !SysCacheDrStateManager.this.cctx.topology().stopping()) {
                        U.error(SysCacheDrStateManager.this.log, "An exception occurred during DR task processing.", e);
                    } else if (SysCacheDrStateManager.this.log.isDebugEnabled()) {
                        SysCacheDrStateManager.this.log.debug("An exception occurred during DR task processing: " + e);
                    }
                    this.onError(e);
                    if (!(e instanceof Error)) break block7;
                    throw (Error)e;
                }
            }
        }

        @Override
        public void onDone() {
            CacheDrPauseInfo newInfo = this.newStopInfo.get();
            if (!F.eq(SysCacheDrStateManager.this.globalStopInfo, newInfo)) {
                SysCacheDrStateManager.this.globalStopInfo = newInfo;
                SysCacheDrStateManager.this.stateLsnr.onGlobalDrStatusChanged(SysCacheDrStateManager.this.globalStopInfo.reason());
            }
        }
    }

    private static class RemoveHandledPartitionClosure
    implements EntryProcessor<CacheDrStateTransferKey, CacheDrStateTransferInfo, Void>,
    Externalizable {
        private static final long serialVersionUID = 5349841260224337573L;
        private List<Integer> parts;
        private long entryCnt;

        public RemoveHandledPartitionClosure() {
        }

        private RemoveHandledPartitionClosure(List<Integer> parts, long entryCnt) {
            this.parts = parts;
            this.entryCnt = entryCnt;
        }

        @Override
        public Void process(MutableEntry<CacheDrStateTransferKey, CacheDrStateTransferInfo> e, Object ... args) {
            CacheDrStateTransferInfo info = (CacheDrStateTransferInfo)e.getValue();
            if (info == null) {
                return null;
            }
            HashMap<Integer, UUID> partsMap = new HashMap<Integer, UUID>(info.partitions());
            for (Integer part : this.parts) {
                partsMap.remove(part);
            }
            long totalSentEntriesCnt = info.entryCount() + this.entryCnt;
            if (partsMap.isEmpty()) {
                e.remove();
            } else {
                e.setValue(new CacheDrStateTransferInfo(info).partitions(partsMap).entryCount(totalSentEntriesCnt));
            }
            return null;
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            U.writeIntCollection(out, this.parts);
            out.writeLong(this.entryCnt);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.parts = U.readIntList(in);
            try {
                this.entryCnt = in.readLong();
            }
            catch (EOFException eOFException) {
                // empty catch block
            }
        }

        public String toString() {
            return "RemoveHandledPartitionClosure [parts=" + this.parts + ']';
        }
    }

    private class FlushStateTransferStateTask
    implements DrControlTask {
        private final CacheDrStateTransferKey key;
        private final EntryProcessor<CacheDrStateTransferKey, CacheDrStateTransferInfo, Void> closure;

        private FlushStateTransferStateTask(CacheDrStateTransferKey key, EntryProcessor<CacheDrStateTransferKey, CacheDrStateTransferInfo, Void> closure) {
            this.key = key;
            this.closure = closure;
        }

        @Override
        public void run() throws IgniteCheckedException {
            GridCacheSharedContext sctx = SysCacheDrStateManager.this.cctx.shared();
            GridDhtPartitionsExchangeFuture topFuture = sctx.exchange().lastTopologyFuture();
            assert (topFuture != null) : "DR Worker should start after join to topology (last exchange future is null)";
            IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture = sctx.exchange().affinityReadyFuture(topFuture.initialVersion());
            affinityReadyFuture.get();
            SysCacheDrStateManager.this.sysCache.invoke(this.key, this.closure, new Object[0]);
        }
    }

    private class StateTransferCompleteTask
    extends DrAbstractControlTask {
        private final CacheDrStateTransferResultKey key;
        private final CacheDrStateTransferResultInfo info;

        StateTransferCompleteTask(CacheDrStateTransferResultKey key, CacheDrStateTransferResultInfo newInfo) {
            this.key = key;
            this.info = newInfo;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run0(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            CacheDrStateTransferResultInfo info = (CacheDrStateTransferResultInfo)SysCacheDrStateManager.this.sysCache.get(this.key);
            if (info == null) {
                return oldStopInfo;
            }
            Collection lsnrs = info.listeners();
            if (!F.isEmpty(lsnrs)) {
                lsnrs = info.listeners().stream().filter(nodeId -> SysCacheDrStateManager.this.cctx.kernalContext().discovery().node((UUID)nodeId) != null).collect(Collectors.toSet());
                lsnrs.remove(SysCacheDrStateManager.this.cctx.localNodeId());
            }
            if (lsnrs.isEmpty()) {
                SysCacheDrStateManager.this.sysCache.remove(new CacheDrStateTransferKey(this.key.cacheName(), this.key.id(), this.key.dataCenterIds()));
                SysCacheDrStateManager.this.sysCache.remove(this.key);
            } else if (lsnrs.size() != info.listeners().size()) {
                SysCacheDrStateManager.this.sysCache.put(this.key, new CacheDrStateTransferResultInfo(true, info.pauseInfo(), lsnrs));
            }
            return oldStopInfo;
        }

        @Override
        public void onDone() {
            super.onDone();
            CacheDrPauseInfo pauseInfo = this.info.pauseInfo();
            if (pauseInfo == null) {
                if (SysCacheDrStateManager.this.log.isInfoEnabled()) {
                    SysCacheDrStateManager.this.log.info("Full state transfer finished: [fstId=" + this.key.id() + ", cacheName=" + this.key.cacheName() + ", dataCenters=" + this.key.dataCenterIds() + "]");
                }
                SysCacheDrStateManager.this.recordCacheFstEvt(1025, this.key.cacheName(), this.key.dataCenterIds());
            } else {
                if (pauseInfo.reason() != CacheDrPauseReason.USER_REQUEST) {
                    SysCacheDrStateManager.this.log.warning("Full state transfer failed: [fstId=" + this.key.id() + ", reason=" + (Object)((Object)pauseInfo.reason()) + ", cacheName=" + this.key.cacheName() + ", dataCenters=" + this.key.dataCenterIds() + "]");
                }
                SysCacheDrStateManager.this.recordCacheFstEvt(1026, this.key.cacheName(), this.key.dataCenterIds());
            }
            StateTransferFuture fut = (StateTransferFuture)SysCacheDrStateManager.this.stateTransferFuts.remove(this.key.id());
            if (fut != null) {
                SysCacheDrStateManager.this.completeFstFuture(fut, this.info.pauseInfo());
            }
        }
    }

    private class StateTransferChangeTask
    extends DrAbstractControlTask {
        private final CacheDrStateTransferKey key;
        private final CacheDrStateTransferInfo newInfo;
        private final CacheDrStateTransferInfo oldInfo;

        StateTransferChangeTask(CacheDrStateTransferKey key, CacheDrStateTransferInfo oldInfo, CacheDrStateTransferInfo newInfo) {
            assert (SysCacheDrStateManager.this.cctx.cacheId() == CU.cacheId(key.cacheName()));
            this.key = key;
            this.newInfo = newInfo;
            this.oldInfo = oldInfo;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run0(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            if ((this.oldInfo != null || this.newInfo != null) && this.newInfo == null) {
                SysCacheDrStateManager.this.stopStateTransfer0(this.key, null);
            }
            return oldStopInfo;
        }

        @Override
        public void onDone() {
            StateTransferInfo newInfo = SysCacheDrStateManager.this.toStateTransferInfo(this.key, this.newInfo);
            StateTransferInfo oldInfo = SysCacheDrStateManager.this.toStateTransferInfo(this.key, this.oldInfo);
            SysCacheDrStateManager.this.stateTransferLsnr.onStateTransferInfoChanged(oldInfo, newInfo);
            if (oldInfo == null) {
                SysCacheDrStateManager.this.recordCacheFstEvt(1024, this.key.cacheName(), this.key.dataCenterIds());
                if (SysCacheDrStateManager.this.log.isInfoEnabled()) {
                    SysCacheDrStateManager.this.log.info("Full state transfer started: [fstId=" + this.key.id() + ", cacheName=" + this.key.cacheName() + ", dataCenters=" + this.key.dataCenterIds() + "]");
                }
            }
            super.onDone();
        }
    }

    private class StateTransferStopTask
    extends DrAbstractControlTask {
        private final GridFutureAdapter<?> fut;
        private final CacheDrStateTransferKey key;
        private final CacheDrPauseInfo stopInfo;

        StateTransferStopTask(GridFutureAdapter<?> fut, @Nullable CacheDrStateTransferKey key, String errMsg) {
            this.fut = fut;
            this.key = key;
            DynamicCacheDescriptor cacheDesc = SysCacheDrStateManager.this.cctx.shared().cache().cacheDescriptor(SysCacheDrStateManager.this.cctx.cacheId());
            this.stopInfo = new CacheDrPauseInfo(SysCacheDrStateManager.this.cctx.localNodeId(), errMsg == null ? CacheDrPauseReason.USER_REQUEST : CacheDrPauseReason.BATCH_FAILED, errMsg == null ? "State transfer cancelled." : errMsg, cacheDesc.receivedFrom());
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run0(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            CacheDrStateTransferResultKey resKey = CacheDrStateTransferResultKey.fromStateTransferKey(this.key);
            CacheDrStateTransferResultInfo res = (CacheDrStateTransferResultInfo)SysCacheDrStateManager.this.sysCache.get(resKey);
            if (res != null && !res.done()) {
                SysCacheDrStateManager.this.sysCache.put(resKey, new CacheDrStateTransferResultInfo(true, this.stopInfo, res.listeners()));
            }
            return oldStopInfo;
        }

        @Override
        public void onDone() {
            super.onDone();
            StateTransferFuture fstFut = (StateTransferFuture)SysCacheDrStateManager.this.stateTransferFuts.remove(this.key.id());
            if (fstFut != null) {
                SysCacheDrStateManager.this.completeFstFuture(fstFut, this.stopInfo);
            }
            this.fut.onDone();
        }

        @Override
        public void onError(Throwable err) {
            StateTransferFuture fstFut = (StateTransferFuture)SysCacheDrStateManager.this.stateTransferFuts.remove(this.key.id());
            if (fstFut != null) {
                fstFut.onDone(err);
            }
            this.fut.onDone(err);
        }
    }

    private class StateTransferStartTask
    extends DrAbstractControlTask {
        private final GridFutureAdapter<GridFutureAdapter<?>> fut;
        private final GridFutureAdapter<?> innerFut = new GridFutureAdapter();
        private final Collection<Byte> dataCenterIds;
        private final Long snapshotId;

        private StateTransferStartTask(@Nullable GridFutureAdapter<GridFutureAdapter<?>> fut, Long snapshotId, Collection<Byte> dataCenterIds) {
            this.fut = fut;
            this.snapshotId = snapshotId;
            this.dataCenterIds = dataCenterIds;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run0(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            assert (oldStopInfo != null);
            if (SysCacheDrStateManager.this.isDrStopped()) {
                this.innerFut.onDone(new IgniteCheckedException("State transfer was not started due to date center replication is stopped: " + oldStopInfo));
                return oldStopInfo;
            }
            this.stateTransfer().listen(new IgniteInClosure<IgniteInternalFuture<?>>(){
                private static final long serialVersionUID = 0L;

                @Override
                public void apply(IgniteInternalFuture<?> transferFut) {
                    try {
                        transferFut.get();
                        StateTransferStartTask.this.innerFut.onDone();
                    }
                    catch (Throwable e) {
                        StateTransferStartTask.this.innerFut.onDone(e);
                    }
                }
            });
            return oldStopInfo;
        }

        IgniteInternalFuture<?> stateTransfer() throws IgniteCheckedException {
            long gridActivationTime = SysCacheDrStateManager.this.cctx.kernalContext().state().clusterState().activationTime();
            String cacheName = SysCacheDrStateManager.this.cctx.name();
            Iterator it = SysCacheDrStateManager.this.sysCache.scanIterator(false, (k, v) -> SysCacheDrStateManager.stateTransferKey(k, cacheName), SysCacheDrStateManager.this.timeout);
            while (it.hasNext()) {
                Cache.Entry entry = it.next();
                CacheDrStateTransferKey oldKey = (CacheDrStateTransferKey)entry.getKey();
                CacheDrStateTransferInfo oldVal = (CacheDrStateTransferInfo)entry.getValue();
                if (oldVal != null && oldVal.gridActivationTime() != gridActivationTime && SysCacheDrStateManager.this.sysCache.remove(oldKey, oldVal)) {
                    SysCacheDrStateManager.this.sysCache.remove(CacheDrStateTransferResultKey.fromStateTransferKey(oldKey));
                    continue;
                }
                if (SysCacheDrStateManager.this.sysCache.get(oldKey) == null) continue;
                CacheDrStateTransferResultKey resKey = CacheDrStateTransferResultKey.fromStateTransferKey(oldKey);
                CacheDrStateTransferResultInfo resInfo = (CacheDrStateTransferResultInfo)SysCacheDrStateManager.this.sysCache.get(resKey);
                if (resInfo.pauseInfo() != null) continue;
                if (!this.dataCenterIds.equals(oldKey.dataCenterIds())) {
                    String msg = "Multiple state transfers are not supported. Another state transfer is active: " + oldKey.id();
                    return new GridFinishedFuture(new IllegalStateException(msg));
                }
                StateTransferFuture fstFut = (StateTransferFuture)SysCacheDrStateManager.this.stateTransferFuts.get(oldKey.id());
                if (!resInfo.done()) {
                    fstFut = SysCacheDrStateManager.this.stateTransferFuts.computeIfAbsent(oldKey.id(), id -> new StateTransferFuture((IgniteUuid)id));
                    if (!resInfo.listeners().contains(SysCacheDrStateManager.this.cctx.localNodeId())) {
                        HashSet<UUID> resLsnrs = new HashSet<UUID>(resInfo.listeners());
                        resLsnrs.add(SysCacheDrStateManager.this.cctx.localNodeId());
                        CacheDrStateTransferResultInfo old = SysCacheDrStateManager.this.sysCache.getAndPut(resKey, new CacheDrStateTransferResultInfo(false, null, resLsnrs));
                        assert (!old.done());
                    }
                    if (SysCacheDrStateManager.this.log.isInfoEnabled()) {
                        SysCacheDrStateManager.this.log.info("Requested state transfer is already running [cache=" + SysCacheDrStateManager.this.cctx.name() + ", dataCenterIds=" + this.dataCenterIds + ']');
                    }
                } else {
                    if (fstFut == null) {
                        fstFut = new StateTransferFuture(oldKey.id());
                        SysCacheDrStateManager.this.completeFstFuture(fstFut, resInfo.pauseInfo());
                    }
                    if (SysCacheDrStateManager.this.log.isInfoEnabled()) {
                        SysCacheDrStateManager.this.log.info("Another state transfer for the same cache completed concurrently. Use it's result [fut=" + fstFut + ", resInfo=" + resInfo + ']');
                    }
                }
                return fstFut;
            }
            return this.startNewStateTransfer();
        }

        private StateTransferFuture startNewStateTransfer() {
            long gridActivationTime = SysCacheDrStateManager.this.cctx.kernalContext().state().clusterState().activationTime();
            CacheDrStateTransferKey key = new CacheDrStateTransferKey(SysCacheDrStateManager.this.cctx.name(), DrUtils.generateFstId(), this.dataCenterIds);
            HashMap<Integer, UUID> parts = new HashMap<Integer, UUID>(SysCacheDrStateManager.this.cctx.affinity().partitions(), 1.0f);
            for (int i = 0; i < SysCacheDrStateManager.this.cctx.affinity().partitions(); ++i) {
                parts.put(i, null);
            }
            AffinityTopologyVersion topVer = new AffinityTopologyVersion(SysCacheDrStateManager.this.cctx.discovery().topologyVersion());
            CacheDrStateTransferInfo info = new CacheDrStateTransferInfo(key.id(), topVer, SysCacheDrStateManager.this.cctx.localNodeId(), gridActivationTime, parts, this.snapshotId);
            if (SysCacheDrStateManager.this.log.isInfoEnabled()) {
                SysCacheDrStateManager.this.log.info("Started data center state transfer [transferKey= " + key + ", info=" + info + ']');
            }
            StateTransferFuture fstFut = SysCacheDrStateManager.this.stateTransferFuts.computeIfAbsent(info.id(), id -> new StateTransferFuture((IgniteUuid)id));
            try {
                boolean putx = SysCacheDrStateManager.this.sysCache.put(key, info);
                assert (putx);
                CacheDrStateTransferResultKey resKey = CacheDrStateTransferResultKey.fromStateTransferKey(key);
                CacheDrStateTransferResultInfo resInfo = (CacheDrStateTransferResultInfo)SysCacheDrStateManager.this.sysCache.get(resKey);
                assert (resInfo == null);
                SysCacheDrStateManager.this.sysCache.put(resKey, new CacheDrStateTransferResultInfo(false, null, Collections.singleton(SysCacheDrStateManager.this.cctx.localNodeId())));
                return fstFut;
            }
            catch (Exception ex) {
                if (SysCacheDrStateManager.this.log.isInfoEnabled()) {
                    SysCacheDrStateManager.this.log.info("Starting state transfer failed [transferKey= " + key + ", info=" + info + ']');
                }
                SysCacheDrStateManager.this.stateTransferFuts.remove(info.id());
                fstFut.onDone(ex);
                return fstFut;
            }
        }

        @Override
        public void onDone() {
            super.onDone();
            this.fut.onDone(this.innerFut);
        }

        @Override
        public void onError(@Nullable Throwable err) {
            this.fut.onDone(err);
        }
    }

    private class DrStopTask
    extends DrAbstractControlTask {
        private final CacheDrPauseReason reason;
        private final String errMsg;
        private final CacheDrPauseInfo rmtStopInfo;
        private final GridFutureAdapter<?> fut;

        private DrStopTask(@Nullable GridFutureAdapter<?> fut, @Nullable CacheDrPauseReason reason, @Nullable String errMsg, CacheDrPauseInfo rmtStopInfo) {
            this.fut = fut;
            this.reason = reason;
            this.errMsg = errMsg;
            this.rmtStopInfo = rmtStopInfo;
        }

        @Override
        public void run() throws Exception {
            if (this.rmtStopInfo != null) {
                SysCacheDrStateManager.this.globalStopInfo = this.rmtStopInfo;
                SysCacheDrStateManager.this.stateLsnr.onGlobalDrStatusChanged(SysCacheDrStateManager.this.globalStopInfo.reason());
            } else {
                super.run();
            }
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run0(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            boolean save;
            boolean bl = save = oldStopInfo == null;
            if (!save) {
                CacheDrPauseReason oldReason = oldStopInfo.reason();
                boolean bl2 = save = oldReason == null && this.reason != null || oldReason != null && this.reason == null;
            }
            if (save) {
                DynamicCacheDescriptor cacheDesc = SysCacheDrStateManager.this.cctx.shared().cache().cacheDescriptor(SysCacheDrStateManager.this.cctx.cacheId());
                CacheDrPauseInfo newStopInfo = new CacheDrPauseInfo(SysCacheDrStateManager.this.cctx.localNodeId(), this.reason, this.errMsg, cacheDesc.receivedFrom());
                SysCacheDrStateManager.this.sysCache.put(new CacheDrPauseKey(SysCacheDrStateManager.this.cctx.name()), newStopInfo);
                if (SysCacheDrStateManager.this.log.isInfoEnabled()) {
                    if (this.reason != null) {
                        SysCacheDrStateManager.this.log.info("Data center replication is stopped [cache=" + SysCacheDrStateManager.this.cctx.name() + ", info=" + newStopInfo + ", reason=" + (Object)((Object)this.reason) + "]");
                    } else if (SysCacheDrStateManager.this.log.isInfoEnabled()) {
                        SysCacheDrStateManager.this.log.info("Data center replication is started [cache=" + SysCacheDrStateManager.this.cctx.name() + ']');
                    }
                }
                return newStopInfo;
            }
            return oldStopInfo;
        }

        @Override
        public void onDone() {
            super.onDone();
            if (this.fut != null) {
                this.fut.onDone();
            }
        }

        @Override
        public void onError(Throwable err) {
            if (this.fut != null) {
                this.fut.onDone(err);
            }
        }
    }
}

