package org.gridgain.grid.kernal.processors.dr;

import java.io.IOException;
import java.io.Serializable;
import java.net.BindException;
import java.net.InetAddress;
import java.nio.ByteOrder;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.gridgain.grid.GridDataLoader;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridInterruptedException;
import org.gridgain.grid.dr.GridDrReceiveHubConfiguration;
import org.gridgain.grid.kernal.GridKernalContext;
import org.gridgain.grid.kernal.processors.dataload.GridDataLoaderEx;
import org.gridgain.grid.kernal.processors.dr.messages.external.GridDrExternalBatchRequest;
import org.gridgain.grid.kernal.processors.dr.messages.external.GridDrExternalBatchResponse;
import org.gridgain.grid.kernal.processors.dr.messages.external.GridDrExternalHandshakeRequest;
import org.gridgain.grid.kernal.processors.dr.messages.external.GridDrExternalHandshakeResponse;
import org.gridgain.grid.kernal.processors.dr.messages.external.GridDrExternalPingRequest;
import org.gridgain.grid.kernal.processors.dr.messages.external.GridDrExternalPingResponse;
import org.gridgain.grid.lang.utils.GridConcurrentHashMap;
import org.gridgain.grid.logger.GridLogger;
import org.gridgain.grid.marshaller.GridMarshaller;
import org.gridgain.grid.spi.GridPortProtocol;
import org.gridgain.grid.thread.GridThread;
import org.gridgain.grid.typedef.F;
import org.gridgain.grid.typedef.X;
import org.gridgain.grid.typedef.internal.CU;
import org.gridgain.grid.typedef.internal.LT;
import org.gridgain.grid.typedef.internal.U;
import org.gridgain.grid.util.GridBusyLock;
import org.gridgain.grid.util.nio.GridBufferedParser;
import org.gridgain.grid.util.nio.GridConnectionBytesVerifyFilter;
import org.gridgain.grid.util.nio.GridNioAsyncNotifyFilter;
import org.gridgain.grid.util.nio.GridNioCodecFilter;
import org.gridgain.grid.util.nio.GridNioMessageTracker;
import org.gridgain.grid.util.nio.GridNioServer;
import org.gridgain.grid.util.nio.GridNioServerListener;
import org.gridgain.grid.util.nio.GridNioSession;
import org.gridgain.grid.util.nio.GridNioSessionMetaKey;
import org.gridgain.grid.util.worker.GridWorker;
import org.jetbrains.annotations.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/gridgain/grid/kernal/processors/dr/GridDrReceiveHub.class */
public class GridDrReceiveHub<K, V> {
    public static final int META_DATA_CENTER_ID;
    public static final long REBIND_FREQ = 3000;
    private static final int TRACKER_META;
    private final GridLogger log;
    private final GridKernalContext ctx;
    private GridMarshaller marsh;
    private GridNioServer srvr;
    private ExecutorService execSvc;
    private GridThread binder;
    private GridConcurrentHashMap<String, GridDataLoaderEx<K, Object>> ldrs;
    private byte[] pingRespData;
    private GridDrReceiveHubConfiguration cfg;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final GridDrExternalPingResponse pingResp = new GridDrExternalPingResponse();
    private final GridBusyLock busyLock = new GridBusyLock();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/grid/kernal/processors/dr/GridDrReceiveHub$NioListener.class */
    public class NioListener implements GridNioServerListener<byte[]> {
        static final /* synthetic */ boolean $assertionsDisabled;

        private NioListener() {
        }

        @Override // org.gridgain.grid.util.nio.GridNioServerListener
        public void onConnected(GridNioSession gridNioSession) {
            if (GridDrReceiveHub.this.log.isDebugEnabled()) {
                GridDrReceiveHub.this.log.debug("Replication send hub connected [remoteAddr=" + gridNioSession.remoteAddress() + "]");
            }
            if (GridDrReceiveHub.this.cfg.getMessageQueueLimit() <= 0 || ((GridNioMessageTracker) gridNioSession.meta(GridDrReceiveHub.TRACKER_META)) != null) {
                return;
            }
            GridNioMessageTracker gridNioMessageTracker = (GridNioMessageTracker) gridNioSession.addMeta(GridDrReceiveHub.TRACKER_META, new GridNioMessageTracker(gridNioSession, GridDrReceiveHub.this.cfg.getMessageQueueLimit()));
            if (!$assertionsDisabled && gridNioMessageTracker != null) {
                throw new AssertionError();
            }
        }

