/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.dr;

import java.io.EOFException;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheStoppedException;
import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.gridgain.grid.cache.dr.CacheDrEntryFilter;
import org.gridgain.grid.cache.dr.CacheDrPauseReason;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.cache.dr.CacheDrStateTransfer;
import org.gridgain.grid.events.DrCacheFstEvent;
import org.gridgain.grid.internal.GridGainFeatures;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrEntryImpl;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrPauseInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrResultType;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferKey;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferResultInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferResultKey;
import org.gridgain.grid.internal.processors.cache.dr.EntryBuffer;
import org.gridgain.grid.internal.processors.cache.dr.GridGainCacheDrManager;
import org.gridgain.grid.internal.processors.cache.dr.Permit;
import org.gridgain.grid.internal.processors.dr.DrSenderAttributes;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.gridgain.grid.internal.processors.dr.fst.Batch;
import org.gridgain.grid.internal.processors.dr.fst.StateTransferJob;
import org.gridgain.grid.internal.processors.dr.fst.StateTransferTask;
import org.jetbrains.annotations.Nullable;

public class CacheDrStateTransferHandler {
    private final GridCacheContext cctx;
    private final GridGainCacheDrManager mgr;
    private final GridSpinBusyLock busyLock;
    private final CacheDrEntryFilter filter;
    private final IgniteLogger log;
    private IgniteInternalCache<Object, Object> sysCache;
    private final CountDownLatch started = new CountDownLatch(1);
    private final ConcurrentMap<IgniteUuid, StateTransferFuture> futMap = new ConcurrentHashMap<IgniteUuid, StateTransferFuture>();
    private final ConcurrentMap<IgniteUuid, CacheStateTransferTask> tasksMap = new ConcurrentHashMap<IgniteUuid, CacheStateTransferTask>();
    private volatile long stateTransferThrottle;
    private int stateTransferThrottleBytes;
    private final boolean throttleBytes;
    private final int maxSize;
    private final int maxSizeBytes;

    public CacheDrStateTransferHandler(GridCacheContext cctx, CacheDrSenderConfiguration cfg, GridSpinBusyLock busyLock, GridGainCacheDrManager mgr) {
        assert (cctx != null);
        assert (cfg != null);
        assert (mgr != null);
        this.cctx = cctx;
        this.mgr = mgr;
        this.busyLock = busyLock;
        this.log = cctx.logger(CacheDrStateTransferHandler.class);
        this.filter = cfg.getEntryFilter();
        this.stateTransferThrottle = cfg.getStateTransferThrottle();
        this.stateTransferThrottleBytes = cfg.getStateTransferThrottleBytes();
        this.throttleBytes = this.stateTransferThrottleBytes > 0;
        int stateTransferBatchSize = mgr.drProc.ggConfig().getStateTransferBatchSendSizeBytes();
        this.maxSizeBytes = stateTransferBatchSize > 0 ? stateTransferBatchSize : mgr.drProc.ggConfig().getBatchSendSizeBytes();
        this.maxSize = cfg.getBatchSendSize();
        String cacheName = cctx.name();
        assert (CacheType.cacheType(cacheName) == CacheType.USER) : "replication of inner system caches is deprecated, cacheName=" + cacheName;
    }

    long getStateTransferThrottle() {
        return this.stateTransferThrottle;
    }

    void setStateTransferThrottle(long stateTransferThrottle) {
        this.stateTransferThrottle = stateTransferThrottle;
    }

    void onKernalStart(IgniteInternalCache<Object, Object> sysCache) {
        this.sysCache = sysCache;
        this.started.countDown();
    }

    void onStateTransferInfoChanged(CacheDrStateTransferKey key, @Nullable CacheDrStateTransferInfo info) throws IgniteCheckedException {
        if (info != null && this.primaryPartitions(info, this.cctx.affinity().affinityTopologyVersion(), null) != null) {
            this.mgr.enqueueDrMgmtTask(new PartitionAssignmentTask(key, null));
        } else if (info == null) {
            this.stopStateTransfer(key, null);
        }
    }

