package org.gridgain.internal.dr;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.util.CompletableFutures;
import org.gridgain.dr.configuration.DrReceiverServerView;
import org.gridgain.internal.dr.messages.DrExternalBatchRequest;
import org.gridgain.internal.dr.messages.DrExternalBatchResponse;
import org.gridgain.internal.dr.messages.DrExternalHandshakeRequest;
import org.gridgain.internal.dr.messages.DrExternalHandshakeResponse;
import org.gridgain.internal.dr.messages.DrExternalMetadataRequest;
import org.gridgain.internal.dr.messages.DrExternalMetadataResponse;
import org.gridgain.internal.dr.messages.DrExternalPingRequest;
import org.gridgain.internal.dr.messages.DrExternalPingResponse;
import org.gridgain.internal.dr.nio.HandshakeFailedException;

@ChannelHandler.Sharable
/* loaded from: input_file:org/gridgain/internal/dr/DrMessageHandler.class */
public class DrMessageHandler extends ChannelInboundHandlerAdapter {
    private static final IgniteLogger LOG = Loggers.forClass(DrMessageHandler.class);
    private static final AttributeKey<Byte> META_DATA_CENTER_ID = AttributeKey.newInstance("DataCenterId");
    private static final AttributeKey<Boolean> META_AWAIT_ACK = AttributeKey.newInstance("AwaitAck");
    private final byte dcId;
    private final long tombstoneTtl;

