/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.network.recovery;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.netty.ChannelCreationListener;
import org.apache.ignite.internal.network.netty.ChannelKey;
import org.apache.ignite.internal.network.netty.HandshakeHandler;
import org.apache.ignite.internal.network.netty.MessageHandler;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.network.netty.NettyUtils;
import org.apache.ignite.internal.network.netty.PipelineUtils;
import org.apache.ignite.internal.network.recovery.DescriptorAcquiry;
import org.apache.ignite.internal.network.recovery.HandshakeManagerUtils;
import org.apache.ignite.internal.network.recovery.HandshakeStartResponseMessageAdapter;
import org.apache.ignite.internal.network.recovery.HandshakeStartResponseMessageV1Adapter;
import org.apache.ignite.internal.network.recovery.HandshakeTieBreaker;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import org.apache.ignite.internal.network.recovery.StaleIdDetector;
import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
import org.apache.ignite.internal.network.recovery.message.ProbeMessage;
import org.apache.ignite.internal.version.IgniteProductVersionSource;

public class RecoveryAcceptorHandshakeManager
implements HandshakeManager {
    private static final IgniteLogger LOG = Loggers.forClass(RecoveryAcceptorHandshakeManager.class);
    private static final int MAX_CLINCH_TERMINATION_AWAIT_ATTEMPTS = 1000;
    protected final InternalClusterNode localNode;
    protected final NetworkMessagesFactory messageFactory;
    private final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture();
    private InternalClusterNode remoteNode;
    private short remoteChannelId;
    protected ChannelHandlerContext ctx;
    protected Channel channel;
    private HandshakeHandler handler;
    private long receivedCount;
    private final RecoveryDescriptorProvider recoveryDescriptorProvider;
    private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;
    private final StaleIdDetector staleIdDetector;
    protected final ClusterIdSupplier clusterIdSupplier;
    private final BooleanSupplier stopping;
    private final IgniteProductVersionSource productVersionSource;
    private RecoveryDescriptor recoveryDescriptor;

    public RecoveryAcceptorHandshakeManager(InternalClusterNode localNode, NetworkMessagesFactory messageFactory, RecoveryDescriptorProvider recoveryDescriptorProvider, HandshakeEventLoopSwitcher handshakeEventLoopSwitcher, StaleIdDetector staleIdDetector, ClusterIdSupplier clusterIdSupplier, ChannelCreationListener channelCreationListener, BooleanSupplier stopping, IgniteProductVersionSource productVersionSource) {
        this.localNode = localNode;
        this.messageFactory = messageFactory;
        this.recoveryDescriptorProvider = recoveryDescriptorProvider;
        this.handshakeEventLoopSwitcher = handshakeEventLoopSwitcher;
        this.staleIdDetector = staleIdDetector;
        this.clusterIdSupplier = clusterIdSupplier;
        this.stopping = stopping;
        this.productVersionSource = productVersionSource;
        this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
            if (throwable != null) {
                this.releaseResources();
                return;
            }
            channelCreationListener.handshakeFinished((NettySender)nettySender);
        });
    }

    private void releaseResources() {
        assert (this.ctx.executor().inEventLoop()) : "Release resources called outside of event loop";
        RecoveryDescriptor desc = this.recoveryDescriptor;
        if (desc != null) {
            desc.release(this.ctx);
        }
    }

    @Override
    public boolean isInitiator() {
        return false;
    }

    @Override
    public void onInit(ChannelHandlerContext handlerContext) {
        this.ctx = handlerContext;
        this.channel = handlerContext.channel();
        this.handler = (HandshakeHandler)this.ctx.handler();
    }

    @Override
    public void onConnectionOpen() {
        this.sendHandshakeStartMessage();
    }

    final void sendHandshakeStartMessage() {
        NetworkMessage handshakeStartMessage = this.createHandshakeStartMessage();
        ChannelFuture sendFuture = this.channel.writeAndFlush(new OutNetworkObject(handshakeStartMessage, Collections.emptyList()));
        NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
            if (throwable != null) {
                this.handshakeCompleteFuture.completeExceptionally(new HandshakeException("Failed to send handshake start message: " + throwable.getMessage(), (Throwable)throwable));
            }
        });
    }

    protected NetworkMessage createHandshakeStartMessage() {
        return this.messageFactory.handshakeStartMessage().serverNode(HandshakeManagerUtils.clusterNodeToMessage(this.localNode)).serverClusterId(this.clusterIdSupplier.clusterId()).productName(this.productVersionSource.productName()).productVersion(this.productVersionSource.productVersion().toString()).build();
    }

    @Override
    public void onMessage(NetworkMessage message) {
        if (message instanceof ProbeMessage) {
            this.onProbeMessage((ProbeMessage)message);
            return;
        }
        if (message instanceof HandshakeRejectedMessage) {
            this.onHandshakeRejectedMessage((HandshakeRejectedMessage)message);
            return;
        }
        if (message instanceof HandshakeStartResponseMessage) {
            this.onHandshakeStartResponseMessage(new HandshakeStartResponseMessageV1Adapter((HandshakeStartResponseMessage)message));
            return;
        }
        assert (this.recoveryDescriptor != null) : "Wrong acceptor handshake flow, message is " + message;
        assert (this.recoveryDescriptor.holderChannel() == this.channel) : "Expected " + this.channel + " but was " + this.recoveryDescriptor.holderChannel() + ", message is " + message;
        if (this.recoveryDescriptor.unacknowledgedCount() == 0) {
            this.finishHandshake();
        }
        this.ctx.fireChannelRead(message);
    }

    protected void onProbeMessage(ProbeMessage message) {
    }

    protected final void onHandshakeStartResponseMessage(HandshakeStartResponseMessageAdapter message) {
        if (this.possiblyRejectHandshakeStartResponse(message)) {
            return;
        }
        this.remoteNode = message.initiatorNode().asClusterNode();
        this.receivedCount = message.receivedCount();
        this.remoteChannelId = message.connectionId();
        ChannelKey channelKey = new ChannelKey(this.remoteNode.name(), this.remoteNode.id(), this.remoteChannelId);
        this.handshakeEventLoopSwitcher.switchEventLoopIfNeeded(this.channel, channelKey).thenRun(() -> this.tryAcquireDescriptorAndFinishHandshake(message));
    }

    private boolean possiblyRejectHandshakeStartResponse(HandshakeStartResponseMessageAdapter message) {
        if (this.staleIdDetector.isIdStale(message.initiatorNode().id())) {
            this.handleStaleInitiatorId(message);
            return true;
        }
        if (this.stopping.getAsBoolean()) {
            this.handleRefusalToEstablishConnectionDueToStopping(message);
            return true;
        }
        return false;
    }

    private void handleStaleInitiatorId(HandshakeStartResponseMessageAdapter msg) {
        String message = String.format("%s:%s is stale, it should be restarted to be allowed to connect", msg.initiatorNode().name(), msg.initiatorNode().id());
        this.sendRejectionMessageAndFailHandshake(message, HandshakeRejectionReason.STALE_LAUNCH_ID, HandshakeException::new);
    }

    private void handleRefusalToEstablishConnectionDueToStopping(HandshakeStartResponseMessageAdapter msg) {
        String message = String.format("%s:%s tried to establish a connection with %s, but it's stopping", msg.initiatorNode().name(), msg.initiatorNode().id(), this.localNode.name());
        this.sendRejectionMessageAndFailHandshake(message, HandshakeRejectionReason.STOPPING, m -> new NodeStoppingException());
    }

    private void sendRejectionMessageAndFailHandshake(String message, HandshakeRejectionReason rejectionReason, Function<String, Exception> exceptionFactory) {
        HandshakeManagerUtils.sendRejectionMessageAndFailHandshake(message, rejectionReason, this.channel, this.handshakeCompleteFuture, exceptionFactory);
    }

    private void tryAcquireDescriptorAndFinishHandshake(HandshakeStartResponseMessageAdapter message) {
        this.tryAcquireDescriptorAndFinishHandshake(message, 0);
    }

    private void tryAcquireDescriptorAndFinishHandshake(HandshakeStartResponseMessageAdapter handshakeStartResponse, int attempt) {
        if (attempt > 1000) {
            throw new IllegalStateException("Too many attempts during handshake from " + this.remoteNode.name() + "(" + this.remoteNode.id() + ":" + this.remoteChannelId + ") via " + this.ctx.channel());
        }
        RecoveryDescriptor descriptor = this.recoveryDescriptorProvider.getRecoveryDescriptor(this.remoteNode.name(), this.remoteNode.id(), this.remoteChannelId);
        while (!descriptor.tryAcquire(this.ctx, this.handshakeCompleteFuture)) {
            if (HandshakeTieBreaker.shouldCloseChannel(this.localNode.id(), this.remoteNode.id())) {
                DescriptorAcquiry competitorAcquiry = descriptor.holder();
                if (competitorAcquiry == null) continue;
                if (this.possiblyRejectHandshakeStartResponse(handshakeStartResponse)) {
                    return;
                }
                competitorAcquiry.clinchResolved().whenComplete((res, ex) -> {
                    if (this.ctx.executor().inEventLoop()) {
                        this.tryAcquireDescriptorAndFinishHandshake(handshakeStartResponse, attempt + 1);
                    } else {
                        this.ctx.executor().execute(() -> this.tryAcquireDescriptorAndFinishHandshake(handshakeStartResponse, attempt + 1));
                    }
                });
                return;
            }
            this.rejectHandshakeDueToLosingClinch(descriptor);
            return;
        }
        if (this.possiblyRejectHandshakeStartResponse(handshakeStartResponse)) {
            descriptor.release(this.ctx);
            return;
        }
        this.recoveryDescriptor = descriptor;
        this.handshake(descriptor);
    }

    private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) {
        msg.reason().print(this.stopping.getAsBoolean(), LOG, "Handshake rejected by initiator: {}", msg.message());
        this.handshakeCompleteFuture.completeExceptionally(HandshakeManagerUtils.createExceptionFromRejectionMessage(msg));
    }

    private void rejectHandshakeDueToLosingClinch(RecoveryDescriptor descriptor) {
        String localErrorMessage = "Failed to acquire recovery descriptor during handshake, it is held by: " + descriptor.holderDescription();
        LOG.debug(localErrorMessage, new Object[0]);
        HandshakeManagerUtils.sendRejectionMessageAndFailHandshake(localErrorMessage, "Handshake clinch detected, this handshake will be terminated, winning channel is " + descriptor.holderDescription(), HandshakeRejectionReason.CLINCH, this.channel, this.handshakeCompleteFuture, HandshakeException::new);
    }

    private void handshake(RecoveryDescriptor descriptor) {
        PipelineUtils.afterHandshake(this.ctx.pipeline(), descriptor, this.createMessageHandler(), this.messageFactory);
        HandshakeFinishMessage response = this.messageFactory.handshakeFinishMessage().receivedCount(descriptor.receivedCount()).build();
        CompletableFuture<Void> sendFuture = NettyUtils.toCompletableFuture(this.channel.write(new OutNetworkObject(response, Collections.emptyList())));
        descriptor.acknowledge(this.receivedCount);
        int unacknowledgedCount = descriptor.unacknowledgedCount();
        if (unacknowledgedCount > 0) {
            CompletableFuture[] futs = new CompletableFuture[unacknowledgedCount + 1];
            futs[0] = sendFuture;
            List<OutNetworkObject> networkMessages = descriptor.unacknowledgedMessages();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Resending on handshake: {}", networkMessages.stream().map(OutNetworkObject::networkMessage).collect(Collectors.toList()));
            }
            for (int i = 0; i < networkMessages.size(); ++i) {
                OutNetworkObject networkMessage = networkMessages.get(i);
                futs[i + 1] = NettyUtils.toCompletableFuture(this.channel.write(networkMessage));
            }
            sendFuture = CompletableFuture.allOf(futs);
        }
        this.channel.flush();
        boolean hasUnacknowledgedMessages = unacknowledgedCount > 0;
        sendFuture.whenComplete((unused, throwable) -> {
            if (throwable != null) {
                this.handshakeCompleteFuture.completeExceptionally(new HandshakeException("Failed to send handshake response: " + throwable.getMessage(), (Throwable)throwable));
            } else if (!hasUnacknowledgedMessages) {
                this.finishHandshake();
            }
        });
    }

    private MessageHandler createMessageHandler() {
        return this.handler.createMessageHandler(this.remoteNode, this.remoteChannelId);
    }

    @Override
    public CompletableFuture<NettySender> localHandshakeFuture() {
        return this.handshakeCompleteFuture;
    }

    @Override
    public CompletionStage<NettySender> finalHandshakeFuture() {
        return this.handshakeCompleteFuture;
    }

    @Override
    public IgniteProductVersionSource productVersionSource() {
        return this.productVersionSource;
    }

    private void finishHandshake() {
        this.ctx.pipeline().remove(this.handler);
        this.handshakeCompleteFuture.complete(new NettySender(this.channel, this.remoteNode.id(), this.remoteNode.name(), this.remoteChannelId, this.recoveryDescriptor));
    }
}

