/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.dr;

import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.cache.configuration.Factory;
import javax.management.JMException;
import javax.management.ObjectName;
import javax.net.ssl.SSLContext;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.MarshallerContextImpl;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.spi.IgnitePortProtocol;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.IgniteThreadFactory;
import org.gridgain.grid.configuration.DrReceiverConfiguration;
import org.gridgain.grid.configuration.GridGainCacheConfiguration;
import org.gridgain.grid.dr.DrReceiverMBean;
import org.gridgain.grid.internal.processors.dr.DrProcessor;
import org.gridgain.grid.internal.processors.dr.DrReceiverInMetricsAdapter;
import org.gridgain.grid.internal.processors.dr.DrReceiverMBeanImpl;
import org.gridgain.grid.internal.processors.dr.DrReceiverMetadataManager;
import org.gridgain.grid.internal.processors.dr.DrSenderMetadataHolder;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalBatchRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalBatchResponse;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalHandshakeRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalHandshakeResponse;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalMessage;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalMetadataRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalMetadataResponse;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalPingRequest;
import org.gridgain.grid.internal.processors.dr.nio.DrBufferedParserAdapter;
import org.gridgain.grid.internal.processors.dr.nio.DrExternalMessageMarshaller;
import org.gridgain.grid.internal.processors.dr.nio.DrNioMessageMarshaller;
import org.gridgain.grid.internal.processors.dr.nio.DrNioParser;
import org.jetbrains.annotations.Nullable;