        @Override // org.gridgain.grid.util.nio.GridNioServerListener
        public void onDisconnected(GridNioSession gridNioSession, @Nullable Exception exc) {
            if (GridDrReceiveHub.this.log.isDebugEnabled()) {
                GridDrReceiveHub.this.log.debug("Replication send hub disconnected [remoteAddr=" + gridNioSession.remoteAddress() + ", dataCenterId=" + gridNioSession.meta(GridDrReceiveHub.META_DATA_CENTER_ID) + ", e=" + exc + ']');
            }
        }

        @Override // org.gridgain.grid.util.nio.GridNioServerListener
        public void onMessage(GridNioSession gridNioSession, byte[] bArr) {
            if (!GridDrReceiveHub.this.busyLock.enterBusy()) {
                if (GridDrReceiveHub.this.log.isDebugEnabled()) {
                    GridDrReceiveHub.this.log.debug("Ignoring message (grid is stopping): " + gridNioSession);
                    return;
                }
                return;
            }
            GridNioMessageTracker gridNioMessageTracker = (GridNioMessageTracker) gridNioSession.meta(GridDrReceiveHub.TRACKER_META);
            if (gridNioMessageTracker != null) {
                gridNioMessageTracker.onMessageReceived();
            }
            try {
                onMessage0(gridNioSession, bArr);
                if (gridNioMessageTracker != null) {
                    gridNioMessageTracker.apply();
                }
                GridDrReceiveHub.this.busyLock.leaveBusy();
            } catch (Throwable th) {
                if (gridNioMessageTracker != null) {
                    gridNioMessageTracker.apply();
                }
                GridDrReceiveHub.this.busyLock.leaveBusy();
                throw th;
            }
        }

        private void onMessage0(GridNioSession gridNioSession, byte[] bArr) {
            try {
                Serializable serializable = (Serializable) GridDrReceiveHub.this.marsh.unmarshal(bArr, (ClassLoader) null);
                if (GridDrReceiveHub.this.log.isDebugEnabled()) {
                    GridDrReceiveHub.this.log.debug("Incoming message: " + serializable);
                }
                if (!(serializable instanceof GridDrExternalBatchRequest)) {
                    if (!(serializable instanceof GridDrExternalHandshakeRequest)) {
                        if (serializable instanceof GridDrExternalPingRequest) {
                            sendPreparedResponse(gridNioSession, serializable, GridDrReceiveHub.this.pingResp, GridDrReceiveHub.this.pingRespData);
                            return;
                        }
                        return;
                    } else {
                        GridDrExternalHandshakeRequest gridDrExternalHandshakeRequest = (GridDrExternalHandshakeRequest) serializable;
                        if (GridDrReceiveHub.this.log.isDebugEnabled()) {
                            GridDrReceiveHub.this.log.debug("Received sender hub handshake request [remoteAddr=" + gridNioSession.remoteAddress() + ", dataCenterId=" + ((int) gridDrExternalHandshakeRequest.dataCenterId()) + "]");
                        }
                        gridNioSession.addMeta(GridDrReceiveHub.META_DATA_CENTER_ID, Byte.valueOf(gridDrExternalHandshakeRequest.dataCenterId()));
                        sendResponse(gridNioSession, gridDrExternalHandshakeRequest, new GridDrExternalHandshakeResponse());
                        return;
                    }
                }
                GridDrExternalBatchRequest gridDrExternalBatchRequest = (GridDrExternalBatchRequest) serializable;
                try {
                    try {
                        if (((Byte) gridNioSession.meta(GridDrReceiveHub.META_DATA_CENTER_ID)) == null) {
                            throw new GridException("Cannot determine data center ID.");
                        }
                        GridDataLoader dataLoader = GridDrReceiveHub.this.dataLoader(gridDrExternalBatchRequest.cacheName());
                        if (!$assertionsDisabled && dataLoader == null) {
                            throw new AssertionError();
                        }
                        gridDrExternalBatchRequest.prepare(GridDrReceiveHub.this.marsh);
                        for (GridDrEntryInfo<K, V> gridDrEntryInfo : gridDrExternalBatchRequest.data()) {
                            gridDrEntryInfo.unmarshall(GridDrReceiveHub.this.marsh);
                            dataLoader.addData((GridDataLoader) gridDrEntryInfo.key(), (K) F.t(gridDrEntryInfo.value(), gridDrEntryInfo.version()));
                        }
                        dataLoader.flush();
                        sendResponse(gridNioSession, gridDrExternalBatchRequest, new GridDrExternalBatchResponse(gridDrExternalBatchRequest.requestId(), null));
                    } catch (Throwable th) {
                        sendResponse(gridNioSession, gridDrExternalBatchRequest, new GridDrExternalBatchResponse(gridDrExternalBatchRequest.requestId(), null));
                        throw th;
                    }
                } catch (Throwable th2) {
                    U.error(GridDrReceiveHub.this.log, "Failed to process replication request.", th2);
                    sendResponse(gridNioSession, gridDrExternalBatchRequest, new GridDrExternalBatchResponse(gridDrExternalBatchRequest.requestId(), th2));
                }
            } catch (GridException e) {
                U.error(GridDrReceiveHub.this.log, "Failed to unmarshall message with default class loader.", e);
            }
        }

