package org.apache.ignite.internal.network.recovery;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.FailureType;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.netty.ChannelCreationListener;
import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
import org.apache.ignite.internal.network.netty.ChannelKey;
import org.apache.ignite.internal.network.netty.HandshakeHandler;
import org.apache.ignite.internal.network.netty.MessageHandler;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.network.netty.NettyUtils;
import org.apache.ignite.internal.network.netty.PipelineUtils;
import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/* loaded from: input_file:org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.class */
public class RecoveryClientHandshakeManager implements HandshakeManager {
    private static final IgniteLogger LOG;
    private static final NetworkMessagesFactory MESSAGE_FACTORY;
    private final ClusterNode localNode;
    private final RecoveryDescriptorProvider recoveryDescriptorProvider;
    private final ChannelEventLoopsSource channelEventLoopsSource;
    private final StaleIdDetector staleIdDetector;
    private final ClusterIdSupplier clusterIdSupplier;
    private final BooleanSupplier stopping;
    private final short connectionId;
    private final CompletableFuture<NettySender> localHandshakeCompleteFuture = new CompletableFuture<>();
    private final CompletableFuture<CompletionStage<NettySender>> masterHandshakeCompleteFuture = new CompletableFuture<>();
    private ClusterNode remoteNode;
    private ChannelHandlerContext ctx;
    private Channel channel;
    private HandshakeHandler handler;
    private RecoveryDescriptor recoveryDescriptor;
    private final FailureProcessor failureProcessor;
    static final /* synthetic */ boolean $assertionsDisabled;

    public RecoveryClientHandshakeManager(ClusterNode clusterNode, short s, RecoveryDescriptorProvider recoveryDescriptorProvider, ChannelEventLoopsSource channelEventLoopsSource, StaleIdDetector staleIdDetector, ClusterIdSupplier clusterIdSupplier, ChannelCreationListener channelCreationListener, BooleanSupplier booleanSupplier, FailureProcessor failureProcessor) {
        this.localNode = clusterNode;
        this.connectionId = s;
        this.recoveryDescriptorProvider = recoveryDescriptorProvider;
        this.channelEventLoopsSource = channelEventLoopsSource;
        this.staleIdDetector = staleIdDetector;
        this.clusterIdSupplier = clusterIdSupplier;
        this.stopping = booleanSupplier;
        this.failureProcessor = failureProcessor;
        this.localHandshakeCompleteFuture.whenComplete((nettySender, th) -> {
            if (th == null) {
                channelCreationListener.handshakeFinished(nettySender);
            } else {
                releaseResources();
                this.masterHandshakeCompleteFuture.complete(this.localHandshakeCompleteFuture);
            }
        });
    }

    private void releaseResources() {
        if (!$assertionsDisabled && !this.ctx.executor().inEventLoop()) {
            throw new AssertionError("Release resources called outside of event loop");
        }
        RecoveryDescriptor recoveryDescriptor = this.recoveryDescriptor;
        if (recoveryDescriptor != null) {
            recoveryDescriptor.release(this.ctx);
        }
    }

    @Override // org.apache.ignite.internal.network.handshake.HandshakeManager
    public void onInit(ChannelHandlerContext channelHandlerContext) {
        this.ctx = channelHandlerContext;
        this.channel = channelHandlerContext.channel();
        this.handler = this.ctx.handler();
    }

    @Override // org.apache.ignite.internal.network.handshake.HandshakeManager
    public void onConnectionOpen() {
        sendProbeToServer();
    }

    private void sendProbeToServer() {
        NettyUtils.toCompletableFuture(this.channel.writeAndFlush(new OutNetworkObject(MESSAGE_FACTORY.probeMessage().build(), List.of(), false))).whenComplete((r9, th) -> {
            if (th != null) {
                if (th instanceof IOException) {
                    LOG.debug("Could not send a probe message via {}", th, new Object[]{this.channel});
                } else {
                    LOG.info("Could not send a probe message via {}", th, new Object[]{this.channel});
                }
            }
        });
    }

