package org.apache.ignite.internal.network.recovery;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.FailureType;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.netty.ChannelCreationListener;
import org.apache.ignite.internal.network.netty.ChannelEventLoopsSource;
import org.apache.ignite.internal.network.netty.ChannelKey;
import org.apache.ignite.internal.network.netty.HandshakeHandler;
import org.apache.ignite.internal.network.netty.MessageHandler;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.network.netty.NettyUtils;
import org.apache.ignite.internal.network.netty.PipelineUtils;
import org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
import org.apache.ignite.internal.network.recovery.message.ProbeMessage;
import org.apache.ignite.network.ClusterNode;

/* loaded from: input_file:org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.class */
public class RecoveryServerHandshakeManager implements HandshakeManager {
    private static final IgniteLogger LOG;
    private static final int MAX_CLINCH_TERMINATION_AWAIT_ATTEMPTS = 1000;
    private final ClusterNode localNode;
    private final NetworkMessagesFactory messageFactory;
    private final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>();
    private ClusterNode remoteNode;
    private short remoteChannelId;
    private ChannelHandlerContext ctx;
    private Channel channel;
    private HandshakeHandler handler;
    private long receivedCount;
    private final RecoveryDescriptorProvider recoveryDescriptorProvider;
    private final ChannelEventLoopsSource channelEventLoopsSource;
    private final StaleIdDetector staleIdDetector;
    private final BooleanSupplier stopping;
    private RecoveryDescriptor recoveryDescriptor;
    private final FailureProcessor failureProcessor;
    static final /* synthetic */ boolean $assertionsDisabled;

    public RecoveryServerHandshakeManager(ClusterNode clusterNode, NetworkMessagesFactory networkMessagesFactory, RecoveryDescriptorProvider recoveryDescriptorProvider, ChannelEventLoopsSource channelEventLoopsSource, StaleIdDetector staleIdDetector, ChannelCreationListener channelCreationListener, BooleanSupplier booleanSupplier, FailureProcessor failureProcessor) {
        this.localNode = clusterNode;
        this.messageFactory = networkMessagesFactory;
        this.recoveryDescriptorProvider = recoveryDescriptorProvider;
        this.channelEventLoopsSource = channelEventLoopsSource;
        this.staleIdDetector = staleIdDetector;
        this.stopping = booleanSupplier;
        this.failureProcessor = failureProcessor;
        this.handshakeCompleteFuture.whenComplete((nettySender, th) -> {
            if (th != null) {
                releaseResources();
            } else {
                channelCreationListener.handshakeFinished(nettySender);
            }
        });
    }

    private void releaseResources() {
        if (!$assertionsDisabled && !this.ctx.executor().inEventLoop()) {
            throw new AssertionError("Release resources called outside of event loop");
        }
        RecoveryDescriptor recoveryDescriptor = this.recoveryDescriptor;
        if (recoveryDescriptor != null) {
            recoveryDescriptor.release(this.ctx);
        }
    }

    @Override // org.apache.ignite.internal.network.handshake.HandshakeManager
    public void onInit(ChannelHandlerContext channelHandlerContext) {
        this.ctx = channelHandlerContext;
        this.channel = channelHandlerContext.channel();
        this.handler = (HandshakeHandler) this.ctx.handler();
    }

    @Override // org.apache.ignite.internal.network.handshake.HandshakeManager
    public void onConnectionOpen() {
        NettyUtils.toCompletableFuture(this.channel.writeAndFlush(new OutNetworkObject(this.messageFactory.handshakeStartMessage().serverNode(HandshakeManagerUtils.clusterNodeToMessage(this.localNode)).build(), Collections.emptyList(), false))).whenComplete((r7, th) -> {
            if (th != null) {
                this.handshakeCompleteFuture.completeExceptionally(new HandshakeException("Failed to send handshake start message: " + th.getMessage(), th));
            }
        });
    }

    @Override // org.apache.ignite.internal.network.handshake.HandshakeManager
    public void onMessage(NetworkMessage networkMessage) {
        if (networkMessage instanceof ProbeMessage) {
            return;
        }
        if (networkMessage instanceof HandshakeRejectedMessage) {
            onHandshakeRejectedMessage((HandshakeRejectedMessage) networkMessage);
            return;
        }
        if (networkMessage instanceof HandshakeStartResponseMessage) {
            onHandshakeStartResponseMessage((HandshakeStartResponseMessage) networkMessage);
            return;
        }
        if (!$assertionsDisabled && this.recoveryDescriptor == null) {
            throw new AssertionError("Wrong server handshake flow, message is " + networkMessage);
        }
        if (!$assertionsDisabled && this.recoveryDescriptor.holderChannel() != this.channel) {
            throw new AssertionError("Expected " + this.channel + " but was " + this.recoveryDescriptor.holderChannel() + ", message is " + networkMessage);
        }
        if (this.recoveryDescriptor.unacknowledgedCount() == 0) {
            finishHandshake();
        }
        this.ctx.fireChannelRead((Object) networkMessage);
    }