        private <R, S> void sendResponse(GridNioSession gridNioSession, R r, S s) {
            try {
                sendPreparedResponse(gridNioSession, r, s, GridDrReceiveHub.this.marsh.marshal(s));
            } catch (GridException e) {
                U.error(GridDrReceiveHub.this.log, "Failed to send response for request: " + r, e);
            }
        }

        private <R, S> void sendPreparedResponse(GridNioSession gridNioSession, R r, S s, byte[] bArr) {
            gridNioSession.send(bArr);
            if (GridDrReceiveHub.this.log.isDebugEnabled()) {
                GridDrReceiveHub.this.log.debug("Response has been sent [req=" + r + ", res=" + s + ']');
            }
        }

        @Override // org.gridgain.grid.util.nio.GridNioServerListener
        public void onSessionWriteTimeout(GridNioSession gridNioSession) {
            LT.warn(GridDrReceiveHub.this.log, null, "NIO session write timed out (consider increasing 'writeTimeout' configuration property) [writeTimeout=" + GridDrReceiveHub.this.cfg.getWriteTimeout() + "]");
            if (GridDrReceiveHub.this.log.isDebugEnabled()) {
                GridDrReceiveHub.this.log.debug("Closing NIO session on write timeout: " + gridNioSession);
            }
            gridNioSession.close();
        }

        @Override // org.gridgain.grid.util.nio.GridNioServerListener
        public void onSessionIdleTimeout(GridNioSession gridNioSession) {
            if (GridDrReceiveHub.this.log.isDebugEnabled()) {
                GridDrReceiveHub.this.log.debug("Closing idle NIO session: " + gridNioSession);
            }
            gridNioSession.close();
        }

