/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.dr;

import java.io.IOException;
import java.net.BindException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.cache.configuration.Factory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.thread.IgniteThread;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrManager;
import org.gridgain.grid.internal.processors.dr.DrProcessor;
import org.gridgain.grid.internal.processors.dr.DrSenderHealthCheckScheduler;
import org.gridgain.grid.internal.processors.dr.DrSenderMetadataHolder;
import org.gridgain.grid.internal.processors.dr.DrSenderRemoteDataCenter;
import org.gridgain.grid.internal.processors.dr.DrSenderRemoteDataCenterNodeStateProcessor;
import org.gridgain.grid.internal.processors.dr.DrSenderRequest;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalBatchResponse;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalHandshakeRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalHandshakeResponse;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalMessage;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalMetadataRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalMetadataResponse;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalPingRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalPingResponse;

public class DrSenderRemoteDataCenterNode {
    static final int DR_SENDER_NODE = GridNioSessionMetaKey.nextUniqueKey();
    @GridToStringExclude
    private final DrProcessor proc;
    @GridToStringExclude
    private final IgniteLogger log;
    @GridToStringExclude
    private final String marshClsName;
    @GridToStringExclude
    private final DrSenderRemoteDataCenter dataCenter;
    @GridToStringExclude
    private final DrSenderRemoteDataCenterNodeStateProcessor stateProcessor;
    @GridToStringExclude
    private final DrSenderConfiguration cfg;
    private final boolean usingBinaryMarshaller;
    private final Collection<SocketAddress> locAddrs;
    private final InetSocketAddress rmtAddr;
    private final byte localDataCenterId;
    @GridToStringExclude
    private final Deque<DrSenderRequest> sndQueue = new ArrayDeque<DrSenderRequest>();
    @GridToStringExclude
    private final Map<IgniteUuid, DrSenderRequest> ackMap = new LinkedHashMap<IgniteUuid, DrSenderRequest>();
    @GridToStringExclude
    private final Map<IgniteUuid, Integer> errMap = new HashMap<IgniteUuid, Integer>();
    @GridToStringExclude
    private GridNioSession ses;
    @GridToStringExclude
    private GridNioServer<DrExternalMessage> nioSrv;
    @GridToStringExclude
    private GridWorker connectWorker;
    private long metaVer;
    private int metaErrorCnt;
    private volatile boolean stopping;
    @GridToStringExclude
    private volatile DrSenderMetadataHolder prevSendMeta;
    private GridFutureAdapter<Void> handShake;

    DrSenderRemoteDataCenterNode(DrProcessor proc, DrSenderRemoteDataCenter dataCenter, Collection<SocketAddress> locAddrs, InetSocketAddress rmtAddr) {
        assert (proc != null);
        assert (dataCenter != null);
        assert (rmtAddr != null);
        assert (!F.isEmpty(locAddrs));
        this.proc = proc;
        this.dataCenter = dataCenter;
        this.locAddrs = locAddrs;
        this.rmtAddr = rmtAddr;
        this.localDataCenterId = proc.ggConfig().getDataCenterId();
        this.cfg = proc.ggConfig().getDrSenderConfiguration();
        Marshaller marshaller = proc.config().getMarshaller();
        this.usingBinaryMarshaller = marshaller instanceof BinaryMarshaller;
        this.marshClsName = marshaller.getClass().getName();
        this.log = proc.context().log(DrSenderRemoteDataCenterNode.class);
        this.stateProcessor = new DrSenderRemoteDataCenterNodeStateProcessor(this.cfg, this.log, this);
        this.handShake = new GridFutureAdapter();
    }