    void onStateTransferResultChanged(CacheDrStateTransferResultKey key, CacheDrStateTransferResultInfo info, boolean transferCompleted) throws IgniteCheckedException {
        assert (info != null);
        UUID locNodeId = this.cctx.localNodeId();
        if (info.done()) {
            Collection lsnrs;
            if (transferCompleted) {
                CacheDrPauseInfo pauseInfo = info.pauseInfo();
                if (pauseInfo == null) {
                    if (this.log.isInfoEnabled()) {
                        this.log.info("Full state transfer finished: [fstId=" + key.id() + ", cacheName=" + key.cacheName() + ", dataCenters=" + key.dataCenterIds() + "]");
                    }
                    this.recordCacheFstEvt(1025, key.cacheName(), key.dataCenterIds());
                } else {
                    if (pauseInfo.reason() != CacheDrPauseReason.USER_REQUEST) {
                        this.log.warning("Full state transfer failed: [fstId=" + key.id() + ", reason=" + (Object)((Object)pauseInfo.reason()) + ", cacheName=" + key.cacheName() + ", dataCenters=" + key.dataCenterIds() + "]");
                    }
                    this.recordCacheFstEvt(1026, key.cacheName(), key.dataCenterIds());
                }
            }
            if (!F.isEmpty(lsnrs = info.listeners()) && (lsnrs = (Collection)info.listeners().stream().filter(nodeId -> this.cctx.kernalContext().discovery().node((UUID)nodeId) != null).collect(Collectors.toSet())).contains(locNodeId)) {
                StateTransferFuture fut = (StateTransferFuture)this.futMap.remove(key.id());
                if (fut != null) {
                    this.completeFstFuture(fut, info);
                }
                lsnrs.remove(locNodeId);
            }
            if (lsnrs.isEmpty()) {
                this.sysCache.remove(new CacheDrStateTransferKey(key.cacheName(), key.id(), key.dataCenterIds()));
                this.sysCache.remove(key);
            } else if (lsnrs.size() != info.listeners().size()) {
                this.sysCache.put(key, new CacheDrStateTransferResultInfo(true, info.pauseInfo(), lsnrs));
            }
        } else {
            this.recordCacheFstEvt(1024, key.cacheName(), key.dataCenterIds());
            if (this.log.isInfoEnabled()) {
                this.log.info("Full state transfer started: [fstId=" + key.id() + ", cacheName=" + key.cacheName() + ", dataCenters=" + key.dataCenterIds() + "]");
            }
        }
    }

    void onKernalStop() {
        this.started.countDown();
        for (CacheStateTransferTask value : this.tasksMap.values()) {
            value.cancel();
        }
        this.tasksMap.clear();
    }