class DrReceiver
implements PartitionsExchangeAware {
    private static final int META_DATA_CENTER_ID = GridNioSessionMetaKey.nextUniqueKey();
    private static final int META_AWAIT_ACK = GridNioSessionMetaKey.nextUniqueKey();
    private static final long REBIND_FREQ = 3000L;
    private static final long STREAMER_RECREATING_TIMEOUT = IgniteSystemProperties.getLong("GG_RECEIVER_DR_STREAMER_RECREATE_TIMEOUT", 5000L);
    private final boolean DR_ACK_MISSING_CACHES = IgniteSystemProperties.getBoolean("DR_ACK_MISSING_CACHES");
    private final boolean isIncrementalDrEnabled = DrUtils.isIncrementalDrEnabled();
    private final DrProcessor proc;
    private final DrReceiverConfiguration cfg;
    private final IgniteLogger log;
    private final Marshaller marsh;
    private final ConcurrentMap<String, DataStreamWrapper> ldrs = new ConcurrentHashMap<String, DataStreamWrapper>();
    private final Lock stateLock = new ReentrantLock();
    private GridNioServer<DrExternalMessage> srvr;
    private ExecutorService execSvc;
    private GridWorker binder;
    private ObjectName rcvHubMBean;
    private volatile boolean stopped;
    private final boolean usingBinaryMarshaller;
    private final DrReceiverMetadataManager metaMgr;
    private DistributedLongProperty tsTtl;
    private long defaultTombstoneTtl;

    DrReceiver(DrProcessor proc) {
        assert (proc != null);
        this.proc = proc;
        this.log = proc.context().log(DrReceiver.class);
        this.cfg = proc.ggConfig().getDrReceiverConfiguration();
        assert (this.cfg != null);
        this.marsh = proc.config().getMarshaller();
        this.usingBinaryMarshaller = this.marsh instanceof BinaryMarshaller;
        this.metaMgr = this.cfg.isLazyBinaryMetadataRegistration() ? new DrReceiverMetadataManager(proc.kernalContext()) : null;
    }

    void start() {
        this.tsTtl = this.proc.kernalContext().cache().context().ttl().tobmstoneTtlProperty();
        this.tsTtl.addListener((name, oldVal, newVal) -> {
            if (oldVal == null && newVal == null) {
                return;
            }
            if (oldVal == null && newVal >= this.defaultTombstoneTtl) {
                return;
            }
            if (newVal == null && this.defaultTombstoneTtl >= oldVal) {
                return;
            }
            U.warn(this.log, "Tombstones time to live has been changed [oldVal=" + oldVal + ", newVal=" + newVal + "], ");
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onKernalStart() throws IgniteCheckedException {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Starting DR receiver hub: " + this.cfg);
        }
        this.stateLock.lock();
        try {
            InetAddress locHost;
            if (this.stopped) {
                return;
            }
            this.initTombstoneTTL();
            int port = this.cfg.getLocalInboundPort();
            String hostStr = this.cfg.getLocalInboundHost();
            if (hostStr == null) {
                hostStr = this.proc.config().getLocalHost();
            }
            try {
                locHost = hostStr != null ? InetAddress.getByName(hostStr) : U.getLocalHost();
            }
            catch (IOException e) {
                throw new IgniteCheckedException("Failed to resolve DR receiver hub TCP server local inbound host: " + hostStr, e);
            }
            this.registerMBean(locHost);
            DrReceiverInMetricsAdapter metrics = this.proc.metrics().registerReceiverMetrics();
            if (!this.startServer(locHost, port, this.cfg, metrics)) {
                U.warn(this.log, "Failed to start DR receiver hub TCP server (retrying every 3000 ms). Another node on this host? [host=" + locHost + ", port=" + port + ']');
                this.binder = new Binder(this.proc.igniteInstanceName(), locHost, port, metrics);
                new IgniteThread(this.binder).start();
            }
        }
        finally {
            this.stateLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onKernalStop(boolean cancel) {
        PluginContext ctx = this.proc.context();
        this.stateLock.lock();
        try {
            if (this.stopped) {
                return;
            }
            this.stopped = true;
        }
        finally {
            this.stateLock.unlock();
        }
        this.log.info("Stopping a DR receiver hub: " + this.cfg);
        if (this.binder != null) {
            U.cancel(this.binder);
            U.join(this.binder, this.log);
        }
        if (this.srvr != null) {
            this.srvr.stop();
            ctx.deregisterPorts(this.getClass());
        }
        if (this.execSvc != null) {
            U.shutdownNow(DrReceiver.class, this.execSvc, this.log);
        }
        for (DataStreamWrapper wrapper : this.ldrs.values()) {
            wrapper.close(cancel, this.log);
        }
        this.ldrs.clear();
        this.unregisterMBean();
        this.stateLock.lock();
        try {
            this.stopped = false;
        }
        finally {
            this.stateLock.unlock();
        }
    }

    @Override
    public void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
        if (fut.exchangeActions() != null && fut.exchangeActions().deactivate()) {
            this.onKernalStop(false);
        }
    }

    private void registerMBean(InetAddress locHost) {
        if (U.IGNITE_MBEANS_DISABLED) {
            return;
        }
        assert (this.rcvHubMBean == null);
        try {
            this.rcvHubMBean = U.registerMBean(this.proc.config().getMBeanServer(), this.proc.igniteInstanceName(), "Data center replication", "receiver hub", new DrReceiverMBeanImpl(this.proc, this.cfg, locHost), DrReceiverMBean.class);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Registered DR receiver hub MBean: " + this.rcvHubMBean);
            }
        }
        catch (JMException e) {
            U.error(this.log, "Failed to register DR receiver hub MBean.", e);
        }
    }

    private void unregisterMBean() {
        if (this.rcvHubMBean == null) {
            return;
        }
        assert (!U.IGNITE_MBEANS_DISABLED);
        try {
            this.proc.config().getMBeanServer().unregisterMBean(this.rcvHubMBean);
            this.rcvHubMBean = null;
            if (this.log.isDebugEnabled()) {
                this.log.debug("Unregistered DR receiver hub MBean: " + this.rcvHubMBean);
            }
        }
        catch (JMException e) {
            U.error(this.log, "Failed to unregister DR receiver hub MBean: " + this.rcvHubMBean, e);
        }
    }

    private boolean startServer(InetAddress addr, int port, DrReceiverConfiguration cfg, DrReceiverInMetricsAdapter metrics) throws IgniteCheckedException {
        PluginContext ctx = this.proc.context();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Trying to start a DR receiver hub [host=" + addr + ", port=" + port + ']');
        }
        try {
            this.execSvc = Executors.newFixedThreadPool(cfg.getWorkerThreads(), new IgniteThreadFactory(this.proc.igniteInstanceName(), "gridgain-receive-hub"));
            ArrayList<GridNioFilterAdapter> filterList = new ArrayList<GridNioFilterAdapter>();
            NioAsyncNotifyFilter notifyFilter = new NioAsyncNotifyFilter(this, this.proc.igniteInstanceName(), this.execSvc, cfg.getMessageQueueLimit(), metrics, this.log);
            filterList.add(notifyFilter);
            if (DrUtils.isNioMarshallerEnabled()) {
                filterList.add(new GridNioCodecFilter(new DrNioParser(new DrNioMessageMarshaller(this.usingBinaryMarshaller)), this.log, false));
            } else {
                filterList.add(new GridNioCodecFilter(new DrBufferedParserAdapter(cfg.isDirectBuffer(), new DrExternalMessageMarshaller(this.usingBinaryMarshaller)), this.log, false));
            }
            filterList.add(new GridConnectionBytesVerifyFilter(this.log));
            Factory<SSLContext> sslCtxFactory = this.proc.getSslContextFactory(cfg);
            if (sslCtxFactory != null) {
                GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtxFactory.create(), cfg.isDirectBuffer(), DrUtils.DR_BYTE_ORDER, this.log, null);
                filterList.add(sslFilter);
            }
            this.srvr = GridNioServer.builder().address(addr).port(port).listener(new NioListener()).filters(filterList.toArray(new GridNioFilter[filterList.size()])).logger(this.log).selectorCount(cfg.getSelectorCount()).sendQueueLimit(cfg.getMessageQueueLimit()).igniteInstanceName(this.proc.igniteInstanceName()).serverName("dr-rcv").byteOrder(DrUtils.DR_BYTE_ORDER).tcpNoDelay(cfg.isTcpNodelay()).directBuffer(cfg.isDirectBuffer()).idleTimeout(cfg.getIdleTimeout()).writeTimeout(cfg.getWriteTimeout()).socketSendBufferSize(cfg.getSocketSendBufferSize()).socketReceiveBufferSize(cfg.getSocketReceiveBufferSize()).serverName("dr-rcv").build();
            this.srvr.start();
            ctx.registerPort(port, IgnitePortProtocol.TCP, this.getClass());
            if (sslCtxFactory != null) {
                this.log.info("Started DR receiver hub [addr=" + addr + ", port=" + port + ", sslEnabled=true, proto=" + sslCtxFactory.create().getProtocol() + "useIgniteSslFactory=" + cfg.isUseIgniteSslContextFactory() + ", cfg=" + cfg + ']');
            } else {
                this.log.info("Started DR receiver hub [addr=" + addr + ", port=" + port + ", sslEnabled=false, cfg=" + cfg + ']');
            }
        }
        catch (IgniteCheckedException e) {
            if (e.hasCause(BindException.class)) {
                return false;
            }
            throw e;
        }
        return true;
    }

    void printMemoryStats() {
        X.println(">>>", new Object[0]);
        X.println(">>> DR receiver hub memory stats [igniteInstanceName=" + this.proc.igniteInstanceName() + ']', new Object[0]);
        X.println(">>>   streamersSize: " + this.ldrs.size(), new Object[0]);
        X.println(">>>   pendingMessagesCount: " + ((ThreadPoolExecutor)this.execSvc).getQueue().size(), new Object[0]);
    }

    private IgniteDataStreamer<KeyCacheObject, CacheObject> streamer(String cacheName) throws IgniteCheckedException {
        return this.wrappedStreamer(cacheName).streamer;
    }

    private DataStreamWrapper wrappedStreamer(String cacheName) throws IgniteCheckedException {
        String cacheName0 = CU.mask(cacheName);
        DataStreamWrapper wrapper = (DataStreamWrapper)this.ldrs.get(cacheName);
        if (wrapper == null) {
            DataStreamWrapper old;
            IgniteInternalCache<?, ?> cache = this.internalCache(cacheName);
            if (cache != null && cache.context() != null && !cache.context().conflictNeedResolve()) {
                LT.warn(this.log, "Conflict resolver is not configured for DR cache \"" + cacheName + "\". Please, check " + GridGainCacheConfiguration.class.getName() + ".");
            }
            DataStreamerImpl<KeyCacheObject, CacheObject> ldr = this.proc.kernalContext().dataStream().dataStreamer(cacheName);
            ldr.perNodeBufferSize(this.cfg.getPerNodeBufferSize());
            ldr.receiver(new IgniteDrDataStreamerCacheUpdater());
            ldr.perNodeParallelOperations(this.cfg.getPerNodeParallelLoadOperations());
            ldr.ioPolicyResolver(new StreamerIoPolicyResolver());
            if (this.cfg.getFlushFrequency() > 0L) {
                ldr.autoFlushFrequency(this.cfg.getFlushFrequency());
            }
            if ((old = this.ldrs.putIfAbsent(cacheName0, wrapper = new DataStreamWrapper(ldr))) != null) {
                block6: {
                    try {
                        ldr.close();
                    }
                    catch (IgniteException e) {
                        if ($assertionsDisabled) break block6;
                        throw new AssertionError((Object)("Exception should not be thrown on empty data streamer closing: " + e));
                    }
                }
                wrapper = old;
            }
        }
        return wrapper;
    }

    @Nullable
    private IgniteInternalCache<?, ?> internalCache(String cacheName) throws IgniteCheckedException {
        GridCacheAdapter cache = this.proc.kernalContext().cache().internalCache(cacheName);
        if (cache != null) {
            return cache;
        }
        DynamicCacheDescriptor desc = this.proc.kernalContext().cache().cacheDescriptor(cacheName);
        if (desc == null) {
            return null;
        }
        return this.proc.kernalContext().cache().getOrStartCache(cacheName);
    }

    private void processInMessage(GridNioSession ses, DrExternalMessage msg) {
        if (msg instanceof DrExternalBatchRequest) {
            this.processBatchRequest(ses, (DrExternalBatchRequest)msg);
        } else if (msg instanceof DrExternalHandshakeRequest) {
            this.processHandshakeRequest(ses, (DrExternalHandshakeRequest)msg);
        } else if (msg instanceof DrExternalPingRequest) {
            this.processPingRequest(ses, (DrExternalPingRequest)msg);
        } else if (msg instanceof DrExternalMetadataRequest) {
            this.processMetadataRequest(ses, (DrExternalMetadataRequest)msg);
        } else {
            U.warn(this.log, "Ignoring message of unknown type [msg=" + msg + ']');
            assert (false);
        }
    }

    private void processBatchRequest(final GridNioSession ses, DrExternalBatchRequest msg) {
        IgniteFuture<?> addDataFut;
        Collection<GridCacheRawVersionedEntry> reqEntries;
        DataStreamWrapper ldr;
        final Byte dataCenterId = (Byte)ses.meta(META_DATA_CENTER_ID);
        assert (dataCenterId != null);
        final String cacheName = msg.cacheName();
        final int reqEntryCnt = msg.entryCount();
        final int reqDataSize = msg.dataSize();
        final IgniteUuid reqId = msg.requestId();
        if (this.log.isTraceEnabled()) {
            this.log.trace("Incoming DR sender hub batch request [remoteAddr=" + ses.remoteAddress() + ", dataCenterId=" + dataCenterId + ", reqId=" + reqId + ", cacheName=" + cacheName + ", entryNum=" + reqEntryCnt + ", dataLen=" + reqDataSize + ']');
        }
        try {
            ldr = this.wrappedStreamer(cacheName);
        }
        catch (Throwable e) {
            if (this.DR_ACK_MISSING_CACHES && e instanceof IllegalStateException) {
                LT.warn(this.log, "Failed to apply DR batch for non-started cache: " + cacheName);
            } else {
                LT.error(this.log, e, "Failed to process DR batch request [remoteAddr=" + ses.remoteAddress() + ", dataCenterId=" + dataCenterId + ", reqId=" + reqId + ']');
            }
            this.rejectRequest(ses, reqId, e.getMessage());
            return;
        }
        if (ldr.isFailed()) {
            this.rejectRequest(ses, reqId, ldr.error().getMessage());
            return;
        }
        try {
            reqEntries = msg.data();
            CacheObjectContext cacheObjCtx = ldr.streamer().cacheObjectContext();
            for (GridCacheRawVersionedEntry entry : reqEntries) {
                entry.unmarshalKey(cacheObjCtx, this.marsh);
                if (this.metaMgr == null) continue;
                entry.unmarshal(cacheObjCtx, this.marsh);
                this.metaMgr.registerMetadataIfNeed(entry.getKey());
                this.metaMgr.registerMetadataIfNeed(entry.getValue());
            }
        }
        catch (Throwable e) {
            U.error(this.log, "Failed to prepare DR sender hub batch request [remoteAddr=" + ses.remoteAddress() + ", dataCenterId=" + dataCenterId + ", reqId=" + reqId + ']', e);
            this.rejectRequest(ses, reqId, "Failed to prepare DR sender hub batch request: " + e.getMessage());
            return;
        }
        this.proc.metrics().onReceiverHubBatchReceived(dataCenterId, cacheName, reqEntryCnt, reqDataSize);
        final long sndTime = System.nanoTime();
        try {
            addDataFut = ldr.streamer().addDataInternal(reqEntries, false);
        }
        catch (Exception e) {
            if (ldr.fail(e)) {
                this.proc.kernalContext().timeout().addTimeoutObject(new GridTimeoutObjectAdapter(STREAMER_RECREATING_TIMEOUT){

                    @Override
                    public void onTimeout() {
                        if (DrReceiver.this.ldrs.remove(cacheName, ldr)) {
                            ldr.close(false, DrReceiver.this.log);
                        }
                    }
                });
                String logMsg = "Failed to add data to streamer. It will be recreated in " + STREAMER_RECREATING_TIMEOUT + " ms [dataCenterId=" + dataCenterId + ", cacheName=" + cacheName + ", reqId=" + reqId + ']';
                if (this.log.isDebugEnabled()) {
                    this.log.error(logMsg, e);
                } else {
                    this.log.warning(logMsg);
                }
            }
            this.rejectRequest(ses, reqId, ldr.error().getMessage());
            return;
        }
        addDataFut.listen(new IgniteInClosure<IgniteFuture<?>>(){
            private static final long serialVersionUID = 0L;

            @Override
            public void apply(IgniteFuture<?> t2) {
                if (DrReceiver.this.stopped) {
                    return;
                }
                try {
                    t2.get();
                    DrReceiver.this.proc.metrics().onReceiverHubBatchAcked(dataCenterId, cacheName, reqEntryCnt, reqDataSize, () -> (System.nanoTime() - sndTime) / 1000000L);
                }
                catch (Throwable e) {
                    if (DrReceiver.this.stopped) {
                        return;
                    }
                    U.error(DrReceiver.this.log, "Failed to process DR sender hub batch request [remoteAddr=" + ses.remoteAddress() + ", reqId=" + reqId + ", cacheName=" + cacheName + ']', e);
                    DrReceiver.this.sendAcknowledge(ses, reqId, "Failed to process DR sender hub batch request: " + e.getMessage());
                    return;
                }
                DrReceiver.this.sendAcknowledge(ses, reqId, null);
            }
        });
        this.proc.metrics().onReceiverHubBatchSent(dataCenterId, cacheName, reqEntryCnt, reqDataSize);
    }

    private void processMetadataRequest(GridNioSession ses, DrExternalMetadataRequest req) {
        Exception err = null;
        MarshallerContextImpl marshCtx = this.proc.kernalContext().marshallerContext();
        try {
            for (Map.Entry<String, DrSenderMetadataHolder.Versioned<IgniteBiTuple<Byte, Integer>>> entry : req.metadata().entrySet()) {
                IgniteBiTuple<Byte, Integer> tup = entry.getValue().value();
                marshCtx.registerClassName(tup.get1(), tup.get2(), entry.getKey());
            }
            if (this.usingBinaryMarshaller) {
                CacheObjectBinaryProcessorImpl binaryProcessor = (CacheObjectBinaryProcessorImpl)this.proc.kernalContext().cacheObjects();
                for (DrSenderMetadataHolder.Versioned<BinaryMetadata> entry : req.binaryMetadata()) {
                    if (this.metaMgr != null) {
                        this.metaMgr.updateMetadata(entry.value());
                        continue;
                    }
                    binaryProcessor.addMeta(entry.value().typeId(), entry.value().wrap(binaryProcessor.binaryContext()), false);
                }
            }
        }
        catch (Exception e) {
            if (this.log.isDebugEnabled()) {
                U.warn(this.log, e);
            }
            err = e;
        }
        DrExternalMetadataResponse resp = new DrExternalMetadataResponse(req.version(), err != null ? err.getMessage() : null);
        ses.send(resp);
    }

    private void processPingRequest(GridNioSession ses, DrExternalPingRequest req) {
        Byte dataCenterId = (Byte)ses.meta(META_DATA_CENTER_ID);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Incoming DR sender hub ping request [remoteAddr=" + ses.remoteAddress() + ", dataCenterId=" + dataCenterId + "]");
        }
        ses.send(DrUtils.PING_RESP);
    }

    private void processHandshakeRequest(GridNioSession ses, DrExternalHandshakeRequest req) {
        if (this.log.isInfoEnabled()) {
            this.log.info("Incoming DR sender hub handshake request [remoteAddr=" + ses.remoteAddress() + ", dataCenterId=" + req.dataCenterId() + "]");
        }
        String errMsg = null;
        if (!"1.0-20140117".equals(req.protocolVersion())) {
            errMsg = "DR handshake failed because of different protocol versions [addr=" + ses.remoteAddress() + ", sndDataCenterId=" + req.dataCenterId() + ", sndProtoVer=" + req.protocolVersion() + ", rcvProtocolVer=" + "1.0-20140117" + ']';
        } else if (!this.marsh.getClass().getName().equals(req.marshallerClassName())) {
            errMsg = "DR handshake failed because of different marshaller implementations (please fix configuration and restart) [addr=" + ses.remoteAddress() + ", sndDataCenterId=" + req.dataCenterId() + ", sndMarsh=" + req.marshallerClassName() + ", rcvMarsh=" + this.marsh.getClass().getName() + ']';
        } else {
            ses.addMeta(META_DATA_CENTER_ID, req.dataCenterId());
            ses.addMeta(META_AWAIT_ACK, req.awaitAcknowledge());
            if (req.getTombstoneTtl() > this.tomstoneTtl()) {
                LT.warn(this.log, "Sender DataCenter has different tombstone TTL (sndTombstoneTTL > rcvTombstoneTTL) [addr=" + ses.remoteAddress() + ", sndDataCenterId=" + req.dataCenterId() + ", sndTombstoneTTL=" + req.getTombstoneTtl() + ", rcvTombstoneTTL=" + this.tomstoneTtl() + ']');
            }
        }
        if (errMsg != null) {
            LT.error(this.log, null, errMsg);
        }
        ses.send(new DrExternalHandshakeResponse(errMsg));
    }

    private void rejectRequest(GridNioSession ses, IgniteUuid reqId, String errMsg) {
        this.sendAcknowledge(ses, reqId, errMsg == null ? "Batch rejected." : errMsg);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Incoming DR request was skipped due to previous streamer exception [reqId=" + reqId + ", errMsg=" + errMsg + ']');
        }
    }

    private void sendAcknowledge(GridNioSession ses, IgniteUuid reqId, @Nullable String errMsg) {
        if (this.DR_ACK_MISSING_CACHES && errMsg != null) {
            errMsg = null;
        }
        if (((Boolean)Objects.requireNonNull(ses.meta(META_AWAIT_ACK))).booleanValue()) {
            ses.send(new DrExternalBatchResponse(reqId, errMsg));
        } else {
            ses.send(DrUtils.PING_RESP);
        }
    }

    private long tomstoneTtl() {
        return this.proc.kernalContext().cache().context().ttl().tombstoneTTL();
    }

    private void initTombstoneTTL() {
        this.defaultTombstoneTtl = this.proc.kernalContext().cache().context().ttl().tombstoneTTL();
        if (this.isIncrementalDrEnabled && this.tsTtl.get() == null && IgniteSystemProperties.getString("DEFAULT_TOMBSTONE_TTL") == null) {
            this.log.warning("Tombstone TTL is not configured whereas an incremental DR is enabled. Default TTL: " + this.defaultTombstoneTtl);
        }
    }

    private static final class NioAsyncNotifyFilter
    extends GridNioAsyncNotifyFilter {
        private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
        private final int messageQueueLimit;
        private final DrReceiverInMetricsAdapter metrics;
        private final DrReceiver receiver;

        NioAsyncNotifyFilter(DrReceiver receiver, String igniteInstanceName, Executor exec, int messageQueueLimit, DrReceiverInMetricsAdapter metrics, IgniteLogger log) {
            super(igniteInstanceName, exec, log);
            assert (metrics != null);
            assert (messageQueueLimit >= 0);
            this.messageQueueLimit = messageQueueLimit;
            this.metrics = metrics;
            this.receiver = receiver;
        }

        @Override
        public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
            if (msg instanceof DrExternalPingRequest) {
                this.receiver.processInMessage(ses, (DrExternalPingRequest)msg);
                return;
            }
            if (this.messageQueueLimit == 0) {
                super.onMessageReceived(ses, msg);
            } else {
                this.onMessageReceived(ses, msg, this.track(ses, (DrExternalMessage)msg));
            }
        }

        private Runnable track(GridNioSession ses, DrExternalMessage msg) {
            int msgSize = msg.size();
            if (this.messageQueueLimit > 0) {
                GridNioMessageTracker tracker = (GridNioMessageTracker)ses.meta(TRACKER_META);
                if (tracker == null) {
                    tracker = new GridNioMessageTracker(ses, this.messageQueueLimit);
                    ses.addMeta(TRACKER_META, tracker);
                }
                tracker.onMessageReceived();
                this.metrics.addMessageSize(msgSize);
                GridNioMessageTracker tracker0 = tracker;
                return () -> {
                    this.metrics.addMessageSize(-msgSize);
                    tracker0.onMessageProcessed();
                };
            }
            this.metrics.addMessageSize(msgSize);
            return () -> this.metrics.addMessageSize(-msgSize);
        }
    }

    private static final class StreamerIoPolicyResolver
    implements IgniteClosure<ClusterNode, Byte> {
        private static final long serialVersionUID = 0L;

        private StreamerIoPolicyResolver() {
        }

        @Override
        public Byte apply(ClusterNode node) {
            return (byte)34;
        }
    }

    private class Binder
    extends GridWorker {
        private final InetAddress host;
        private final int port;
        private final DrReceiverInMetricsAdapter metrics;

        private Binder(String igniteInstanceName, InetAddress host, int port, DrReceiverInMetricsAdapter metrics) {
            super(igniteInstanceName, "rcv-hub-binder", DrReceiver.this.log);
            this.host = host;
            this.port = port;
            this.metrics = metrics;
        }

        @Override
        protected void body() {
            while (!this.isCancelled()) {
                try {
                    U.sleep(3000L);
                }
                catch (IgniteInterruptedCheckedException ignore) {
                    if (!this.log.isDebugEnabled()) break;
                    this.log.debug("DR receiver hub TCP server binder thread was interrupted.");
                    break;
                }
                DrReceiver.this.stateLock.lock();
                try {
                    if (DrReceiver.this.stopped || DrReceiver.this.startServer(this.host, this.port, DrReceiver.this.cfg, this.metrics)) break;
                }
                finally {
                    DrReceiver.this.stateLock.unlock();
                }
            }
        }
    }

    private class NioListener
    extends GridNioServerListenerAdapter<DrExternalMessage> {
        private NioListener() {
        }

        @Override
        public void onConnected(GridNioSession ses) {
            if (DrReceiver.this.log.isDebugEnabled()) {
                DrReceiver.this.log.debug("Remote host has connected to receiver port [remoteAddr=" + ses.remoteAddress() + ", localAddr=" + ses.localAddress() + ']');
            }
        }

        @Override
        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
            if (ses.meta(META_DATA_CENTER_ID) != null) {
                if (DrReceiver.this.log.isInfoEnabled()) {
                    DrReceiver.this.log.info("Remote sender hub has disconnected [remoteAddr=" + ses.remoteAddress() + ", dataCenterId=" + ses.meta(META_DATA_CENTER_ID) + ", err=" + e + ']');
                }
            } else if (DrReceiver.this.log.isDebugEnabled()) {
                DrReceiver.this.log.debug("Remote host has disconnected [remoteAddr=" + ses.remoteAddress() + ", err=" + e + ']');
            }
        }

        @Override
        public void onMessage(GridNioSession ses, DrExternalMessage msg) {
            if (DrReceiver.this.stopped) {
                if (DrReceiver.this.log.isDebugEnabled()) {
                    DrReceiver.this.log.debug("Ignoring DR message (grid is stopping): " + ses);
                }
                return;
            }
            DrReceiver.this.processInMessage(ses, msg);
        }

        @Override
        public void onSessionWriteTimeout(GridNioSession ses) {
            Byte dataCenterId = (Byte)ses.meta(META_DATA_CENTER_ID);
            LT.warn(DrReceiver.this.log, "DR sender hub session write timed out (consider increasing 'writeTimeout' configuration property) [remoteAddr=" + ses.remoteAddress() + ", dataCenterId=" + dataCenterId + ", writeTimeout=" + DrReceiver.this.cfg.getWriteTimeout() + ']');
            if (DrReceiver.this.log.isDebugEnabled()) {
                DrReceiver.this.log.debug("Closing DR sender hub session on write timeout [remoteAddr=" + ses.remoteAddress() + ", dataCenterId=" + dataCenterId + ", writeTimeout=" + DrReceiver.this.cfg.getWriteTimeout() + ']');
            }
            ses.close();
        }

        @Override
        public void onSessionIdleTimeout(GridNioSession ses) {
            if (DrReceiver.this.log.isDebugEnabled()) {
                DrReceiver.this.log.debug("Closing DR sender hub session on idle timeout [remoteAddr=" + ses.remoteAddress() + ", dataCenterId=" + ses.meta(META_DATA_CENTER_ID) + ", idleTimeout=" + DrReceiver.this.cfg.getIdleTimeout() + ']');
            }
            ses.close();
        }
    }

    private static class DataStreamWrapper {
        private final DataStreamerImpl<KeyCacheObject, CacheObject> streamer;
        private volatile Exception error;

        private DataStreamWrapper(DataStreamerImpl<KeyCacheObject, CacheObject> streamer) {
            assert (streamer != null);
            this.streamer = streamer;
        }

        private DataStreamerImpl<KeyCacheObject, CacheObject> streamer() {
            return this.streamer;
        }

        private Exception error() {
            return this.error;
        }

        private boolean isFailed() {
            return this.error != null;
        }

        private boolean fail(Exception err) {
            if (this.error == null) {
                this.error = err;
                return true;
            }
            return false;
        }

        private void close(boolean cancel, IgniteLogger log) {
            block2: {
                try {
                    this.streamer.closeEx(cancel);
                }
                catch (IgniteCheckedException e) {
                    if (log == null || this.error != null && !log.isDebugEnabled()) break block2;
                    log.error("Failed to close DR data streamer [cache=" + this.streamer.cacheName() + ']', e);
                }
            }
        }
    }
}