    void start(GridNioServer<DrExternalMessage> srv, DrSenderHealthCheckScheduler scheduler) {
        assert (this.nioSrv == null);
        assert (srv != null);
        this.nioSrv = srv;
        scheduler.add(this.stateProcessor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void connect() {
        assert (this.nioSrv != null);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Initiated connection on replica node: " + this);
        }
        GridWorker worker = null;
        DrSenderRemoteDataCenterNode drSenderRemoteDataCenterNode = this;
        synchronized (drSenderRemoteDataCenterNode) {
            DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState state = this.stateProcessor.getNodeState();
            if (state == DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState.CONNECTING || state == DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState.CONNECTED) {
                return;
            }
            this.stateProcessor.changeState(DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState.CONNECTING);
            assert (this.connectWorker == null) : state;
            if (!this.stopping) {
                worker = this.connectWorker = new GridWorker(this.proc.igniteInstanceName(), "dr-node-" + this.rmtAddr + "-connection-worker", this.log){

                    @Override
                    protected void body() {
                        DrSenderRemoteDataCenterNode.this.connectInternal();
                    }
                };
            }
        }
        if (worker != null) {
            new IgniteThread(worker).start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connectInternal() {
        SocketChannel ch = null;
        boolean connected = false;
        try {
            if (this.stopping) {
                return;
            }
            ch = DrSenderRemoteDataCenterNode.connectSocket(this.locAddrs, this.rmtAddr);
            GridNioSession ses = this.createSession(ch);
            connected = true;
            if (this.log.isDebugEnabled()) {
                this.log.debug("Established a network connection to a remote receiver hub: " + ses);
            }
            this.submit(() -> this.handshake(ses));
        }
        catch (ConnectException e) {
            U.warn(this.log, e.getMessage());
        }
        catch (Exception e) {
            U.error(this.log, "Connection failure: " + this, e);
        }
        finally {
            DrSenderRemoteDataCenterNode e = this;
            synchronized (e) {
                if (this.stopping) {
                    U.closeQuiet(ch);
                }
                this.connectWorker = null;
                if (!connected) {
                    U.closeQuiet(ch);
                    if (!this.stopping) {
                        this.submit(() -> this.disconnect(false));
                    }
                }
            }
        }
    }

    public static SocketChannel connectSocket(Collection<SocketAddress> locAddrs, InetSocketAddress rmtAddr) throws IOException {
        InetAddress[] resolvedRmtAddrs;
        try {
            resolvedRmtAddrs = InetAddress.getAllByName(rmtAddr.getHostString());
        }
        catch (UnknownHostException e) {
            throw new UnknownHostException("Could not resolve replica hostname: " + rmtAddr);
        }
        ArrayList<IOException> errs = null;
        ArrayList<BindException> bindErrs = null;
        ArrayList<String> msgs = new ArrayList<String>();
        SocketChannel ch = null;
        block5: for (SocketAddress addr : locAddrs) {
            for (InetAddress rmtIp : resolvedRmtAddrs) {
                try {
                    ch = SocketChannel.open();
                    ch.configureBlocking(true);
                    ch.socket().setReuseAddress(true);
                    ch.socket().bind(addr);
                    ch.connect(new InetSocketAddress(rmtIp, rmtAddr.getPort()));
                    return ch;
                }
                catch (BindException e) {
                    U.closeQuiet(ch);
                    ch = null;
                    if (bindErrs == null) {
                        bindErrs = new ArrayList<BindException>();
                    }
                    bindErrs.add(e);
                    msgs.add("[addr=" + addr + ", err=" + e.getMessage() + "]");
                    continue block5;
                }
                catch (IOException e) {
                    U.closeQuiet(ch);
                    ch = null;
                    if (errs == null) {
                        errs = new ArrayList<IOException>();
                    }
                    errs.add(e);
                    msgs.add("[rmtIp=" + rmtIp.getHostAddress() + ", err=" + e.getMessage() + "]");
                }
            }
        }
        SocketException e = bindErrs != null && errs == null ? new BindException("Could not bind to local addresses [locAddrs=" + Arrays.toString(locAddrs.toArray()) + ", errs=" + msgs + "]") : new ConnectException("Could not connect to replica [locAddrs=" + Arrays.toString(locAddrs.toArray()) + ", rmtAddr=" + rmtAddr.getHostString() + ":" + rmtAddr.getPort() + ", errs=" + msgs + "]");
        if (errs != null) {
            for (IOException iOException : errs) {
                e.addSuppressed(iOException);
            }
        }
        if (bindErrs != null) {
            for (IOException iOException : bindErrs) {
                e.addSuppressed(iOException);
            }
        }
        throw e;
    }

    private GridNioSession createSession(SocketChannel ch) throws IgniteCheckedException, IOException {
        assert (ch != null);
        Factory<SSLContext> sslCtxFactory = this.proc.getSslContextFactory(this.cfg);
        HashMap<Integer, Object> meta = new HashMap<Integer, Object>();
        if (sslCtxFactory == null) {
            ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
        } else {
            SSLEngine sslEngine = sslCtxFactory.create().createSSLEngine();
            assert (sslEngine != null);
            sslEngine.setUseClientMode(true);
            BlockingSslHandler sslHnd = new BlockingSslHandler(sslEngine, ch, true, DrUtils.DR_BYTE_ORDER, this.log);
            if (!sslHnd.handshake()) {
                throw new IgniteCheckedException("Failed to perform SSL handshake.");
            }
            ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
            GridSslMeta sslMeta = new GridSslMeta();
            sslMeta.sslEngine(sslEngine);
            meta.put(GridNioSessionMetaKey.SSL_META.ordinal(), sslMeta);
        }
        meta.put(DR_SENDER_NODE, this);
        return (GridNioSession)this.nioSrv.createSession(ch, meta, false, null).get();
    }

    private void handshake(GridNioSession ses0) {
        try {
            assert (ses0 != null);
            assert (this.ses == null);
            this.ses = ses0;
            DrExternalHandshakeRequest req = new DrExternalHandshakeRequest(this.localDataCenterId, "1.0-20140117", this.marshClsName, this.dataCenter.awaitAcknowledge(), this.proc.senderHub().tomstoneTtl());
            this.ses.send(req);
            this.stateProcessor.notifyHandshakeSent();
        }
        catch (Exception e) {
            U.error(this.log, "Failed to make handshake. [node=" + this + ']', e);
        }
    }

    void disconnect(boolean crashed) {
        if (this.ses != null) {
            this.nioSrv.close(this.ses);
            this.ses = null;
        }
        this.ackMap.values().forEach(this.dataCenter::onReject);
        this.ackMap.clear();
        this.sndQueue.clear();
        this.errMap.clear();
        DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState nodeState0 = this.stateProcessor.getNodeState();
        this.stateProcessor.notifyDisconnect(crashed);
        this.metaVer = 0L;
        this.dataCenter.onNodeDisconnect(this, nodeState0 == DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState.CONNECTED);
    }

    InetSocketAddress rmtAddr() {
        return this.rmtAddr;
    }

    void enqueueOutMessage(DrSenderRequest msg) {
        this.sndQueue.addLast(msg);
        this.ackMap.put(msg.id(), msg);
    }

    void sendMetaIfNeeded(DrSenderMetadataHolder meta) {
        DrSenderMetadataHolder meta0 = meta;
        if (meta0 == null) {
            DrSenderMetadataHolder storedMeta = this.dataCenter.getMetadata(this.metaVer);
            if (storedMeta != null && storedMeta.version() > this.metaVer) {
                meta0 = storedMeta;
            } else {
                return;
            }
        }
        DrSenderMetadataHolder metaToSend = meta0;
        this.handShake.listen(f -> {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Send meta called with node state=" + (Object)((Object)this.stateProcessor.getNodeState()));
            }
            this.submit(() -> this.sendMeta(metaToSend));
        });
    }

    private void sendMeta(DrSenderMetadataHolder meta) {
        if (this.stateProcessor.getNodeState() != DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState.CONNECTED) {
            return;
        }
        DrExternalMetadataRequest metaReq = new DrExternalMetadataRequest(meta.version(), meta.metadata(), meta.binaryMetadata().values());
        this.prevSendMeta = meta;
        if (this.log.isTraceEnabled()) {
            this.log.trace("Send updated meta to receiver.");
        }
        this.ses.send(metaReq);
    }

    void sendRequests() {
        DrSenderRequest req;
        if (this.stateProcessor.getNodeState() != DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState.CONNECTED) {
            return;
        }
        while ((req = this.sndQueue.poll()) != null) {
            this.sendRequest(req);
        }
    }

    private void sendRequest(DrSenderRequest req) {
        IgniteUuid id = req.id();
        String cacheName = req.cacheName();
        int bytesCnt = req.byteCount();
        int entriesCnt = req.entryCount();
        req.sendTime(System.nanoTime());
        GridNioFuture<?> fut = this.ses.send(req.data());
        fut.chain(f -> {
            if (!this.dataCenter.awaitAcknowledge()) {
                boolean failed = false;
                try {
                    f.get();
                }
                catch (Throwable err) {
                    LT.error(this.log, err, "Failed to send batch [rmtAddr=" + this.rmtAddr + ", err=" + err + ']');
                    failed = true;
                }
                this.acknowledge(id, failed);
            }
            this.proc.metrics().onSenderHubBatchSent(this.dataCenter.dataCenterId(), cacheName, entriesCnt, bytesCnt);
            return f;
        }, this.proc.senderHub()::submit);
    }

    void processInMessage(GridNioSession msgSes, DrExternalMessage msg) {
        if (msgSes != this.ses) {
            if (this.log.isTraceEnabled()) {
                LT.error(this.log, null, "Message from different session won't to be processed, rejected.");
            }
            return;
        }
        this.stateProcessor.notifyDataReceived();
        if (msg instanceof DrExternalPingRequest) {
            this.processPingResponse(DrUtils.PING_RESP);
            return;
        }
        this.submit(() -> {
            if (msgSes != this.ses) {
                return;
            }
            try {
                if (msg instanceof DrExternalBatchResponse) {
                    this.processBatchResponse((DrExternalBatchResponse)msg);
                } else if (msg instanceof DrExternalHandshakeResponse) {
                    this.processHandshakeResponse((DrExternalHandshakeResponse)msg);
                } else if (msg instanceof DrExternalPingResponse) {
                    this.processPingResponse((DrExternalPingResponse)msg);
                } else if (msg instanceof DrExternalMetadataResponse) {
                    this.processMetaResponse((DrExternalMetadataResponse)msg);
                } else {
                    U.warn(this.log, "Ignoring message of unknown type [msg=" + msg + ", replicaNode=" + this + ']');
                    assert (false);
                }
            }
            catch (Throwable th) {
                this.log.error("Failed to process incoming message.", th);
                this.disconnect(true);
            }
        });
    }

    void processMetaResponse(DrExternalMetadataResponse msg) {
        if (msg.errorMessage() == null) {
            this.metaErrorCnt = 0;
            this.metaVer = msg.version();
            this.sendRequests();
        } else {
            ++this.metaErrorCnt;
            LT.error(this.log, null, msg.errorMessage());
            if (this.tooMuchErrors()) {
                this.disconnect(true);
            } else {
                this.sendMeta(this.prevSendMeta);
            }
        }
    }

    private void processPingResponse(DrExternalPingResponse msg) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Incoming ping response [msg=" + msg + ", replicaNode=" + this + ']');
        }
        this.stateProcessor.notifyPingResponseArrived();
    }

    private void processBatchResponse(DrExternalBatchResponse msg) {
        String errMsg;
        assert (this.dataCenter.awaitAcknowledge());
        if (this.log.isDebugEnabled()) {
            this.log.debug("Incoming batch response [msg=" + msg + ", replicaNode=" + this + ']');
        }
        if ((errMsg = msg.errorMessage()) != null) {
            if (errMsg.startsWith("Cache doesn't exist: ")) {
                String cacheName = errMsg.substring(21);
                LT.error(this.log, null, "Replica node failed to apply batch because cache doesn't exist at the receiver. Replication of the cache will be stopped. [cacheName=" + cacheName + ']');
                try {
                    IgniteInternalCache cache = this.proc.kernalContext().cache().getOrStartCache(cacheName);
                    CacheDrManager mgr = DrUtils.drManagerSafe(cache.context().dr(), cacheName);
                    if (!mgr.drStatus().stopped()) {
                        mgr.onBatchFailed(Collections.singletonMap(this.proc.kernalContext().localNodeId(), new Exception("Replica node failed to apply batch because cache doesn't exist at the receiver. Replication of the cache will be stopped. [cacheName=" + cacheName + ']')));
                    }
                }
                catch (IgniteCheckedException e) {
                    LT.error(this.log, e, "Cannot start cache that doesn't exist on receiver. [cacheName=" + cacheName + ']');
                }
            } else {
                LT.error(this.log, null, "Replica node failed to apply batch [msg=" + msg + ", replicaNode=" + this + ']');
            }
        }
        this.acknowledge(msg.requestId(), msg.errorMessage() != null);
    }

    private void processHandshakeResponse(DrExternalHandshakeResponse res) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Incoming handshake response [msg=" + res + ", replicaNode=" + this + ']');
        }
        assert (this.stateProcessor.getNodeState() == DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState.CONNECTING);
        if (res.errorMessage() != null) {
            LT.error(this.log, null, "Handshake failed [rmtAddr=" + this.rmtAddr + ", err=" + res.errorMessage() + ']');
            this.disconnect(true);
            return;
        }
        this.stateProcessor.changeState(DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState.CONNECTED);
        this.handShake.onDone();
        this.dataCenter.onNodeConnect(this);
    }