    private void onHandshakeStartResponseMessage(HandshakeStartResponseMessage handshakeStartResponseMessage) {
        if (possiblyRejectHandshakeStartResponse(handshakeStartResponseMessage)) {
            return;
        }
        this.remoteNode = handshakeStartResponseMessage.clientNode().asClusterNode();
        this.receivedCount = handshakeStartResponseMessage.receivedCount();
        this.remoteChannelId = handshakeStartResponseMessage.connectionId();
        HandshakeManagerUtils.switchEventLoopIfNeeded(this.channel, new ChannelKey(this.remoteNode.name(), UUID.fromString(this.remoteNode.id()), this.remoteChannelId), this.channelEventLoopsSource, () -> {
            tryAcquireDescriptorAndFinishHandshake(handshakeStartResponseMessage);
        });
    }

    private boolean possiblyRejectHandshakeStartResponse(HandshakeStartResponseMessage handshakeStartResponseMessage) {
        if (this.staleIdDetector.isIdStale(handshakeStartResponseMessage.clientNode().id())) {
            handleStaleClientId(handshakeStartResponseMessage);
            return true;
        }
        if (!this.stopping.getAsBoolean()) {
            return false;
        }
        handleRefusalToEstablishConnectionDueToStopping(handshakeStartResponseMessage);
        return true;
    }

    private void handleStaleClientId(HandshakeStartResponseMessage handshakeStartResponseMessage) {
        sendRejectionMessageAndFailHandshake(handshakeStartResponseMessage.clientNode().name() + ":" + handshakeStartResponseMessage.clientNode().id() + " is stale, client should be restarted to be allowed to connect", HandshakeRejectionReason.STALE_LAUNCH_ID, HandshakeException::new);
    }

    private void handleRefusalToEstablishConnectionDueToStopping(HandshakeStartResponseMessage handshakeStartResponseMessage) {
        sendRejectionMessageAndFailHandshake(handshakeStartResponseMessage.clientNode().name() + ":" + handshakeStartResponseMessage.clientNode().id() + " tried to establish a connection with " + this.localNode.name() + ", but it's stopping", HandshakeRejectionReason.STOPPING, str -> {
            return new NodeStoppingException();
        });
    }

    private void sendRejectionMessageAndFailHandshake(String str, HandshakeRejectionReason handshakeRejectionReason, Function<String, Exception> function) {
        HandshakeManagerUtils.sendRejectionMessageAndFailHandshake(str, handshakeRejectionReason, this.channel, this.handshakeCompleteFuture, function);
    }

    private void tryAcquireDescriptorAndFinishHandshake(HandshakeStartResponseMessage handshakeStartResponseMessage) {
        tryAcquireDescriptorAndFinishHandshake(handshakeStartResponseMessage, 0);
    }

    private void tryAcquireDescriptorAndFinishHandshake(HandshakeStartResponseMessage handshakeStartResponseMessage, int i) {
        if (i > 1000) {
            throw new IllegalStateException("Too many attempts during handshake from " + this.remoteNode.name() + "(" + this.remoteNode.id() + ":" + this.remoteChannelId + ") via " + this.ctx.channel());
        }
        RecoveryDescriptor recoveryDescriptor = this.recoveryDescriptorProvider.getRecoveryDescriptor(this.remoteNode.name(), UUID.fromString(this.remoteNode.id()), this.remoteChannelId);
        while (!recoveryDescriptor.tryAcquire(this.ctx, this.handshakeCompleteFuture)) {
            if (!HandshakeTieBreaker.shouldCloseChannel(UUID.fromString(this.localNode.id()), UUID.fromString(this.remoteNode.id()))) {
                rejectHandshakeDueToLosingClinch(recoveryDescriptor);
                return;
            }
            DescriptorAcquiry holder = recoveryDescriptor.holder();
            if (holder != null) {
                if (possiblyRejectHandshakeStartResponse(handshakeStartResponseMessage)) {
                    return;
                }
                holder.clinchResolved().whenComplete((r8, th) -> {
                    if (this.ctx.executor().inEventLoop()) {
                        tryAcquireDescriptorAndFinishHandshake(handshakeStartResponseMessage, i + 1);
                    } else {
                        this.ctx.executor().execute(() -> {
                            tryAcquireDescriptorAndFinishHandshake(handshakeStartResponseMessage, i + 1);
                        });
                    }
                });
                return;
            }
        }
        if (possiblyRejectHandshakeStartResponse(handshakeStartResponseMessage)) {
            recoveryDescriptor.release(this.ctx);
        } else {
            this.recoveryDescriptor = recoveryDescriptor;
            handshake(recoveryDescriptor);
        }
    }

