/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.network.recovery;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import org.apache.ignite.internal.network.configuration.AckConfiguration;
import org.apache.ignite.internal.network.configuration.AckView;
import org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite.internal.network.handshake.CriticalHandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.netty.ChannelCreationListener;
import org.apache.ignite.internal.network.netty.ChannelKey;
import org.apache.ignite.internal.network.netty.HandshakeHandler;
import org.apache.ignite.internal.network.netty.MessageHandler;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.network.netty.NettyUtils;
import org.apache.ignite.internal.network.netty.PipelineUtils;
import org.apache.ignite.internal.network.recovery.DescriptorAcquiry;
import org.apache.ignite.internal.network.recovery.HandshakeManagerUtils;
import org.apache.ignite.internal.network.recovery.HandshakeStartMessageAdapter;
import org.apache.ignite.internal.network.recovery.HandshakeStartMessageV1Adapter;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import org.apache.ignite.internal.network.recovery.StaleIdDetector;
import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
import org.apache.ignite.internal.network.recovery.message.ProbeMessage;
import org.apache.ignite.internal.version.IgniteProductVersionSource;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class RecoveryInitiatorHandshakeManager
implements HandshakeManager {
    private static final IgniteLogger LOG = Loggers.forClass(RecoveryInitiatorHandshakeManager.class);
    protected static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory();
    protected final InternalClusterNode localNode;
    private final RecoveryDescriptorProvider recoveryDescriptorProvider;
    private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;
    private final StaleIdDetector staleIdDetector;
    private final ClusterIdSupplier clusterIdSupplier;
    private final BooleanSupplier stopping;
    private final IgniteProductVersionSource productVersionSource;
    protected final short connectionId;
    private final CompletableFuture<NettySender> localHandshakeCompleteFuture = new CompletableFuture();
    private final CompletableFuture<CompletionStage<NettySender>> masterHandshakeCompleteFuture = new CompletableFuture();
    private final AckConfiguration ackConfiguration;
    private InternalClusterNode remoteNode;
    protected ChannelHandlerContext ctx;
    private Channel channel;
    private HandshakeHandler handler;
    private RecoveryDescriptor recoveryDescriptor;

    public RecoveryInitiatorHandshakeManager(InternalClusterNode localNode, short connectionId, RecoveryDescriptorProvider recoveryDescriptorProvider, HandshakeEventLoopSwitcher handshakeEventLoopSwitcher, StaleIdDetector staleIdDetector, ClusterIdSupplier clusterIdSupplier, ChannelCreationListener channelCreationListener, BooleanSupplier stopping, IgniteProductVersionSource productVersionSource, AckConfiguration ackConfiguration) {
        this.localNode = localNode;
        this.connectionId = connectionId;
        this.recoveryDescriptorProvider = recoveryDescriptorProvider;
        this.handshakeEventLoopSwitcher = handshakeEventLoopSwitcher;
        this.staleIdDetector = staleIdDetector;
        this.clusterIdSupplier = clusterIdSupplier;
        this.stopping = stopping;
        this.productVersionSource = productVersionSource;
        this.ackConfiguration = ackConfiguration;
        this.localHandshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
            if (throwable != null) {
                this.releaseResources();
                this.masterHandshakeCompleteFuture.complete(this.localHandshakeCompleteFuture);
                return;
            }
            channelCreationListener.handshakeFinished((NettySender)nettySender);
        });
    }

    private void releaseResources() {
        assert (this.ctx.executor().inEventLoop()) : "Release resources called outside of event loop";
        RecoveryDescriptor desc = this.recoveryDescriptor;
        if (desc != null) {
            desc.release(this.ctx);
        }
    }

    @Override
    public boolean isInitiator() {
        return true;
    }

    @Override
    public void onInit(ChannelHandlerContext handlerContext) {
        this.ctx = handlerContext;
        this.channel = handlerContext.channel();
        this.handler = (HandshakeHandler)this.ctx.handler();
    }

    @Override
    public void onConnectionOpen() {
        this.sendProbeToAcceptor();
    }

    private void sendProbeToAcceptor() {
        ProbeMessage probe = MESSAGE_FACTORY.probeMessage().probe(this.probePayload()).build();
        NettyUtils.toCompletableFuture(this.channel.writeAndFlush(new OutNetworkObject(probe, List.of()))).whenComplete((res, ex) -> {
            if (ex != null) {
                if (ex instanceof IOException) {
                    LOG.debug("Could not send a probe message via {}", (Throwable)ex, (Object)this.channel);
                } else {
                    LOG.info("Could not send a probe message via {}", (Throwable)ex, (Object)this.channel);
                }
            }
        });
    }

    protected byte probePayload() {
        return 0;
    }

    @Override
    public void onMessage(NetworkMessage message) {
        if (message instanceof HandshakeRejectedMessage) {
            this.onHandshakeRejectedMessage((HandshakeRejectedMessage)message);
            return;
        }
        if (message instanceof HandshakeStartMessage) {
            this.onHandshakeStartMessage(new HandshakeStartMessageV1Adapter((HandshakeStartMessage)message));
            return;
        }
        assert (this.recoveryDescriptor != null) : "Wrong initiator handshake flow, message is " + message;
        assert (this.recoveryDescriptor.holderChannel() == this.channel) : "Expected " + this.channel + " but was " + this.recoveryDescriptor.holderChannel() + ", message is " + message;
        if (message instanceof HandshakeFinishMessage) {
            HandshakeFinishMessage msg = (HandshakeFinishMessage)message;
            long receivedCount = msg.receivedCount();
            this.recoveryDescriptor.acknowledge(receivedCount);
            if (this.recoveryDescriptor.unacknowledgedCount() == 0) {
                this.finishHandshake();
                return;
            }
            List<OutNetworkObject> networkMessages = this.recoveryDescriptor.unacknowledgedMessages();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Resending on handshake: {}", networkMessages.stream().map(OutNetworkObject::networkMessage).collect(Collectors.toList()));
            }
            for (OutNetworkObject networkMessage : networkMessages) {
                this.channel.write(networkMessage);
            }
            this.channel.flush();
            return;
        }
        assert (this.recoveryDescriptor.holderChannel() == this.channel) : "Expected " + this.channel + " but was " + this.recoveryDescriptor.holderChannel() + ", message is " + message;
        if (this.recoveryDescriptor.unacknowledgedCount() == 0) {
            this.finishHandshake();
        }
        this.ctx.fireChannelRead(message);
    }

    protected final void onHandshakeStartMessage(HandshakeStartMessageAdapter handshakeStartMessage) {
        if (this.possiblyRejectHandshakeStart(handshakeStartMessage)) {
            return;
        }
        this.remoteNode = handshakeStartMessage.acceptorNode().asClusterNode();
        ChannelKey channelKey = new ChannelKey(this.remoteNode.name(), this.remoteNode.id(), this.connectionId);
        this.handshakeEventLoopSwitcher.switchEventLoopIfNeeded(this.channel, channelKey).thenRun(() -> this.proceedAfterSavingIds(handshakeStartMessage));
    }

    private void proceedAfterSavingIds(HandshakeStartMessageAdapter handshakeStartMessage) {
        RecoveryDescriptor descriptor = this.recoveryDescriptorProvider.getRecoveryDescriptor(this.remoteNode.name(), this.remoteNode.id(), this.connectionId);
        while (!descriptor.tryAcquire(this.ctx, this.localHandshakeCompleteFuture)) {
            DescriptorAcquiry competitorAcquiry;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Failed to acquire recovery descriptor during handshake, it is held by: {}.", descriptor.holderDescription());
            }
            if ((competitorAcquiry = descriptor.holder()) == null) continue;
            this.completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry);
            return;
        }
        if (this.possiblyRejectHandshakeStart(handshakeStartMessage)) {
            descriptor.release(this.ctx);
            return;
        }
        this.recoveryDescriptor = descriptor;
        this.handshake(this.recoveryDescriptor);
    }

    private boolean possiblyRejectHandshakeStart(HandshakeStartMessageAdapter message) {
        if (message.acceptorNode().id().equals(this.localNode.id())) {
            this.handleLoopConnection(message);
            return true;
        }
        if (this.staleIdDetector.isIdStale(message.acceptorNode().id())) {
            this.handleStaleAcceptorId(message);
            return true;
        }
        if (RecoveryInitiatorHandshakeManager.clusterIdMismatch(message.acceptorClusterId(), this.clusterIdSupplier.clusterId())) {
            this.handleClusterIdMismatch(message);
            return true;
        }
        if (!this.productVersionSource.productName().equals(message.productName())) {
            this.handleProductNameMismatch(message);
            return true;
        }
        if (this.stopping.getAsBoolean()) {
            this.handleRefusalToEstablishConnectionDueToStopping(message);
            return true;
        }
        return false;
    }

    private void handleLoopConnection(HandshakeStartMessageAdapter msg) {
        String message = String.format("Got handshake start from self, this should never happen; this is a programming error [localNode=%s, acceptorNode=%s]", this.localNode, msg.acceptorNode());
        this.sendRejectionMessageAndFailHandshake(message, HandshakeRejectionReason.LOOP, CriticalHandshakeException::new);
    }

    private void completeMasterFutureWithCompetitorHandshakeFuture(DescriptorAcquiry competitorAcquiry) {
        this.masterHandshakeCompleteFuture.complete(competitorAcquiry.handshakeCompleteFuture());
        this.localHandshakeCompleteFuture.completeExceptionally(new HandshakeException("Stepping aside to allow an incoming handshake from " + this.remoteNode.name() + " to finish."));
    }

    private static boolean clusterIdMismatch(@Nullable UUID acceptorClusterId, @Nullable UUID initiatorClusterId) {
        return acceptorClusterId != null && initiatorClusterId != null && !acceptorClusterId.equals(initiatorClusterId);
    }

    private void handleStaleAcceptorId(HandshakeStartMessageAdapter msg) {
        String message = String.format("%s:%s is stale, node should be restarted so that other nodes can connect", msg.acceptorNode().name(), msg.acceptorNode().id());
        this.sendRejectionMessageAndFailHandshake(message, HandshakeRejectionReason.STALE_LAUNCH_ID, HandshakeException::new);
    }

    private void handleClusterIdMismatch(HandshakeStartMessageAdapter msg) {
        String message = String.format("%s:%s belongs to cluster %s which is different from this one %s, connection rejected. Either another cluster is reachable for this one on the network (in this case make sure they can't connect), or CMG/MG repair was made and then some node that did not participate one is started (in this case, migrate the started node to the repaired cluster using CMG/MG repair tools)", msg.acceptorNode().name(), msg.acceptorNode().id(), msg.acceptorClusterId(), this.clusterIdSupplier.clusterId());
        this.sendRejectionMessageAndFailHandshake(message, HandshakeRejectionReason.CLUSTER_ID_MISMATCH, HandshakeException::new);
    }

    private void handleProductNameMismatch(HandshakeStartMessageAdapter msg) {
        String message = String.format("%s:%s runs product '%s' which is different from this one '%s', connection rejected", msg.acceptorNode().name(), msg.acceptorNode().id(), msg.productName(), this.productVersionSource.productName());
        this.sendRejectionMessageAndFailHandshake(message, HandshakeRejectionReason.PRODUCT_MISMATCH, HandshakeException::new);
    }

    private void handleRefusalToEstablishConnectionDueToStopping(HandshakeStartMessageAdapter msg) {
        String message = String.format("%s:%s tried to establish a connection with %s, but it's stopping", msg.acceptorNode().name(), msg.acceptorNode().id(), this.localNode.name());
        this.sendRejectionMessageAndFailHandshake(message, HandshakeRejectionReason.STOPPING, m -> new NodeStoppingException());
    }

    private void sendRejectionMessageAndFailHandshake(String message, HandshakeRejectionReason rejectionReason, Function<String, Exception> exceptionFactory) {
        HandshakeManagerUtils.sendRejectionMessageAndFailHandshake(message, rejectionReason, this.channel, this.localHandshakeCompleteFuture, exceptionFactory);
    }

    private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) {
        if (!this.stopping.getAsBoolean() && msg.reason().logAsWarn()) {
            LOG.warn("Handshake rejected by acceptor: {}", msg.message());
        } else {
            LOG.debug("Handshake rejected by acceptor: {}", msg.message());
        }
        if (msg.reason() == HandshakeRejectionReason.CLINCH) {
            this.giveUpClinch();
        } else {
            this.localHandshakeCompleteFuture.completeExceptionally(new HandshakeException(msg.message()));
        }
    }

    private void giveUpClinch() {
        RecoveryDescriptor descriptor = this.recoveryDescriptorProvider.getRecoveryDescriptor(this.remoteNode.name(), this.remoteNode.id(), this.connectionId);
        DescriptorAcquiry myAcquiry = descriptor.holder();
        assert (myAcquiry != null);
        assert (myAcquiry.channel() != null);
        assert (myAcquiry.channel() == this.ctx.channel()) : "Expected the descriptor to be held by current channel " + this.ctx.channel() + ", but it's held by another channel " + myAcquiry.channel();
        descriptor.release(this.ctx);
        myAcquiry.markClinchResolved();
        DescriptorAcquiry competitorAcquiry = descriptor.holder();
        if (competitorAcquiry != null) {
            this.completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry);
        } else {
            this.localHandshakeCompleteFuture.completeExceptionally(new ChannelAlreadyExistsException(this.remoteNode.id()));
        }
    }

    @Override
    public CompletableFuture<NettySender> localHandshakeFuture() {
        return this.localHandshakeCompleteFuture;
    }

    @Override
    public CompletionStage<NettySender> finalHandshakeFuture() {
        return this.masterHandshakeCompleteFuture.thenCompose(Function.identity());
    }

    @Override
    public IgniteProductVersionSource productVersionSource() {
        return this.productVersionSource;
    }

    private void handshake(RecoveryDescriptor descriptor) {
        PipelineUtils.afterHandshake(this.ctx.pipeline(), descriptor, this.createMessageHandler(), MESSAGE_FACTORY, (AckView)this.ackConfiguration.value());
        NetworkMessage response = this.createHandshakeStartResponseMessage(descriptor);
        ChannelFuture sendFuture = this.ctx.channel().writeAndFlush(new OutNetworkObject(response, Collections.emptyList()));
        NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
            if (throwable != null) {
                this.localHandshakeCompleteFuture.completeExceptionally(new HandshakeException("Failed to send handshake response: " + throwable.getMessage(), (Throwable)throwable));
            }
        });
    }

    protected NetworkMessage createHandshakeStartResponseMessage(RecoveryDescriptor descriptor) {
        return MESSAGE_FACTORY.handshakeStartResponseMessage().clientNode(HandshakeManagerUtils.clusterNodeToMessage(this.localNode)).receivedCount(descriptor.receivedCount()).connectionId(this.connectionId).build();
    }

    private MessageHandler createMessageHandler() {
        return this.handler.createMessageHandler(this.remoteNode, this.connectionId);
    }

    protected void finishHandshake() {
        this.ctx.pipeline().remove(this.handler);
        this.masterHandshakeCompleteFuture.complete(this.localHandshakeCompleteFuture);
        this.localHandshakeCompleteFuture.complete(new NettySender(this.channel, this.remoteNode.id(), this.remoteNode.name(), this.connectionId, this.recoveryDescriptor));
    }

    @TestOnly
    void setRemoteNode(InternalClusterNode remoteNode) {
        this.remoteNode = remoteNode;
    }
}

