package org.apache.ignite3.internal.cluster.management;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite3.internal.cluster.management.network.messages.NodeStopMessage;
import org.apache.ignite3.internal.cluster.management.network.messages.SuccessResponseMessage;
import org.apache.ignite3.internal.failure.NodeStopper;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.network.ClusterService;
import org.apache.ignite3.internal.network.NetworkMessage;
import org.apache.ignite3.network.ClusterNode;
import org.jetbrains.annotations.VisibleForTesting;

/* loaded from: input_file:org/apache/ignite3/internal/cluster/management/ClusterStopper.class */
public class ClusterStopper {
    private static final IgniteLogger LOG = Loggers.forClass(ClusterStopper.class);
    private static final int STOP_MESSAGE_SEND_TIMEOUT_MILLIS = 10000;
    private static final int RETRY_CNT = 5;
    private final ClusterService clusterService;
    private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
    private final ScheduledExecutorService retryExecutor;
    private final NodeStopper localNodeStopper;

    public ClusterStopper(ClusterService clusterService, ScheduledExecutorService scheduledExecutorService, NodeStopper nodeStopper) {
        this.clusterService = clusterService;
        this.retryExecutor = scheduledExecutorService;
        this.localNodeStopper = nodeStopper;
    }

    public CompletableFuture<Void> stopCluster(String str) {
        Collection<ClusterNode> collection = (List) remoteNodes().stream().filter(clusterNode -> {
            return !this.clusterService.nodeName().equals(clusterNode.name());
        }).collect(Collectors.toList());
        NodeStopMessage build = this.msgFactory.nodeStopMessage().reason(str).build();
        LOG.info("Cluster stopped.", new Object[0]);
        return invokeMessage(collection, build).thenCompose(r6 -> {
            LOG.info("Cluster stopped successfully.", new Object[0]);
            return doLocalNodeStop(build);
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r4, th) -> {
            if (th instanceof InternalStopException) {
                return;
            }
            LOG.debug("Critical error encountered during cluster stopping.", new Object[0]);
        });
    }

    public CompletableFuture<Void> doLocalNodeStop(NodeStopMessage nodeStopMessage) {
        LOG.info("Node will be stopped due to: {}.", nodeStopMessage.reason());
        NodeStopper nodeStopper = this.localNodeStopper;
        Objects.requireNonNull(nodeStopper);
        return CompletableFuture.runAsync(nodeStopper::stopNode);
    }

    private CompletableFuture<Void> invokeMessage(Collection<ClusterNode> collection, NetworkMessage networkMessage) {
        return allOf(collection, clusterNode -> {
            return sendWithRetry(clusterNode, networkMessage).thenAccept(networkMessage2 -> {
                if (!(networkMessage2 instanceof SuccessResponseMessage)) {
                    throw new InternalStopException(String.format("Unexpected response from node \"%s\": %s", clusterNode.name(), networkMessage2.getClass()));
                }
            });
        });
    }

    private static CompletableFuture<Void> allOf(Collection<ClusterNode> collection, Function<ClusterNode, CompletableFuture<?>> function) {
        return CompletableFuture.allOf((CompletableFuture[]) collection.stream().map(function).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    private CompletableFuture<NetworkMessage> sendWithRetry(ClusterNode clusterNode, NetworkMessage networkMessage) {
        CompletableFuture<NetworkMessage> completableFuture = new CompletableFuture<>();
        sendWithRetry(clusterNode, networkMessage, completableFuture, 5);
        return completableFuture.whenComplete((networkMessage2, th) -> {
            if (th != null) {
                LOG.warn("Unable to send message [msg={}, target={}]", th, networkMessage.getClass(), clusterNode);
            }
        });
    }

    private void sendWithRetry(ClusterNode clusterNode, NetworkMessage networkMessage, CompletableFuture<NetworkMessage> completableFuture, int i) {
        this.clusterService.messagingService().invoke(clusterNode, networkMessage, 10000L).whenComplete((networkMessage2, th) -> {
            if (th == null) {
                LOG.info("Remote node \"%s\" has been successfully stopped.", clusterNode.name());
                completableFuture.complete(networkMessage2);
            } else if ((th instanceof NodeStoppingException) || i == 1) {
                completableFuture.completeExceptionally(th);
            } else {
                LOG.error("Remote node \"{}\" stop failed [reason={}]", th, clusterNode.name(), th.getMessage());
                this.retryExecutor.schedule(() -> {
                    sendWithRetry(clusterNode, networkMessage, completableFuture, i - 1);
                }, 500L, TimeUnit.MILLISECONDS);
            }
        });
    }

    @VisibleForTesting
    Collection<ClusterNode> remoteNodes() {
        return this.clusterService.topologyService().allMembers();
    }
}