    void stop() {
        for (StateTransferFuture fut : this.futMap.values()) {
            fut.onDone(new IgniteCheckedException("State transfer is cancelled", new CacheStoppedException(this.cctx.name())));
        }
        this.futMap.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    IgniteInternalFuture<?> stateTransfer(Collection<Byte> dataCenterIds, boolean syncFst, long timeout) throws IgniteCheckedException {
        if (!this.busyLock.enterBusy()) {
            return new GridFinishedFuture(new IgniteCheckedException("State transfer is cancelled", new CacheStoppedException(this.cctx.name())));
        }
        try {
            Collection<Byte> oldDataCenterIds;
            GridCompoundFuture res = new GridCompoundFuture();
            HashSet<Byte> newDataCenterIds = new HashSet<Byte>(dataCenterIds);
            Iterator<Cache.Entry<Object, Object>> it = this.sysCache.scanIterator(false, null, timeout);
            while (it.hasNext()) {
                Cache.Entry<Object, Object> entry = it.next();
                if (!this.mgr.stateTransferKey(entry.getKey())) continue;
                CacheDrStateTransferKey oldKey = (CacheDrStateTransferKey)entry.getKey();
                CacheDrStateTransferInfo oldVal = (CacheDrStateTransferInfo)entry.getValue();
                if (oldVal != null && oldVal.gridActivationTime() != this.kernalCtx().state().clusterState().activationTime()) {
                    this.sysCache.remove(oldKey);
                    this.sysCache.remove(CacheDrStateTransferResultKey.fromStateTransferKey(oldKey));
                    continue;
                }
                oldDataCenterIds = oldKey.dataCenterIds();
                if (Collections.disjoint(newDataCenterIds, oldDataCenterIds)) continue;
                CacheDrStateTransferResultKey resKey = CacheDrStateTransferResultKey.fromStateTransferKey(oldKey);
                if (this.sysCache.get(oldKey) == null) continue;
                CacheDrStateTransferResultInfo resInfo = (CacheDrStateTransferResultInfo)this.sysCache.get(resKey);
                assert (resInfo != null);
                if (resInfo.pauseInfo() != null) continue;
                if (oldVal.isSyncTransfer() != syncFst) {
                    String msg = "Can't start " + (syncFst ? "synchronous" : "asynchronous") + " full state transfer because a full state transfer of another type " + (syncFst ? "(asynchronous)" : "(synchronous)") + " is in progress.";
                    GridFinishedFuture gridFinishedFuture = new GridFinishedFuture(new IgniteCheckedException(msg));
                    return gridFinishedFuture;
                }
                newDataCenterIds.removeAll(oldDataCenterIds);
                StateTransferFuture oldFut = (StateTransferFuture)this.futMap.get(oldKey.id());
                if (!resInfo.done()) {
                    if (oldFut == null) {
                        oldFut = new StateTransferFuture(oldKey);
                        StateTransferFuture oldFut0 = this.futMap.putIfAbsent(oldKey.id(), oldFut);
                        assert (oldFut0 == null) : "Race condition detected.";
                        HashSet<UUID> resLsnrs = new HashSet<UUID>(resInfo.listeners());
                        resLsnrs.add(this.cctx.localNodeId());
                        CacheDrStateTransferResultInfo old = (CacheDrStateTransferResultInfo)this.sysCache.getAndPut(resKey, new CacheDrStateTransferResultInfo(false, null, resLsnrs));
                        assert (!old.done());
                    }
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Another state transfer for the same cache is already running. Will wait for the old one to finish [fut=" + oldFut + ", resInfo=" + resInfo + ']');
                    }
                } else {
                    if (oldFut == null) {
                        oldFut = new StateTransferFuture(oldKey);
                        this.completeFstFuture(oldFut, resInfo);
                    }
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Another state transfer for the same cache completed concurrently. Use it's result [fut=" + oldFut + ", resInfo=" + resInfo + ']');
                    }
                }
                res.add(oldFut);
            }
            if (!newDataCenterIds.isEmpty()) {
                if (newDataCenterIds.size() != dataCenterIds.size()) {
                    ArrayList<Byte> rmvIds = new ArrayList<Byte>(dataCenterIds);
                    rmvIds.removeAll(newDataCenterIds);
                    this.log.info("Requested state transfer is already running for some data centers [cache=" + this.cctx.name() + ", dataCenterIds=" + rmvIds + ']');
                }
                CacheDrStateTransferKey key = new CacheDrStateTransferKey(this.cctx.name(), IgniteUuid.randomUuid(), newDataCenterIds);
                HashMap<Integer, UUID> parts = new HashMap<Integer, UUID>(this.cctx.affinity().partitions(), 1.0f);
                for (int i = 0; i < this.cctx.affinity().partitions(); ++i) {
                    parts.put(i, null);
                }
                assert (!F.isEmpty(parts));
                AffinityTopologyVersion topVer = new AffinityTopologyVersion(this.cctx.discovery().topologyVersion());
                if (syncFst && !(syncFst = CacheDrStateTransferHandler.isSyncFstSupported(this.kernalCtx(), this.cctx.discovery().nodes(topVer.topologyVersion())))) {
                    this.log.warning("Synchronous full state transfer failed: One of nodes doesn't support synchronous full state transfer.");
                    oldDataCenterIds = new GridFinishedFuture(new IgniteCheckedException("One of nodes doesn't support synchronous full state transfer."));
                    return oldDataCenterIds;
                }
                CacheDrStateTransferInfo info = new CacheDrStateTransferInfo(key.id(), topVer, this.cctx.localNodeId(), this.kernalCtx().state().clusterState().activationTime(), parts, syncFst);
                this.log.info("Starting data center state transfer [transferKey= " + key + ", info=" + info + ']');
                boolean putx = this.sysCache.put(key, info);
                assert (putx);
                CacheDrStateTransferResultKey resKey = new CacheDrStateTransferResultKey(this.cctx.name(), key.id(), new ArrayList<Byte>(newDataCenterIds));
                CacheDrStateTransferResultInfo resInfo = (CacheDrStateTransferResultInfo)this.sysCache.get(resKey);
                assert (resInfo == null);
                this.sysCache.put(resKey, new CacheDrStateTransferResultInfo(false, null, Collections.singleton(this.cctx.localNodeId())));
                StateTransferFuture fut = new StateTransferFuture(key);
                res.add(fut);
                StateTransferFuture oldFut = this.futMap.putIfAbsent(info.id(), fut);
                assert (oldFut == null) : "Race condition detected.";
            } else {
                this.log.info("Requested state transfer is already running [cache=" + this.cctx.name() + ", dataCenterIds=" + dataCenterIds + ']');
            }
            res.markInitialized();
            GridCompoundFuture gridCompoundFuture = res;
            return gridCompoundFuture;
        }
        catch (ClusterTopologyCheckedException e) {
            throw e;
        }
        catch (IgniteCheckedException e) {
            GridFinishedFuture gridFinishedFuture = new GridFinishedFuture(e);
            return gridFinishedFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    public static boolean isSyncFstSupported(GridKernalContext ctx, Collection<ClusterNode> nodes) {
        for (ClusterNode node : nodes) {
            if (node.isClient() && node.attribute("plugins.gg.replication.snd.hub") == null || GridGainFeatures.nodeSupports(ctx, node, GridGainFeatures.SYNCHRONOUS_STATE_TRANSFER)) continue;
            return false;
        }
        return true;
    }

    private void completeFstFuture(StateTransferFuture fut, CacheDrStateTransferResultInfo resInfo) {
        if (resInfo.pauseInfo() == null) {
            fut.onDone();
        } else {
            fut.onDone(new IgniteCheckedException("State transfer is cancelled: " + resInfo.pauseInfo()));
        }
    }

    private void recordCacheFstEvt(int type, String cacheName, Collection<Byte> dcIds) {
        if (!this.kernalCtx().event().isUserRecordable(type)) {
            return;
        }
        ClusterNode node = this.kernalCtx().discovery().localNode();
        String msg = null;
        switch (type) {
            case 1024: {
                msg = "Cache full state transfer started.";
                break;
            }
            case 1025: {
                msg = "Cache full state transfer finished.";
                break;
            }
            case 1026: {
                msg = "Cache full state transfer failed.";
                break;
            }
            default: {
                assert (false) : "Unsupported event type " + type;
                break;
            }
        }
        this.kernalCtx().event().record(new DrCacheFstEvent(node, msg, type, cacheName, dcIds));
    }

    private GridKernalContext kernalCtx() {
        return this.cctx.kernalContext();
    }

    Collection<CacheDrStateTransfer> listStateTransfers() throws IgniteCheckedException {
        U.await(this.started);
        int cacheParts = this.cctx.affinity().partitions();
        LinkedList<CacheDrStateTransfer> res = new LinkedList<CacheDrStateTransfer>();
        Iterator<Cache.Entry<Object, Object>> it = this.sysCache.scanIterator(false, null);
        while (it.hasNext()) {
            Cache.Entry<Object, Object> entry = it.next();
            if (!this.mgr.stateTransferKey(entry.getKey())) continue;
            CacheDrStateTransferKey key = (CacheDrStateTransferKey)entry.getKey();
            CacheDrStateTransferInfo val = (CacheDrStateTransferInfo)entry.getValue();
            CacheDrStateTransferResultInfo info = (CacheDrStateTransferResultInfo)this.sysCache.get(CacheDrStateTransferResultKey.fromStateTransferKey(key));
            if (info == null || info.done()) continue;
            res.add(new CacheDrStateTransfer(key.id(), key.dataCenterIds(), key.cacheName(), val.nodeId(), val.startTime(), val.isSyncTransfer(), cacheParts - val.partitions().size()));
        }
        return res;
    }

    void onDataNodeLeft(AffinityTopologyVersion topVer, UUID nodeId, long timeout) {
        try {
            Iterator<Cache.Entry<Object, Object>> it = this.sysCache.scanIterator(false, null, timeout);
            while (it.hasNext()) {
                CacheDrStateTransferInfo info;
                Cache.Entry<Object, Object> entry = it.next();
                if (!this.mgr.stateTransferKey(entry.getKey()) || (info = (CacheDrStateTransferInfo)entry.getValue()) == null || this.primaryPartitions(info, topVer, nodeId) == null) continue;
                this.mgr.enqueueDrMgmtTask(new PartitionAssignmentTask((CacheDrStateTransferKey)entry.getKey(), nodeId));
            }
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to read system cache entries: " + e, e);
        }
    }

    void onReplicationStop(CacheDrPauseInfo stopInfo) throws IgniteCheckedException {
        assert (stopInfo != null && stopInfo.reason() != null);
        HashSet<CacheDrStateTransferKey> fstKeys = new HashSet<CacheDrStateTransferKey>();
        HashMap<Object, CacheDrStateTransferResultInfo> res = new HashMap<Object, CacheDrStateTransferResultInfo>();
        Iterator<Cache.Entry<Object, Object>> it = this.sysCache.scanIterator(false, null);
        while (it.hasNext()) {
            CacheDrStateTransferResultInfo info;
            Cache.Entry<Object, Object> entry = it.next();
            if (this.mgr.stateTransferKey(entry.getKey())) {
                fstKeys.add((CacheDrStateTransferKey)entry.getKey());
                continue;
            }
            if (!this.mgr.stateTransferResultKey(entry.getKey()) || (info = (CacheDrStateTransferResultInfo)entry.getValue()).done()) continue;
            res.put(entry.getKey(), new CacheDrStateTransferResultInfo(true, stopInfo, info.listeners()));
        }
        if (!fstKeys.isEmpty()) {
            this.sysCache.removeAll(fstKeys);
        }
        if (!res.isEmpty()) {
            this.sysCache.putAll(res);
        }
    }

    @Nullable
    private Set<Integer> primaryPartitions(CacheDrStateTransferInfo info, AffinityTopologyVersion topVer, @Nullable UUID failedNodeId) {
        assert (topVer.topologyVersion() > 0L);
        UUID locNodeId = this.cctx.kernalContext().localNodeId();
        Set<Integer> locParts = this.cctx.affinity().primaryPartitions(locNodeId, topVer);
        HashSet<Integer> parts = null;
        for (Map.Entry<Integer, UUID> e : info.partitions().entrySet()) {
            if (e.getValue() != null && !e.getValue().equals(failedNodeId) || !locParts.contains(e.getKey())) continue;
            if (parts == null) {
                parts = new HashSet<Integer>();
            }
            parts.add(e.getKey());
        }
        return parts;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private Permit acquirePermit(int size) throws IgniteCheckedException {
        int size0;
        if (!this.throttleBytes) {
            return null;
        }
        CacheDrStateTransferHandler cacheDrStateTransferHandler = this;
        synchronized (cacheDrStateTransferHandler) {
            try {
                long start = U.currentTimeMillis();
                while ((size0 = this.acquire(size)) == 0) {
                    if (this.mgr.stopped()) {
                        throw new IgniteCheckedException("Interrupt state transfer due to cache stopping: " + this.cctx.name());
                    }
                    this.wait(500L);
                    long now = U.currentTimeMillis();
                    long waitDuration = now - start;
                    if (waitDuration <= 0L) continue;
                    this.mgr.metrics0().onFstThrottling(waitDuration);
                    start = now;
                }
            }
            catch (InterruptedException e) {
                throw new IgniteInterruptedCheckedException(e);
            }
        }
        int finalSize = size0;
        return () -> this.release(finalSize);
    }

    private int acquire(int bytes) {
        assert (Thread.holdsLock(this));
        if (this.stateTransferThrottleBytes == 0) {
            return 0;
        }
        int acquired = Math.min(bytes, this.stateTransferThrottleBytes);
        this.stateTransferThrottleBytes -= acquired;
        return acquired;
    }

    private synchronized boolean release(int bytes) {
        this.stateTransferThrottleBytes += bytes;
        this.notifyAll();
        return true;
    }

    void stopStateTransfer(CacheDrStateTransferKey key, CacheDrPauseInfo reason) throws IgniteCheckedException {
        CacheStateTransferTask fstTask = (CacheStateTransferTask)this.tasksMap.remove(key.id());
        if (fstTask != null) {
            fstTask.cancel();
        }
        try {
            CacheDrStateTransferResultKey resKey = CacheDrStateTransferResultKey.fromStateTransferKey(key);
            CacheDrStateTransferResultInfo res = (CacheDrStateTransferResultInfo)this.sysCache.get(resKey);
            if (res != null && !res.done()) {
                this.sysCache.put(resKey, new CacheDrStateTransferResultInfo(true, reason, res.listeners()));
            }
        }
        catch (ClusterTopologyCheckedException e) {
            throw e;
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to set state transfer result.", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void scanPartition(StateTransferJob job, BatchImpl batch, GridCacheContext<?, ?> cctx) throws IgniteCheckedException {
        int part = job.partition();
        GridDhtLocalPartition locPart = cctx.topology().localPartition(part);
        boolean cancelPartition = true;
        if (locPart != null && locPart.reserve()) {
            if (locPart.state() == GridDhtPartitionState.OWNING) {
                cancelPartition = false;
            } else {
                locPart.release();
            }
        }
        if (cancelPartition) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to reserve partition to scan [cache=" + cctx.name() + "part=" + part + ']');
            }
            this.clearPartitionAssignment(batch.key, part, cctx.localNodeId());
            return;
        }
        try {
            if (cctx.topology().stopping()) {
                throw new CacheStoppedException(cctx.name());
            }
            GridIterator<CacheDataRow> iter = cctx.offheap().cachePartitionIterator(cctx.cacheId(), part, null, null);
            long entryCntr = 0L;
            while (!job.isCancelled() && iter.hasNext()) {
                CacheDataRow row = (CacheDataRow)iter.next();
                if (row.key().internal()) continue;
                GridCacheVersion drVer = row.version().conflictVersion();
                batch.replicateIfNeeded(false);
                ++entryCntr;
                GridCacheRawVersionedEntry entry = new GridCacheRawVersionedEntry(row.key(), row.value(), row.expireTime() > 0L ? 1L : 0L, row.expireTime(), drVer);
                if (this.filter != null) {
                    entry.unmarshal(cctx.cacheObjectContext());
                    if (!this.filter.accept(new CacheDrEntryImpl(entry, cctx.cacheObjectContext()))) {
                        this.mgr.metrics0().onSenderCacheEntryFiltered();
                        continue;
                    }
                }
                batch.add(entry);
            }
            if (!job.isCancelled()) {
                batch.partitionHandled(part, entryCntr);
            }
        }
        finally {
            locPart.release();
        }
    }

    private void clearPartitionAssignment(CacheDrStateTransferKey key, int part, UUID nodeId) throws IgniteCheckedException {
        this.mgr.txOp(stopInfo -> {
            if (stopInfo.reason() != null) {
                return;
            }
            IgniteInternalCache<Object, Object> sysCache0 = this.sysCache;
            sysCache0.invoke(key, new ClearPartitionAssignmentClosure(nodeId, part), new Object[0]);
        }, true);
    }

    private class PartitionAssignmentTask
    extends GridGainCacheDrManager.DrTask {
        private final CacheDrStateTransferKey key;
        private final UUID failedNodeId;
        private Set<Integer> parts;
        private boolean syncFst;

        private PartitionAssignmentTask(@Nullable CacheDrStateTransferKey key, UUID failedNodeId) {
            this.key = key;
            this.failedNodeId = failedNodeId;
        }

        @Override
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            if (oldStopInfo.reason() != null) {
                return oldStopInfo;
            }
            IgniteInternalCache sysCache0 = CacheDrStateTransferHandler.this.sysCache;
            CacheDrStateTransferInfo info = (CacheDrStateTransferInfo)sysCache0.get(this.key);
            if (info != null) {
                this.syncFst = info.isSyncTransfer();
                this.parts = CacheDrStateTransferHandler.this.primaryPartitions(info, CacheDrStateTransferHandler.this.cctx.affinity().affinityTopologyVersion(), this.failedNodeId);
                if (this.parts != null) {
                    sysCache0.invoke(this.key, new ReservePartitionsClosure(CacheDrStateTransferHandler.this.cctx.localNodeId(), this.parts), new Object[0]);
                    DrSenderAttributes attrs = CacheDrStateTransferHandler.this.mgr.sendHubAttributes();
                    if (this.failedNodeId == null && attrs != null) {
                        CacheStateTransferTask task = new CacheStateTransferTask(this.key, this.parts, this.syncFst);
                        if (CacheDrStateTransferHandler.this.tasksMap.putIfAbsent(this.key.id(), task) == null) {
                            CacheDrStateTransferHandler.this.mgr.submitStateTransferTask(task);
                        }
                    }
                }
            }
            return oldStopInfo;
        }

        @Override
        public void onDone() {
            if (!F.isEmpty(this.parts)) {
                DrSenderAttributes attrs = CacheDrStateTransferHandler.this.mgr.sendHubAttributes();
                if (attrs == null) {
                    return;
                }
                CacheStateTransferTask task = (CacheStateTransferTask)CacheDrStateTransferHandler.this.tasksMap.get(this.key.id());
                if (!task.scheduleMorePartitions(this.parts) && !task.isCancelled()) {
                    CacheDrStateTransferHandler.this.mgr.submitStateTransferTask(task);
                }
            }
            if (CacheDrStateTransferHandler.this.log.isDebugEnabled()) {
                CacheDrStateTransferHandler.this.log.debug("Created assignments for partitions: " + this.parts);
            }
        }
    }

    private class FlushStateTransferStateTask
    extends GridGainCacheDrManager.DrTask {
        private final CacheDrStateTransferKey key;
        private final EntryProcessor<CacheDrStateTransferKey, CacheDrStateTransferInfo, Void> closure;

        private FlushStateTransferStateTask(CacheDrStateTransferKey key, EntryProcessor<CacheDrStateTransferKey, CacheDrStateTransferInfo, Void> closure) {
            this.key = key;
            this.closure = closure;
        }

        @Override
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            if (oldStopInfo.reason() != null) {
                return oldStopInfo;
            }
            CacheDrStateTransferHandler.this.sysCache.invoke(this.key, this.closure, new Object[0]);
            return oldStopInfo;
        }
    }

