/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.distributed.dht;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import javax.cache.Cache;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.EntryGetWithTtlResult;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheClearAllRunnable;
import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCachePreloader;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.ReaderArguments;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedConcurrentMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtGetSingleFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.SpanType;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CI3;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public abstract class GridDhtCacheAdapter<K, V>
extends GridDistributedCacheAdapter<K, V> {
    private static final long serialVersionUID = 0L;
    private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = GridConcurrentFactory.newMap();
    private volatile boolean stopping;
    private final GridLocalEventListener discoLsnr = new GridLocalEventListener(){

        @Override
        public void onEvent(Event evt) {
            DiscoveryEvent e = (DiscoveryEvent)evt;
            ClusterNode loc = GridDhtCacheAdapter.this.ctx.localNode();
            assert (e.type() == 11 || e.type() == 12) : e;
            ClusterNode n = e.eventNode();
            assert (!loc.id().equals(n.id()));
            for (GridDhtForceKeysFuture f : GridDhtCacheAdapter.this.forceKeyFuts.values()) {
                f.onDiscoveryEvent(e);
            }
        }
    };

    protected GridDhtCacheAdapter() {
    }

    public boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) {
        this.forceKeyFuts.put(fut.futureId(), fut);
        if (this.stopping) {
            fut.onDone(this.stopError());
            return false;
        }
        return true;
    }

    public void removeFuture(GridDhtForceKeysFuture<?, ?> fut) {
        this.forceKeyFuts.remove(fut.futureId(), fut);
    }

    protected final void processForceKeyResponse(ClusterNode node, GridDhtForceKeysResponse msg) {
        GridDhtForceKeysFuture f = (GridDhtForceKeysFuture)this.forceKeyFuts.get(msg.futureId());
        if (f != null) {
            f.onResult(msg);
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Receive force key response for unknown future (is it duplicate?) [nodeId=" + node.id() + ", res=" + msg + ']');
        }
    }

    protected final void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) {
        IgniteInternalFuture<?> fut = this.ctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion());
        if (fut.isDone()) {
            this.processForceKeysRequest0(node, msg);
        } else {
            fut.listen(new CI1<IgniteInternalFuture<?>>(){

                @Override
                public void apply(IgniteInternalFuture<?> t) {
                    GridDhtCacheAdapter.this.processForceKeysRequest0(node, msg);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processForceKeysRequest0(ClusterNode node, GridDhtForceKeysRequest msg) {
        try {
            ClusterNode loc = this.ctx.localNode();
            GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(this.ctx.cacheId(), msg.futureId(), msg.miniId(), this.ctx.deploymentEnabled());
            GridDhtPartitionTopology top = this.ctx.topology();
            block12: for (KeyCacheObject k : msg.keys()) {
                int p = this.ctx.affinity().partition(k);
                GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false);
                if (locPart == null && !top.owners(p).contains(loc)) {
                    res.addMissed(k);
                    continue;
                }
                while (true) {
                    this.ctx.shared().database().checkpointReadLock();
                    try {
                        GridCacheEntryEx entry = this.ctx.dht().entryEx(k);
                        entry.unswap();
                        if (this.ctx.mvccEnabled()) {
                            List<GridCacheEntryInfo> infos = entry.allVersionsInfo();
                            if (infos == null) {
                                assert (entry.obsolete()) : entry;
                                continue;
                            }
                            for (int i = 0; i < infos.size(); ++i) {
                                res.addInfo(infos.get(i));
                            }
                        } else {
                            GridCacheEntryInfo info = entry.info();
                            if (info == null) {
                                assert (entry.obsolete()) : entry;
                                continue;
                            }
                            if (!info.isNew()) {
                                res.addInfo(info);
                            }
                        }
                        entry.touch();
                        continue block12;
                    }
                    catch (GridCacheEntryRemovedException ignore) {
                        if (!this.log.isDebugEnabled()) continue;
                        this.log.debug("Got removed entry: " + k);
                        continue;
                    }
                    catch (GridDhtInvalidPartitionException ignore) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Local node is no longer an owner: " + p);
                        }
                        res.addMissed(k);
                        continue block12;
                    }
                    finally {
                        this.ctx.shared().database().checkpointReadUnlock();
                        continue;
                    }
                    break;
                }
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Sending force key response [node=" + node.id() + ", res=" + res + ']');
            }
            this.ctx.io().send(node, (GridCacheMessage)res, this.ctx.ioPolicy());
        }
        catch (ClusterTopologyCheckedException ignore) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Received force key request form failed node (will ignore) [nodeId=" + node.id() + ", req=" + msg + ']');
            }
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to reply to force key request [nodeId=" + node.id() + ", req=" + msg + ']', e);
        }
    }

    public void dumpDebugInfo() {
        if (!this.forceKeyFuts.isEmpty()) {
            U.warn(this.log, "Pending force key futures [cache=" + this.ctx.name() + "]:");
            for (GridDhtForceKeysFuture fut : this.forceKeyFuts.values()) {
                U.warn(this.log, ">>> " + fut);
            }
        }
    }

    @Override
    public void onKernalStop() {
        super.onKernalStop();
        this.stopping = true;
        IgniteCheckedException err = this.stopError();
        for (GridDhtForceKeysFuture fut : this.forceKeyFuts.values()) {
            fut.onDone(err);
        }
        this.ctx.gridEvents().removeLocalEventListener(this.discoLsnr, new int[0]);
    }

    private IgniteCheckedException stopError() {
        return new NodeStoppingException("Operation has been cancelled (cache or node is stopping).");
    }

    protected final void processNearGetResponse(UUID nodeId, GridNearGetResponse res) {
        try (MTC.TraceSurroundings ignored = MTC.support(this.ctx.kernalContext().tracing().create(SpanType.CACHE_API_NEAR_PROCESS_ATOMIC_GET_RESPONSE, MTC.span()));){
            CacheGetFuture fut;
            if (this.log.isDebugEnabled()) {
                this.log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']');
            }
            if ((fut = (CacheGetFuture)((Object)this.ctx.mvcc().future(res.futureId()))) == null) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
                }
                return;
            }
            fut.onResult(nodeId, res);
        }
    }

    protected void processNearSingleGetResponse(UUID nodeId, GridNearSingleGetResponse res) {
        try (MTC.TraceSurroundings ignored = MTC.support(this.ctx.kernalContext().tracing().create(SpanType.CACHE_API_NEAR_PROCESS_ATOMIC_SINGLE_GET_RESPONSE, MTC.span()));){
            GridPartitionedSingleGetFuture fut;
            if (this.log.isDebugEnabled()) {
                this.log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']');
            }
            if ((fut = (GridPartitionedSingleGetFuture)this.ctx.mvcc().future(new IgniteUuid(IgniteUuid.VM_ID, res.futureId()))) == null) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
                }
                return;
            }
            fut.onResult(nodeId, res);
        }
    }

    protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
        this(ctx, new GridCachePartitionedConcurrentMap(ctx.group()));
    }

    protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) {
        super(ctx, map);
    }

    @Override
    public void onKernalStart() throws IgniteCheckedException {
        super.onKernalStart();
        assert (!this.ctx.isRecoveryMode()) : "Registering message handlers in recovery mode [cacheName=" + this.name() + ']';
        this.ctx.io().addCacheHandler(this.ctx.cacheId(), this.ctx.startTopologyVersion(), GridCacheTtlUpdateRequest.class, this::processTtlUpdateRequest);
        this.ctx.gridEvents().addLocalEventListener(this.discoLsnr, 11, 12);
    }

    @Override
    public void printMemoryStats() {
        super.printMemoryStats();
        this.ctx.group().topology().printMemoryStats(1024);
    }

    public abstract GridNearCacheAdapter<K, V> near();

    public GridDhtPartitionTopology topology() {
        return this.ctx.group().topology();
    }

    @Override
    public GridCachePreloader preloader() {
        return this.ctx.group().preloader();
    }

    @Nullable
    public GridDhtCacheEntry peekExx(KeyCacheObject key) {
        return (GridDhtCacheEntry)this.peekEx(key);
    }

    @Override
    public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException {
        return super.entryEx(key, topVer);
    }

    public GridDhtCacheEntry entryExx(KeyCacheObject key) throws GridDhtInvalidPartitionException {
        return (GridDhtCacheEntry)this.entryEx(key);
    }

    public GridDhtCacheEntry entryExx(KeyCacheObject key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException {
        return (GridDhtCacheEntry)this.entryEx(key, topVer);
    }

    protected GridDistributedCacheEntry createEntry(KeyCacheObject key) {
        return new GridDhtDetachedCacheEntry(this.ctx, key);
    }

    @Override
    public void localLoad(Collection<? extends K> keys, ExpiryPolicy plc, boolean keepBinary) throws IgniteCheckedException {
        if (this.ctx.store().isLocal()) {
            super.localLoad(keys, plc, keepBinary);
            return;
        }
        final AffinityTopologyVersion topVer = this.ctx.affinity().affinityTopologyVersion();
        final GridCacheVersion ver0 = this.ctx.shared().versions().nextForLoad(topVer.topologyVersion());
        final boolean replicate = this.ctx.isDrEnabled();
        final ExpiryPolicy plc0 = plc != null ? plc : this.ctx.expiry();
        Collection<KeyCacheObject> keys0 = this.ctx.cacheKeysView(keys);
        this.ctx.store().loadAll(null, keys0, (IgniteBiInClosure<KeyCacheObject, Object>)new CI2<KeyCacheObject, Object>(){

            @Override
            public void apply(KeyCacheObject key, Object val) {
                GridDhtCacheAdapter.this.loadEntry(key, val, ver0, null, topVer, replicate, plc0);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void localLoadCache(final IgniteBiPredicate<K, V> p, Object[] args) throws IgniteCheckedException {
        ExpiryPolicy plc;
        if (this.ctx.store().isLocal()) {
            super.localLoadCache(p, args);
            return;
        }
        MvccUtils.verifyMvccOperationSupport(this.ctx, "Load");
        final AffinityTopologyVersion topVer = this.ctx.affinity().affinityTopologyVersion();
        final GridCacheVersion ver0 = this.ctx.shared().versions().nextForLoad(topVer.topologyVersion());
        final boolean replicate = this.ctx.isDrEnabled();
        CacheOperationContext opCtx = this.ctx.operationContextPerCall();
        ExpiryPolicy plc0 = opCtx != null ? opCtx.expiry() : null;
        ExpiryPolicy expiryPolicy = plc = plc0 != null ? plc0 : this.ctx.expiry();
        if (p != null) {
            this.ctx.kernalContext().resource().injectGeneric(p);
        }
        try {
            this.ctx.store().loadCache((GridInClosure3<KeyCacheObject, Object, GridCacheVersion>)new CI3<KeyCacheObject, Object, GridCacheVersion>(){

                @Override
                public void apply(KeyCacheObject key, Object val, @Nullable GridCacheVersion ver) {
                    assert (ver == null);
                    GridDhtCacheAdapter.this.loadEntry(key, val, ver0, p, topVer, replicate, plc);
                }
            }, args);
        }
        finally {
            if (p instanceof PlatformCacheEntryFilter) {
                ((PlatformCacheEntryFilter)p).onClose();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void loadEntry(KeyCacheObject key, Object val, GridCacheVersion ver, @Nullable IgniteBiPredicate<K, V> p, AffinityTopologyVersion topVer, boolean replicate, @Nullable ExpiryPolicy plc) {
        block15: {
            if (p != null && !p.apply(key.value(this.ctx.cacheObjectContext(), false), val)) {
                return;
            }
            try {
                GridDhtLocalPartition part = this.ctx.group().topology().localPartition(this.ctx.affinity().partition(key), AffinityTopologyVersion.NONE, true);
                if (part.reserve()) {
                    GridCacheEntryEx entry = null;
                    this.ctx.shared().database().checkpointReadLock();
                    try {
                        long ttl = CU.ttlForLoad(plc);
                        if (ttl == -2L) {
                            return;
                        }
                        CacheObject cacheVal = this.ctx.toCacheObject(val);
                        entry = this.entryEx(key);
                        entry.initialValue(cacheVal, ver, ttl, -1L, false, topVer, replicate ? GridDrType.DR_LOAD : GridDrType.DR_NONE, true, false);
                        break block15;
                    }
                    catch (IgniteCheckedException e) {
                        throw new IgniteException("Failed to put cache value: " + entry, e);
                    }
                    catch (GridCacheEntryRemovedException ignore) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Got removed entry during loadCache (will ignore): " + entry);
                        }
                        break block15;
                    }
                    finally {
                        if (entry != null) {
                            entry.touch();
                        }
                        part.release();
                        this.ctx.shared().database().checkpointReadUnlock();
                    }
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Will node load entry into cache (partition is invalid): " + part);
                }
            }
            catch (GridDhtInvalidPartitionException e) {
                if (!this.log.isDebugEnabled()) break block15;
                this.log.debug(S.toString("Ignoring entry for partition that does not belong", "key", (Object)key, true, "val", val, true, "err", (Object)e, false));
            }
        }
    }

    @Override
    public int size() {
        return (int)this.sizeLong();
    }

    @Override
    public long sizeLong() {
        long sum = 0L;
        for (GridDhtLocalPartition p : this.topology().currentLocalPartitions()) {
            sum += p.dataStore().cacheSize(this.ctx.cacheId());
        }
        return sum;
    }

    @Override
    public int primarySize() {
        return (int)this.primarySizeLong();
    }

    @Override
    public long primarySizeLong() {
        long sum = 0L;
        AffinityTopologyVersion topVer = this.ctx.affinity().affinityTopologyVersion();
        for (GridDhtLocalPartition p : this.topology().currentLocalPartitions()) {
            if (!p.primary(topVer)) continue;
            sum += p.dataStore().cacheSize(this.ctx.cacheId());
        }
        return sum;
    }

    IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> getDhtAllAsync(Collection<KeyCacheObject> keys, @Nullable ReaderArguments readerArgs, boolean readThrough, @Nullable UUID subjId, String taskName, @Nullable IgniteCacheExpiryPolicy expiry, boolean skipVals, boolean recovery, @Nullable String txLbl, MvccSnapshot mvccSnapshot, boolean touchTtl) {
        return this.getAllAsync0(keys, readerArgs, readThrough, subjId, taskName, false, expiry, skipVals, true, recovery, true, txLbl, mvccSnapshot, touchTtl);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable Collection<KeyCacheObject> keys, final @Nullable ReaderArguments readerArgs, boolean readThrough, @Nullable UUID subjId, String taskName, final boolean deserializeBinary, final @Nullable IgniteCacheExpiryPolicy expiry, final boolean skipVals, final boolean keepCacheObjects, boolean recovery, final boolean needVer, @Nullable String txLbl, MvccSnapshot mvccSnapshot, boolean touchTtl) {
        Object object;
        if (F.isEmpty(keys)) {
            return new GridFinishedFuture<Map<K1, V1>>(Collections.emptyMap());
        }
        assert (mvccSnapshot == null == !this.ctx.mvccEnabled());
        HashMap<KeyCacheObject, EntryGetResult> misses = null;
        HashSet<GridCacheEntryEx> newLocalEntries = null;
        this.ctx.shared().database().checkpointReadLock();
        try {
            boolean storeEnabled;
            int keysSize = keys.size();
            final Map<Object, Object> map = keysSize == 1 ? new IgniteBiTuple() : U.newHashMap(keysSize);
            boolean bl = storeEnabled = !skipVals && readThrough && this.ctx.readThrough();
            boolean readNoEntry = !touchTtl && this.ctx.readNoEntry(expiry, readerArgs != null);
            block11: for (KeyCacheObject key : keys) {
                while (true) {
                    try {
                        EntryGetResult res = null;
                        boolean evt = !skipVals;
                        boolean updateMetrics = !skipVals;
                        GridCacheEntryEx entry = null;
                        boolean skipEntry = readNoEntry;
                        if (readNoEntry) {
                            CacheDataRow row;
                            if (mvccSnapshot != null) {
                                row = this.ctx.offheap().mvccRead(this.ctx, key, mvccSnapshot);
                            } else {
                                CacheDataRow cacheDataRow = row = skipVals ? this.ctx.offheap().find(this.ctx, key) : this.ctx.offheap().read(this.ctx, key);
                            }
                            if (row != null) {
                                long expireTime = row.expireTime();
                                if (expireTime != 0L) {
                                    if (expireTime > U.currentTimeMillis()) {
                                        res = new EntryGetWithTtlResult(row.value(), row.version(), false, expireTime, 0L);
                                    } else if (!skipVals) {
                                        skipEntry = false;
                                    }
                                } else {
                                    res = new EntryGetResult(row.value(), row.version(), false);
                                }
                            }
                            if (res != null) {
                                if (evt) {
                                    this.ctx.events().readEvent(key, null, txLbl, row.value(), subjId, taskName, !deserializeBinary);
                                }
                                if (updateMetrics && this.ctx.statisticsEnabled()) {
                                    this.ctx.cache().metrics0().onRead(true);
                                }
                            } else if (storeEnabled) {
                                skipEntry = false;
                            }
                        }
                        if (!skipEntry) {
                            boolean isNewLocalEntry = this.map.getEntry(this.ctx, key) == null;
                            entry = this.entryEx(key);
                            if (entry == null) {
                                if (skipVals || !this.ctx.statisticsEnabled()) continue block11;
                                this.ctx.cache().metrics0().onRead(false);
                                continue block11;
                            }
                            if (isNewLocalEntry) {
                                if (newLocalEntries == null) {
                                    newLocalEntries = new HashSet<GridCacheEntryEx>();
                                }
                                newLocalEntries.add(entry);
                            }
                            if (storeEnabled) {
                                res = entry.innerGetAndReserveForLoad(updateMetrics, evt, subjId, taskName, expiry, !deserializeBinary, readerArgs);
                                assert (res != null);
                                if (res.value() == null) {
                                    if (misses == null) {
                                        misses = new HashMap<KeyCacheObject, EntryGetResult>();
                                    }
                                    misses.put(key, res);
                                    res = null;
                                }
                            } else if (touchTtl) {
                                res = entry.touchTtlVersioned(expiry);
                                if (res == null) {
                                    entry.touch();
                                }
                            } else {
                                res = entry.innerGetVersioned(null, null, updateMetrics, evt, subjId, null, taskName, expiry, !deserializeBinary, readerArgs);
                                if (res == null) {
                                    entry.touch();
                                }
                            }
                        }
                        if (res == null) continue block11;
                        this.ctx.addResult(map, key, res, skipVals, keepCacheObjects, deserializeBinary, true, needVer);
                        if (entry != null) {
                            entry.touch();
                        }
                        if (keysSize != 1) continue block11;
                        GridFinishedFuture<Map<K1, V1>> gridFinishedFuture = new GridFinishedFuture<Map<K1, V1>>(map);
                        return gridFinishedFuture;
                    }
                    catch (GridCacheEntryRemovedException ignored) {
                        if (!this.log.isDebugEnabled()) continue;
                        this.log.debug("Got removed entry in getAllAsync(..) method (will retry): " + key);
                        continue;
                    }
                    break;
                }
            }
            if (storeEnabled && misses != null) {
                final HashMap<KeyCacheObject, EntryGetResult> loadKeys = misses;
                final HashSet loaded = new HashSet();
                GridEmbeddedFuture gridEmbeddedFuture = new GridEmbeddedFuture(this.ctx.closures().callLocalSafe(this.ctx.projectSafe(new GPC<Map<K1, V1>>(){

                    @Override
                    public Map<K1, V1> call() throws Exception {
                        GridDhtCacheAdapter.this.ctx.store().loadAll(null, loadKeys.keySet(), (IgniteBiInClosure<KeyCacheObject, Object>)new CI2<KeyCacheObject, Object>(){

                            @Override
                            public void apply(KeyCacheObject key, Object val) {
                                EntryGetResult res = (EntryGetResult)loadKeys.get(key);
                                if (res == null || val == null) {
                                    return;
                                }
                                loaded.add(key);
                                CacheObject cacheVal = GridDhtCacheAdapter.this.ctx.toCacheObject(val);
                                while (true) {
                                    GridCacheEntryEx entry = null;
                                    try {
                                        GridDhtCacheAdapter.this.ctx.shared().database().ensureFreeSpace(GridDhtCacheAdapter.this.ctx.dataRegion());
                                    }
                                    catch (IgniteCheckedException e) {
                                        throw new GridClosureException(e);
                                    }
                                    GridDhtCacheAdapter.this.ctx.shared().database().checkpointReadLock();
                                    try {
                                        entry = GridDhtCacheAdapter.this.entryEx(key);
                                        entry.unswap();
                                        GridCacheVersion newVer = GridDhtCacheAdapter.this.nextVersion();
                                        EntryGetResult verVal = entry.versionedValue(cacheVal, res.version(), newVer, expiry, readerArgs);
                                        if (GridDhtCacheAdapter.this.log.isDebugEnabled()) {
                                            GridDhtCacheAdapter.this.log.debug("Set value loaded from store into entry [oldVer=" + res.version() + ", newVer=" + verVal.version() + ", entry=" + entry + ']');
                                        }
                                        if (verVal.value() != null) {
                                            GridDhtCacheAdapter.this.ctx.addResult(map, key, verVal, skipVals, keepCacheObjects, deserializeBinary, true, needVer);
                                        } else {
                                            GridDhtCacheAdapter.this.ctx.addResult(map, key, new EntryGetResult(cacheVal, res.version()), skipVals, keepCacheObjects, deserializeBinary, false, needVer);
                                        }
                                        entry.touch();
                                    }
                                    catch (GridCacheEntryRemovedException ignore) {
                                        if (!GridDhtCacheAdapter.this.log.isDebugEnabled()) continue;
                                        GridDhtCacheAdapter.this.log.debug("Got removed entry during getAllAsync (will retry): " + entry);
                                        continue;
                                    }
                                    catch (IgniteCheckedException e) {
                                        throw new GridClosureException(e);
                                    }
                                    finally {
                                        GridDhtCacheAdapter.this.ctx.shared().database().checkpointReadUnlock();
                                        continue;
                                    }
                                    break;
                                }
                            }
                        });
                        GridDhtCacheAdapter.this.clearReservationsIfNeeded(loadKeys, loaded, null);
                        return map;
                    }
                }), true), new C2<Map<K, V>, Exception, IgniteInternalFuture<Map<K, V>>>(){

                    @Override
                    public IgniteInternalFuture<Map<K, V>> apply(Map<K, V> map, Exception e) {
                        if (e != null) {
                            GridDhtCacheAdapter.this.clearReservationsIfNeeded(loadKeys, loaded, null);
                            return new GridFinishedFuture(e);
                        }
                        HashSet notFound = new HashSet(loadKeys.keySet());
                        notFound.removeAll(loaded);
                        for (KeyCacheObject key : notFound) {
                            GridCacheEntryEx entry = GridDhtCacheAdapter.this.peekEx(key);
                            if (entry == null) continue;
                            entry.touch();
                        }
                        return new GridFinishedFuture(Collections.emptyMap());
                    }
                }, new C2<Map<K1, V1>, Exception, Map<K1, V1>>(){

                    @Override
                    public Map<K1, V1> apply(Map<K1, V1> loaded, Exception e) {
                        if (e == null) {
                            map.putAll(loaded);
                        }
                        return map;
                    }
                });
                return gridEmbeddedFuture;
            }
            assert (misses == null);
            GridFinishedFuture gridFinishedFuture = new GridFinishedFuture(map);
            return gridFinishedFuture;
        }
        catch (AssertionError | RuntimeException e) {
            if (misses != null) {
                for (KeyCacheObject key0 : misses.keySet()) {
                    GridCacheEntryEx entry = this.peekEx(key0);
                    if (entry == null) continue;
                    entry.touch();
                }
            }
            if (newLocalEntries != null) {
                for (GridCacheEntryEx entry : newLocalEntries) {
                    this.removeEntry(entry);
                }
            }
            object = new GridFinishedFuture((Throwable)e);
            return object;
        }
        catch (IgniteCheckedException e) {
            object = new GridFinishedFuture(e);
            return object;
        }
        finally {
            this.ctx.shared().database().checkpointReadUnlock();
        }
    }

    public GridDhtFuture<Collection<GridCacheEntryInfo>> getDhtAsync(UUID reader, long msgId, Map<KeyCacheObject, Boolean> keys, boolean addReaders, boolean readThrough, AffinityTopologyVersion topVer, @Nullable UUID subjId, int taskNameHash, @Nullable IgniteCacheExpiryPolicy expiry, boolean skipVals, boolean recovery, @Nullable String txLbl, MvccSnapshot mvccSnapshot) {
        GridDhtGetFuture fut = new GridDhtGetFuture(this.ctx, msgId, reader, keys, readThrough, topVer, subjId, taskNameHash, expiry, skipVals, recovery, addReaders, txLbl, mvccSnapshot);
        fut.init();
        return fut;
    }

    GridDhtGetSingleFuture getDhtSingleAsync(UUID nodeId, long msgId, KeyCacheObject key, boolean addRdr, boolean readThrough, AffinityTopologyVersion topVer, @Nullable UUID subjId, int taskNameHash, @Nullable IgniteCacheExpiryPolicy expiry, boolean skipVals, boolean recovery, String txLbl, MvccSnapshot mvccSnapshot, boolean touchTtl) {
        GridDhtGetSingleFuture fut = new GridDhtGetSingleFuture(this.ctx, msgId, nodeId, key, addRdr, readThrough, topVer, subjId, taskNameHash, expiry, skipVals, recovery, txLbl, mvccSnapshot, touchTtl);
        fut.init();
        return fut;
    }

    protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSingleGetRequest req) {
        try (MTC.TraceSurroundings ignored = MTC.support(this.ctx.kernalContext().tracing().create(SpanType.CACHE_API_NEAR_PROCESS_ATOMIC_SINGLE_GET_REQUEST, MTC.span()));){
            assert (this.ctx.affinityNode());
            final GridCacheAdapter.CacheExpiryPolicy expiryPlc = GridCacheAdapter.CacheExpiryPolicy.fromRemote(req.createTtl(), req.accessTtl());
            GridDhtGetSingleFuture fut = this.getDhtSingleAsync(nodeId, req.messageId(), req.key(), req.addReader(), req.readThrough(), req.topologyVersion(), req.subjectId(), req.taskNameHash(), expiryPlc, req.skipValues(), req.recovery(), req.txLabel(), req.mvccSnapshot(), req.touchTtl());
            fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>(){

                @Override
                public void apply(IgniteInternalFuture<GridCacheEntryInfo> f) {
                    GridNearSingleGetResponse res;
                    GridDhtFuture fut = (GridDhtFuture)f;
                    try {
                        GridCacheEntryInfo info = (GridCacheEntryInfo)fut.get();
                        if (F.isEmpty(fut.invalidPartitions())) {
                            Message res0 = null;
                            if (info != null) {
                                if (req.needEntryInfo()) {
                                    info.key(null);
                                    res0 = info;
                                } else {
                                    res0 = req.needVersion() ? new CacheVersionedValue(info.value(), info.version()) : info.value();
                                }
                            }
                            res = new GridNearSingleGetResponse(GridDhtCacheAdapter.this.ctx.cacheId(), req.futureId(), null, res0, false, req.addDeploymentInfo());
                            if (info != null && req.skipValues()) {
                                res.setContainsValue();
                            }
                        } else {
                            AffinityTopologyVersion topVer = GridDhtCacheAdapter.this.ctx.shared().exchange().lastTopologyFuture().initialVersion();
                            res = new GridNearSingleGetResponse(GridDhtCacheAdapter.this.ctx.cacheId(), req.futureId(), topVer, null, true, req.addDeploymentInfo());
                        }
                    }
                    catch (NodeStoppingException ignored) {
                        return;
                    }
                    catch (IgniteCheckedException e) {
                        U.error(GridDhtCacheAdapter.this.log, "Failed processing get request: " + req, e);
                        res = new GridNearSingleGetResponse(GridDhtCacheAdapter.this.ctx.cacheId(), req.futureId(), req.topologyVersion(), null, false, req.addDeploymentInfo());
                        res.error(e);
                    }
                    try {
                        GridDhtCacheAdapter.this.ctx.io().send(nodeId, (GridCacheMessage)res, GridDhtCacheAdapter.this.ctx.ioPolicy());
                    }
                    catch (ClusterTopologyCheckedException e) {
                        if (GridDhtCacheAdapter.this.log.isDebugEnabled()) {
                            GridDhtCacheAdapter.this.log.debug("Failed to send get response to node, node failed: " + nodeId);
                        }
                    }
                    catch (IgniteCheckedException e) {
                        U.error(GridDhtCacheAdapter.this.log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + ",req=" + req + ", res=" + res + ']', e);
                    }
                    GridDhtCacheAdapter.this.sendTtlUpdateRequest(expiryPlc);
                }
            });
        }
    }

    protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) {
        try (MTC.TraceSurroundings ignored = MTC.support(this.ctx.kernalContext().tracing().create(SpanType.CACHE_API_NEAR_PROCESS_ATOMIC_GET_REQUEST, MTC.span()));){
            assert (this.ctx.affinityNode());
            final GridCacheAdapter.CacheExpiryPolicy expiryPlc = GridCacheAdapter.CacheExpiryPolicy.fromRemote(req.createTtl(), req.accessTtl());
            GridDhtFuture<Collection<GridCacheEntryInfo>> fut = this.getDhtAsync(nodeId, req.messageId(), req.keys(), req.addReaders(), req.readThrough(), req.topologyVersion(), req.subjectId(), req.taskNameHash(), expiryPlc, req.skipValues(), req.recovery(), req.txLabel(), req.mvccSnapshot());
            fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>(){

                @Override
                public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> f) {
                    GridNearGetResponse res = new GridNearGetResponse(GridDhtCacheAdapter.this.ctx.cacheId(), req.futureId(), req.miniId(), req.version(), req.deployInfo() != null);
                    GridDhtFuture fut = (GridDhtFuture)f;
                    try {
                        Collection entries = (Collection)fut.get();
                        res.entries(entries);
                    }
                    catch (NodeStoppingException ignored) {
                        return;
                    }
                    catch (IgniteCheckedException e) {
                        U.error(GridDhtCacheAdapter.this.log, "Failed processing get request: " + req, e);
                        res.error(e);
                    }
                    if (!F.isEmpty(fut.invalidPartitions())) {
                        AffinityTopologyVersion topVer = GridDhtCacheAdapter.this.ctx.shared().exchange().lastTopologyFuture().initialVersion();
                        res.invalidPartitions(fut.invalidPartitions(), topVer);
                    }
                    try {
                        GridDhtCacheAdapter.this.ctx.io().send(nodeId, (GridCacheMessage)res, GridDhtCacheAdapter.this.ctx.ioPolicy());
                    }
                    catch (IgniteCheckedException e) {
                        U.error(GridDhtCacheAdapter.this.log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + ",req=" + req + ", res=" + res + ']', e);
                    }
                    GridDhtCacheAdapter.this.sendTtlUpdateRequest(expiryPlc);
                }
            });
        }
    }

    public void sendTtlUpdateRequest(final @Nullable IgniteCacheExpiryPolicy expiryPlc) {
        if (expiryPlc != null && !F.isEmpty(expiryPlc.entries())) {
            this.ctx.closures().runLocalSafe(new GridPlainRunnable(){

                @Override
                public void run() {
                    Map<KeyCacheObject, GridCacheVersion> entries = expiryPlc.entries();
                    assert (entries != null && !entries.isEmpty());
                    HashMap<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<ClusterNode, GridCacheTtlUpdateRequest>();
                    AffinityTopologyVersion topVer = GridDhtCacheAdapter.this.ctx.shared().exchange().readyAffinityVersion();
                    for (Map.Entry<KeyCacheObject, GridCacheVersion> e : entries.entrySet()) {
                        ClusterNode clusterNode = GridDhtCacheAdapter.this.ctx.affinity().primaryByKey(e.getKey(), topVer);
                        if (clusterNode.isLocal()) {
                            Collection<ClusterNode> nodes = GridDhtCacheAdapter.this.ctx.affinity().backupsByKey(e.getKey(), topVer);
                            for (ClusterNode node : nodes) {
                                GridCacheTtlUpdateRequest req = (GridCacheTtlUpdateRequest)reqMap.get(node);
                                if (req == null) {
                                    req = new GridCacheTtlUpdateRequest(GridDhtCacheAdapter.this.ctx.cacheId(), topVer, expiryPlc.forAccess());
                                    reqMap.put(node, req);
                                }
                                req.addEntry(e.getKey(), e.getValue());
                            }
                            continue;
                        }
                        GridCacheTtlUpdateRequest req = (GridCacheTtlUpdateRequest)reqMap.get(clusterNode);
                        if (req == null) {
                            req = new GridCacheTtlUpdateRequest(GridDhtCacheAdapter.this.ctx.cacheId(), topVer, expiryPlc.forAccess());
                            reqMap.put(clusterNode, req);
                        }
                        req.addEntry(e.getKey(), e.getValue());
                    }
                    Map<UUID, Collection<IgniteBiTuple<KeyCacheObject, GridCacheVersion>>> rdrs = expiryPlc.readers();
                    if (rdrs != null) {
                        assert (!rdrs.isEmpty());
                        for (Map.Entry<Object, Object> entry : rdrs.entrySet()) {
                            ClusterNode node = GridDhtCacheAdapter.this.ctx.node((UUID)entry.getKey());
                            if (node == null) continue;
                            GridCacheTtlUpdateRequest req = (GridCacheTtlUpdateRequest)reqMap.get(node);
                            if (req == null) {
                                req = new GridCacheTtlUpdateRequest(GridDhtCacheAdapter.this.ctx.cacheId(), topVer, expiryPlc.forAccess());
                                reqMap.put(node, req);
                            }
                            for (IgniteBiTuple t : (Collection)entry.getValue()) {
                                req.addNearEntry((KeyCacheObject)t.get1(), (GridCacheVersion)t.get2());
                            }
                        }
                    }
                    for (Map.Entry<Object, Object> entry : reqMap.entrySet()) {
                        try {
                            GridDhtCacheAdapter.this.ctx.io().send((ClusterNode)entry.getKey(), (GridCacheMessage)entry.getValue(), GridDhtCacheAdapter.this.ctx.ioPolicy());
                        }
                        catch (IgniteCheckedException e) {
                            if (e instanceof ClusterTopologyCheckedException) {
                                if (!GridDhtCacheAdapter.this.log.isDebugEnabled()) continue;
                                GridDhtCacheAdapter.this.log.debug("Failed to send TTL update request, node left: " + entry.getKey());
                                continue;
                            }
                            U.error(GridDhtCacheAdapter.this.log, "Failed to send TTL update request.", e);
                        }
                    }
                }
            });
        }
    }

    private void propagateTtlUpdateRequestFromPrimary(final UUID srcNodeId, final GridCacheTtlUpdateRequest incomingReq) {
        this.ctx.closures().runLocalSafe(new Runnable(){

            @Override
            public void run() {
                HashMap<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<ClusterNode, GridCacheTtlUpdateRequest>();
                for (int i = 0; i < incomingReq.keys().size(); ++i) {
                    KeyCacheObject key = incomingReq.keys().get(i);
                    if (!GridDhtCacheAdapter.this.ctx.affinity().primaryByKey(key, incomingReq.topologyVersion()).isLocal()) continue;
                    Collection<ClusterNode> nodes = GridDhtCacheAdapter.this.ctx.affinity().backupsByKey(key, incomingReq.topologyVersion());
                    for (ClusterNode node : nodes) {
                        if (node.id().equals(srcNodeId)) continue;
                        GridCacheTtlUpdateRequest req = (GridCacheTtlUpdateRequest)reqMap.get(node);
                        if (req == null) {
                            req = new GridCacheTtlUpdateRequest(GridDhtCacheAdapter.this.ctx.cacheId(), incomingReq.topologyVersion(), incomingReq.ttl());
                            reqMap.put(node, req);
                        }
                        req.addEntry(key, incomingReq.version(i));
                    }
                    GridDhtCacheEntry entry = GridDhtCacheAdapter.this.ctx.dht().entryExx(key, incomingReq.topologyVersion());
                    Collection<UUID> readers = null;
                    try {
                        readers = entry.readers();
                    }
                    catch (GridCacheEntryRemovedException e) {
                        U.error(GridDhtCacheAdapter.this.log, "Failed to send TTL update request.", e);
                    }
                    for (UUID reader : readers) {
                        ClusterNode node;
                        if (reader.equals(srcNodeId) || (node = GridDhtCacheAdapter.this.ctx.node(reader)) == null) continue;
                        GridCacheTtlUpdateRequest req = (GridCacheTtlUpdateRequest)reqMap.get(node);
                        if (req == null) {
                            req = new GridCacheTtlUpdateRequest(GridDhtCacheAdapter.this.ctx.cacheId(), incomingReq.topologyVersion(), incomingReq.ttl());
                            reqMap.put(node, req);
                        }
                        req.addNearEntry(key, incomingReq.version(i));
                    }
                }
                for (Map.Entry req : reqMap.entrySet()) {
                    try {
                        GridDhtCacheAdapter.this.ctx.io().send((ClusterNode)req.getKey(), (GridCacheMessage)req.getValue(), GridDhtCacheAdapter.this.ctx.ioPolicy());
                    }
                    catch (IgniteCheckedException e) {
                        if (e instanceof ClusterTopologyCheckedException) {
                            if (!GridDhtCacheAdapter.this.log.isDebugEnabled()) continue;
                            GridDhtCacheAdapter.this.log.debug("Failed to send TTL update request, node left: " + req.getKey());
                            continue;
                        }
                        U.error(GridDhtCacheAdapter.this.log, "Failed to send TTL update request.", e);
                    }
                }
            }
        });
    }

    private void processTtlUpdateRequest(UUID srcNodeId, GridCacheTtlUpdateRequest req) {
        if (req.keys() != null) {
            this.updateTtl(this, req.keys(), req.versions(), req.ttl());
        }
        if (req.nearKeys() != null) {
            GridNearCacheAdapter<K, V> near = this.near();
            assert (near != null);
            this.updateTtl(near, req.nearKeys(), req.nearVersions(), req.ttl());
        }
        this.propagateTtlUpdateRequestFromPrimary(srcNodeId, req);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateTtl(GridCacheAdapter<K, V> cache, List<KeyCacheObject> keys, List<GridCacheVersion> vers, long ttl) {
        assert (!F.isEmpty(keys));
        assert (keys.size() == vers.size());
        int size = keys.size();
        block10: for (int i = 0; i < size; ++i) {
            GridCacheEntryEx entry = null;
            try {
                while (true) {
                    this.ctx.shared().database().checkpointReadLock();
                    try {
                        entry = cache.entryEx(keys.get(i));
                        entry.updateTimeToLiveOnTtlUpdateRequest(vers.get(i), ttl);
                        continue block10;
                    }
                    catch (GridCacheEntryRemovedException ignore) {
                        if (!this.log.isDebugEnabled()) continue;
                        this.log.debug("Got removed entry: " + entry);
                        continue;
                    }
                    catch (GridDhtInvalidPartitionException e) {
                        if (!this.log.isDebugEnabled()) continue block10;
                        this.log.debug("Got GridDhtInvalidPartitionException: " + e);
                        continue block10;
                    }
                    finally {
                        this.ctx.shared().database().checkpointReadUnlock();
                        continue;
                    }
                    break;
                }
            }
            finally {
                if (entry != null) {
                    entry.touch();
                }
            }
        }
    }

    @Override
    public void unlockAll(Collection<? extends K> keys) {
        assert (false);
    }

    @Override
    public String toString() {
        return S.toString(GridDhtCacheAdapter.class, this, super.toString());
    }

    @Override
    public List<GridCacheClearAllRunnable<K, V>> splitClearLocally(boolean srv, boolean near, boolean readers) {
        return this.ctx.affinityNode() ? super.splitClearLocally(srv, near, readers) : Collections.emptyList();
    }

    protected final boolean needRemap(AffinityTopologyVersion mapVer, AffinityTopologyVersion curVer) {
        Collection<ClusterNode> cacheNodes1;
        if (curVer.equals(mapVer)) {
            return false;
        }
        AffinityTopologyVersion lastAffChangedTopVer = this.ctx.shared().exchange().lastAffinityChangedTopologyVersion(mapVer);
        if (curVer.isBetween(lastAffChangedTopVer, mapVer)) {
            return false;
        }
        Collection<ClusterNode> cacheNodes0 = this.ctx.discovery().cacheGroupAffinityNodes(this.ctx.groupId(), mapVer);
        if (!cacheNodes0.equals(cacheNodes1 = this.ctx.discovery().cacheGroupAffinityNodes(this.ctx.groupId(), curVer)) || this.ctx.affinity().affinityTopologyVersion().before(curVer)) {
            return true;
        }
        try {
            List<List<ClusterNode>> aff1 = this.ctx.affinity().assignments(mapVer);
            List<List<ClusterNode>> aff2 = this.ctx.affinity().assignments(curVer);
            return !aff1.equals(aff2);
        }
        catch (IllegalStateException ignored) {
            return true;
        }
    }

    public Iterator<Cache.Entry<K, V>> localEntriesIterator(boolean primary, boolean backup, boolean keepBinary) {
        return this.localEntriesIterator(primary, backup, keepBinary, this.ctx.affinity().affinityTopologyVersion());
    }

    private Iterator<Cache.Entry<K, V>> localEntriesIterator(boolean primary, boolean backup, boolean keepBinary, AffinityTopologyVersion topVer) {
        return this.iterator(this.localEntriesIteratorEx(primary, backup, topVer), !keepBinary);
    }

    private Iterator<? extends GridCacheEntryEx> localEntriesIteratorEx(final boolean primary, boolean backup, final AffinityTopologyVersion topVer) {
        assert (primary || backup);
        if (primary && backup) {
            return this.entries().iterator();
        }
        final Iterator<GridDhtLocalPartition> partIt = this.topology().currentLocalPartitions().iterator();
        return new Iterator<GridCacheMapEntry>(){
            private GridCacheMapEntry next;
            private Iterator<GridCacheMapEntry> curIt;
            {
                this.advance();
            }

            @Override
            public boolean hasNext() {
                return this.next != null;
            }

            @Override
            public GridCacheMapEntry next() {
                if (this.next == null) {
                    throw new NoSuchElementException();
                }
                GridCacheMapEntry e = this.next;
                this.advance();
                return e;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }

            private void advance() {
                this.next = null;
                do {
                    if (this.curIt == null) {
                        while (partIt.hasNext()) {
                            GridDhtLocalPartition part = (GridDhtLocalPartition)partIt.next();
                            if (primary != part.primary(topVer)) continue;
                            this.curIt = part.entries(GridDhtCacheAdapter.this.ctx.cacheId(), new CacheEntryPredicate[0]).iterator();
                            break;
                        }
                    }
                    if (this.curIt == null) continue;
                    if (this.curIt.hasNext()) {
                        this.next = this.curIt.next();
                        break;
                    }
                    this.curIt = null;
                } while (partIt.hasNext());
            }
        };
    }

    protected abstract class MessageHandler<M>
    implements IgniteBiInClosure<UUID, M> {
        private static final long serialVersionUID = 0L;

        protected MessageHandler() {
        }

        @Override
        public void apply(UUID nodeId, M msg) {
            ClusterNode node = GridDhtCacheAdapter.this.ctx.node(nodeId);
            if (node == null) {
                if (GridDhtCacheAdapter.this.log.isDebugEnabled()) {
                    GridDhtCacheAdapter.this.log.debug("Received message from failed node [node=" + nodeId + ", msg=" + msg + ']');
                }
                return;
            }
            if (GridDhtCacheAdapter.this.log.isDebugEnabled()) {
                GridDhtCacheAdapter.this.log.debug("Received message from node [node=" + nodeId + ", msg=" + msg + ']');
            }
            this.onMessage(node, msg);
        }

        protected abstract void onMessage(ClusterNode var1, M var2);
    }

    private static class MultiUpdateFuture
    extends GridFutureAdapter<IgniteUuid> {
        private AffinityTopologyVersion topVer;

        private MultiUpdateFuture(@NotNull AffinityTopologyVersion topVer) {
            this.topVer = topVer;
        }

        private AffinityTopologyVersion topologyVersion() {
            return this.topVer;
        }
    }
}