    public DrMessageHandler(DrReceiverServerView drReceiverServerView) {
        this.dcId = drReceiverServerView.dataCenterId();
        if (drReceiverServerView.tombstoneTtl() > 0) {
            this.tombstoneTtl = drReceiverServerView.tombstoneTtl();
        } else {
            this.tombstoneTtl = DrUtils.DEFAULT_TOMBSTONE_TTL;
            LOG.warn("Tombstone TTL is not configured. Default TTL will be used: " + this.tombstoneTtl + "ms", new Object[0]);
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        if (obj instanceof DrExternalHandshakeRequest) {
            processHandshakeRequest(channelHandlerContext, (DrExternalHandshakeRequest) obj);
            return;
        }
        if (obj instanceof DrExternalPingRequest) {
            processPingRequest(channelHandlerContext, (DrExternalPingRequest) obj);
            return;
        }
        if (obj instanceof DrExternalMetadataRequest) {
            processMetadataRequest(channelHandlerContext, (DrExternalMetadataRequest) obj);
        } else if (obj instanceof DrExternalBatchRequest) {
            processBatchRequest(channelHandlerContext, (DrExternalBatchRequest) obj);
        } else {
            LOG.warn("Ignoring message of unknown type [msg=" + obj + "]", new Object[0]);
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        String message = th.getMessage();
        if (LOG.isDebugEnabled()) {
            LOG.info(message, new Object[0]);
        }
        channelHandlerContext.close();
    }

    private void processHandshakeRequest(ChannelHandlerContext channelHandlerContext, DrExternalHandshakeRequest drExternalHandshakeRequest) {
        try {
            onHandshake(channelHandlerContext.channel(), drExternalHandshakeRequest);
            channelHandlerContext.channel().writeAndFlush(new DrExternalHandshakeResponse(null));
        } catch (Throwable th) {
            channelHandlerContext.channel().writeAndFlush(new DrExternalHandshakeResponse(th.getMessage()));
            channelHandlerContext.fireExceptionCaught(th);
        }
    }

    private static void processPingRequest(ChannelHandlerContext channelHandlerContext, DrExternalPingRequest drExternalPingRequest) {
        Channel channel = channelHandlerContext.channel();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Incoming DR ping request [remoteAddress=" + channel.remoteAddress() + ", dataCenterId=" + ((Byte) channel.attr(META_DATA_CENTER_ID).get()) + "]", new Object[0]);
        }
        if (!channel.hasAttr(META_DATA_CENTER_ID)) {
            throw new IllegalStateException("Unexpected ping request.");
        }
        channel.writeAndFlush(DrExternalPingResponse.INSTANCE);
    }

    private void processMetadataRequest(ChannelHandlerContext channelHandlerContext, DrExternalMetadataRequest drExternalMetadataRequest) {
        Channel channel = channelHandlerContext.channel();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Incoming DR metadata request [remoteAddress=" + channel.remoteAddress() + ", dataCenterId=" + ((Byte) channel.attr(META_DATA_CENTER_ID).get()) + "], version=" + drExternalMetadataRequest.version() + "]", new Object[0]);
        }
        if (!channel.hasAttr(META_DATA_CENTER_ID)) {
            channel.writeAndFlush(new DrExternalMetadataResponse(drExternalMetadataRequest.version(), "Unexpected DR metadata request."));
            channel.close();
            return;
        }
        try {
            onMetadataRequest(channel, drExternalMetadataRequest);
            channel.writeAndFlush(new DrExternalMetadataResponse(drExternalMetadataRequest.version(), null));
        } catch (Throwable th) {
            channel.writeAndFlush(new DrExternalMetadataResponse(drExternalMetadataRequest.version(), "Failed to process DR metadata request: " + th.getMessage()));
            channelHandlerContext.fireExceptionCaught(th);
        }
    }

    private void processBatchRequest(ChannelHandlerContext channelHandlerContext, DrExternalBatchRequest drExternalBatchRequest) {
        Channel channel = channelHandlerContext.channel();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Incoming DR batch request [remoteAddress=" + channel.remoteAddress() + ", dataCenterId=" + ((Byte) channel.attr(META_DATA_CENTER_ID).get()) + "], reqId=" + drExternalBatchRequest.requestId() + "]", new Object[0]);
        }
        if (!channel.hasAttr(META_DATA_CENTER_ID)) {
            channel.writeAndFlush(new DrExternalBatchResponse(drExternalBatchRequest.requestId(), "Unexpected DR batch request."));
            channel.close();
            return;
        }
        try {
            onBatchRequest(channel, drExternalBatchRequest).handle((r8, th) -> {
                if (th != null) {
                    channel.writeAndFlush(new DrExternalBatchResponse(drExternalBatchRequest.requestId(), "Failed to process DR batch request: " + th.getMessage()));
                    return null;
                }
                if (channel.attr(META_AWAIT_ACK).get() != Boolean.TRUE) {
                    return null;
                }
                channel.writeAndFlush(new DrExternalBatchResponse(drExternalBatchRequest.requestId(), null));
                return null;
            });
            if (channel.attr(META_AWAIT_ACK).get() != Boolean.TRUE) {
                channel.writeAndFlush(new DrExternalBatchResponse(drExternalBatchRequest.requestId(), null));
            }
        } catch (Throwable th2) {
            channel.writeAndFlush(new DrExternalBatchResponse(drExternalBatchRequest.requestId(), "Failed to process DR batch request: " + th2.getMessage()));
            channelHandlerContext.fireExceptionCaught(th2);
        }
    }

    protected void onHandshake(Channel channel, DrExternalHandshakeRequest drExternalHandshakeRequest) throws HandshakeFailedException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Incoming DR handshake request [remoteAddress=" + channel.remoteAddress() + ", dataCenterId=" + drExternalHandshakeRequest.dataCenterId() + "]", new Object[0]);
        }
        String str = null;
        if (!DrUtils.DR_PROTO_VER.equals(drExternalHandshakeRequest.protocolVersion())) {
            str = "DR handshake failed because of different protocol versions [remoteAddress=" + channel.remoteAddress() + ", sndDataCenterId=" + drExternalHandshakeRequest.dataCenterId() + ", sndProtoVer=" + drExternalHandshakeRequest.protocolVersion() + ", rcvProtocolVer=1.0-20140117]";
        } else if (!"org.apache.ignite.internal.binary.BinaryMarshaller".equals(drExternalHandshakeRequest.marshallerClassName())) {
            str = "DR handshake failed because of different marshaller implementations (please fix configuration and restart) [remoteAddress=" + channel.remoteAddress() + ", sndDataCenterId=" + drExternalHandshakeRequest.dataCenterId() + ", sndMarsh=" + drExternalHandshakeRequest.marshallerClassName() + ", rcvMarsh=org.apache.ignite.internal.binary.BinaryMarshaller]";
        } else if (drExternalHandshakeRequest.dataCenterId() != this.dcId) {
            str = "DR handshake failed because of incoming connection from non-allowed datacenter [remoteAddress=" + channel.remoteAddress() + ", sndDataCenterId=" + drExternalHandshakeRequest.dataCenterId() + ", allowedDataCenterId=" + this.dcId + "]";
        } else {
            channel.attr(META_DATA_CENTER_ID).set(Byte.valueOf(drExternalHandshakeRequest.dataCenterId()));
            channel.attr(META_AWAIT_ACK).set(Boolean.valueOf(drExternalHandshakeRequest.awaitAcknowledge()));
        }
        if (drExternalHandshakeRequest.getTombstoneTtl() > this.tombstoneTtl) {
            LOG.warn(IgniteStringFormatter.format("Sender DataCenter has different tombstone TTL (sndTombstoneTTL > rcvTombstoneTTL) [addr={}, sndDataCenterId={}, sndTombstoneTTL={}, rcvTombstoneTTL={}]", channel.remoteAddress(), Byte.valueOf(drExternalHandshakeRequest.dataCenterId()), Long.valueOf(drExternalHandshakeRequest.getTombstoneTtl()), Long.valueOf(this.tombstoneTtl)), new Object[0]);
        }
        if (str != null) {
            throw new HandshakeFailedException(str);
        }
    }

    protected void onMetadataRequest(Channel channel, DrExternalMetadataRequest drExternalMetadataRequest) {
    }

    protected CompletableFuture<Void> onBatchRequest(Channel channel, DrExternalBatchRequest drExternalBatchRequest) {
        return CompletableFutures.nullCompletedFuture();
    }
}