    private boolean tooMuchErrors() {
        int totalErrCnt = this.metaErrorCnt;
        for (Integer errCnt : this.errMap.values()) {
            totalErrCnt += errCnt.intValue();
        }
        return totalErrCnt >= this.cfg.getMaxErrors();
    }

    private void acknowledge(IgniteUuid id, boolean err) {
        DrSenderRequest req = this.ackMap.get(id);
        if (req == null) {
            assert (!this.errMap.containsKey(id));
            return;
        }
        if (!err) {
            this.ackMap.remove(id);
            this.errMap.remove(id);
            this.dataCenter.onResponse(id, req);
            if (this.dataCenter.awaitAcknowledge()) {
                this.proc.metrics().onSenderHubBatchAcked(this.dataCenter.dataCenterId(), req.cacheName(), req.entryCount(), req.data().length, (System.nanoTime() - req.sendTime()) / 1000000L);
            }
        } else {
            this.proc.metrics().onSenderHubBatchFailed(this.dataCenter.dataCenterId(), req.cacheName(), req.entryCount(), req.data().length);
            this.errMap.merge(id, 1, Integer::sum);
            if (this.tooMuchErrors()) {
                this.disconnect(true);
            } else {
                this.sndQueue.addLast(req);
            }
        }
    }

    private IgniteInternalFuture<Void> submit(Runnable runnable) {
        return this.proc.senderHub().submit(runnable);
    }

