package org.gridgain.grid.internal.processors.dr;

import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.cache.CacheException;
import javax.cache.configuration.Factory;
import javax.management.JMException;
import javax.management.ObjectName;
import javax.net.ssl.SSLContext;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.MarshallerContextImpl;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.nio.GridBufferedParser;
import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter;
import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.spi.IgnitePortProtocol;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.IgniteThreadFactory;
import org.gridgain.grid.GridSystemProperties;
import org.gridgain.grid.configuration.DrReceiverConfiguration;
import org.gridgain.grid.dr.DrReceiverMBean;
import org.gridgain.grid.internal.processors.dr.DrSenderMetadataHolder;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalBatchRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalBatchResponse;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalHandshakeRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalHandshakeResponse;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalMetadataRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalMetadataResponse;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalPingRequest;
import org.jetbrains.annotations.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/gridgain/grid/internal/processors/dr/DrReceiver.class */
public class DrReceiver {
    private static final int META_DATA_CENTER_ID;
    private static final int META_AWAIT_ACK;
    private static final long REBIND_FREQ = 3000;
    private static final long STREAMER_RECREATING_TIMEOUT;
    private final DrProcessor proc;
    private final DrReceiverConfiguration cfg;
    private final IgniteLogger log;
    private final Marshaller marsh;
    private GridNioServer<byte[]> srvr;
    private ExecutorService execSvc;
    private GridWorker binder;
    private ObjectName rcvHubMBean;
    private volatile boolean stopped;
    private final boolean usingBinaryMarshaller;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final boolean DR_ACK_MISSING_CACHES = IgniteSystemProperties.getBoolean("DR_ACK_MISSING_CACHES");
    private final ConcurrentMap<String, DataStreamWrapper> ldrs = new ConcurrentHashMap();
    private final Lock stateLock = new ReentrantLock();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/grid/internal/processors/dr/DrReceiver$Binder.class */
    public class Binder extends GridWorker {
        private final InetAddress host;
        private final int port;

        private Binder(String str, InetAddress inetAddress, int i) {
            super(str, "rcv-hub-binder", DrReceiver.this.log);
            this.host = inetAddress;
            this.port = i;
        }

