/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.dr;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.util.CompletableFutures;
import org.gridgain.dr.configuration.DrReceiverServerView;
import org.gridgain.internal.dr.messages.DrExternalBatchRequest;
import org.gridgain.internal.dr.messages.DrExternalBatchResponse;
import org.gridgain.internal.dr.messages.DrExternalHandshakeRequest;
import org.gridgain.internal.dr.messages.DrExternalHandshakeResponse;
import org.gridgain.internal.dr.messages.DrExternalMetadataRequest;
import org.gridgain.internal.dr.messages.DrExternalMetadataResponse;
import org.gridgain.internal.dr.messages.DrExternalPingRequest;
import org.gridgain.internal.dr.messages.DrExternalPingResponse;
import org.gridgain.internal.dr.nio.HandshakeFailedException;

@ChannelHandler.Sharable
public class DrMessageHandler
extends ChannelInboundHandlerAdapter {
    private static final IgniteLogger LOG = Loggers.forClass(DrMessageHandler.class);
    private static final AttributeKey<Byte> META_DATA_CENTER_ID = AttributeKey.newInstance("DataCenterId");
    private static final AttributeKey<Boolean> META_AWAIT_ACK = AttributeKey.newInstance("AwaitAck");
    private final byte dcId;
    private final long tombstoneTtl;

    public DrMessageHandler(DrReceiverServerView cfg) {
        this.dcId = cfg.dataCenterId();
        if (cfg.tombstoneTtl() > 0L) {
            this.tombstoneTtl = cfg.tombstoneTtl();
        } else {
            this.tombstoneTtl = 1800000L;
            LOG.warn("Tombstone TTL is not configured. Default TTL will be used: {}ms", this.tombstoneTtl);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof DrExternalHandshakeRequest) {
            this.processHandshakeRequest(ctx, (DrExternalHandshakeRequest)msg);
        } else if (msg instanceof DrExternalPingRequest) {
            DrMessageHandler.processPingRequest(ctx, (DrExternalPingRequest)msg);
        } else if (msg instanceof DrExternalMetadataRequest) {
            this.processMetadataRequest(ctx, (DrExternalMetadataRequest)msg);
        } else if (msg instanceof DrExternalBatchRequest) {
            this.processBatchRequest(ctx, (DrExternalBatchRequest)msg);
        } else {
            LOG.warn("Ignoring message of unknown type: msg={}", msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (cause instanceof IOException) {
            LOG.debug(cause.getMessage(), cause);
        } else {
            LOG.info(cause.getMessage(), cause);
        }
        ctx.close();
    }

    private void processHandshakeRequest(ChannelHandlerContext ctx, DrExternalHandshakeRequest req) {
        Channel channel = ctx.channel();
        try {
            this.onHandshake(channel, req);
        }
        catch (HandshakeFailedException ex) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Incoming connection rejected: remoteAddress={}", ex.getMessage(), channel.remoteAddress());
            } else {
                LOG.info("Incoming connection rejected: remoteAddress={}", channel.remoteAddress());
            }
            channel.writeAndFlush(new DrExternalHandshakeResponse(ex.getMessage()));
            channel.close();
            return;
        }
        catch (Exception ex) {
            ctx.fireExceptionCaught(ex);
            return;
        }
        channel.writeAndFlush(new DrExternalHandshakeResponse(null));
    }

    private static void processPingRequest(ChannelHandlerContext ctx, DrExternalPingRequest req) {
        Channel channel = ctx.channel();
        if (!channel.hasAttr(META_DATA_CENTER_ID)) {
            LOG.trace("Unexpected DR metadata request: remoteAddress={}", channel.remoteAddress());
            channel.close();
            return;
        }
        if (LOG.isTraceEnabled()) {
            Byte dataCenterId = channel.attr(META_DATA_CENTER_ID).get();
            LOG.trace("Incoming DR ping request [remoteAddress={}, dataCenterId={}]", channel.remoteAddress(), dataCenterId);
        }
        channel.writeAndFlush(DrExternalPingResponse.INSTANCE);
    }

    private void processMetadataRequest(ChannelHandlerContext ctx, DrExternalMetadataRequest req) {
        Channel channel = ctx.channel();
        if (!channel.hasAttr(META_DATA_CENTER_ID)) {
            LOG.trace("Unexpected DR metadata request: remoteAddress={}", channel.remoteAddress());
            channel.writeAndFlush(new DrExternalMetadataResponse(req.version(), "Unexpected DR metadata request."));
            channel.close();
            return;
        }
        if (LOG.isTraceEnabled()) {
            Byte dataCenterId = channel.attr(META_DATA_CENTER_ID).get();
            LOG.trace("Incoming DR metadata request [remoteAddress={}, dataCenterId={}, version={}]", channel.remoteAddress(), dataCenterId, req.version());
        }
        try {
            this.onMetadataRequest(channel, req);
        }
        catch (Exception err) {
            String errMsg = "Failed to process DR metadata request: " + err.getMessage();
            channel.writeAndFlush(new DrExternalMetadataResponse(req.version(), errMsg));
            ctx.fireExceptionCaught(err);
            return;
        }
        channel.writeAndFlush(new DrExternalMetadataResponse(req.version(), null));
    }

    private void processBatchRequest(ChannelHandlerContext ctx, DrExternalBatchRequest req) {
        Channel channel = ctx.channel();
        if (!channel.hasAttr(META_DATA_CENTER_ID)) {
            LOG.trace("Unexpected DR batch request: remoteAddress={}", channel.remoteAddress());
            channel.writeAndFlush(new DrExternalBatchResponse(req.requestId(), "Unexpected DR batch request."));
            channel.close();
            return;
        }
        Byte dataCenterId = channel.attr(META_DATA_CENTER_ID).get();
        LOG.trace("Incoming DR batch request [remoteAddress={}, dataCenterId={}, reqId={}]", channel.remoteAddress(), dataCenterId, req.requestId());
        try {
            this.onBatchRequest(channel, req).handle((ignore, err) -> {
                if (err != null) {
                    channel.writeAndFlush(new DrExternalBatchResponse(req.requestId(), "Failed to process DR batch request: " + err.getMessage()));
                    ctx.fireExceptionCaught((Throwable)err);
                } else if (channel.attr(META_AWAIT_ACK).get() == Boolean.TRUE) {
                    channel.writeAndFlush(new DrExternalBatchResponse(req.requestId(), null));
                }
                return null;
            });
        }
        catch (Throwable err2) {
            channel.writeAndFlush(new DrExternalBatchResponse(req.requestId(), "Failed to process DR batch request: " + err2.getMessage()));
            ctx.fireExceptionCaught(err2);
            return;
        }
        if (channel.attr(META_AWAIT_ACK).get() != Boolean.TRUE) {
            channel.writeAndFlush(new DrExternalBatchResponse(req.requestId(), null));
        }
    }

    protected void onHandshake(Channel channel, DrExternalHandshakeRequest req) throws HandshakeFailedException {
        LOG.debug("Incoming DR handshake request [remoteAddress={}, dataCenterId={}]", channel.remoteAddress(), req.dataCenterId());
        String errMsg = null;
        if (!"1.0-20140117".equals(req.protocolVersion())) {
            errMsg = IgniteStringFormatter.format("DR handshake failed because of different protocol versions [remoteAddress={}, dataCenterId={}, remoteProtocolVersion={}, localProtocolVersion={}]", channel.remoteAddress(), req.dataCenterId(), req.protocolVersion(), "1.0-20140117");
        } else if (!"org.apache.ignite.internal.binary.BinaryMarshaller".equals(req.marshallerClassName())) {
            errMsg = IgniteStringFormatter.format("DR handshake failed because of different marshaller implementations (please fix configuration and restart) [remoteAddress={}, dataCenterId={}, remoteMarshaller={}, localMarshaller={}]", channel.remoteAddress(), req.dataCenterId(), req.marshallerClassName(), "org.apache.ignite.internal.binary.BinaryMarshaller");
        } else if (req.dataCenterId() != this.dcId) {
            errMsg = IgniteStringFormatter.format("DR handshake failed because of incoming connection from non-allowed datacenter [remoteAddress={}, dataCenterId={}, allowedDataCenterId={}]", channel.remoteAddress(), req.dataCenterId(), this.dcId);
        } else {
            channel.attr(META_DATA_CENTER_ID).set(req.dataCenterId());
            channel.attr(META_AWAIT_ACK).set(req.awaitAcknowledge());
        }
        if (req.getTombstoneTtl() > this.tombstoneTtl) {
            LOG.warn(IgniteStringFormatter.format("Sending DataCenter has higher tombstone TTL than Receiving one (remoteTombstoneTTL > localTombstoneTTL)[remoteAddress={}, dataCenterId={}, remoteTombstoneTTL={}, localTombstoneTTL={}]", channel.remoteAddress(), req.dataCenterId(), req.getTombstoneTtl(), this.tombstoneTtl), new Object[0]);
        }
        if (errMsg != null) {
            throw new HandshakeFailedException(errMsg);
        }
    }

    protected void onMetadataRequest(Channel channel, DrExternalMetadataRequest req) {
    }

    protected CompletableFuture<Void> onBatchRequest(Channel channel, DrExternalBatchRequest req) {
        return CompletableFutures.nullCompletedFuture();
    }
}

