package org.gridgain.grid.kernal.processors.dr.ent;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.channels.spi.AbstractInterruptibleChannel;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridUuid;
import org.gridgain.grid.dr.hub.sender.GridDrSenderHubConfiguration;
import org.gridgain.grid.dr.hub.sender.store.memory.GridDrSenderHubInMemoryStore;
import org.gridgain.grid.kernal.GridKernalContext;
import org.gridgain.grid.kernal.processors.dr.GridDrUtils;
import org.gridgain.grid.kernal.processors.dr.messages.external.GridDrExternalBatchResponse;
import org.gridgain.grid.kernal.processors.dr.messages.external.GridDrExternalHandshakeRequest;
import org.gridgain.grid.kernal.processors.dr.messages.external.GridDrExternalHandshakeResponse;
import org.gridgain.grid.kernal.processors.dr.messages.external.GridDrExternalPingResponse;
import org.gridgain.grid.lang.GridBiTuple;
import org.gridgain.grid.logger.GridLogger;
import org.gridgain.grid.marshaller.GridMarshaller;
import org.gridgain.grid.util.nio.GridNioServerBuffer;
import org.gridgain.grid.util.tostring.GridToStringExclude;
import org.gridgain.grid.util.typedef.F;
import org.gridgain.grid.util.typedef.X;
import org.gridgain.grid.util.typedef.internal.LT;
import org.gridgain.grid.util.typedef.internal.S;
import org.gridgain.grid.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/gridgain/grid/kernal/processors/dr/ent/GridDrSenderRemoteDataCenterNode.class */
public class GridDrSenderRemoteDataCenterNode<K, V> {
    private final GridEntDrProcessor<K, V> proc;
    private final GridLogger log;

    @GridToStringExclude
    private final GridMarshaller marsh;

    @GridToStringExclude
    private final GridDrSenderRemoteDataCenter dataCenter;

    @GridToStringExclude
    private final GridDrSenderHubConfiguration cfg;
    private final InetAddress locAddr;
    private final InetSocketAddress remoteAddr;
    private final byte dataCenterId;
    private final boolean awaitAck;

    @GridToStringExclude
    private ByteBuffer reqBuf;
    private GridDrSenderHubRequest batchReq;
    private long lastReadTime;
    private int failedAttempts;
    private long lastDisconnTime;
    private long lastOfflineTime;
    private boolean pingRequested;

    @GridToStringExclude
    private long pingSndTime;
    private boolean forceDisconnect;
    private boolean forceOffline;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Deque<GridBiTuple<GridDrSenderHubRequest, ByteBuffer>> sndQueue = new ArrayDeque();
    private final Map<GridUuid, GridDrSenderHubRequest> ackQueue = new LinkedHashMap();
    private final Map<GridUuid, Integer> errQueue = new HashMap();

    @GridToStringExclude
    private final ByteBuffer readBuf = ByteBuffer.allocate(GridDrSenderHubInMemoryStore.DFLT_MAX_SIZE);

    @GridToStringExclude
    private final GridNioServerBuffer srvBuf = new GridNioServerBuffer();
    private DataCenterNodeState nodeState = DataCenterNodeState.NOT_CONNECTED;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/grid/kernal/processors/dr/ent/GridDrSenderRemoteDataCenterNode$DataCenterNodeState.class */
    public enum DataCenterNodeState {
        NOT_CONNECTED,
        CONNECTING,
        CONNECTED;

        private static final DataCenterNodeState[] VALS = values();

