/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.network.wrapper;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.thread.ExecutorChooser;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.network.NetworkAddress;

public class JumpToExecutorByConsistentIdAfterSend
implements MessagingService {
    private final MessagingService messagingService;
    private final String localConsistentId;
    private final ExecutorChooser<NetworkMessage> executorChooser;

    public JumpToExecutorByConsistentIdAfterSend(MessagingService messagingService, String localConsistentId, ExecutorChooser<NetworkMessage> executorChooser) {
        this.messagingService = messagingService;
        this.localConsistentId = localConsistentId;
        this.executorChooser = executorChooser;
    }

    @Override
    public void weakSend(InternalClusterNode recipient, NetworkMessage msg) {
        this.messagingService.weakSend(recipient, msg);
    }

    @Override
    public CompletableFuture<Void> send(InternalClusterNode recipient, ChannelType channelType, NetworkMessage msg) {
        CompletableFuture<Void> future = this.messagingService.send(recipient, channelType, msg);
        return this.switchResponseHandlingToAnotherThreadIfNeeded(msg, future, recipient.name());
    }

    @Override
    public CompletableFuture<Void> send(String recipientConsistentId, ChannelType channelType, NetworkMessage msg) {
        CompletableFuture<Void> future = this.messagingService.send(recipientConsistentId, channelType, msg);
        return this.switchResponseHandlingToAnotherThreadIfNeeded(msg, future, recipientConsistentId);
    }

    @Override
    public CompletableFuture<Void> send(NetworkAddress recipientNetworkAddress, ChannelType channelType, NetworkMessage msg) {
        throw new UnsupportedOperationException("Sending by network address is not supported by this implementation.");
    }

    @Override
    public CompletableFuture<Void> respond(InternalClusterNode recipient, ChannelType channelType, NetworkMessage msg, long correlationId) {
        CompletableFuture<Void> future = this.messagingService.respond(recipient, channelType, msg, correlationId);
        return this.switchResponseHandlingToAnotherThreadIfNeeded(msg, future, recipient.name());
    }

    @Override
    public CompletableFuture<Void> respond(String recipientConsistentId, ChannelType channelType, NetworkMessage msg, long correlationId) {
        CompletableFuture<Void> future = this.messagingService.respond(recipientConsistentId, channelType, msg, correlationId);
        return this.switchResponseHandlingToAnotherThreadIfNeeded(msg, future, recipientConsistentId);
    }

    @Override
    public CompletableFuture<NetworkMessage> invoke(InternalClusterNode recipient, ChannelType channelType, NetworkMessage msg, long timeout) {
        CompletableFuture<NetworkMessage> future = this.messagingService.invoke(recipient, channelType, msg, timeout);
        return this.switchResponseHandlingToAnotherThreadIfNeeded(msg, future, recipient.name());
    }

    @Override
    public CompletableFuture<NetworkMessage> invoke(String recipientConsistentId, ChannelType channelType, NetworkMessage msg, long timeout) {
        CompletableFuture<NetworkMessage> future = this.messagingService.invoke(recipientConsistentId, channelType, msg, timeout);
        return this.switchResponseHandlingToAnotherThreadIfNeeded(msg, future, recipientConsistentId);
    }

    private <T> CompletableFuture<T> switchResponseHandlingToAnotherThreadIfNeeded(NetworkMessage msg, CompletableFuture<T> future, String recipientConsistentId) {
        if (future.isDone()) {
            return future;
        }
        if (this.isSelf(recipientConsistentId)) {
            return future;
        }
        return ((CompletableFuture)future.handleAsync(CompletableFutures::completedOrFailedFuture, this.executorChooser.choose(msg))).thenCompose(Function.identity());
    }

    private boolean isSelf(String recipientConsistentId) {
        return this.localConsistentId.equals(recipientConsistentId);
    }

    @Override
    public void addMessageHandler(Class<?> messageGroup, NetworkMessageHandler handler) {
        this.messagingService.addMessageHandler(messageGroup, handler);
    }

    @Override
    public void addMessageHandler(Class<?> messageGroup, ExecutorChooser<NetworkMessage> executorChooser, NetworkMessageHandler handler) {
        this.messagingService.addMessageHandler(messageGroup, executorChooser, handler);
    }
}

