/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.dr;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.management.JMException;
import javax.management.ObjectName;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheStoppedException;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.dr.GridCacheReplicationManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainInClosure;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.CachePluginConfiguration;
import org.apache.ignite.plugin.CachePluginContext;
import org.gridgain.grid.cache.dr.CacheDrEntryFilter;
import org.gridgain.grid.cache.dr.CacheDrMBean;
import org.gridgain.grid.cache.dr.CacheDrPauseReason;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.cache.dr.CacheDrStateTransfer;
import org.gridgain.grid.cache.dr.CacheDrStatus;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainCacheConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.dr.DrSenderLoadBalancingMode;
import org.gridgain.grid.events.DrCacheReplicationEvent;
import org.gridgain.grid.internal.GridPluginUtils;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrEntryImpl;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrHandler;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrMBeanAdapter;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrManager;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrMetrics;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrPauseInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrPauseKey;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrResultType;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrSenderHubStopInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrSenderHubStopKey;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrSenderMetricsAdapter;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferHandler;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferKey;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferResultInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferResultKey;
import org.gridgain.grid.internal.processors.cache.dr.DrSenderGroupNodeStopKey;
import org.gridgain.grid.internal.processors.cache.dr.EntryBuffer;
import org.gridgain.grid.internal.processors.cache.dr.Permit;
import org.gridgain.grid.internal.processors.dr.DrProcessor;
import org.gridgain.grid.internal.processors.dr.DrSenderAttributes;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.gridgain.grid.internal.processors.dr.fst.StateTransferTask;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class GridGainCacheDrManager
extends GridCacheManagerAdapter
implements GridCacheReplicationManager,
CacheDrManager {
    private static final long DFLT_GG_DR_SYSTEM_CACHE_SCAN_TIMEOUT = 60000L;
    private final long timeout = IgniteSystemProperties.getLong("GG_DR_SYSTEM_CACHE_SCAN_TIMEOUT", 60000L);
    private static final int TX_RETRIES_WITH_NO_THROTTLING = 10;
    private static final CacheDrSenderConfiguration DFLT_CACHE_SND_CFG = new CacheDrSenderConfiguration().setSenderGroup("<default>");
    private final CachePluginContext<GridGainCacheConfiguration> ggCctx;
    final DrProcessor drProc;
    private GridGainCacheConfiguration ccfg;
    private final CacheDrSenderConfiguration sndCfg;
    private final byte dataCenterId;
    private IgniteInternalCache<Object, Object> sysCache;
    private UUID sysCacheQryId;
    private CacheDrHandler drHnd;
    private CacheDrStateTransferHandler fstHnd;
    private boolean sndEnabled;
    private final GridLocalEventListener discoLsnr = new DiscoveryListener();
    private volatile CacheDrPauseInfo stopInfo;
    private volatile boolean stopping;
    private final CopyOnWriteArrayList<ClusterNode> sndHubs = new CopyOnWriteArrayList();
    private final ThreadLocalRandom sndHubsRnd = ThreadLocalRandom.current();
    private final AtomicLong sndHubIdx = new AtomicLong();
    private volatile boolean sndHubInit;
    private final GridFutureAdapter<?> sndHubInitFut = new GridFutureAdapter();
    private volatile DrSenderAttributes sndHubAttrs;
    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
    protected volatile CacheDrMetrics metrics;
    private ObjectName cacheMBean;
    private ClusterNode locSnd;
    private final boolean useCacheNames;
    private final boolean forceOverrideCacheSndCfg;
    private DrControlTaskExecutor controlTask;

    public GridGainCacheDrManager(CachePluginContext<GridGainCacheConfiguration> ggCctx, DrProcessor drProc) {
        boolean bl = this.forceOverrideCacheSndCfg = IgniteSystemProperties.getInteger("GG_DR_FORCE_DC_ID", 0) > 0;
        assert (ggCctx != null);
        this.ggCctx = ggCctx;
        this.drProc = drProc;
        GridGainConfiguration ggCfg = GridPluginUtils.gridPluginConfiguration(ggCctx.grid().configuration());
        if (ggCfg == null) {
            throw new CacheException("GridGain plugin configuration was not found. GridGain plugin has to be configured if DR feature is used.");
        }
        this.dataCenterId = ggCfg.getDataCenterId();
        this.useCacheNames = ggCfg.isDrUseCacheNames();
        assert (this.dataCenterId >= 0);
        this.ccfg = GridCacheUtils.cachePluginConfiguration(ggCctx.igniteCacheConfiguration(), GridGainCacheConfiguration.class);
        if (this.forceOverrideCacheSndCfg && !CU.isSystemCache(ggCctx.igniteCacheConfiguration().getName())) {
            this.overrideCacheDrConfiguration();
        }
        this.sndCfg = this.ccfg != null ? this.ccfg.getDrSenderConfiguration() : null;
    }

    @Override
    protected void start0() throws IgniteCheckedException {
        this.sndEnabled = this.sndCfg != null && !this.cctx.isNear();
        boolean isDrSndCache = this.ccfg != null && this.ccfg.getDrSenderConfiguration() != null;
        boolean isDrRcvCache = this.ccfg != null;
        this.metrics = new CacheDrMetrics(isDrSndCache, isDrRcvCache);
        if (this.sndEnabled) {
            assert (this.sndCfg != null);
            this.registerMBean();
            this.injectResources(this.sndCfg.getEntryFilter());
            U.startLifecycleAware(Collections.singleton(this.sndCfg.getEntryFilter()));
            this.controlTask = new DrControlTaskExecutor();
            this.cctx.events().addListener(this.discoLsnr, 10, 12, 11);
            this.drHnd = new CacheDrHandler(this.cctx, this, this.drProc, this.busyLock, this.sndCfg);
            this.drHnd.onStart();
            this.fstHnd = new CacheDrStateTransferHandler(this.cctx, this.sndCfg, this.busyLock, this);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Started cache data center replication manager [cache=" + this.cctx.name() + ", configuration=" + this.sndCfg + ']');
            }
            if (this.sndCfg.isPreferLocalSender() && this.hasLocalSender()) {
                this.locSnd = this.ggCctx.localNode();
            }
        }
    }

    private boolean hasLocalSender() {
        GridGainConfiguration ggCfg = GridPluginUtils.gridPluginConfiguration(this.cctx.kernalContext().config());
        if (ggCfg == null || ggCfg.getDrSenderConfiguration() == null) {
            return false;
        }
        DrSenderConfiguration sndHubCfg = ggCfg.getDrSenderConfiguration();
        if (this.useCacheNames) {
            assert (!F.isEmpty(sndHubCfg.getCacheNames()));
            for (String name : sndHubCfg.getCacheNames()) {
                if (!F.eq(name, this.cctx.name())) continue;
                return true;
            }
        } else {
            assert (F.isEmpty(sndHubCfg.getCacheNames())) : "cache names are not allowed.";
            String sndGroup = DrUtils.effectiveSenderGroup(this.sndCfg);
            for (String grpName : DrUtils.effectiveSenderGroups(sndHubCfg)) {
                if (!F.eq(grpName, sndGroup)) continue;
                return true;
            }
        }
        return false;
    }

    private void overrideCacheDrConfiguration() {
        if (this.ccfg == null) {
            this.ccfg = new GridGainCacheConfiguration();
            CachePluginConfiguration[] origCfgs = this.ggCctx.igniteCacheConfiguration().getPluginConfigurations();
            if (F.isEmpty(origCfgs)) {
                this.ggCctx.igniteCacheConfiguration().setPluginConfigurations(this.ccfg);
            } else {
                CachePluginConfiguration[] cpCfgs = new CachePluginConfiguration[origCfgs.length + 1];
                System.arraycopy(origCfgs, 0, cpCfgs, 0, origCfgs.length);
                cpCfgs[origCfgs.length] = this.ccfg;
                this.ggCctx.igniteCacheConfiguration().setPluginConfigurations(cpCfgs);
            }
        }
        if (this.ccfg.getDrSenderConfiguration() == null) {
            this.ccfg.setDrSenderConfiguration(DFLT_CACHE_SND_CFG);
        }
    }

    private void registerMBean() {
        if (U.IGNITE_MBEANS_DISABLED) {
            return;
        }
        try {
            this.cacheMBean = U.registerMBean(this.cctx.gridConfig().getMBeanServer(), this.cctx.igniteInstanceName(), this.cctx.cache().name(), "Cache data replication", new CacheDrMBeanAdapter(this), CacheDrMBean.class);
        }
        catch (JMException e) {
            U.error(this.log, "Failed to register cache MBean.", e);
        }
    }

    private void unregisterMBean() {
        if (this.cacheMBean == null) {
            return;
        }
        assert (!U.IGNITE_MBEANS_DISABLED);
        try {
            this.cctx.gridConfig().getMBeanServer().unregisterMBean(this.cacheMBean);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Unregistered cache MBean: " + this.cacheMBean);
            }
        }
        catch (JMException e) {
            U.error(this.log, "Failed to unregister cache MBean: " + this.cacheMBean, e);
        }
    }

    private void injectResources(@Nullable Object obj) throws IgniteCheckedException {
        if (obj != null) {
            this.cctx.kernalContext().resource().injectGeneric(obj);
            this.cctx.kernalContext().resource().injectCacheName(obj, this.cctx.config().getName());
        }
    }

    @Override
    protected void onKernalStart0() throws IgniteCheckedException {
        this.sysCache = this.cctx.kernalContext().cache().utilityCache();
        if (this.sndEnabled) {
            assert (this.sysCache.configuration().getCacheMode() == CacheMode.REPLICATED);
            this.fstHnd.onKernalStart(this.sysCache);
            if (!this.cctx.gridConfig().isClientMode().booleanValue()) {
                this.sysCacheQryId = this.sysCache.context().continuousQueries().executeInternalQuery(new SystemCacheUpdatedListener(), this.useCacheNames ? new DrEntryEventFilter(this.ggCctx.igniteCacheConfiguration().getName()) : new DrGroupControlEventFilter(DrUtils.effectiveSenderGroup(this.sndCfg), this.ggCctx.igniteCacheConfiguration().getName()), true, true, false, false);
            }
            GridCacheSharedContext sctx = this.cctx.shared();
            GridDhtPartitionsExchangeFuture topFuture = sctx.exchange().lastTopologyFuture();
            assert (topFuture != null) : "DR Worker should start after join to topology (last exchange future is null)";
            IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture = sctx.exchange().affinityReadyFuture(topFuture.initialVersion());
            affinityReadyFuture.chainCompose(ignore -> this.sysCache.context().preloader().syncFuture()).listen(f -> {
                try {
                    f.get();
                    this.drProc.submit(this.controlTask);
                }
                catch (NodeStoppingException nodeStoppingException) {
                }
                catch (Exception e) {
                    throw new IgniteException("Failed to wait for affinity ready future [topVer=" + topFuture.initialVersion() + "]", e);
                }
            });
        }
    }

    @Override
    protected void onKernalStop0(boolean cancel) {
        this.unregisterMBean();
        if (this.sndEnabled) {
            this.stopping = true;
            this.busyLock.block();
            this.cctx.events().removeListener(this.discoLsnr);
            if (this.sysCacheQryId != null) {
                this.sysCache.context().continuousQueries().cancelInternalQuery(this.sysCacheQryId);
            }
            if (this.controlTask != null) {
                this.controlTask.stop();
            }
            if (this.fstHnd != null) {
                this.fstHnd.onKernalStop();
            }
            if (this.drHnd != null) {
                this.drHnd.onKernalStop();
            }
            if (!this.sndHubInitFut.isDone()) {
                this.sndHubInitFut.onDone(new IgniteCheckedException("Failed to initialize send hubs because grid is stopping."));
            }
            if (this.fstHnd != null) {
                this.fstHnd.stop();
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Stopped cache replication manager.");
            }
        }
    }

    @Override
    protected void stop0(boolean cancel, boolean destroy) {
        if (this.sndEnabled) {
            U.stopLifecycleAware(this.log, Collections.singleton(this.sndCfg.getEntryFilter()));
        }
    }

    @Override
    public byte dataCenterId() {
        return this.dataCenterId;
    }

    @Override
    public void replicate(KeyCacheObject key, @Nullable CacheObject val, long ttl, long expireTime, GridCacheVersion ver, GridDrType drType, AffinityTopologyVersion topVer) throws IgniteCheckedException {
        assert (this.sndEnabled);
        assert (!this.cctx.localNode().isClient());
        GridCacheRawVersionedEntry entry = new GridCacheRawVersionedEntry(key, val, ttl, expireTime, ver.conflictVersion());
        this.replicate(entry, drType, topVer);
    }

    private void replicate(GridCacheRawVersionedEntry entry, GridDrType drType, AffinityTopologyVersion topVer) throws IgniteCheckedException {
        boolean apply;
        CacheDrEntryFilter drFilter = this.sndCfg.getEntryFilter();
        boolean bl = apply = drFilter == null;
        if (!apply) {
            entry.unmarshal(this.cctx.cacheObjectContext());
            apply = drFilter.accept(new CacheDrEntryImpl(entry, this.cctx.cacheObjectContext()));
        }
        if (!apply) {
            this.metrics.onSenderCacheEntryFiltered();
            return;
        }
        if (!this.initializeSenderHubs()) {
            return;
        }
        if (!this.stopped()) {
            this.drHnd.onReplicate(entry, drType, topVer);
        }
    }

    private IgniteInternalFuture<?> stateTransfer(Collection<Byte> dataCenterIds, boolean sync) {
        if (!this.sndEnabled) {
            return new GridFinishedFuture(new IgniteCheckedException("Failed to initiate state transfer because data center replication is disabled in cache: " + this.cctx.name()));
        }
        if (!this.awaitDrInitialization()) {
            return new GridFinishedFuture(new IgniteCheckedException("Failed to initiate state transfer because sender hubs are not initialized."));
        }
        GridFutureAdapter fut = new GridFutureAdapter();
        this.enqueueDrMgmtTask(new StateTransferStartTask(fut, dataCenterIds, sync));
        try {
            return (IgniteInternalFuture)fut.get();
        }
        catch (IgniteCheckedException e) {
            throw new IllegalStateException("Failed to initiate state transfer.", e);
        }
    }

    @Override
    public boolean enabled() {
        return this.sndEnabled;
    }

    @Override
    public boolean receiveEnabled() {
        return this.ccfg != null;
    }

    @Override
    public CacheDrStatus drStatus() {
        if (this.cctx.isNear()) {
            return ((GridGainCacheDrManager)this.cctx.near().dht().context().dr()).drStatus();
        }
        this.checkDrEnabled();
        this.awaitDrInitialization();
        CacheDrPauseInfo stopInfo = this.stopInfo;
        return stopInfo != null && stopInfo.reason() != null ? new CacheDrStatus(stopInfo.reason(), stopInfo.error()) : CacheDrStatus.ACTIVE;
    }

    @Override
    public void onExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException {
        assert (this.cctx.isDrEnabled());
        this.drHnd.onExchange(topVer, left, this.stopped());
    }

    @Override
    public void partitionEvicted(int part) {
        assert (this.cctx.isDrEnabled());
        assert (part >= 0);
        if (!this.stopped()) {
            this.drHnd.onPartitionEvicted(part);
        }
    }

    private void userStateChange(boolean stopped) throws IgniteCheckedException {
        if (!this.sndEnabled) {
            throw new IgniteCheckedException("Failed to initiate state transfer because data center replication is disabled in cache: " + this.cctx.name());
        }
        if (!this.awaitDrInitialization()) {
            throw new IgniteCheckedException("Failed to change start/stop DR because grid is stopping.");
        }
        DrStopTask task = new DrStopTask(stopped ? CacheDrPauseReason.USER_REQUEST : null, null, null);
        this.enqueueDrMgmtTask(task);
        task.fut.get();
    }

    private GridKernalContext kernalCtx() {
        return this.cctx.kernalContext();
    }

    private void recordCacheReplicationStateChangedEvt(CacheDrPauseInfo info) {
        String msg;
        int type;
        assert (info != null) : "info";
        if (info.reason() == null) {
            type = 1023;
            msg = "Replication started.";
        } else {
            type = 1022;
            msg = "Replication stopped.";
        }
        if (!this.kernalCtx().event().isUserRecordable(type)) {
            return;
        }
        ClusterNode node = this.kernalCtx().discovery().localNode();
        this.kernalCtx().event().record(new DrCacheReplicationEvent(node, msg, type, this.cctx.name(), info));
    }

    @Override
    public void onBatchFailed(Map<UUID, Throwable> err) {
        assert (!F.isEmpty(err));
        if (this.log.isTraceEnabled()) {
            this.log.trace("DR batch failed: " + err.entrySet().stream().map(e -> '(' + ((UUID)e.getKey()).toString() + ", " + ((Throwable)e.getValue()).getMessage() + ')').collect(Collectors.joining()));
        }
        this.enqueueDrMgmtTask(new DrStopTask(CacheDrPauseReason.BATCH_FAILED, "All available sender hubs failed to process data center replication batch.", null));
    }

    IgniteInternalFuture<CacheDrResultType> fullStateTransferReplicate(Collection<Byte> dataCenterIds, Map<Byte, EntryBuffer> entryBuffers, IgniteThrowableFunction<Integer, Permit> permitFunction, boolean syncFst, @Nullable IgniteUuid fstId) throws IgniteCheckedException {
        if (!this.initializeSenderHubs()) {
            throw new IgniteCheckedException("No sender hub found. Full state transfer will be stopped: [cache=" + this.cctx.name() + ", fstId=" + fstId + ']');
        }
        if (this.stopped()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Data center replication is stopped, ignoring full state transfer [cache=" + this.cctx.name() + ']');
            }
            return new GridFinishedFuture<CacheDrResultType>(CacheDrResultType.IGNORED);
        }
        return this.drHnd.fullStateTransferReplicate(dataCenterIds, entryBuffers, permitFunction, syncFst, fstId);
    }

    private CacheDrPauseInfo stopReplication(@Nullable CacheDrPauseInfo oldStopInfo, @Nullable CacheDrPauseReason reason, @Nullable String errMsg) throws IgniteCheckedException {
        boolean save;
        boolean bl = save = oldStopInfo == null;
        if (!save) {
            CacheDrPauseReason oldReason = oldStopInfo.reason();
            boolean bl2 = save = oldReason == null && reason != null || oldReason != null && reason == null;
        }
        if (save) {
            DynamicCacheDescriptor cacheDesc = this.cctx.shared().cache().cacheDescriptor(this.cctx.cacheId());
            CacheDrPauseInfo newStopInfo = new CacheDrPauseInfo(this.cctx.localNodeId(), reason, errMsg, cacheDesc.receivedFrom());
            this.sysCache.put(new CacheDrPauseKey(this.cctx.name()), newStopInfo);
            if (reason != null) {
                this.fstHnd.onReplicationStop(newStopInfo);
                if (this.log.isInfoEnabled()) {
                    this.log.info("Data center replication is stopped [cache=" + this.cctx.name() + ", info=" + newStopInfo + ", reason=" + (Object)((Object)reason) + "]");
                }
            } else {
                if (this.sndHubs.isEmpty()) {
                    throw new IgniteCheckedException("Failed to start replication because there are no sender hubs available.");
                }
                if (this.log.isInfoEnabled()) {
                    this.log.info("Data center replication is started [cache=" + this.cctx.name() + ']');
                }
            }
            return newStopInfo;
        }
        return oldStopInfo;
    }

    /*
     * Exception decompiling
     */
    public boolean txOp(GridPlainInClosure<CacheDrPauseInfo> clo, boolean globalSync) throws IgniteCheckedException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [22[CATCHBLOCK]], but top level block is 10[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public int queuedKeysCount() {
        return this.sndEnabled ? this.drHnd.queuedKeysCount() : 0;
    }

    public int batchWaitingSendCount() {
        return this.sndEnabled ? this.drHnd.batchWaitingSendCount() : 0;
    }

    public int batchWaitingAcknowledgeCount() {
        return this.sndEnabled ? this.drHnd.batchWaitingAcknowledgeCount() : 0;
    }

    public int senderHubsCount() {
        return this.sndHubs.size();
    }

    @Nullable
    public String getDrSenderGroup() {
        return this.sndCfg == null || this.useCacheNames ? null : DrUtils.effectiveSenderGroup(this.sndCfg);
    }

    void setStateTransferThrottle(long stateTransferThrottle) {
        CacheDrStateTransferHandler fstHnd = this.fstHnd;
        if (fstHnd != null) {
            fstHnd.setStateTransferThrottle(stateTransferThrottle);
        }
    }

    long getStateTransferThrottle() {
        CacheDrStateTransferHandler fstHnd = this.fstHnd;
        if (fstHnd != null) {
            return fstHnd.getStateTransferThrottle();
        }
        return 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean initializeSenderHubs() {
        if (this.sndHubInit) {
            return this.sndHubAttrs != null;
        }
        GridGainCacheDrManager gridGainCacheDrManager = this;
        synchronized (gridGainCacheDrManager) {
            if (this.sndHubInit) {
                return this.sndHubAttrs != null;
            }
            this.cctx.discovery().allNodes().stream().filter(this::isSenderHub).forEach(this.sndHubs::add);
            if (!this.sndHubs.isEmpty()) {
                this.sndHubAttrs = (DrSenderAttributes)this.sndHubs.get(0).attribute("plugins.gg.replication.snd.hub");
            }
            this.sndHubInit = true;
            this.sndHubInitFut.onDone();
            return this.sndHubAttrs != null;
        }
    }

    private boolean awaitDrInitialization() {
        if (this.sndHubInit) {
            return true;
        }
        try {
            this.sndHubInitFut.get();
            return true;
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to wait for sender hubs initialization.", e);
            return false;
        }
    }

    public boolean initDr() {
        try {
            GridTuple<Object> newStopInfo;
            AtomicBoolean infoChanged;
            boolean shouldStopReplication;
            boolean res;
            if (this.cctx.gridConfig().isClientMode().booleanValue()) {
                this.sysCacheQryId = this.sysCache.context().continuousQueries().executeInternalQuery(new SystemCacheUpdatedListener(), this.useCacheNames ? new DrEntryEventFilter(this.ggCctx.igniteCacheConfiguration().getName()) : new DrGroupControlEventFilter(DrUtils.effectiveSenderGroup(this.sndCfg), this.ggCctx.igniteCacheConfiguration().getName()), false, true, false, false);
            }
            if (!(res = this.txOp(arg_0 -> this.lambda$initDr$1(shouldStopReplication = !this.initializeSenderHubs() && !this.cctx.localNode().isClient(), infoChanged = new AtomicBoolean(false), newStopInfo = F.t(null), arg_0), true))) {
                return false;
            }
            this.stopInfo = newStopInfo.get();
            assert (this.stopInfo != null);
            this.metrics.onStopStateChanged(this.stopInfo.reason(), this.stopInfo.error());
        }
        catch (Throwable e) {
            if (!this.stopping) {
                U.error(this.log, "Failed to initialize data center replication state.", e);
            }
            this.sndHubInitFut.onDone(e);
            if (e instanceof Error) {
                throw (Error)e;
            }
            return false;
        }
        return true;
    }

    private void addHub(ClusterNode sndHub) {
        if (this.sndHubs.addIfAbsent(sndHub)) {
            this.sndHubAttrs = (DrSenderAttributes)sndHub.attribute("plugins.gg.replication.snd.hub");
        }
    }

    private ClusterNode removeHub(UUID nodeId) {
        ClusterNode rmv = null;
        for (ClusterNode sndHub : this.sndHubs) {
            if (!sndHub.id().equals(nodeId)) continue;
            rmv = sndHub;
            break;
        }
        if (rmv == null) {
            return null;
        }
        this.sndHubs.remove(rmv);
        if (this.sndHubs.isEmpty()) {
            this.sndHubAttrs = null;
        } else {
            ClusterNode lastSndHub = this.sndHubs.get(this.sndHubs.size() - 1);
            this.sndHubAttrs = (DrSenderAttributes)lastSndHub.attribute("plugins.gg.replication.snd.hub");
            assert (this.sndHubAttrs != null);
        }
        return rmv;
    }

    @Nullable
    public ClusterNode nextHub(Collection<UUID> failedHubs) {
        int idx;
        if (!this.initializeSenderHubs()) {
            return null;
        }
        if (this.locSnd != null && !failedHubs.contains(this.locSnd.id())) {
            return this.locSnd;
        }
        ArrayList<ClusterNode> sndHubs0 = new ArrayList<ClusterNode>(this.sndHubs);
        List<ClusterNode> list = sndHubs0 = F.isEmpty(failedHubs) ? sndHubs0 : F.filterList(sndHubs0, false, node -> failedHubs.contains(node.id()));
        if (sndHubs0.isEmpty()) {
            return null;
        }
        assert (this.sndCfg.getLoadBalancingMode() != null);
        if (this.sndCfg.getLoadBalancingMode() == DrSenderLoadBalancingMode.DR_RANDOM) {
            idx = this.sndHubsRnd.nextInt(sndHubs0.size());
        } else {
            assert (this.sndCfg.getLoadBalancingMode() == DrSenderLoadBalancingMode.DR_ROUND_ROBIN);
            idx = (int)(this.sndHubIdx.incrementAndGet() & (long)(sndHubs0.size() - 1));
        }
        return (ClusterNode)sndHubs0.get(idx);
    }

    boolean isSenderHub(ClusterNode node) {
        if (this.useCacheNames) {
            DrSenderAttributes attr = (DrSenderAttributes)node.attribute("plugins.gg.replication.snd.hub");
            return this.isSenderHubAttribute(attr);
        }
        String[] sndGroups = (String[])node.attribute("plugins.gg.replication.snd.groups");
        if (F.isEmpty(sndGroups)) {
            return false;
        }
        String sndGroup = DrUtils.effectiveSenderGroup(this.sndCfg);
        for (String grp : sndGroups) {
            if (!sndGroup.equals(grp)) continue;
            return true;
        }
        return false;
    }

    private boolean isSenderHubAttribute(@Nullable DrSenderAttributes attr) {
        if (attr != null) {
            assert (attr.getCacheNames() != null);
            assert (this.sndEnabled);
            for (String cacheName : attr.getCacheNames()) {
                if (!F.eq(CU.mask(this.cctx.name()), cacheName)) continue;
                return true;
            }
        }
        return false;
    }

    public boolean stopped() {
        return this.stopping || this.stopInfo != null && this.stopInfo.reason() != null;
    }

    @Override
    public CacheDrMetrics metrics() {
        CacheDrSenderMetricsAdapter drSndMetrics;
        CacheDrMetrics cp = CacheDrMetrics.copyOf(this.metrics);
        if (cp != null && (drSndMetrics = cp.drSendMetrics0()) != null) {
            drSndMetrics.backupQueueSize(this.sndEnabled ? (long)this.drHnd.backupQueueSize() : 0L);
        }
        return cp;
    }

    CacheDrMetrics metrics0() {
        return this.metrics;
    }

    @Override
    public IgniteInternalFuture<?> startStateTransfer(Collection<Byte> dataCenterIds, boolean sync) {
        if (this.cctx.isNear()) {
            return ((GridGainCacheDrManager)this.cctx.near().dht().context().dr()).stateTransfer(dataCenterIds, sync);
        }
        this.checkDrEnabled();
        return this.stateTransfer(dataCenterIds, sync);
    }

    @Override
    public IgniteInternalFuture<?> stopStateTransfer(CacheDrStateTransferKey stateTransferKey) {
        this.checkDrEnabled();
        if (!this.sndEnabled) {
            return new GridFinishedFuture(new IgniteCheckedException("Failed to stop state transfer because data center replication is disabled in cache: " + this.cctx.name()));
        }
        GridFutureAdapter fut = new GridFutureAdapter();
        this.enqueueDrMgmtTask(new StateTransferStopTask(fut, stateTransferKey, CacheDrPauseReason.USER_REQUEST));
        return fut;
    }

    @Override
    public Collection<CacheDrStateTransfer> activeStateTransferTasks() {
        if (this.cctx.isNear()) {
            return ((CacheDrManager)((Object)this.cctx.near().dht().context().dr())).activeStateTransferTasks();
        }
        this.checkDrEnabled();
        try {
            return this.fstHnd.listStateTransfers();
        }
        catch (IgniteCheckedException ignored) {
            throw new IllegalStateException("Failed to list state transfers because grid is stopping.");
        }
    }

    @Override
    public void stopReplication() {
        if (this.cctx.isNear()) {
            ((CacheDrManager)((Object)this.cctx.near().dht().context().dr())).stopReplication();
            return;
        }
        this.checkDrEnabled();
        try {
            this.userStateChange(true);
        }
        catch (IgniteCheckedException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    public void startReplication() {
        if (this.cctx.isNear()) {
            ((CacheDrManager)((Object)this.cctx.near().dht().context().dr())).startReplication();
            return;
        }
        this.checkDrEnabled();
        try {
            this.userStateChange(false);
        }
        catch (IgniteCheckedException e) {
            throw new IllegalStateException(e.getMessage());
        }
    }

    private void checkDrEnabled() {
        if (!this.enabled()) {
            throw new IllegalStateException("Data center replication is not configured for cache: " + this.cctx.name());
        }
    }

    @Override
    public void onReceiveCacheEntriesReceived(int entriesCnt) {
        this.metrics.onReceiveCacheEntriesReceived(entriesCnt);
    }

    @Override
    public void onReceiveCacheConflictResolved(boolean useNew, boolean useOld, boolean merge) {
        this.metrics.onReceiveCacheConflictResolved(useNew, useOld, merge);
    }

    @Override
    public void resetMetrics() {
        boolean isDrSndCache = this.ccfg != null && this.ccfg.getDrSenderConfiguration() != null;
        boolean isDrRcvCache = this.ccfg != null;
        this.metrics = new CacheDrMetrics(isDrSndCache, isDrRcvCache);
        if (!this.cctx.isColocated() && this.cctx.dht().near() != null) {
            this.metrics.delegate(((CacheDrManager)((Object)this.cctx.dht().near().context().dr())).metrics());
        }
    }

    DrSenderAttributes sendHubAttributes() {
        return this.sndHubAttrs;
    }

    boolean stopKey(Object key) {
        if (this.useCacheNames && key instanceof CacheDrSenderHubStopKey) {
            CacheDrSenderHubStopKey key0 = (CacheDrSenderHubStopKey)key;
            String cacheName = this.ggCctx.igniteCacheConfiguration().getName();
            return F.eq(cacheName, key0.cacheName());
        }
        if (!this.useCacheNames && key instanceof DrSenderGroupNodeStopKey) {
            DrSenderGroupNodeStopKey key0 = (DrSenderGroupNodeStopKey)key;
            String grpName = DrUtils.effectiveSenderGroup(this.sndCfg);
            return F.eq(grpName, key0.groupName());
        }
        return false;
    }

    boolean pauseKey(Object key) {
        return key instanceof CacheDrPauseKey && F.eq(this.ggCctx.igniteCacheConfiguration().getName(), ((CacheDrPauseKey)key).cacheName());
    }

    boolean stateTransferResultKey(Object key) {
        return key instanceof CacheDrStateTransferResultKey && F.eq(this.ggCctx.igniteCacheConfiguration().getName(), ((CacheDrStateTransferResultKey)key).cacheName());
    }

    boolean stateTransferKey(Object key) {
        return key instanceof CacheDrStateTransferKey && F.eq(this.ggCctx.igniteCacheConfiguration().getName(), ((CacheDrStateTransferKey)key).cacheName());
    }

    void disableAdaptiveThrottling() {
        CacheDrHandler drHnd = this.drHnd;
        if (drHnd != null) {
            drHnd.setDisableThrottling(true);
        }
    }

    void enableAdaptiveThrottling() {
        CacheDrHandler drHnd = this.drHnd;
        if (drHnd != null) {
            drHnd.setDisableThrottling(false);
        }
    }

    private void onSenderHubLeave(UUID nodeId, @Nullable String errMsg) {
        ClusterNode hub = this.removeHub(nodeId);
        if (hub == null) {
            return;
        }
        this.drHnd.onSenderHubsLeave(Collections.singletonList(nodeId));
        if (this.cctx.kernalContext().clientNode()) {
            return;
        }
        if (this.sndHubs.isEmpty()) {
            String causeMessage = errMsg == null ? "" : " " + errMsg;
            this.enqueueDrMgmtTask(new DrStopTask(CacheDrPauseReason.NO_SND_HUBS, "No sender hubs." + causeMessage, null));
        } else {
            Boolean isStorePersistent = (Boolean)hub.attribute("plugins.gg.replication.snd.store.persistent");
            if (isStorePersistent == null) {
                isStorePersistent = true;
            }
            if (!isStorePersistent.booleanValue()) {
                String causeMessage = errMsg == null ? "" : " " + errMsg;
                this.enqueueDrMgmtTask(new DrStopTask(CacheDrPauseReason.BATCH_FAILED, "Sender with non-persistent sender store has gone." + causeMessage, null));
            }
        }
    }

    void enqueueDrMgmtTask(DrTask task) {
        this.controlTask.accept(task);
    }

    void submitStateTransferTask(StateTransferTask task) {
        this.drProc.submitStateTransferTask(task);
    }

    void runAsync(final DrTask task) {
        this.drProc.submit(new Runnable(){

            @Override
            public void run() {
                block7: {
                    try {
                        GridCacheSharedContext sctx = GridGainCacheDrManager.this.cctx.shared();
                        GridDhtPartitionsExchangeFuture topFuture = sctx.exchange().lastTopologyFuture();
                        assert (topFuture != null) : "DR Worker should start after join to topology (last exchange future is null)";
                        IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture = sctx.exchange().affinityReadyFuture(topFuture.initialVersion());
                        affinityReadyFuture.get();
                        boolean res = GridGainCacheDrManager.this.txOp(stopInfo -> task.run((CacheDrPauseInfo)stopInfo), true);
                        if (!res) {
                            task.onError(new CacheStoppedException("Dr task is cancelled due to cache stop: task=" + task + ", cache=" + GridGainCacheDrManager.this.cctx.name()));
                            return;
                        }
                        task.onDone();
                    }
                    catch (Throwable e) {
                        if (!GridGainCacheDrManager.this.cctx.topology().stopping()) {
                            U.error(GridGainCacheDrManager.this.log, "An exception occurred during DR task processing.", e);
                        } else if (GridGainCacheDrManager.this.log.isDebugEnabled()) {
                            GridGainCacheDrManager.this.log.debug("An exception occurred during DR task processing: " + e);
                        }
                        task.onError(e);
                        if (!(e instanceof Error)) break block7;
                        throw (Error)e;
                    }
                }
            }

            public String toString() {
                return task.toString();
            }
        });
    }

    private /* synthetic */ void lambda$initDr$1(boolean shouldStopReplication, AtomicBoolean infoChanged, GridTuple newStopInfo, CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
        CacheDrPauseInfo stopInfo = shouldStopReplication ? this.stopReplication(oldStopInfo, CacheDrPauseReason.NO_SND_HUBS, null) : (oldStopInfo == null ? this.stopReplication(null, null, null) : oldStopInfo);
        infoChanged.set(oldStopInfo != stopInfo);
        newStopInfo.set(stopInfo);
    }

    private class SystemCacheUpdatedListener
    implements CacheEntryUpdatedListener<Object, Object> {
        private SystemCacheUpdatedListener() {
        }

        @Override
        public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent<?, ?> e : evts) {
                Object key = e.getKey();
                if (GridGainCacheDrManager.this.stateTransferKey(key)) {
                    GridGainCacheDrManager.this.enqueueDrMgmtTask(new StateTransferChangeTask((CacheDrStateTransferKey)key, (CacheDrStateTransferInfo)e.getValue()));
                    continue;
                }
                if (GridGainCacheDrManager.this.stateTransferResultKey(key)) {
                    CacheDrStateTransferResultInfo oldInfo = (CacheDrStateTransferResultInfo)e.getOldValue();
                    CacheDrStateTransferResultInfo newInfo = (CacheDrStateTransferResultInfo)e.getValue();
                    boolean transferCompleted = oldInfo != null && !oldInfo.done() && newInfo != null && newInfo.done();
                    GridGainCacheDrManager.this.enqueueDrMgmtTask(new StateTransferResultTask((CacheDrStateTransferResultKey)key, transferCompleted));
                    continue;
                }
                if (GridGainCacheDrManager.this.stopKey(key)) {
                    CacheDrSenderHubStopInfo stopInfo = (CacheDrSenderHubStopInfo)e.getValue();
                    if (stopInfo == null) continue;
                    UUID stoppedNodeId = key instanceof CacheDrSenderHubStopKey ? ((CacheDrSenderHubStopKey)key).nodeId() : ((DrSenderGroupNodeStopKey)key).nodeId();
                    GridGainCacheDrManager.this.onSenderHubLeave(stoppedNodeId, stopInfo.errorMessage());
                    continue;
                }
                if (!GridGainCacheDrManager.this.pauseKey(key)) continue;
                assert (key instanceof CacheDrPauseKey) : key;
                CacheDrPauseInfo info = (CacheDrPauseInfo)e.getValue();
                if (info == null) continue;
                GridGainCacheDrManager.this.recordCacheReplicationStateChangedEvt(info);
                if (F.eq(GridGainCacheDrManager.this.cctx.localNodeId(), info.nodeId())) continue;
                GridGainCacheDrManager.this.enqueueDrMgmtTask(new DrStopTask(null, null, info));
            }
        }
    }

    public static abstract class DrTask {
        @Nullable
        public abstract CacheDrPauseInfo run(@Nullable CacheDrPauseInfo var1) throws IgniteCheckedException;

        @Nullable
        public AffinityTopologyVersion topologyVersion() {
            return null;
        }

        public void onDone() {
        }

        public void onError(Throwable err) {
        }
    }

    private class NodeLeaveTask
    extends DrTask {
        private final AffinityTopologyVersion topVer;
        private final UUID nodeId;
        private final boolean dataNode;
        private final boolean sndHubNode;

        private NodeLeaveTask(long topVer, UUID nodeId, boolean dataNode, boolean sndHubNode) {
            this.topVer = new AffinityTopologyVersion(topVer);
            this.nodeId = nodeId;
            this.dataNode = dataNode;
            this.sndHubNode = sndHubNode;
        }

        @Override
        @Nullable
        public AffinityTopologyVersion topologyVersion() {
            return this.topVer;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            ClusterNode node = GridGainCacheDrManager.this.cctx.kernalContext().discovery().oldestAliveServerNode(this.topVer);
            if (!F.eq(GridGainCacheDrManager.this.cctx.localNode(), node)) {
                return oldStopInfo;
            }
            HashSet<GridCacheInternal> staleStopKeys = new HashSet<GridCacheInternal>();
            HashSet<CacheDrStateTransferResultKey> orphanFstResults = new HashSet<CacheDrStateTransferResultKey>();
            HashMap<CacheDrStateTransferResultKey, CacheDrStateTransferResultInfo> fstResults = new HashMap<CacheDrStateTransferResultKey, CacheDrStateTransferResultInfo>();
            Iterator it = GridGainCacheDrManager.this.sysCache.scanIterator(false, null, GridGainCacheDrManager.this.timeout);
            while (it.hasNext()) {
                CacheDrStateTransferResultInfo resInfo;
                boolean removed;
                Cache.Entry entry = it.next();
                Object key = entry.getKey();
                if (GridGainCacheDrManager.this.stopKey(key)) {
                    UUID stoppedNodeId = key instanceof CacheDrSenderHubStopKey ? ((CacheDrSenderHubStopKey)key).nodeId() : ((DrSenderGroupNodeStopKey)key).nodeId();
                    if (GridGainCacheDrManager.this.cctx.discovery().alive(stoppedNodeId) && !F.eq(this.nodeId, stoppedNodeId)) continue;
                    staleStopKeys.add((GridCacheInternal)key);
                    continue;
                }
                if (!GridGainCacheDrManager.this.stateTransferResultKey(entry.getKey()) || !(removed = (resInfo = (CacheDrStateTransferResultInfo)entry.getValue()).listeners().removeIf(uuid -> !GridGainCacheDrManager.this.cctx.discovery().alive((UUID)uuid)))) continue;
                if (resInfo.done() && resInfo.listeners().isEmpty()) {
                    orphanFstResults.add((CacheDrStateTransferResultKey)entry.getKey());
                    continue;
                }
                fstResults.put((CacheDrStateTransferResultKey)entry.getKey(), resInfo);
            }
            if (!staleStopKeys.isEmpty()) {
                GridGainCacheDrManager.this.sysCache.removeAll(staleStopKeys);
            }
            if (!orphanFstResults.isEmpty()) {
                GridGainCacheDrManager.this.sysCache.removeAll(orphanFstResults.stream().map(r -> new CacheDrStateTransferKey(r.cacheName(), r.id(), r.dataCenterIds())).collect(Collectors.toSet()));
                GridGainCacheDrManager.this.sysCache.removeAll(orphanFstResults);
            }
            if (!fstResults.isEmpty()) {
                GridGainCacheDrManager.this.sysCache.putAll(fstResults);
            }
            return oldStopInfo;
        }

        @Override
        public void onDone() {
            if (this.dataNode) {
                GridGainCacheDrManager.this.fstHnd.onDataNodeLeft(this.topVer, this.nodeId, GridGainCacheDrManager.this.timeout);
            }
        }
    }

    private class StateTransferResultTask
    extends DrTask {
        private final CacheDrStateTransferResultKey key;
        private final boolean transferCompleted;

        private StateTransferResultTask(CacheDrStateTransferResultKey key, boolean transferCompleted) {
            this.key = key;
            this.transferCompleted = transferCompleted;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            Object info = GridGainCacheDrManager.this.sysCache.get(this.key);
            if (info != null) {
                GridGainCacheDrManager.this.fstHnd.onStateTransferResultChanged(this.key, (CacheDrStateTransferResultInfo)info, this.transferCompleted);
            }
            return oldStopInfo;
        }
    }

    private class StateTransferChangeTask
    extends DrTask {
        private final CacheDrStateTransferKey key;
        private final CacheDrStateTransferInfo info;

        StateTransferChangeTask(CacheDrStateTransferKey key, CacheDrStateTransferInfo info) {
            this.key = key;
            this.info = info;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            GridGainCacheDrManager.this.fstHnd.onStateTransferInfoChanged(this.key, this.info);
            return oldStopInfo;
        }
    }

    private class StateTransferStopTask
    extends DrTask {
        private final GridFutureAdapter<GridFutureAdapter<?>> fut;
        private final CacheDrStateTransferKey key;
        private final CacheDrPauseInfo stopInfo;

        StateTransferStopTask(GridFutureAdapter<GridFutureAdapter<?>> fut, CacheDrStateTransferKey key, CacheDrPauseReason reason) {
            this.fut = fut;
            this.key = key;
            DynamicCacheDescriptor cacheDesc = GridGainCacheDrManager.this.cctx.shared().cache().cacheDescriptor(GridGainCacheDrManager.this.cctx.cacheId());
            this.stopInfo = reason == null ? null : new CacheDrPauseInfo(GridGainCacheDrManager.this.kernalCtx().localNodeId(), reason, "State transfer cancelled.", cacheDesc.receivedFrom());
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            GridGainCacheDrManager.this.fstHnd.stopStateTransfer(this.key, this.stopInfo);
            return oldStopInfo;
        }

        @Override
        public void onDone() {
            this.fut.onDone();
        }

        @Override
        public void onError(Throwable err) {
            this.fut.onDone(err);
        }
    }

    private class StateTransferStartTask
    extends DrTask {
        private final GridFutureAdapter<GridFutureAdapter<?>> fut;
        private final GridFutureAdapter<?> innerFut = new GridFutureAdapter();
        private final Collection<Byte> dataCenterIds;
        private final boolean syncFst;

        private StateTransferStartTask(GridFutureAdapter<GridFutureAdapter<?>> fut, Collection<Byte> dataCenterIds, boolean syncFst) {
            this.fut = fut;
            this.dataCenterIds = dataCenterIds;
            this.syncFst = syncFst;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            assert (oldStopInfo != null);
            if (oldStopInfo.reason() != null) {
                this.innerFut.onDone(new IgniteCheckedException("Failed to initiate state transfer because data center replication is stopped: " + oldStopInfo));
            } else {
                GridGainCacheDrManager.this.fstHnd.stateTransfer(this.dataCenterIds, this.syncFst, GridGainCacheDrManager.this.timeout).listen(new IgniteInClosure<IgniteInternalFuture<?>>(){
                    private static final long serialVersionUID = 0L;

                    @Override
                    public void apply(IgniteInternalFuture<?> transferFut) {
                        try {
                            transferFut.get();
                            StateTransferStartTask.this.innerFut.onDone();
                        }
                        catch (Throwable e) {
                            StateTransferStartTask.this.innerFut.onDone(e);
                        }
                    }
                });
            }
            return oldStopInfo;
        }

        @Override
        public void onDone() {
            this.fut.onDone(this.innerFut);
        }

        @Override
        public void onError(@Nullable Throwable err) {
            this.fut.onDone(err);
        }
    }

    private class DrStopTask
    extends DrTask {
        private final CacheDrPauseReason reason;
        private final String errMsg;
        private final CacheDrPauseInfo rmtStopInfo;
        private final GridFutureAdapter<?> fut = new GridFutureAdapter();

        private DrStopTask(@Nullable CacheDrPauseReason reason, @Nullable String errMsg, CacheDrPauseInfo rmtStopInfo) {
            this.reason = reason;
            this.errMsg = errMsg;
            this.rmtStopInfo = rmtStopInfo;
        }

        @Override
        @Nullable
        public CacheDrPauseInfo run(@Nullable CacheDrPauseInfo oldStopInfo) throws IgniteCheckedException {
            if (GridGainCacheDrManager.this.log.isInfoEnabled()) {
                GridGainCacheDrManager.this.log.info("Data center replication executes a " + this.taskPurpose() + " task [reason=" + (Object)((Object)this.reason) + ", rmtStopInfo=" + this.rmtStopInfo + ", oldStopInfo=" + oldStopInfo + ", sndHubs=" + GridGainCacheDrManager.this.sndHubs + "]");
            }
            if (this.rmtStopInfo == null) {
                return GridGainCacheDrManager.this.stopReplication(oldStopInfo, this.reason, this.errMsg);
            }
            if (this.rmtStopInfo.reason() == null && GridGainCacheDrManager.this.sndHubs.isEmpty() && !GridGainCacheDrManager.this.cctx.localNode().isClient()) {
                return GridGainCacheDrManager.this.stopReplication(this.rmtStopInfo, CacheDrPauseReason.NO_SND_HUBS, null);
            }
            return this.rmtStopInfo;
        }

        @NotNull
        private String taskPurpose() {
            return this.reason == null && (this.rmtStopInfo == null || this.rmtStopInfo.reason() == null) ? "start" : "stop";
        }

        @Override
        public void onDone() {
            this.fut.onDone();
        }

        @Override
        public void onError(Throwable err) {
            this.fut.onDone(err);
        }
    }

    private class DrControlTaskExecutor
    implements Consumer<DrTask>,
    Runnable {
        private final BlockingQueue<DrTask> tasksQ = new LinkedBlockingDeque<DrTask>();
        private boolean hasTasks;
        private volatile TaskExecutorState state = TaskExecutorState.NOT_STARTED;
        private volatile Thread thread;

        private DrControlTaskExecutor() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            if (this.isCancelled()) {
                return;
            }
            if (this.state == TaskExecutorState.NOT_STARTED && GridGainCacheDrManager.this.initDr()) {
                DrControlTaskExecutor drControlTaskExecutor = this;
                synchronized (drControlTaskExecutor) {
                    if (this.isCancelled()) {
                        return;
                    }
                    this.state = TaskExecutorState.INITIALIZED;
                    if (!this.hasTasks) {
                        return;
                    }
                }
            }
            this.thread = Thread.currentThread();
            try {
                while (!this.isCancelled()) {
                    DrTask task;
                    while (!this.isCancelled() && (task = (DrTask)this.tasksQ.poll()) != null) {
                        this.processDrTask(task);
                    }
                    DrControlTaskExecutor drControlTaskExecutor = this;
                    synchronized (drControlTaskExecutor) {
                        if (this.tasksQ.isEmpty()) {
                            this.hasTasks = false;
                            return;
                        }
                    }
                }
                return;
            }
            finally {
                DrControlTaskExecutor drControlTaskExecutor = this;
                synchronized (drControlTaskExecutor) {
                    this.thread = null;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void accept(DrTask drTask) {
            DrControlTaskExecutor drControlTaskExecutor = this;
            synchronized (drControlTaskExecutor) {
                if (this.isCancelled()) {
                    drTask.onError(new CacheStoppedException("Dr task is cancelled due to cache stop: task=" + drTask + ", cache=" + GridGainCacheDrManager.this.cctx.name()));
                    return;
                }
                this.tasksQ.offer(drTask);
                if (!this.hasTasks) {
                    this.hasTasks = true;
                    if (this.isInitialized()) {
                        GridGainCacheDrManager.this.drProc.submit(this);
                    }
                }
            }
        }

        private boolean isInitialized() {
            return this.state == TaskExecutorState.INITIALIZED;
        }

        private boolean isCancelled() {
            return this.state == TaskExecutorState.CANCELLED;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void stop() {
            DrTask t2;
            assert (GridGainCacheDrManager.this.stopping);
            DrControlTaskExecutor drControlTaskExecutor = this;
            synchronized (drControlTaskExecutor) {
                this.state = TaskExecutorState.CANCELLED;
                Thread thread0 = this.thread;
                if (thread0 != null) {
                    thread0.interrupt();
                }
            }
            IgniteCheckedException reason = new IgniteCheckedException("Failed to perform DR task because grid is stopping.");
            while ((t2 = (DrTask)this.tasksQ.poll()) != null) {
                t2.onError(reason);
            }
        }

        private void processDrTask(DrTask task) {
            block10: {
                try {
                    GridTuple<Object> newStopInfo;
                    boolean res;
                    AffinityTopologyVersion topVer = task.topologyVersion();
                    if (topVer != null) {
                        GridGainCacheDrManager.this.cctx.affinity().affinityReadyFuture(topVer).get();
                    }
                    if (!(res = GridGainCacheDrManager.this.txOp(arg_0 -> DrControlTaskExecutor.lambda$processDrTask$0(newStopInfo = F.t(null), task, arg_0), true))) {
                        task.onError(new CacheStoppedException("Dr task is cancelled due to cache stop: task=" + task + ", cache=" + GridGainCacheDrManager.this.cctx.name()));
                        return;
                    }
                    CacheDrPauseInfo newInfo = newStopInfo.get();
                    if (!F.eq(GridGainCacheDrManager.this.stopInfo, newInfo)) {
                        GridGainCacheDrManager.this.stopInfo = newInfo;
                        if (newInfo != null) {
                            GridGainCacheDrManager.this.metrics.onStopStateChanged(newInfo.reason(), newInfo.error());
                        } else {
                            GridGainCacheDrManager.this.metrics.onStopStateChanged(null, null);
                        }
                    }
                    task.onDone();
                }
                catch (Throwable e) {
                    if (!GridGainCacheDrManager.this.stopping && !GridGainCacheDrManager.this.cctx.topology().stopping()) {
                        U.error(GridGainCacheDrManager.this.log, "An exception occurred during DR task processing.", e);
                    } else if (GridGainCacheDrManager.this.log.isDebugEnabled()) {
                        GridGainCacheDrManager.this.log.debug("An exception occurred during DR task processing: " + e);
                    }
                    task.onError(e);
                    if (!(e instanceof Error)) break block10;
                    throw (Error)e;
                }
            }
        }

        public String toString() {
            return "DrControlTaskExecutor{hasTasks=" + this.hasTasks + ", state=" + (Object)((Object)this.state) + '}';
        }

        private static /* synthetic */ void lambda$processDrTask$0(GridTuple newStopInfo, DrTask task, CacheDrPauseInfo stopInfo) throws IgniteCheckedException {
            newStopInfo.set(task.run(stopInfo));
        }
    }

    private static enum TaskExecutorState {
        NOT_STARTED,
        INITIALIZED,
        CANCELLED;

    }

    private class DiscoveryListener
    implements GridLocalEventListener {
        private DiscoveryListener() {
        }

        @Override
        public void onEvent(Event evt) {
            DiscoveryEvent evt0 = (DiscoveryEvent)evt;
            switch (evt0.type()) {
                case 11: 
                case 12: {
                    ClusterNode shadow = evt0.eventNode();
                    boolean cacheNode = GridGainCacheDrManager.this.cctx.discovery().cacheNode(shadow, GridGainCacheDrManager.this.cctx.name());
                    boolean sndHubNode = GridGainCacheDrManager.this.isSenderHub(shadow);
                    if (sndHubNode) {
                        GridGainCacheDrManager.this.onSenderHubLeave(shadow.id(), null);
                    }
                    if (GridGainCacheDrManager.this.cctx.kernalContext().clientNode() || !cacheNode && !sndHubNode) break;
                    GridGainCacheDrManager.this.enqueueDrMgmtTask(new NodeLeaveTask(evt0.topologyVersion(), shadow.id(), cacheNode, sndHubNode));
                    break;
                }
                default: {
                    assert (evt0.type() == 10);
                    ClusterNode node = GridGainCacheDrManager.this.cctx.discovery().node(evt0.eventNode().id());
                    if (node == null) {
                        return;
                    }
                    if (!GridGainCacheDrManager.this.isSenderHub(node)) break;
                    GridGainCacheDrManager.this.addHub(node);
                }
            }
        }
    }

    public static class DrGroupControlEventFilter
    extends DrEntryEventFilter {
        private static final long serialVersionUID = 0L;
        private String grpName;

        public DrGroupControlEventFilter() {
        }

        public DrGroupControlEventFilter(String grpName, String cacheName) {
            super(cacheName);
            this.grpName = grpName;
        }

        @Override
        public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException {
            Object key = evt.getKey();
            if (key instanceof CacheDrStateTransferKey) {
                return F.eq(this.cacheName, ((CacheDrStateTransferKey)key).cacheName());
            }
            if (key instanceof CacheDrStateTransferResultKey) {
                return F.eq(this.cacheName, ((CacheDrStateTransferResultKey)key).cacheName());
            }
            if (key instanceof CacheDrPauseKey) {
                return F.eq(this.cacheName, ((CacheDrPauseKey)key).cacheName());
            }
            if (key instanceof DrSenderGroupNodeStopKey) {
                return F.eq(this.grpName, ((DrSenderGroupNodeStopKey)key).groupName());
            }
            return false;
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            super.readExternal(in);
            this.grpName = U.readString(in);
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            super.writeExternal(out);
            U.writeString(out, this.grpName);
        }
    }

    public static class DrEntryEventFilter
    implements CacheEntryEventSerializableFilter,
    Externalizable {
        private static final long serialVersionUID = 0L;
        protected String cacheName;

        public DrEntryEventFilter() {
        }

        public DrEntryEventFilter(String cacheName) {
            this.cacheName = cacheName;
        }

        @Override
        public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException {
            Object key = evt.getKey();
            if (key instanceof CacheDrStateTransferKey) {
                return F.eq(this.cacheName, ((CacheDrStateTransferKey)key).cacheName());
            }
            if (key instanceof CacheDrStateTransferResultKey) {
                return F.eq(this.cacheName, ((CacheDrStateTransferResultKey)key).cacheName());
            }
            if (key instanceof CacheDrPauseKey) {
                return F.eq(this.cacheName, ((CacheDrPauseKey)key).cacheName());
            }
            if (key instanceof CacheDrSenderHubStopKey) {
                return F.eq(this.cacheName, ((CacheDrSenderHubStopKey)key).cacheName());
            }
            return false;
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.cacheName = U.readString(in);
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            U.writeString(out, this.cacheName);
        }
    }
}