        @Nullable
        public static DataCenterNodeState fromOrdinal(int i) {
            if (i < 0 || i >= VALS.length) {
                return null;
            }
            return VALS[i];
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GridDrSenderRemoteDataCenterNode(GridEntDrProcessor<K, V> gridEntDrProcessor, GridDrSenderRemoteDataCenter gridDrSenderRemoteDataCenter, InetAddress inetAddress, InetSocketAddress inetSocketAddress) throws GridException {
        if (!$assertionsDisabled && gridEntDrProcessor == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && gridDrSenderRemoteDataCenter == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && inetSocketAddress == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && inetAddress == null) {
            throw new AssertionError();
        }
        this.proc = gridEntDrProcessor;
        this.dataCenter = gridDrSenderRemoteDataCenter;
        GridKernalContext context = gridEntDrProcessor.context();
        this.log = context.log(GridDrSenderRemoteDataCenterNode.class);
        this.marsh = context.config().getMarshaller();
        this.cfg = context.config().getDrSenderHubConfiguration();
        this.dataCenterId = context.config().getDataCenterId();
        this.awaitAck = gridDrSenderRemoteDataCenter.awaitAcknowledge();
        this.locAddr = inetAddress;
        this.remoteAddr = inetSocketAddress;
    }

    private void connect(Selector selector) {
        if (!$assertionsDisabled && selector == null) {
            throw new AssertionError();
        }
        this.nodeState = DataCenterNodeState.CONNECTING;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Initiated connection on replica node: " + this);
        }
        SelectionKey selectionKey = null;
        try {
            SocketChannel open = SocketChannel.open();
            open.configureBlocking(false);
            open.socket().bind(new InetSocketAddress(this.locAddr, 0));
            selectionKey = open.register(selector, 8);
            selectionKey.attach(this);
            open.connect(this.remoteAddr);
        } catch (IOException e) {
            U.error(this.log, "Connection failure: " + this, e);
            this.failedAttempts++;
            disconnect(selectionKey);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void disconnect(@Nullable SelectionKey selectionKey) {
        DataCenterNodeState dataCenterNodeState = this.nodeState;
        this.nodeState = DataCenterNodeState.NOT_CONNECTED;
        SocketChannel socketChannel = selectionKey != null ? (SocketChannel) selectionKey.channel() : null;
        if (socketChannel != null) {
            Socket socket = socketChannel.socket();
            if (socket.isConnected() && this.log.isDebugEnabled()) {
                this.log.debug("Replica node disconnected: " + this);
            }
            U.closeQuiet(socket);
            U.closeQuiet((AbstractInterruptibleChannel) socketChannel);
        }
        U.closeQuiet(selectionKey);
        for (Map.Entry<GridUuid, GridDrSenderHubRequest> entry : this.ackQueue.entrySet()) {
            this.dataCenter.onReject(entry.getKey(), entry.getValue());
        }
        long currentTimeMillis = U.currentTimeMillis();
        this.lastDisconnTime = currentTimeMillis;
        if (this.forceOffline || this.failedAttempts >= this.cfg.getMaxFailedConnectAttempts()) {
            this.lastOfflineTime = currentTimeMillis;
        }
        this.ackQueue.clear();
        this.sndQueue.clear();
        this.errQueue.clear();
        this.readBuf.clear();
        this.srvBuf.reset();
        this.reqBuf = null;
        this.pingRequested = false;
        this.pingSndTime = 0L;
        this.lastReadTime = 0L;
        this.forceDisconnect = false;
        this.forceOffline = false;
        this.dataCenter.onNodeDisconnect(dataCenterNodeState == DataCenterNodeState.CONNECTED);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onConnect(SelectionKey selectionKey) {
        if (!$assertionsDisabled && this.nodeState != DataCenterNodeState.CONNECTING) {
            throw new AssertionError();
        }
        try {
            ((SocketChannel) selectionKey.channel()).finishConnect();
            this.lastReadTime = U.currentTimeMillis();
            this.reqBuf = ByteBuffer.wrap(GridDrUtils.marshal(new GridDrExternalHandshakeRequest(this.dataCenterId, GridEntDrProcessor.DR_PROTO_VER, this.marsh.getClass().getName(), this.awaitAck)));
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connection has been established, will send handshake request: " + this);
            }
        } catch (ConnectException e) {
            if (this.log.isDebugEnabled()) {
                U.error(this.log, "Connection failure (connection refused?) [node=" + this + ']', e);
            }
            LT.warn(this.log, e, "Connection to receiver hub of remote data center failed (is it online?) [node=" + this + ", err=" + e + ']');
            this.failedAttempts++;
            disconnect(selectionKey);
        } catch (IOException e2) {
            U.error(this.log, "Connection failure [node=" + this + ']', e2);
            this.failedAttempts++;
            disconnect(selectionKey);
        } catch (GridException e3) {
            U.error(this.log, "Failed to marshal handshake request: " + this, e3);
            this.failedAttempts++;
            disconnect(selectionKey);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onRead(SelectionKey selectionKey) throws GridException, IOException {
        if (!$assertionsDisabled && selectionKey == null) {
            throw new AssertionError();
        }
        this.readBuf.clear();
        if (((ReadableByteChannel) selectionKey.channel()).read(this.readBuf) == -1) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Socket channel has reached end-of-stream, will disconnect: " + this);
            }
            disconnect(selectionKey);
        } else {
            this.lastReadTime = U.currentTimeMillis();
            this.readBuf.flip();
            while (this.readBuf.hasRemaining()) {
                byte[] read = this.srvBuf.read(this.readBuf);
                if (read != null) {
                    processInMessage(read);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onWrite(SelectionKey selectionKey) throws IOException {
        while (true) {
            if (this.nodeState == DataCenterNodeState.CONNECTED && this.reqBuf == null) {
                GridBiTuple<GridDrSenderHubRequest, ByteBuffer> poll = this.sndQueue.poll();
                if (poll != null) {
                    this.batchReq = poll.get1();
                    if (this.batchReq != null) {
                        this.batchReq.sendTime(System.nanoTime());
                    }
                    this.reqBuf = poll.get2();
                } else {
                    this.batchReq = null;
                    this.reqBuf = null;
                }
            }
            if (this.reqBuf == null) {
                return;
            }
            ((WritableByteChannel) selectionKey.channel()).write(this.reqBuf);
            if (this.pingRequested && this.pingSndTime == 0) {
                this.pingSndTime = U.currentTimeMillis();
            }
            if (this.reqBuf.remaining() != 0) {
                return;
            }
            if (this.batchReq != null) {
                this.proc.metrics0().onSenderHubBatchSent(this.dataCenter.id(), this.batchReq.cacheName(), this.batchReq.entryCount(), this.batchReq.byteCount());
                if (!this.awaitAck) {
                    acknowledge(this.batchReq.id(), null);
                }
            }
            this.batchReq = null;
            this.reqBuf = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processOutMessage(GridUuid gridUuid, GridDrSenderHubRequest gridDrSenderHubRequest) {
        if (!$assertionsDisabled && this.nodeState != DataCenterNodeState.CONNECTED) {
            throw new AssertionError();
        }
        this.ackQueue.put(gridUuid, gridDrSenderHubRequest);
        this.sndQueue.add(F.t(gridDrSenderHubRequest, gridDrSenderHubRequest.data()));
    }

    private void processInMessage(byte[] bArr) throws GridException {
        Object unmarshal = GridDrUtils.unmarshal(bArr);
        if (unmarshal instanceof GridDrExternalBatchResponse) {
            if (!$assertionsDisabled && !this.awaitAck) {
                throw new AssertionError();
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Incoming batch response [msg=" + unmarshal + ", replicaNode=" + this + ']');
            }
            GridDrExternalBatchResponse gridDrExternalBatchResponse = (GridDrExternalBatchResponse) unmarshal;
            acknowledge(gridDrExternalBatchResponse.requestId(), gridDrExternalBatchResponse.errorMessage());
            return;
        }
        if (!(unmarshal instanceof GridDrExternalHandshakeResponse)) {
            if (!(unmarshal instanceof GridDrExternalPingResponse)) {
                U.warn(this.log, "Ignoring message of unknown type [msg=" + unmarshal + ", replicaNode=" + this + ']');
                return;
            }
            if (this.log.isTraceEnabled()) {
                this.log.trace("Incoming ping response [msg=" + unmarshal + ", replicaNode=" + this + ']');
            }
            this.pingRequested = false;
            this.pingSndTime = 0L;
            return;
        }
        GridDrExternalHandshakeResponse gridDrExternalHandshakeResponse = (GridDrExternalHandshakeResponse) unmarshal;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Incoming handshake response [msg=" + unmarshal + ", replicaNode=" + this + ']');
        }
        if (gridDrExternalHandshakeResponse.errorMessage() != null) {
            LT.error(this.log, null, gridDrExternalHandshakeResponse.errorMessage());
            mustDisconnect(true);
            return;
        }
        this.nodeState = DataCenterNodeState.CONNECTED;
        this.failedAttempts = 0;
        this.lastOfflineTime = 0L;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Replica node connected: " + this);
        }
        this.dataCenter.onNodeConnect();
    }

    private void acknowledge(GridUuid gridUuid, @Nullable String str) {
        if (!this.ackQueue.containsKey(gridUuid)) {
            if (!$assertionsDisabled && this.errQueue.containsKey(gridUuid)) {
                throw new AssertionError();
            }
            return;
        }
        if (str == null) {
            GridDrSenderHubRequest remove = this.ackQueue.remove(gridUuid);
            if (this.awaitAck) {
                this.proc.metrics0().onSenderHubBatchAcked(this.dataCenter.id(), remove.cacheName(), remove.entryCount(), remove.data().array().length, (System.nanoTime() - remove.sendTime()) / 1000000);
            }
            this.errQueue.remove(gridUuid);
            this.dataCenter.onResponse(gridUuid, remove);
            return;
        }
        Integer num = this.errQueue.get(gridUuid);
        if (num == null) {
            this.errQueue.put(gridUuid, 1);
        } else {
            this.errQueue.put(gridUuid, Integer.valueOf(num.intValue() + 1));
        }
        int i = 0;
        Iterator<Integer> it = this.errQueue.values().iterator();
        while (it.hasNext()) {
            i += it.next().intValue();
        }
        if (i > this.cfg.getMaxErrors()) {
            mustDisconnect(true);
            return;
        }
        GridDrSenderHubRequest remove2 = this.ackQueue.remove(gridUuid);
        this.ackQueue.put(gridUuid, remove2);
        this.sndQueue.add(F.t((GridDrSenderHubRequest) null, remove2.data()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void refreshBeforeReplica(Selector selector) {
        long currentTimeMillis = U.currentTimeMillis();
        switch (this.nodeState) {
            case CONNECTED:
                long readTimeout = this.cfg.getReadTimeout();
                if (this.pingRequested && this.pingSndTime != 0 && currentTimeMillis - this.pingSndTime > this.cfg.getSystemRequestTimeout()) {
                    U.warn(this.log, "Ping response was not obtained in a timely manner. Node will disconnect: " + this);
                    mustDisconnect(false);
                    return;
                } else {
                    if (this.pingRequested || readTimeout == 0 || currentTimeMillis - this.lastReadTime <= readTimeout) {
                        return;
                    }
                    this.sndQueue.addFirst(F.t((GridDrSenderHubRequest) null, ByteBuffer.wrap(GridDrUtils.PING_REQ_BYTES)));
                    this.pingRequested = true;
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("Sender hub will send ping request: " + this);
                        return;
                    }
                    return;
                }
            case CONNECTING:
                if (this.lastReadTime == 0 || currentTimeMillis - this.lastReadTime <= this.cfg.getSystemRequestTimeout()) {
                    return;
                }
                if (this.log.isDebugEnabled()) {
                    U.warn(this.log, "Handshake response was not obtained in a timely manner. Node will disconnect: " + this);
                }
                this.failedAttempts++;
                mustDisconnect(false);
                return;
            case NOT_CONNECTED:
                long reconnectOnFailureTimeout = this.cfg.getReconnectOnFailureTimeout();
                if ((this.lastOfflineTime == 0 || reconnectOnFailureTimeout <= 0) ? currentTimeMillis - this.lastDisconnTime > this.cfg.getHealthCheckFrequency() : currentTimeMillis - this.lastOfflineTime > reconnectOnFailureTimeout) {
                    connect(selector);
                    return;
                }
                return;
            default:
                if (!$assertionsDisabled) {
                    throw new AssertionError("Unknown node state.");
                }
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void refreshAfterReplica(SelectionKey selectionKey) {
        SocketChannel socketChannel;
        if (this.forceDisconnect) {
            disconnect(selectionKey);
            return;
        }
        if (this.nodeState == DataCenterNodeState.CONNECTED) {
            if (this.reqBuf == null && this.sndQueue.isEmpty()) {
                selectionKey.interestOps(1);
                return;
            } else {
                selectionKey.interestOps(5);
                return;
            }
        }
        if (this.nodeState == DataCenterNodeState.CONNECTING && (socketChannel = (SocketChannel) selectionKey.channel()) != null && socketChannel.isConnected()) {
            if (this.reqBuf != null) {
                selectionKey.interestOps(5);
            } else {
                selectionKey.interestOps(1);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void printMemoryStats() {
        X.println(">>>", new Object[0]);
        X.println(">>> DR replica node memory stats [node=" + this + ']', new Object[0]);
        X.println(">>>   sndQueueSize: " + this.sndQueue.size(), new Object[0]);
        X.println(">>>   errQueueSize: " + this.errQueue.size(), new Object[0]);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean operational() {
        return this.nodeState == DataCenterNodeState.CONNECTED && !this.forceDisconnect;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean notOffline() {
        return this.lastOfflineTime == 0;
    }

    void mustDisconnect(boolean z) {
        this.forceDisconnect = true;
        this.forceOffline = z;
    }

    public String toString() {
        return S.toString(GridDrSenderRemoteDataCenterNode.class, this);
    }

    static {
        $assertionsDisabled = !GridDrSenderRemoteDataCenterNode.class.desiredAssertionStatus();
    }
}