    private void onHandshakeRejectedMessage(HandshakeRejectedMessage handshakeRejectedMessage) {
        boolean z = this.stopping.getAsBoolean() || !handshakeRejectedMessage.reason().critical();
        if (z) {
            LOG.debug("Handshake rejected by client: {}", handshakeRejectedMessage.message());
        } else {
            LOG.warn("Handshake rejected by client: {}", handshakeRejectedMessage.message());
        }
        HandshakeException handshakeException = new HandshakeException(handshakeRejectedMessage.message());
        this.handshakeCompleteFuture.completeExceptionally(handshakeException);
        if (z) {
            return;
        }
        this.failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, handshakeException));
    }

    private void rejectHandshakeDueToLosingClinch(RecoveryDescriptor recoveryDescriptor) {
        String str = "Failed to acquire recovery descriptor during handshake, it is held by: " + recoveryDescriptor.holderDescription();
        LOG.debug(str, new Object[0]);
        HandshakeManagerUtils.sendRejectionMessageAndFailHandshake(str, "Handshake clinch detected, this handshake will be terminated, winning channel is " + recoveryDescriptor.holderDescription(), HandshakeRejectionReason.CLINCH, this.channel, this.handshakeCompleteFuture, HandshakeException::new);
    }

    private void handshake(RecoveryDescriptor recoveryDescriptor) {
        PipelineUtils.afterHandshake(this.ctx.pipeline(), recoveryDescriptor, createMessageHandler(), this.messageFactory);
        CompletableFuture<Void> completableFuture = NettyUtils.toCompletableFuture(this.channel.write(new OutNetworkObject(this.messageFactory.handshakeFinishMessage().receivedCount(recoveryDescriptor.receivedCount()).build(), Collections.emptyList(), false)));
        recoveryDescriptor.acknowledge(this.receivedCount);
        int unacknowledgedCount = recoveryDescriptor.unacknowledgedCount();
        if (unacknowledgedCount > 0) {
            CompletableFuture[] completableFutureArr = new CompletableFuture[unacknowledgedCount + 1];
            completableFutureArr[0] = completableFuture;
            List<OutNetworkObject> unacknowledgedMessages = recoveryDescriptor.unacknowledgedMessages();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Resending on handshake: {}", unacknowledgedMessages.stream().map((v0) -> {
                    return v0.networkMessage();
                }).collect(Collectors.toList()));
            }
            for (int i = 0; i < unacknowledgedMessages.size(); i++) {
                completableFutureArr[i + 1] = NettyUtils.toCompletableFuture(this.channel.write(unacknowledgedMessages.get(i)));
            }
            completableFuture = CompletableFuture.allOf(completableFutureArr);
        }
        this.channel.flush();
        boolean z = unacknowledgedCount > 0;
        completableFuture.whenComplete((r8, th) -> {
            if (th != null) {
                this.handshakeCompleteFuture.completeExceptionally(new HandshakeException("Failed to send handshake response: " + th.getMessage(), th));
            } else {
                if (z) {
                    return;
                }
                finishHandshake();
            }
        });
    }

    private MessageHandler createMessageHandler() {
        return this.handler.createMessageHandler(this.remoteNode, this.remoteChannelId);
    }

    @Override // org.apache.ignite.internal.network.handshake.HandshakeManager
    public CompletableFuture<NettySender> localHandshakeFuture() {
        return this.handshakeCompleteFuture;
    }

    @Override // org.apache.ignite.internal.network.handshake.HandshakeManager
    public CompletionStage<NettySender> finalHandshakeFuture() {
        return this.handshakeCompleteFuture;
    }

    private void finishHandshake() {
        this.ctx.pipeline().remove(this.handler);
        this.handshakeCompleteFuture.complete(new NettySender(this.channel, this.remoteNode.id(), this.remoteNode.name(), this.remoteChannelId, this.recoveryDescriptor));
    }

    static {
        $assertionsDisabled = !RecoveryServerHandshakeManager.class.desiredAssertionStatus();
        LOG = Loggers.forClass(RecoveryServerHandshakeManager.class);
    }
}