    public boolean isConnected() {
        return this.stateProcessor.getNodeState() == DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState.CONNECTED;
    }

    boolean isCrashed() {
        return this.stateProcessor.getNodeState() == DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState.CRASHED;
    }

    boolean isPaused() {
        return this.stateProcessor.getNodeState() == DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState.PAUSED;
    }

    void sendPing() {
        this.ses.send(DrUtils.PING_REQ);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void stop() {
        DrSenderRemoteDataCenterNode drSenderRemoteDataCenterNode = this;
        synchronized (drSenderRemoteDataCenterNode) {
            this.stopping = true;
            if (this.connectWorker != null) {
                this.connectWorker.cancel();
            }
            this.connectWorker = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    IgniteInternalFuture<Void> pause() {
        DrSenderRemoteDataCenterNode drSenderRemoteDataCenterNode = this;
        synchronized (drSenderRemoteDataCenterNode) {
            if (this.connectWorker != null) {
                this.connectWorker.cancel();
            }
            this.connectWorker = null;
        }
        return this.submit(() -> {
            this.disconnect(false);
            this.stateProcessor.changeState(DrSenderRemoteDataCenterNodeStateProcessor.DataCenterNodeState.PAUSED);
        });
    }

    void printMemoryStats() {
        X.println(">>>", new Object[0]);
        X.println(">>> DR replica node memory stats [node=" + this + ']', new Object[0]);
        X.println(">>>   sndQueueSize: " + this.sndQueue.size(), new Object[0]);
        X.println(">>>   errQueueSize: " + this.errMap.size(), new Object[0]);
    }

    public String toString() {
        return S.toString(DrSenderRemoteDataCenterNode.class, this);
    }
}