    private static class RemoveHandledPartitionClosure
    implements EntryProcessor<CacheDrStateTransferKey, CacheDrStateTransferInfo, Void>,
    Externalizable {
        private static final long serialVersionUID = 5349841260224337573L;
        private List<Integer> parts;
        private long entryCnt;

        public RemoveHandledPartitionClosure() {
        }

        private RemoveHandledPartitionClosure(List<Integer> parts, long entryCnt) {
            this.parts = parts;
            this.entryCnt = entryCnt;
        }

        @Override
        public Void process(MutableEntry<CacheDrStateTransferKey, CacheDrStateTransferInfo> e, Object ... args) {
            CacheDrStateTransferInfo info = (CacheDrStateTransferInfo)e.getValue();
            if (info == null) {
                return null;
            }
            HashMap<Integer, UUID> partsMap = new HashMap<Integer, UUID>(info.partitions());
            for (Integer part : this.parts) {
                partsMap.remove(part);
            }
            long totalSentEntriesCnt = info.entryCount() + this.entryCnt;
            if (partsMap.isEmpty()) {
                e.remove();
            } else {
                e.setValue(new CacheDrStateTransferInfo(info).partitions(partsMap).entryCount(totalSentEntriesCnt));
            }
            return null;
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            U.writeIntCollection(out, this.parts);
            out.writeLong(this.entryCnt);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.parts = U.readIntList(in);
            try {
                this.entryCnt = in.readLong();
            }
            catch (EOFException eOFException) {
                // empty catch block
            }
        }

        public String toString() {
            return "RemoveHandledPartitionClosure [parts=" + this.parts + ']';
        }
    }