        protected void body() {
            while (!isCancelled()) {
                try {
                    U.sleep(DrReceiver.REBIND_FREQ);
                    DrReceiver.this.stateLock.lock();
                    try {
                        if (DrReceiver.this.stopped) {
                            return;
                        }
                        if (DrReceiver.this.startServer(this.host, this.port, DrReceiver.this.cfg)) {
                            return;
                        }
                    } catch (IgniteCheckedException e) {
                        if (!this.isCancelled) {
                            U.error(this.log, "Failed to start DR receiver hub TCP server.", e);
                        } else if (this.log.isDebugEnabled()) {
                            this.log.debug("DR receiver hub TCP server binder thread was interrupted.");
                        }
                        return;
                    } finally {
                        DrReceiver.this.stateLock.unlock();
                    }
                } catch (IgniteInterruptedCheckedException e2) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("DR receiver hub TCP server binder thread was interrupted.");
                        return;
                    }
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/grid/internal/processors/dr/DrReceiver$DataStreamWrapper.class */
    public static class DataStreamWrapper {
        private final DataStreamerImpl<KeyCacheObject, CacheObject> streamer;
        private volatile Exception error;
        static final /* synthetic */ boolean $assertionsDisabled;

        private DataStreamWrapper(DataStreamerImpl<KeyCacheObject, CacheObject> dataStreamerImpl) {
            if (!$assertionsDisabled && dataStreamerImpl == null) {
                throw new AssertionError();
            }
            this.streamer = dataStreamerImpl;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public DataStreamerImpl<KeyCacheObject, CacheObject> streamer() {
            return this.streamer;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Exception error() {
            return this.error;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isFailed() {
            return this.error != null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean fail(Exception exc) {
            if (this.error != null) {
                return false;
            }
            this.error = exc;
            return true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close(boolean z, IgniteLogger igniteLogger) {
            try {
                this.streamer.close(z);
            } catch (IgniteException | CacheException e) {
                if (igniteLogger != null) {
                    if (this.error == null || igniteLogger.isDebugEnabled()) {
                        igniteLogger.error("Failed to close DR data streamer [cache=" + this.streamer.cacheName() + ']', e);
                    }
                }
            }
        }

        static {
            $assertionsDisabled = !DrReceiver.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/grid/internal/processors/dr/DrReceiver$NioAsyncNotifyFilter.class */
    public static final class NioAsyncNotifyFilter extends GridNioAsyncNotifyFilter {
        private static final int TRACKER_META;
        private final int messageQueueLimit;
        private final DrReceiverInMetricsAdapter metrics;
        static final /* synthetic */ boolean $assertionsDisabled;

        NioAsyncNotifyFilter(String str, Executor executor, int i, DrReceiverInMetricsAdapter drReceiverInMetricsAdapter, IgniteLogger igniteLogger) {
            super(str, executor, igniteLogger);
            if (!$assertionsDisabled && drReceiverInMetricsAdapter == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && i < 0) {
                throw new AssertionError();
            }
            this.messageQueueLimit = i;
            this.metrics = drReceiverInMetricsAdapter;
        }

        public void onMessageReceived(GridNioSession gridNioSession, Object obj) throws IgniteCheckedException {
            onMessageReceived(gridNioSession, obj, track(gridNioSession, (byte[]) obj));
        }

        private Runnable track(GridNioSession gridNioSession, byte[] bArr) {
            int length = bArr.length;
            if (this.messageQueueLimit <= 0) {
                this.metrics.addMessageSize(length);
                return () -> {
                    this.metrics.addMessageSize(-length);
                };
            }
            GridNioMessageTracker gridNioMessageTracker = (GridNioMessageTracker) gridNioSession.meta(TRACKER_META);
            GridNioMessageTracker gridNioMessageTracker2 = gridNioMessageTracker;
            if (gridNioMessageTracker == null) {
                int i = TRACKER_META;
                GridNioMessageTracker gridNioMessageTracker3 = new GridNioMessageTracker(gridNioSession, this.messageQueueLimit);
                gridNioMessageTracker2 = gridNioMessageTracker3;
                gridNioSession.addMeta(i, gridNioMessageTracker3);
            }
            gridNioMessageTracker2.onMessageReceived();
            this.metrics.addMessageSize(length);
            GridNioMessageTracker gridNioMessageTracker4 = gridNioMessageTracker2;
            return () -> {
                this.metrics.addMessageSize(-length);
                gridNioMessageTracker4.onMessageProcessed();
            };
        }

        static {
            $assertionsDisabled = !DrReceiver.class.desiredAssertionStatus();
            TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/grid/internal/processors/dr/DrReceiver$NioListener.class */
    public class NioListener extends GridNioServerListenerAdapter<byte[]> {
        private NioListener() {
        }

        public void onConnected(GridNioSession gridNioSession) {
            DrReceiver.this.log.info("Remote DR sender hub connected [remoteAddr=" + gridNioSession.remoteAddress() + "]");
        }

        public void onDisconnected(GridNioSession gridNioSession, @Nullable Exception exc) {
            DrReceiver.this.log.info("Remote DR sender hub disconnected [remoteAddr=" + gridNioSession.remoteAddress() + ", dataCenterId=" + gridNioSession.meta(DrReceiver.META_DATA_CENTER_ID) + ", err=" + exc + ']');
        }

        public void onMessage(GridNioSession gridNioSession, byte[] bArr) {
            if (!DrReceiver.this.stopped) {
                DrReceiver.this.processInMessage(gridNioSession, bArr);
            } else if (DrReceiver.this.log.isDebugEnabled()) {
                DrReceiver.this.log.debug("Ignoring DR message (grid is stopping): " + gridNioSession);
            }
        }

        public void onSessionWriteTimeout(GridNioSession gridNioSession) {
            Byte b = (Byte) gridNioSession.meta(DrReceiver.META_DATA_CENTER_ID);
            LT.warn(DrReceiver.this.log, "DR sender hub session write timed out (consider increasing 'writeTimeout' configuration property) [remoteAddr=" + gridNioSession.remoteAddress() + ", dataCenterId=" + b + ", writeTimeout=" + DrReceiver.this.cfg.getWriteTimeout() + ']');
            if (DrReceiver.this.log.isDebugEnabled()) {
                DrReceiver.this.log.debug("Closing DR sender hub session on write timeout [remoteAddr=" + gridNioSession.remoteAddress() + ", dataCenterId=" + b + ", writeTimeout=" + DrReceiver.this.cfg.getWriteTimeout() + ']');
            }
            gridNioSession.close();
        }

        public void onSessionIdleTimeout(GridNioSession gridNioSession) {
            if (DrReceiver.this.log.isDebugEnabled()) {
                DrReceiver.this.log.debug("Closing DR sender hub session on idle timeout [remoteAddr=" + gridNioSession.remoteAddress() + ", dataCenterId=" + gridNioSession.meta(DrReceiver.META_DATA_CENTER_ID) + ", idleTimeout=" + DrReceiver.this.cfg.getIdleTimeout() + ']');
            }
            gridNioSession.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/grid/internal/processors/dr/DrReceiver$StreamerIoPolicyResolver.class */
    public static final class StreamerIoPolicyResolver implements IgniteClosure<ClusterNode, Byte> {
        private static final long serialVersionUID = 0;

        private StreamerIoPolicyResolver() {
        }

        public Byte apply(ClusterNode clusterNode) {
            return (byte) 34;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DrReceiver(DrProcessor drProcessor) {
        if (!$assertionsDisabled && drProcessor == null) {
            throw new AssertionError();
        }
        this.proc = drProcessor;
        this.log = drProcessor.context().log(DrReceiver.class);
        this.cfg = drProcessor.ggConfig().getDrReceiverConfiguration();
        if (!$assertionsDisabled && this.cfg == null) {
            throw new AssertionError();
        }
        this.marsh = drProcessor.config().getMarshaller();
        this.usingBinaryMarshaller = this.marsh instanceof BinaryMarshaller;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onKernalStart() throws IgniteCheckedException {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Starting DR receiver hub: " + this.cfg);
        }
        this.stateLock.lock();
        try {
            if (this.stopped) {
                return;
            }
            int localInboundPort = this.cfg.getLocalInboundPort();
            String localInboundHost = this.cfg.getLocalInboundHost();
            if (localInboundHost == null) {
                localInboundHost = this.proc.config().getLocalHost();
            }
            try {
                InetAddress byName = localInboundHost != null ? InetAddress.getByName(localInboundHost) : U.getLocalHost();
                registerMBean(byName);
                if (!startServer(byName, localInboundPort, this.cfg)) {
                    U.warn(this.log, "Failed to start DR receiver hub TCP server (retrying every 3000 ms). Another node on this host? [host=" + byName + ", port=" + localInboundPort + ']');
                    this.binder = new Binder(this.proc.igniteInstanceName(), byName, localInboundPort);
                    new IgniteThread(this.binder).start();
                }
                this.stateLock.unlock();
            } catch (IOException e) {
                throw new IgniteCheckedException("Failed to resolve DR receiver hub TCP server local inbound host: " + localInboundHost, e);
            }
        } finally {
            this.stateLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onKernalStop(boolean z) {
        PluginContext context = this.proc.context();
        this.stateLock.lock();
        try {
            if (this.stopped) {
                return;
            }
            this.stopped = true;
            this.log.info("Stopping a DR receiver hub: " + this.cfg);
            if (this.binder != null) {
                U.cancel(this.binder);
                U.join(this.binder, this.log);
            }
            if (this.srvr != null) {
                this.srvr.stop();
                context.deregisterPorts(getClass());
            }
            if (this.execSvc != null) {
                U.shutdownNow(DrReceiver.class, this.execSvc, this.log);
            }
            Iterator<DataStreamWrapper> it = this.ldrs.values().iterator();
            while (it.hasNext()) {
                it.next().close(z, this.log);
            }
            this.ldrs.clear();
            unregisterMBean();
        } finally {
            this.stateLock.unlock();
        }
    }

    private void registerMBean(InetAddress inetAddress) {
        if (U.IGNITE_MBEANS_DISABLED) {
            return;
        }
        try {
            this.rcvHubMBean = U.registerMBean(this.proc.config().getMBeanServer(), this.proc.igniteInstanceName(), "Data center replication", "receiver hub", new DrReceiverMBeanImpl(this.proc, this.cfg, inetAddress), DrReceiverMBean.class);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Registered DR receiver hub MBean: " + this.rcvHubMBean);
            }
        } catch (JMException e) {
            U.error(this.log, "Failed to register DR receiver hub MBean.", e);
        }
    }

    private void unregisterMBean() {
        if (this.rcvHubMBean == null) {
            return;
        }
        if (!$assertionsDisabled && U.IGNITE_MBEANS_DISABLED) {
            throw new AssertionError();
        }
        try {
            this.proc.config().getMBeanServer().unregisterMBean(this.rcvHubMBean);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Unregistered DR receiver hub MBean: " + this.rcvHubMBean);
            }
        } catch (JMException e) {
            U.error(this.log, "Failed to unregister DR receiver hub MBean: " + this.rcvHubMBean, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean startServer(InetAddress inetAddress, int i, DrReceiverConfiguration drReceiverConfiguration) throws IgniteCheckedException {
        PluginContext context = this.proc.context();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Trying to start a DR receiver hub [host=" + inetAddress + ", port=" + i + ']');
        }
        DrReceiverInMetricsAdapter registerReceiverMetrics = this.proc.metrics().registerReceiverMetrics();
        try {
            this.execSvc = Executors.newFixedThreadPool(drReceiverConfiguration.getWorkerThreads(), new IgniteThreadFactory(this.proc.igniteInstanceName(), "gridgain-receive-hub"));
            ArrayList arrayList = new ArrayList();
            arrayList.add(drReceiverConfiguration.getMessageQueueLimit() == 0 ? new GridNioAsyncNotifyFilter(this.proc.igniteInstanceName(), this.execSvc, this.log) : new NioAsyncNotifyFilter(this.proc.igniteInstanceName(), this.execSvc, drReceiverConfiguration.getMessageQueueLimit(), registerReceiverMetrics, this.log));
            arrayList.add(new GridNioCodecFilter(new GridBufferedParser(drReceiverConfiguration.isDirectBuffer(), DrUtils.DR_BYTE_ORDER), this.log, false));
            arrayList.add(new GridConnectionBytesVerifyFilter(this.log));
            Factory<SSLContext> sslContextFactory = this.proc.getSslContextFactory(drReceiverConfiguration);
            if (sslContextFactory != null) {
                GridNioSslFilter gridNioSslFilter = new GridNioSslFilter((SSLContext) sslContextFactory.create(), drReceiverConfiguration.isDirectBuffer(), DrUtils.DR_BYTE_ORDER, this.log);
                gridNioSslFilter.needClientAuth(true);
                gridNioSslFilter.wantClientAuth(true);
                arrayList.add(gridNioSslFilter);
            }
            this.srvr = GridNioServer.builder().address(inetAddress).port(i).listener(new NioListener()).filters((GridNioFilter[]) arrayList.toArray(new GridNioFilter[arrayList.size()])).logger(this.log).selectorCount(drReceiverConfiguration.getSelectorCount()).sendQueueLimit(drReceiverConfiguration.getMessageQueueLimit()).igniteInstanceName(this.proc.igniteInstanceName()).serverName("dr-rcv").byteOrder(DrUtils.DR_BYTE_ORDER).tcpNoDelay(drReceiverConfiguration.isTcpNodelay()).directBuffer(drReceiverConfiguration.isDirectBuffer()).idleTimeout(drReceiverConfiguration.getIdleTimeout()).writeTimeout(drReceiverConfiguration.getWriteTimeout()).socketSendBufferSize(drReceiverConfiguration.getSocketSendBufferSize()).socketReceiveBufferSize(drReceiverConfiguration.getSocketReceiveBufferSize()).serverName("dr-rcv").build();
            this.srvr.start();
            context.registerPort(i, IgnitePortProtocol.TCP, getClass());
            this.log.info("Started DR receiver hub [addr=" + inetAddress + ", port=" + i + ", cfg=" + drReceiverConfiguration + ']');
            return true;
        } catch (IgniteCheckedException e) {
            if (e.hasCause(new Class[]{BindException.class})) {
                return false;
            }
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void printMemoryStats() {
        X.println(">>>", new Object[0]);
        X.println(">>> DR receiver hub memory stats [igniteInstanceName=" + this.proc.igniteInstanceName() + ']', new Object[0]);
        X.println(">>>   streamersSize: " + this.ldrs.size(), new Object[0]);
        X.println(">>>   pendingMessagesCount: " + ((ThreadPoolExecutor) this.execSvc).getQueue().size(), new Object[0]);
    }

    private DataStreamWrapper streamer(String str) {
        String mask = CU.mask(str);
        DataStreamWrapper dataStreamWrapper = this.ldrs.get(str);
        if (dataStreamWrapper == null) {
            DataStreamerImpl dataStreamer = this.proc.kernalContext().dataStream().dataStreamer(str);
            dataStreamer.perNodeBufferSize(this.cfg.getPerNodeBufferSize());
            dataStreamer.receiver(new IgniteDrDataStreamerCacheUpdater());
            dataStreamer.perNodeParallelOperations(this.cfg.getPerNodeParallelLoadOperations());
            dataStreamer.ioPolicyResolver(new StreamerIoPolicyResolver());
            if (this.cfg.getFlushFrequency() > 0) {
                dataStreamer.autoFlushFrequency(this.cfg.getFlushFrequency());
            }
            dataStreamWrapper = new DataStreamWrapper(dataStreamer);
            DataStreamWrapper putIfAbsent = this.ldrs.putIfAbsent(mask, dataStreamWrapper);
            if (putIfAbsent != null) {
                try {
                    dataStreamer.close();
                } catch (IgniteException e) {
                    if (!$assertionsDisabled) {
                        throw new AssertionError("Exception should not be thrown on empty data streamer closing: " + e);
                    }
                }
                dataStreamWrapper = putIfAbsent;
            }
        }
        return dataStreamWrapper;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processInMessage(GridNioSession gridNioSession, byte[] bArr) {
        try {
            Object unmarshal = DrUtils.unmarshal(bArr, this.usingBinaryMarshaller);
            if (unmarshal instanceof DrExternalBatchRequest) {
                processBatchRequest(gridNioSession, (DrExternalBatchRequest) unmarshal);
            } else if (unmarshal instanceof DrExternalHandshakeRequest) {
                processHandshakeRequest(gridNioSession, (DrExternalHandshakeRequest) unmarshal);
            } else if (unmarshal instanceof DrExternalPingRequest) {
                processPingRequest(gridNioSession, (DrExternalPingRequest) unmarshal);
            } else if (unmarshal instanceof DrExternalMetadataRequest) {
                processMetadataRequest(gridNioSession, (DrExternalMetadataRequest) unmarshal);
            } else {
                U.warn(this.log, "Ignoring message of unknown type [msg=" + unmarshal + ']');
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
            }
        } catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to process incoming message.", e);
        }
    }

    private void processBatchRequest(final GridNioSession gridNioSession, DrExternalBatchRequest drExternalBatchRequest) {
        Byte b = (Byte) gridNioSession.meta(META_DATA_CENTER_ID);
        if (!$assertionsDisabled && b == null) {
            throw new AssertionError();
        }
        final String cacheName = drExternalBatchRequest.cacheName();
        final int entryCount = drExternalBatchRequest.entryCount();
        final int dataSize = drExternalBatchRequest.dataSize();
        final IgniteUuid requestId = drExternalBatchRequest.requestId();
        if (this.log.isTraceEnabled()) {
            this.log.trace("Incoming DR sender hub batch request [remoteAddr=" + gridNioSession.remoteAddress() + ", dataCenterId=" + b + ", reqId=" + requestId + ", cacheName=" + cacheName + ", entryNum=" + entryCount + ", dataLen=" + dataSize + ']');
        }
        try {
            final DataStreamWrapper streamer = streamer(cacheName);
            if (streamer.isFailed()) {
                rejectRequest(gridNioSession, requestId, streamer.error().getMessage());
                return;
            }
            try {
                Collection<GridCacheRawVersionedEntry> data = drExternalBatchRequest.data();
                CacheObjectContext cacheObjectContext = streamer.streamer().cacheObjectContext();
                Iterator<GridCacheRawVersionedEntry> it = data.iterator();
                while (it.hasNext()) {
                    it.next().unmarshalKey(cacheObjectContext, this.marsh);
                }
                this.proc.metrics().onReceiverHubBatchReceived(b.byteValue(), cacheName, entryCount, dataSize);
                final long nanoTime = System.nanoTime();
                try {
                    streamer.streamer().addDataInternal(data).listen(new IgniteInClosure<IgniteFuture<?>>() { // from class: org.gridgain.grid.internal.processors.dr.DrReceiver.2
                        private static final long serialVersionUID = 0;

                        public void apply(IgniteFuture<?> igniteFuture) {
                            if (DrReceiver.this.stopped) {
                                return;
                            }
                            String str = null;
                            try {
                                try {
                                    igniteFuture.get();
                                    DrReceiver.this.proc.metrics().onReceiverHubBatchAcked(cacheName, entryCount, dataSize, (System.nanoTime() - nanoTime) / 1000000);
                                    DrReceiver.this.sendAcknowledge(gridNioSession, requestId, null);
                                } catch (IgniteException | CacheException | NullPointerException e) {
                                    U.error(DrReceiver.this.log, "Failed to process DR sender hub batch request [remoteAddr=" + gridNioSession.remoteAddress() + ", reqId=" + requestId + ", cacheName=" + cacheName + ']', e);
                                    str = "Failed to process DR sender hub batch request: " + e.getMessage();
                                    DrReceiver.this.sendAcknowledge(gridNioSession, requestId, str);
                                }
                            } catch (Throwable th) {
                                DrReceiver.this.sendAcknowledge(gridNioSession, requestId, str);
                                throw th;
                            }
                        }
                    });
                    this.proc.metrics().onReceiverHubBatchSent(cacheName, entryCount, dataSize);
                } catch (Exception e) {
                    if (streamer.fail(e)) {
                        this.proc.kernalContext().timeout().addTimeoutObject(new GridTimeoutObjectAdapter(STREAMER_RECREATING_TIMEOUT) { // from class: org.gridgain.grid.internal.processors.dr.DrReceiver.1
                            public void onTimeout() {
                                if (DrReceiver.this.ldrs.remove(cacheName, streamer)) {
                                    streamer.close(false, DrReceiver.this.log);
                                }
                            }
                        });
                        String str = "Failed to add data to streamer. It will be recreated in " + STREAMER_RECREATING_TIMEOUT + " ms [dataCenterId=" + b + ", cacheName=" + cacheName + ", reqId=" + requestId + ']';
                        if (this.log.isDebugEnabled()) {
                            this.log.error(str, e);
                        } else {
                            this.log.warning(str);
                        }
                    }
                    rejectRequest(gridNioSession, requestId, streamer.error().getMessage());
                }
            } catch (IgniteCheckedException e2) {
                U.error(this.log, "Failed to prepare DR sender hub batch request [remoteAddr=" + gridNioSession.remoteAddress() + ", dataCenterId=" + b + ", reqId=" + requestId + ']', e2);
                rejectRequest(gridNioSession, requestId, "Failed to prepare DR sender hub batch request: " + e2.getMessage());
            }
        } catch (Exception e3) {
            if (this.DR_ACK_MISSING_CACHES && (e3 instanceof IllegalStateException)) {
                LT.warn(this.log, "Failed to apply DR batch for non-started cache: " + cacheName);
            } else {
                U.error(this.log, "Failed to process DR batch request [remoteAddr=" + gridNioSession.remoteAddress() + ", dataCenterId=" + b + ", reqId=" + requestId + ']', e3);
            }
            rejectRequest(gridNioSession, requestId, e3.getMessage());
        }
    }

    private void processMetadataRequest(GridNioSession gridNioSession, DrExternalMetadataRequest drExternalMetadataRequest) {
        Exception exc = null;
        MarshallerContextImpl marshallerContext = this.proc.kernalContext().marshallerContext();
        try {
            for (Map.Entry<String, DrSenderMetadataHolder.Versioned<IgniteBiTuple<Byte, Integer>>> entry : drExternalMetadataRequest.metadata().entrySet()) {
                IgniteBiTuple<Byte, Integer> value = entry.getValue().value();
                marshallerContext.registerClassName(((Byte) value.get1()).byteValue(), ((Integer) value.get2()).intValue(), entry.getKey());
            }
            if (this.usingBinaryMarshaller) {
                CacheObjectBinaryProcessorImpl cacheObjects = this.proc.kernalContext().cacheObjects();
                for (DrSenderMetadataHolder.Versioned<BinaryMetadata> versioned : drExternalMetadataRequest.binaryMetadata()) {
                    cacheObjects.addMeta(versioned.value().typeId(), versioned.value().wrap(cacheObjects.binaryContext()), false);
                }
            }
        } catch (Exception e) {
            if (this.log.isDebugEnabled()) {
                U.warn(this.log, e);
            }
            exc = e;
        }
        try {
            gridNioSession.send(DrUtils.marshal(new DrExternalMetadataResponse(drExternalMetadataRequest.version(), exc != null ? exc.getMessage() : null)));
        } catch (IgniteCheckedException e2) {
            U.error(this.log, e2);
        }
    }

    private void processPingRequest(GridNioSession gridNioSession, DrExternalPingRequest drExternalPingRequest) {
        Byte b = (Byte) gridNioSession.meta(META_DATA_CENTER_ID);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Incoming DR sender hub ping request [remoteAddr=" + gridNioSession.remoteAddress() + ", dataCenterId=" + b + "]");
        }
        gridNioSession.send(DrUtils.PING_RESP_BYTES);
    }

    private void processHandshakeRequest(GridNioSession gridNioSession, DrExternalHandshakeRequest drExternalHandshakeRequest) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Incoming DR sender hub handshake request [remoteAddr=" + gridNioSession.remoteAddress() + ", dataCenterId=" + ((int) drExternalHandshakeRequest.dataCenterId()) + "]");
        }
        String str = null;
        if (!DrProcessor.DR_PROTO_VER.equals(drExternalHandshakeRequest.protocolVersion())) {
            str = "DR handshake failed because of different protocol versions [addr=" + gridNioSession.remoteAddress() + ", sndDataCenterId=" + ((int) drExternalHandshakeRequest.dataCenterId()) + ", sndProtoVer=" + drExternalHandshakeRequest.protocolVersion() + ", rcvProtocolVer=" + DrProcessor.DR_PROTO_VER + ']';
        } else if (this.marsh.getClass().getName().equals(drExternalHandshakeRequest.marshallerClassName())) {
            gridNioSession.addMeta(META_DATA_CENTER_ID, Byte.valueOf(drExternalHandshakeRequest.dataCenterId()));
            gridNioSession.addMeta(META_AWAIT_ACK, Boolean.valueOf(drExternalHandshakeRequest.awaitAcknowledge()));
        } else {
            str = "DR handshake failed because of different marshaller implementations (please fix configuration and restart) [addr=" + gridNioSession.remoteAddress() + ", sndDataCenterId=" + ((int) drExternalHandshakeRequest.dataCenterId()) + ", sndMarsh=" + drExternalHandshakeRequest.marshallerClassName() + ", rcvMarsh=" + this.marsh.getClass().getName() + ']';
        }
        if (str != null) {
            LT.error(this.log, (Throwable) null, str);
        }
        try {
            gridNioSession.send(DrUtils.marshal(new DrExternalHandshakeResponse(str)));
        } catch (IgniteCheckedException e) {
            U.error(this.log, e);
        }
    }

    private void rejectRequest(GridNioSession gridNioSession, IgniteUuid igniteUuid, String str) {
        sendAcknowledge(gridNioSession, igniteUuid, str);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Incoming DR request was skipped due to previous streamer exception [reqId=" + igniteUuid + ", errMsg=" + str + ']');
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendAcknowledge(GridNioSession gridNioSession, IgniteUuid igniteUuid, String str) {
        try {
            if (this.DR_ACK_MISSING_CACHES && str != null) {
                str = null;
            }
            if (((Boolean) Objects.requireNonNull(gridNioSession.meta(META_AWAIT_ACK))).booleanValue()) {
                gridNioSession.send(DrUtils.marshal(new DrExternalBatchResponse(igniteUuid, str)));
            }
        } catch (IgniteCheckedException e) {
            U.error(this.log, e);
        }
    }

    static {
        $assertionsDisabled = !DrReceiver.class.desiredAssertionStatus();
        META_DATA_CENTER_ID = GridNioSessionMetaKey.nextUniqueKey();
        META_AWAIT_ACK = GridNioSessionMetaKey.nextUniqueKey();
        STREAMER_RECREATING_TIMEOUT = IgniteSystemProperties.getLong(GridSystemProperties.GG_RECEIVER_DR_STREAMER_RECREATE_TIMEOUT, 5000L);
    }
}