        static {
            $assertionsDisabled = !GridDrReceiveHub.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GridDrReceiveHub(GridKernalContext gridKernalContext) {
        this.ctx = gridKernalContext;
        this.log = gridKernalContext.log(GridDrReceiveHub.class);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() throws GridException {
        this.cfg = this.ctx.config().getReplicationReceiveHubConfiguration();
        if (!$assertionsDisabled && this.cfg == null) {
            throw new AssertionError();
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Starting replication server.");
        }
        final int localInboundPort = this.cfg.getLocalInboundPort();
        String localInboundHost = this.cfg.getLocalInboundHost();
        if (localInboundHost == null) {
            localInboundHost = this.ctx.config().getLocalHost();
        }
        try {
            final InetAddress byName = localInboundHost != null ? InetAddress.getByName(localInboundHost) : U.getLocalHost();
            this.ldrs = new GridConcurrentHashMap<>();
            this.marsh = this.ctx.config().getMarshaller();
            this.pingRespData = this.marsh.marshal(this.pingResp);
            if (startServer(byName, localInboundPort, this.cfg)) {
                return;
            }
            U.warn(this.log, "Failed to start replication TCP server (retrying every 3000 ms). Another node on this host? [locHost=" + byName + ", port=" + localInboundPort + ']');
            this.binder = new GridThread(new GridWorker(this.ctx.gridName(), "grid-replication-binder", this.log) { // from class: org.gridgain.grid.kernal.processors.dr.GridDrReceiveHub.1
                @Override // org.gridgain.grid.util.worker.GridWorker
                public void body() {
                    while (!Thread.currentThread().isInterrupted()) {
                        try {
                            U.sleep(3000L);
                            try {
                                if (GridDrReceiveHub.this.startServer(byName, localInboundPort, GridDrReceiveHub.this.cfg)) {
                                    return;
                                }
                            } catch (GridException e) {
                                U.error(GridDrReceiveHub.this.log, "Failed to start TCP replication server.", e);
                                return;
                            }
                        } catch (GridInterruptedException e2) {
                            if (GridDrReceiveHub.this.log.isDebugEnabled()) {
                                GridDrReceiveHub.this.log.debug("Binder thread was interrupted.");
                                return;
                            }
                            return;
                        }
                    }
                }
            });
            this.binder.start();
        } catch (IOException e) {
            throw new GridException("Configuration parameter 'localHost' cannot be resolved to local address.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop(boolean z, boolean z2) {
        if (z) {
            stopLoaders(true);
        }
        this.busyLock.block();
        if (!z) {
            stopLoaders(false);
        }
        U.interrupt(this.binder);
        U.join(this.binder, this.log);
        GridNioServer gridNioServer = this.srvr;
        if (gridNioServer != null) {
            gridNioServer.stop();
        }
        U.shutdownNow(GridDrReceiveHub.class, this.execSvc, this.log);
        this.ctx.ports().deregisterPorts(getClass());
    }

    private void stopLoaders(boolean z) {
        Iterator<GridDataLoaderEx<K, Object>> it = this.ldrs.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close(z);
            } catch (GridException e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean startServer(InetAddress inetAddress, int i, GridDrReceiveHubConfiguration gridDrReceiveHubConfiguration) throws GridException {
        try {
            this.execSvc = Executors.newFixedThreadPool(gridDrReceiveHubConfiguration.getWorkerThreads());
            this.srvr = GridNioServer.builder().address(inetAddress).port(i).listener(new NioListener()).filters(new GridNioAsyncNotifyFilter(this.ctx.gridName(), this.execSvc, this.log), new GridNioCodecFilter(new GridBufferedParser(gridDrReceiveHubConfiguration.isDirectBuffer(), ByteOrder.nativeOrder()), this.log), new GridConnectionBytesVerifyFilter(this.log)).logger(this.log).selectorCount(gridDrReceiveHubConfiguration.getSelectorCount()).sendQueueLimit(gridDrReceiveHubConfiguration.getMessageQueueLimit()).gridName(this.ctx.gridName()).byteOrder(ByteOrder.nativeOrder()).tcpNoDelay(gridDrReceiveHubConfiguration.isTcpNodelay()).directBuffer(gridDrReceiveHubConfiguration.isDirectBuffer()).idleTimeout(gridDrReceiveHubConfiguration.getIdleTimeout()).writeTimeout(gridDrReceiveHubConfiguration.getWriteTimeout()).build();
            this.srvr.start();
            this.ctx.ports().registerPort(i, GridPortProtocol.TCP, getClass());
            if (this.log.isDebugEnabled()) {
                this.log.debug("Started replication server [addr=" + inetAddress + ", port=" + i + ']');
            }
            return true;
        } catch (GridException e) {
            if (e.hasCause(BindException.class)) {
                return false;
            }
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public GridDataLoader<K, Object> dataLoader(String str) {
        String mask = CU.mask(str);
        GridDataLoaderEx<K, Object> gridDataLoaderEx = this.ldrs.get(mask);
        if (gridDataLoaderEx == null) {
            gridDataLoaderEx = (GridDataLoaderEx) this.ctx.dataLoad().dataLoader(CU.unmask(mask));
            gridDataLoaderEx.jobFactory(new GridDrDataLoaderJobFactory());
            gridDataLoaderEx.perNodeParallelLoadOperations(Runtime.getRuntime().availableProcessors());
            gridDataLoaderEx.autoFlushFrequency(this.cfg.getFlushFrequency());
            GridDataLoaderEx<K, Object> putIfAbsent = this.ldrs.putIfAbsent(mask, gridDataLoaderEx);
            if (putIfAbsent != null) {
                try {
                    gridDataLoaderEx.close(true);
                } catch (GridException e) {
                }
                gridDataLoaderEx = putIfAbsent;
            }
        }
        return gridDataLoaderEx;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void printMemoryStats() {
        X.println(">>>", new Object[0]);
        X.println(">>> DR receive hub memory stats [grid=" + this.ctx.gridName() + ']', new Object[0]);
        X.println(">>>   dataLoadersSize: " + this.ldrs.size(), new Object[0]);
        X.println(">>>   pendingMessagesCount: " + ((ThreadPoolExecutor) this.execSvc).getQueue().size(), new Object[0]);
    }

    static {
        $assertionsDisabled = !GridDrReceiveHub.class.desiredAssertionStatus();
        META_DATA_CENTER_ID = GridNioSessionMetaKey.nextUniqueKey();
        TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
    }
}