    private static class ClearPartitionAssignmentClosure
    implements EntryProcessor<CacheDrStateTransferKey, CacheDrStateTransferInfo, Void>,
    Externalizable {
        private static final long serialVersionUID = 8197706140589104476L;
        private UUID nodeId;
        private int part;

        public ClearPartitionAssignmentClosure() {
        }

        private ClearPartitionAssignmentClosure(UUID nodeId, int part) {
            this.nodeId = nodeId;
            this.part = part;
        }

        @Override
        public Void process(MutableEntry<CacheDrStateTransferKey, CacheDrStateTransferInfo> e, Object ... args) {
            CacheDrStateTransferInfo info = (CacheDrStateTransferInfo)e.getValue();
            if (info == null) {
                return null;
            }
            HashMap<Integer, UUID> parts = new HashMap<Integer, UUID>(info.partitions());
            if (this.nodeId.equals(parts.get(this.part))) {
                parts.put(this.part, null);
            }
            e.setValue(new CacheDrStateTransferInfo(info).partitions(parts));
            return null;
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            U.writeUuid(out, this.nodeId);
            out.writeInt(this.part);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.nodeId = U.readUuid(in);
            this.part = in.readInt();
        }

        public String toString() {
            return "ClearPartitionAssignmentClosure [part=" + this.part + ", nodeId=" + this.nodeId + ']';
        }
    }

