/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.cluster.management;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.cluster.management.InternalStopException;
import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite.internal.cluster.management.network.messages.NodeStopMessage;
import org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
import org.apache.ignite.internal.failure.NodeStopper;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
import org.jetbrains.annotations.VisibleForTesting;

public class ClusterStopper {
    private static final IgniteLogger LOG = Loggers.forClass(ClusterStopper.class);
    private static final int STOP_MESSAGE_SEND_TIMEOUT_MILLIS = 10000;
    private static final int RETRY_CNT = 5;
    private final ClusterService clusterService;
    private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
    private final ScheduledExecutorService retryExecutor;
    private final NodeStopper localNodeStopper;

    public ClusterStopper(ClusterService clusterService, ScheduledExecutorService retryExecutor, NodeStopper localNodeStopper) {
        this.clusterService = clusterService;
        this.retryExecutor = retryExecutor;
        this.localNodeStopper = localNodeStopper;
    }

    public CompletableFuture<Void> stopCluster(String reason) {
        List<InternalClusterNode> nodesToStop = this.remoteNodes().stream().filter(node -> !this.clusterService.nodeName().equals(node.name())).collect(Collectors.toList());
        NodeStopMessage stopMessage = this.msgFactory.nodeStopMessage().reason(reason).build();
        LOG.info("Cluster stopped.", new Object[0]);
        return ((CompletableFuture)this.invokeMessage(nodesToStop, stopMessage).thenCompose(v -> {
            LOG.info("Cluster stopped successfully.", new Object[0]);
            return this.doLocalNodeStop(stopMessage);
        })).whenComplete((v, e) -> {
            if (!(e instanceof InternalStopException)) {
                LOG.debug("Critical error encountered during cluster stopping.", new Object[0]);
            }
        });
    }

    public CompletableFuture<Void> doLocalNodeStop(NodeStopMessage msg) {
        LOG.info("Node will be stopped due to: {}.", new Object[]{msg.reason()});
        return CompletableFuture.runAsync(() -> ((NodeStopper)this.localNodeStopper).stopNode());
    }

    private CompletableFuture<Void> invokeMessage(Collection<InternalClusterNode> nodes, NetworkMessage message) {
        return ClusterStopper.allOf(nodes, node -> this.sendWithRetry((InternalClusterNode)node, message).thenAccept(response -> {
            if (!(response instanceof SuccessResponseMessage)) {
                throw new InternalStopException(String.format("Unexpected response from node \"%s\": %s", node.name(), response.getClass()));
            }
        }));
    }

    private static CompletableFuture<Void> allOf(Collection<InternalClusterNode> nodes, Function<InternalClusterNode, CompletableFuture<?>> futureProducer) {
        CompletableFuture[] futures = (CompletableFuture[])nodes.stream().map(futureProducer).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(futures);
    }

    private CompletableFuture<NetworkMessage> sendWithRetry(InternalClusterNode node, NetworkMessage msg) {
        CompletableFuture<NetworkMessage> result = new CompletableFuture<NetworkMessage>();
        this.sendWithRetry(node, msg, result, 5);
        return result.whenComplete((v, e) -> {
            if (e != null) {
                LOG.warn("Unable to send message [msg={}, target={}]", e, new Object[]{msg.getClass(), node});
            }
        });
    }

    private void sendWithRetry(InternalClusterNode node, NetworkMessage msg, CompletableFuture<NetworkMessage> result, int attempts) {
        this.clusterService.messagingService().invoke(node, msg, 10000L).whenComplete((response, e) -> {
            if (e == null) {
                LOG.info("Remote node \"%s\" has been successfully stopped.", new Object[]{node.name()});
                result.complete((NetworkMessage)response);
            } else if (e instanceof NodeStoppingException || attempts == 1) {
                result.completeExceptionally((Throwable)e);
            } else {
                LOG.error("Remote node \"{}\" stop failed [reason={}]", e, new Object[]{node.name(), e.getMessage()});
                this.retryExecutor.schedule(() -> this.sendWithRetry(node, msg, result, attempts - 1), 500L, TimeUnit.MILLISECONDS);
            }
        });
    }

    @VisibleForTesting
    Collection<InternalClusterNode> remoteNodes() {
        return this.clusterService.topologyService().allMembers();
    }
}