    @Override // org.apache.ignite.internal.network.handshake.HandshakeManager
    public void onMessage(NetworkMessage networkMessage) {
        if (networkMessage instanceof HandshakeRejectedMessage) {
            onHandshakeRejectedMessage((HandshakeRejectedMessage) networkMessage);
            return;
        }
        if (networkMessage instanceof HandshakeStartMessage) {
            onHandshakeStartMessage((HandshakeStartMessage) networkMessage);
            return;
        }
        if (!$assertionsDisabled && this.recoveryDescriptor == null) {
            throw new AssertionError("Wrong client handshake flow, message is " + networkMessage);
        }
        if (!$assertionsDisabled && this.recoveryDescriptor.holderChannel() != this.channel) {
            throw new AssertionError("Expected " + this.channel + " but was " + this.recoveryDescriptor.holderChannel() + ", message is " + networkMessage);
        }
        if (!(networkMessage instanceof HandshakeFinishMessage)) {
            if (!$assertionsDisabled && this.recoveryDescriptor.holderChannel() != this.channel) {
                throw new AssertionError("Expected " + this.channel + " but was " + this.recoveryDescriptor.holderChannel() + ", message is " + networkMessage);
            }
            if (this.recoveryDescriptor.unacknowledgedCount() == 0) {
                finishHandshake();
            }
            this.ctx.fireChannelRead(networkMessage);
            return;
        }
        this.recoveryDescriptor.acknowledge(((HandshakeFinishMessage) networkMessage).receivedCount());
        if (this.recoveryDescriptor.unacknowledgedCount() == 0) {
            finishHandshake();
            return;
        }
        List<OutNetworkObject> unacknowledgedMessages = this.recoveryDescriptor.unacknowledgedMessages();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Resending on handshake: {}", new Object[]{unacknowledgedMessages.stream().map((v0) -> {
                return v0.networkMessage();
            }).collect(Collectors.toList())});
        }
        Iterator<OutNetworkObject> it = unacknowledgedMessages.iterator();
        while (it.hasNext()) {
            this.channel.write(it.next());
        }
        this.channel.flush();
    }

    private void onHandshakeStartMessage(HandshakeStartMessage handshakeStartMessage) {
        if (possiblyRejectHandshakeStart(handshakeStartMessage)) {
            return;
        }
        this.remoteNode = handshakeStartMessage.serverNode().asClusterNode();
        HandshakeManagerUtils.switchEventLoopIfNeeded(this.channel, new ChannelKey(this.remoteNode.name(), UUID.fromString(this.remoteNode.id()), this.connectionId), this.channelEventLoopsSource, () -> {
            proceedAfterSavingIds(handshakeStartMessage);
        });
    }

    private void proceedAfterSavingIds(HandshakeStartMessage handshakeStartMessage) {
        RecoveryDescriptor recoveryDescriptor = this.recoveryDescriptorProvider.getRecoveryDescriptor(this.remoteNode.name(), UUID.fromString(this.remoteNode.id()), this.connectionId);
        while (!recoveryDescriptor.tryAcquire(this.ctx, this.localHandshakeCompleteFuture)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Failed to acquire recovery descriptor during handshake, it is held by: {}.", new Object[]{recoveryDescriptor.holderDescription()});
            }
            DescriptorAcquiry holder = recoveryDescriptor.holder();
            if (holder != null) {
                completeMasterFutureWithCompetitorHandshakeFuture(holder);
                return;
            }
        }
        if (possiblyRejectHandshakeStart(handshakeStartMessage)) {
            recoveryDescriptor.release(this.ctx);
        } else {
            this.recoveryDescriptor = recoveryDescriptor;
            handshake(this.recoveryDescriptor);
        }
    }

    private boolean possiblyRejectHandshakeStart(HandshakeStartMessage handshakeStartMessage) {
        if (this.staleIdDetector.isIdStale(handshakeStartMessage.serverNode().id())) {
            handleStaleServerId(handshakeStartMessage);
            return true;
        }
        if (clusterIdMismatch(handshakeStartMessage.serverClusterId(), this.clusterIdSupplier.clusterId())) {
            handleClusterIdMismatch(handshakeStartMessage);
            return true;
        }
        if (!this.stopping.getAsBoolean()) {
            return false;
        }
        handleRefusalToEstablishConnectionDueToStopping(handshakeStartMessage);
        return true;
    }

    private void completeMasterFutureWithCompetitorHandshakeFuture(DescriptorAcquiry descriptorAcquiry) {
        this.masterHandshakeCompleteFuture.complete(descriptorAcquiry.handshakeCompleteFuture());
        this.localHandshakeCompleteFuture.completeExceptionally(new HandshakeException("Stepping aside to allow an incoming handshake from " + this.remoteNode.name() + " to finish."));
    }

    private static boolean clusterIdMismatch(@Nullable UUID uuid, @Nullable UUID uuid2) {
        return (uuid == null || uuid2 == null || uuid.equals(uuid2)) ? false : true;
    }

    private void handleStaleServerId(HandshakeStartMessage handshakeStartMessage) {
        sendRejectionMessageAndFailHandshake(handshakeStartMessage.serverNode().name() + ":" + handshakeStartMessage.serverNode().id() + " is stale, server should be restarted so that clients can connect", HandshakeRejectionReason.STALE_LAUNCH_ID, HandshakeException::new);
    }

    private void handleClusterIdMismatch(HandshakeStartMessage handshakeStartMessage) {
        sendRejectionMessageAndFailHandshake(handshakeStartMessage.serverNode().name() + ":" + handshakeStartMessage.serverNode().id() + " belongs to cluster " + handshakeStartMessage.serverClusterId() + " which is different from this one " + this.clusterIdSupplier.clusterId() + ", connection rejected; should CMG/MG repair be finished?", HandshakeRejectionReason.CLUSTER_ID_MISMATCH, HandshakeException::new);
    }

    private void handleRefusalToEstablishConnectionDueToStopping(HandshakeStartMessage handshakeStartMessage) {
        sendRejectionMessageAndFailHandshake(handshakeStartMessage.serverNode().name() + ":" + handshakeStartMessage.serverNode().id() + " tried to establish a connection with " + this.localNode.name() + ", but it's stopping", HandshakeRejectionReason.STOPPING, str -> {
            return new NodeStoppingException();
        });
    }

    private void sendRejectionMessageAndFailHandshake(String str, HandshakeRejectionReason handshakeRejectionReason, Function<String, Exception> function) {
        HandshakeManagerUtils.sendRejectionMessageAndFailHandshake(str, handshakeRejectionReason, this.channel, this.localHandshakeCompleteFuture, function);
    }

    private void onHandshakeRejectedMessage(HandshakeRejectedMessage handshakeRejectedMessage) {
        boolean z = this.stopping.getAsBoolean() || !handshakeRejectedMessage.reason().critical();
        if (z) {
            LOG.debug("Handshake rejected by server: {}", new Object[]{handshakeRejectedMessage.message()});
        } else {
            LOG.warn("Handshake rejected by server: {}", new Object[]{handshakeRejectedMessage.message()});
        }
        if (handshakeRejectedMessage.reason() == HandshakeRejectionReason.CLINCH) {
            giveUpClinch();
        } else {
            this.localHandshakeCompleteFuture.completeExceptionally(new HandshakeException(handshakeRejectedMessage.message()));
        }
        if (z) {
            return;
        }
        this.failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, new HandshakeException("Handshake rejected by server: " + handshakeRejectedMessage.message())));
    }

    private void giveUpClinch() {
        RecoveryDescriptor recoveryDescriptor = this.recoveryDescriptorProvider.getRecoveryDescriptor(this.remoteNode.name(), UUID.fromString(this.remoteNode.id()), this.connectionId);
        DescriptorAcquiry holder = recoveryDescriptor.holder();
        if (!$assertionsDisabled && holder == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && holder.channel() == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && holder.channel() != this.ctx.channel()) {
            throw new AssertionError("Expected the descriptor to be held by current channel " + this.ctx.channel() + ", but it's held by another channel " + holder.channel());
        }
        recoveryDescriptor.release(this.ctx);
        holder.markClinchResolved();
        DescriptorAcquiry holder2 = recoveryDescriptor.holder();
        if (holder2 != null) {
            completeMasterFutureWithCompetitorHandshakeFuture(holder2);
        } else {
            this.localHandshakeCompleteFuture.completeExceptionally(new ChannelAlreadyExistsException(this.remoteNode.name()));
        }
    }

    @Override // org.apache.ignite.internal.network.handshake.HandshakeManager
    public CompletableFuture<NettySender> localHandshakeFuture() {
        return this.localHandshakeCompleteFuture;
    }

    @Override // org.apache.ignite.internal.network.handshake.HandshakeManager
    public CompletionStage<NettySender> finalHandshakeFuture() {
        return this.masterHandshakeCompleteFuture.thenCompose(Function.identity());
    }

    private void handshake(RecoveryDescriptor recoveryDescriptor) {
        PipelineUtils.afterHandshake(this.ctx.pipeline(), recoveryDescriptor, createMessageHandler(), MESSAGE_FACTORY);
        NettyUtils.toCompletableFuture(this.ctx.channel().writeAndFlush(new OutNetworkObject(MESSAGE_FACTORY.handshakeStartResponseMessage().clientNode(HandshakeManagerUtils.clusterNodeToMessage(this.localNode)).receivedCount(recoveryDescriptor.receivedCount()).connectionId(this.connectionId).build(), Collections.emptyList(), false))).whenComplete((r7, th) -> {
            if (th != null) {
                this.localHandshakeCompleteFuture.completeExceptionally(new HandshakeException("Failed to send handshake response: " + th.getMessage(), th));
            }
        });
    }

    private MessageHandler createMessageHandler() {
        return this.handler.createMessageHandler(this.remoteNode, this.connectionId);
    }

    protected void finishHandshake() {
        this.ctx.pipeline().remove(this.handler);
        this.masterHandshakeCompleteFuture.complete(this.localHandshakeCompleteFuture);
        this.localHandshakeCompleteFuture.complete(new NettySender(this.channel, this.remoteNode.id(), this.remoteNode.name(), this.connectionId, this.recoveryDescriptor));
    }

    @TestOnly
    void setRemoteNode(ClusterNode clusterNode) {
        this.remoteNode = clusterNode;
    }

    static {
        $assertionsDisabled = !RecoveryClientHandshakeManager.class.desiredAssertionStatus();
        LOG = Loggers.forClass(RecoveryClientHandshakeManager.class);
        MESSAGE_FACTORY = new NetworkMessagesFactory();
    }
}