    private static class ReservePartitionsClosure
    implements EntryProcessor<CacheDrStateTransferKey, CacheDrStateTransferInfo, Void>,
    Externalizable {
        private static final long serialVersionUID = 417469116208546150L;
        private UUID nodeId;
        private Collection<Integer> parts;

        public ReservePartitionsClosure() {
        }

        private ReservePartitionsClosure(UUID nodeId, Collection<Integer> parts) {
            this.nodeId = nodeId;
            this.parts = parts;
        }

        @Override
        public Void process(MutableEntry<CacheDrStateTransferKey, CacheDrStateTransferInfo> e, Object ... args) {
            CacheDrStateTransferInfo info = (CacheDrStateTransferInfo)e.getValue();
            if (info == null) {
                return null;
            }
            HashMap<Integer, UUID> partsMap = new HashMap<Integer, UUID>(info.partitions());
            for (Integer part : this.parts) {
                partsMap.put(part, this.nodeId);
            }
            e.setValue(new CacheDrStateTransferInfo(info).partitions(partsMap));
            return null;
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            U.writeUuid(out, this.nodeId);
            U.writeIntCollection(out, this.parts);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.nodeId = U.readUuid(in);
            this.parts = U.readIntCollection(in);
        }

        public String toString() {
            return "ReservePartitionsClosure [parts=" + this.parts + ", nodeId=" + this.nodeId + ']';
        }
    }

    private class BatchImpl
    implements Batch {
        private final CacheDrStateTransferKey key;
        private final boolean syncFst;
        private final Map<Byte, EntryBuffer> bufferMap = new HashMap<Byte, EntryBuffer>();
        private final List<Integer> handledParts = new ArrayList<Integer>();
        private final Collection<IgniteInternalFuture<CacheDrResultType>> futs = new ArrayList<IgniteInternalFuture<CacheDrResultType>>();
        private IgniteInternalFuture<CacheDrResultType> lastFut;
        private long handledPartEntryCnt;
        private int batchEntries;
        private int batchSizeBytes;

        BatchImpl(CacheDrStateTransferKey key, boolean syncFst) {
            this.key = key;
            this.syncFst = syncFst;
        }

        <K, V> void add(GridCacheRawVersionedEntry<K, V> entry) {
            this.writeToBuffer(entry);
        }

        private <K, V> void writeToBuffer(GridCacheRawVersionedEntry<K, V> entry) {
            EntryBuffer buffer = this.bufferMap.computeIfAbsent(entry.version().dataCenterId(), k -> new EntryBuffer(CacheDrStateTransferHandler.this.cctx));
            buffer.writeEntry(entry);
            this.batchSizeBytes += DrUtils.drEntrySize(entry);
            ++this.batchEntries;
        }

        void partitionHandled(int part, long entryCnt) {
            this.handledParts.add(part);
            this.handledPartEntryCnt += entryCnt;
        }

        void removeHandledPartitions(boolean sync) throws IgniteCheckedException {
            GridCompoundFuture partFinishFut = new GridCompoundFuture();
            if (this.lastFut != null) {
                partFinishFut.add(this.lastFut);
                this.lastFut = null;
            }
            for (IgniteInternalFuture<CacheDrResultType> fut : this.futs) {
                partFinishFut.add(fut);
            }
            ArrayList<Integer> handledParts0 = new ArrayList<Integer>(this.handledParts);
            long entryCnt0 = this.handledPartEntryCnt;
            CacheDrStateTransferKey key0 = this.key;
            this.futs.clear();
            this.handledParts.clear();
            this.handledPartEntryCnt = 0L;
            partFinishFut.listen(future -> {
                try {
                    future.get();
                    CacheDrStateTransferHandler.this.mgr.runAsync(new FlushStateTransferStateTask(key0, new RemoveHandledPartitionClosure(handledParts0, entryCnt0)));
                    if (CacheDrStateTransferHandler.this.log.isDebugEnabled()) {
                        CacheDrStateTransferHandler.this.log.debug("Handled partitions: cache=" + CacheDrStateTransferHandler.this.cctx.name() + ", parts=" + handledParts0 + ", entriesProcessed=" + entryCnt0);
                    }
                }
                catch (IgniteCheckedException e) {
                    U.error(CacheDrStateTransferHandler.this.log, "Failed to save DR state transfer state for transferred partitions: " + Arrays.toString(handledParts0.toArray()), e);
                }
            });
            partFinishFut.markInitialized();
            if (sync) {
                partFinishFut.get();
            }
        }

        @Override
        public void flush() {
            block3: {
                try {
                    this.replicateIfNeeded(true);
                }
                catch (Exception e) {
                    if (!CacheDrStateTransferHandler.this.cctx.topology().stopping() && !CacheDrStateTransferHandler.this.kernalCtx().isStopping()) {
                        U.error(CacheDrStateTransferHandler.this.log, "Failed to flush full state transfer batch.", e);
                    }
                    if (!CacheDrStateTransferHandler.this.log.isTraceEnabled()) break block3;
                    CacheDrStateTransferHandler.this.log.trace("State transfer task has been stopped for cache: name=" + CacheDrStateTransferHandler.this.cctx.name() + ", cause=" + e.getMessage());
                }
            }
        }

        void replicateIfNeeded(boolean force) throws IgniteCheckedException {
            if (this.batchOverflowed() || force && this.batchEntries > 0) {
                IgniteInternalFuture<CacheDrResultType> fut = CacheDrStateTransferHandler.this.mgr.fullStateTransferReplicate(this.key.dataCenterIds(), this.bufferMap, x$0 -> CacheDrStateTransferHandler.this.acquirePermit(x$0), this.syncFst, this.syncFst ? this.key.id() : null);
                this.futs.add(fut);
                this.bufferMap.clear();
                this.batchEntries = 0;
                this.batchSizeBytes = 0;
                if (!this.handledParts.isEmpty()) {
                    this.removeHandledPartitions(force);
                    this.lastFut = fut;
                }
                long stateTransferThrottle = CacheDrStateTransferHandler.this.getStateTransferThrottle();
                if (!CacheDrStateTransferHandler.this.throttleBytes && stateTransferThrottle > 0L) {
                    U.sleep(stateTransferThrottle);
                }
            } else if (force && !this.handledParts.isEmpty()) {
                this.removeHandledPartitions(force);
            }
        }

        private boolean batchOverflowed() {
            return CacheDrStateTransferHandler.this.maxSizeBytes > 0 && this.batchSizeBytes >= CacheDrStateTransferHandler.this.maxSizeBytes || CacheDrStateTransferHandler.this.maxSize > 0 && this.batchEntries >= CacheDrStateTransferHandler.this.maxSize;
        }
    }

    private class CacheStateTransferTask
    implements StateTransferTask<BatchImpl> {
        private final CacheDrStateTransferKey key;
        private final Queue<Integer> partQ;
        private final BitSet partsQueued = new BitSet();
        private final boolean syncFst;
        private volatile boolean cancelled;

        CacheStateTransferTask(CacheDrStateTransferKey key, Collection<Integer> partitions, boolean syncFst) {
            this.key = key;
            this.syncFst = syncFst;
            this.partQ = new LinkedList<Integer>(partitions);
            for (Integer part : partitions) {
                this.partsQueued.set(part);
            }
        }

        @Override
        @Nullable
        public synchronized StateTransferJob<BatchImpl> nextJob() {
            Integer part = this.partQ.poll();
            return part == null ? null : new CacheStateTransferJob(part, this.partQ.isEmpty());
        }

        @Override
        public BatchImpl createBatch() {
            return new BatchImpl(this.key, this.syncFst);
        }

        synchronized boolean scheduleMorePartitions(Collection<Integer> newParts) {
            if (F.isEmpty(newParts)) {
                return true;
            }
            boolean isTaskActive = !this.partQ.isEmpty();
            for (Integer part : newParts) {
                if (this.partsQueued.get(part)) continue;
                this.partsQueued.set(part);
                this.partQ.add(part);
            }
            return isTaskActive || this.partQ.isEmpty();
        }

        @Override
        public void cancel() {
            this.cancelled = true;
        }

        @Override
        public boolean isCancelled() {
            return this.cancelled;
        }

        private class CacheStateTransferJob
        implements StateTransferJob<BatchImpl> {
            private final int part;
            private final boolean last;

            CacheStateTransferJob(int part, boolean last) {
                this.part = part;
                this.last = last;
            }

            @Override
            public boolean isCancelled() {
                return CacheStateTransferTask.this.isCancelled();
            }

            @Override
            public int partition() {
                return this.part;
            }

            @Override
            public void runWithBatch(BatchImpl batch) {
                block3: {
                    try {
                        CacheDrStateTransferHandler.this.scanPartition(this, batch, CacheDrStateTransferHandler.this.cctx);
                    }
                    catch (Exception e) {
                        if (!CacheDrStateTransferHandler.this.cctx.topology().stopping() && !CacheDrStateTransferHandler.this.cctx.kernalContext().isStopping()) {
                            U.error(CacheDrStateTransferHandler.this.log, "Failed to run state transfer task.", e);
                        }
                        if (!CacheDrStateTransferHandler.this.log.isTraceEnabled()) break block3;
                        CacheDrStateTransferHandler.this.log.trace("State transfer task has been stopped for cache: name=" + CacheDrStateTransferHandler.this.cctx.name() + ", cause=" + e.getMessage());
                    }
                }
            }
        }
    }

    private static class StateTransferFuture
    extends GridFutureAdapter<Void> {
        private CacheDrStateTransferKey key;

        private StateTransferFuture(CacheDrStateTransferKey key) {
            this.key = key;
        }

        CacheDrStateTransferKey key() {
            return this.key;
        }

        @Override
        public String toString() {
            return S.toString(StateTransferFuture.class, this);
        }
    }
}

