/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.disaster.system;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite3.internal.cluster.management.ClusterState;
import org.apache.ignite3.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite3.internal.cluster.management.network.messages.SuccessResponseMessage;
import org.apache.ignite3.internal.disaster.system.ServerRestarter;
import org.apache.ignite3.internal.disaster.system.SystemDisasterRecoveryManager;
import org.apache.ignite3.internal.disaster.system.SystemDisasterRecoveryStorage;
import org.apache.ignite3.internal.disaster.system.exception.ClusterResetException;
import org.apache.ignite3.internal.disaster.system.exception.MigrateException;
import org.apache.ignite3.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
import org.apache.ignite3.internal.disaster.system.message.ResetClusterMessage;
import org.apache.ignite3.internal.disaster.system.message.ResetClusterMessageBuilder;
import org.apache.ignite3.internal.disaster.system.message.StartMetastorageRepairRequest;
import org.apache.ignite3.internal.disaster.system.message.StartMetastorageRepairResponse;
import org.apache.ignite3.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup;
import org.apache.ignite3.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.manager.IgniteComponent;
import org.apache.ignite3.internal.metastorage.impl.MetastorageGroupMaintenance;
import org.apache.ignite3.internal.network.ClusterIdSupplier;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.MessagingService;
import org.apache.ignite3.internal.network.NetworkMessage;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.vault.VaultManager;
import org.jetbrains.annotations.Nullable;

public class SystemDisasterRecoveryManagerImpl
implements SystemDisasterRecoveryManager,
IgniteComponent {
    private static final IgniteLogger LOG = Loggers.forClass(SystemDisasterRecoveryManagerImpl.class);
    private final String thisNodeName;
    private final TopologyService topologyService;
    private final MessagingService messagingService;
    private final ServerRestarter restarter;
    private final MetastorageGroupMaintenance metastorageGroupMaintenance;
    private final ClusterIdSupplier clusterIdSupplier;
    private final SystemDisasterRecoveryMessagesFactory messagesFactory = new SystemDisasterRecoveryMessagesFactory();
    private static final CmgMessagesFactory cmgMessagesFactory = new CmgMessagesFactory();
    private final SystemDisasterRecoveryStorage storage;
    private final Executor restartExecutor;
    private final ClusterManagementGroupManager cmgManager;

    public SystemDisasterRecoveryManagerImpl(String thisNodeName, TopologyService topologyService, MessagingService messagingService, VaultManager vaultManager, ServerRestarter restarter, MetastorageGroupMaintenance metastorageGroupMaintenance, ClusterManagementGroupManager cmgManager, ClusterIdSupplier clusterIdSupplier) {
        this.thisNodeName = thisNodeName;
        this.topologyService = topologyService;
        this.messagingService = messagingService;
        this.restarter = restarter;
        this.metastorageGroupMaintenance = metastorageGroupMaintenance;
        this.cmgManager = cmgManager;
        this.clusterIdSupplier = clusterIdSupplier;
        this.storage = new SystemDisasterRecoveryStorage(vaultManager);
        this.restartExecutor = new ThreadPerTaskExecutor(thisNodeName + "-restart-");
    }

    @Override
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        this.messagingService.addMessageHandler(SystemDisasterRecoveryMessageGroup.class, (message, sender, correlationId) -> {
            if (message instanceof ResetClusterMessage) {
                assert (correlationId != null);
                this.handleResetClusterMessage((ResetClusterMessage)message, sender, correlationId);
            } else if (message instanceof StartMetastorageRepairRequest) {
                assert (correlationId != null);
                this.handleStartMetastorageRepairRequest(sender, correlationId);
            } else if (message instanceof BecomeMetastorageLeaderMessage) {
                assert (correlationId != null);
                this.handleBecomeMetastorageLeaderMessage((BecomeMetastorageLeaderMessage)message, sender, correlationId);
            }
        });
        return CompletableFutures.nullCompletedFuture();
    }

    private void handleResetClusterMessage(ResetClusterMessage message, InternalClusterNode sender, long correlationId) {
        this.restartExecutor.execute(() -> {
            this.storage.saveResetClusterMessage(message);
            ((CompletableFuture)this.messagingService.respond(sender, (NetworkMessage)SystemDisasterRecoveryManagerImpl.successResponseMessage(), correlationId).thenRunAsync(() -> {
                if (!this.thisNodeName.equals(sender.name())) {
                    this.restarter.initiateRestart();
                }
            }, this.restartExecutor)).whenComplete((res, ex) -> {
                if (ex != null) {
                    LOG.error("Error when handling a ResetClusterMessage", (Throwable)ex);
                }
            });
        });
    }

    private static SuccessResponseMessage successResponseMessage() {
        return cmgMessagesFactory.successResponseMessage().build();
    }

    private void handleStartMetastorageRepairRequest(InternalClusterNode sender, long correlationId) {
        ((CompletableFuture)this.metastorageGroupMaintenance.raftNodeIndex().thenAccept(indexWithTerm -> {
            this.storage.saveWitnessedMetastorageRepairClusterId(this.requiredClusterId());
            StartMetastorageRepairResponse response = this.messagesFactory.startMetastorageRepairResponse().raftIndex(indexWithTerm.index()).raftTerm(indexWithTerm.term()).build();
            this.messagingService.respond(sender, (NetworkMessage)response, correlationId);
        })).whenComplete((res, ex) -> {
            if (ex != null) {
                LOG.error("Error when handling a StartMetastorageRepairRequest", (Throwable)ex);
            }
        });
    }

    private UUID requiredClusterId() {
        return Objects.requireNonNull(this.clusterIdSupplier.clusterId(), "No clusterId yet");
    }

    private void handleBecomeMetastorageLeaderMessage(BecomeMetastorageLeaderMessage message, InternalClusterNode sender, long correlationId) {
        ((CompletableFuture)CompletableFutures.nullCompletedFuture().thenRun(() -> {
            this.metastorageGroupMaintenance.initiateForcefulVotersChange(message.termBeforeChange(), message.targetVotingSet());
            this.messagingService.respond(sender, (NetworkMessage)SystemDisasterRecoveryManagerImpl.successResponseMessage(), correlationId);
        })).whenComplete((res, ex) -> {
            if (ex != null) {
                LOG.error("Error when handling a BecomeMetastorageLeaderMessage", (Throwable)ex);
            }
        });
    }

    @Override
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        return CompletableFutures.nullCompletedFuture();
    }

    @Override
    public void saveClusterState(ClusterState clusterState) {
        this.storage.saveClusterState(clusterState);
    }

    @Override
    public void markInitConfigApplied() {
        this.storage.markInitConfigApplied();
    }

    @Override
    public CompletableFuture<Void> resetCluster(List<String> proposedCmgNodeNames) {
        if (proposedCmgNodeNames == null) {
            return CompletableFuture.failedFuture(new ClusterResetException("Proposed CMG node names can't be null."));
        }
        return this.resetClusterInternal(proposedCmgNodeNames, null);
    }

    @Override
    public CompletableFuture<Void> resetClusterRepairingMetastorage(@Nullable List<String> proposedCmgNodeNames, int metastorageReplicationFactor) {
        return this.proposedCmgNodeNamesOrCurrentIfNull(proposedCmgNodeNames).thenCompose(cmgNodeNames -> this.resetClusterInternal((Collection<String>)cmgNodeNames, metastorageReplicationFactor));
    }

    private CompletableFuture<Void> resetClusterInternal(Collection<String> proposedCmgNodeNames, @Nullable Integer metastorageReplicationFactor) {
        try {
            return this.doResetCluster(proposedCmgNodeNames, metastorageReplicationFactor);
        }
        catch (ClusterResetException e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    private CompletableFuture<Void> doResetCluster(Collection<String> proposedCmgNodeNames, @Nullable Integer metastorageReplicationFactor) {
        Collection<InternalClusterNode> nodesInTopology = this.topologyService.allMembers();
        SystemDisasterRecoveryManagerImpl.ensureNoRepetitions(proposedCmgNodeNames);
        this.ensureContainsThisNodeName(proposedCmgNodeNames);
        SystemDisasterRecoveryManagerImpl.ensureAllProposedCmgNodesAreInTopology(proposedCmgNodeNames, nodesInTopology);
        SystemDisasterRecoveryManagerImpl.ensureReplicationFactorIsPositiveIfGiven(metastorageReplicationFactor);
        SystemDisasterRecoveryManagerImpl.ensureReplicationFactorFitsTopologyIfGiven(metastorageReplicationFactor, nodesInTopology);
        this.ensureInitConfigApplied();
        ClusterState clusterState = this.ensureClusterStateIsPresent();
        if (metastorageReplicationFactor != null) {
            this.ensureNoMgMajorityIsOnline(clusterState, nodesInTopology);
        }
        ResetClusterMessage message = this.buildResetClusterMessageForReset(proposedCmgNodeNames, clusterState, metastorageReplicationFactor, nodesInTopology);
        Map<String, CompletableFuture<NetworkMessage>> responseFutures = this.sendResetClusterMessageTo(nodesInTopology, message);
        return CompletableFutures.allOf(responseFutures.values()).handleAsync((res, ex) -> {
            boolean repairMg;
            SystemDisasterRecoveryManagerImpl.rethrowIfError(ex);
            boolean bl = repairMg = metastorageReplicationFactor != null;
            if (SystemDisasterRecoveryManagerImpl.enoughResponsesAreSuccesses(repairMg, proposedCmgNodeNames, responseFutures)) {
                this.restarter.initiateRestart();
                return null;
            }
            throw new ClusterResetException(SystemDisasterRecoveryManagerImpl.errorMessageForNotEnoughSuccesses(repairMg, responseFutures));
        }, this.restartExecutor);
    }

    private CompletableFuture<Collection<String>> proposedCmgNodeNamesOrCurrentIfNull(@Nullable List<String> proposedCmgNodeNames) {
        return proposedCmgNodeNames != null ? CompletableFuture.completedFuture(proposedCmgNodeNames) : this.cmgManager.clusterState().thenApply(ClusterState::cmgNodes);
    }

    private static void ensureReplicationFactorIsPositiveIfGiven(@Nullable Integer metastorageReplicationFactor) {
        if (metastorageReplicationFactor != null && metastorageReplicationFactor <= 0) {
            throw new ClusterResetException("Metastorage replication factor must be positive.");
        }
    }

    private static void ensureNoRepetitions(Collection<String> proposedCmgNodeNames) {
        if (new HashSet<String>(proposedCmgNodeNames).size() != proposedCmgNodeNames.size()) {
            throw new ClusterResetException("New CMG node names have repetitions: " + String.valueOf(proposedCmgNodeNames) + ".");
        }
    }

    private void ensureContainsThisNodeName(Collection<String> proposedCmgNodeNames) {
        if (!proposedCmgNodeNames.contains(this.thisNodeName)) {
            throw new ClusterResetException("Current node is not contained in the new CMG, so it cannot conduct a cluster reset.");
        }
    }

    private void ensureInitConfigApplied() {
        if (!this.storage.isInitConfigApplied()) {
            throw new ClusterResetException("Initial configuration is not applied, so the node cannot serve as a cluster reset conductor.");
        }
    }

    private static void ensureAllProposedCmgNodesAreInTopology(Collection<String> proposedCmgNodeNames, Collection<InternalClusterNode> nodesInTopology) {
        Set namesOfNodesInTopology = nodesInTopology.stream().map(InternalClusterNode::name).collect(Collectors.toSet());
        HashSet<String> notInTopology = new HashSet<String>(proposedCmgNodeNames);
        notInTopology.removeAll(namesOfNodesInTopology);
        if (!notInTopology.isEmpty()) {
            throw new ClusterResetException("Some of proposed CMG nodes are not online: " + String.valueOf(notInTopology) + ".");
        }
    }

    private static void ensureReplicationFactorFitsTopologyIfGiven(@Nullable Integer metastorageReplicationFactor, Collection<InternalClusterNode> nodesInTopology) {
        if (metastorageReplicationFactor != null && metastorageReplicationFactor > nodesInTopology.size()) {
            throw new ClusterResetException("Metastorage replication factor cannot exceed size of current physical topology (" + nodesInTopology.size() + ").");
        }
    }

    private ClusterState ensureClusterStateIsPresent() {
        ClusterState clusterState = this.storage.readClusterState();
        if (clusterState == null) {
            throw new ClusterResetException("Node does not have cluster state.");
        }
        return clusterState;
    }

    private void ensureNoMgMajorityIsOnline(ClusterState clusterState, Collection<InternalClusterNode> nodesInTopology) {
        Set namesOfNodesInTopology = nodesInTopology.stream().map(InternalClusterNode::name).collect(Collectors.toSet());
        HashSet<String> nodes = new HashSet<String>(clusterState.metaStorageNodes());
        nodes.retainAll(namesOfNodesInTopology);
        int majority = SystemDisasterRecoveryManagerImpl.majoritySizeFor(clusterState.metaStorageNodes().size());
        if (nodes.size() >= majority) {
            throw new ClusterResetException(String.format("Majority repair is rejected because majority of Metastorage nodes are online [metastorageNodes=%s, onlineNodes=%s].", clusterState.metaStorageNodes(), namesOfNodesInTopology));
        }
    }

    private ResetClusterMessage buildResetClusterMessageForReset(Collection<String> proposedCmgNodeNames, ClusterState clusterState, @Nullable Integer metastorageReplicationFactor, Collection<InternalClusterNode> nodesInTopology) {
        ArrayList<UUID> formerClusterIds = new ArrayList<UUID>(Objects.requireNonNullElse(clusterState.formerClusterIds(), new ArrayList()));
        formerClusterIds.add(clusterState.clusterTag().clusterId());
        ResetClusterMessageBuilder builder = this.messagesFactory.resetClusterMessage().newCmgNodes(new HashSet<String>(proposedCmgNodeNames)).currentMetaStorageNodes(clusterState.metaStorageNodes()).clusterName(clusterState.clusterTag().clusterName()).clusterId(UUID.randomUUID()).formerClusterIds(formerClusterIds).initialClusterConfiguration(clusterState.initialClusterConfiguration());
        if (metastorageReplicationFactor != null) {
            builder.metastorageReplicationFactor(metastorageReplicationFactor);
            builder.conductor(this.thisNodeName);
            builder.participatingNodes(nodesInTopology.stream().map(InternalClusterNode::name).collect(Collectors.toSet()));
        }
        return builder.build();
    }

    private Map<String, CompletableFuture<NetworkMessage>> sendResetClusterMessageTo(Collection<InternalClusterNode> nodesInTopology, ResetClusterMessage message) {
        HashMap<String, CompletableFuture<NetworkMessage>> responseFutures = new HashMap<String, CompletableFuture<NetworkMessage>>();
        for (InternalClusterNode node : nodesInTopology) {
            responseFutures.put(node.name(), this.messagingService.invoke(node, (NetworkMessage)message, 10000L));
        }
        return responseFutures;
    }

    private static void rethrowIfError(Throwable ex) {
        if (ex instanceof Error) {
            throw (Error)ex;
        }
    }

    private static boolean enoughResponsesAreSuccesses(boolean repairMg, Collection<String> proposedCmgNodeNames, Map<String, CompletableFuture<NetworkMessage>> responseFutures) {
        if (repairMg) {
            return responseFutures.values().stream().allMatch(CompletableFutures::isCompletedSuccessfully);
        }
        return SystemDisasterRecoveryManagerImpl.isMajorityOfCmgAreSuccesses(proposedCmgNodeNames, responseFutures);
    }

    private static boolean isMajorityOfCmgAreSuccesses(Collection<String> proposedCmgNodeNames, Map<String, CompletableFuture<NetworkMessage>> responseFutures) {
        HashSet<String> newCmgNodesSet = new HashSet<String>(proposedCmgNodeNames);
        List futuresFromNewCmg = responseFutures.entrySet().stream().filter(entry -> newCmgNodesSet.contains(entry.getKey())).map(Map.Entry::getValue).collect(Collectors.toList());
        assert (futuresFromNewCmg.size() == proposedCmgNodeNames.size()) : futuresFromNewCmg.size() + " futures, but " + proposedCmgNodeNames.size() + " nodes";
        long successes = futuresFromNewCmg.stream().filter(CompletableFutures::isCompletedSuccessfully).count();
        return successes >= (long)SystemDisasterRecoveryManagerImpl.majoritySizeFor(futuresFromNewCmg.size());
    }

    private static int majoritySizeFor(int votingMembersCount) {
        return votingMembersCount / 2 + 1;
    }

    private static String errorMessageForNotEnoughSuccesses(boolean repairMg, Map<String, CompletableFuture<NetworkMessage>> responseFutures) {
        if (repairMg) {
            return String.format("Did not get successful response from at least one node, failing cluster reset [failedNode=%s].", SystemDisasterRecoveryManagerImpl.findAnyFailedNodeName(responseFutures));
        }
        return "Did not get successful responses from new CMG majority, failing cluster reset.";
    }

    private static String findAnyFailedNodeName(Map<String, CompletableFuture<NetworkMessage>> responseFutures) {
        return (String)responseFutures.entrySet().stream().filter(entry -> ((CompletableFuture)entry.getValue()).isCompletedExceptionally()).findAny().orElseThrow(() -> new AssertionError((Object)"At least one failed future must be present")).getKey();
    }

    @Override
    public CompletableFuture<Void> migrate(ClusterState targetClusterState) {
        try {
            return this.doMigrate(targetClusterState);
        }
        catch (MigrateException e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    private CompletableFuture<Void> doMigrate(ClusterState targetClusterState) {
        if (targetClusterState.formerClusterIds() == null) {
            throw new MigrateException("Migration can only happen using cluster state from a node that saw a cluster reset");
        }
        ClusterState clusterState = this.ensureClusterStateIsPresent();
        if (SystemDisasterRecoveryManagerImpl.isDescendantOrSame(clusterState, targetClusterState)) {
            throw new MigrateException("Migration can only happen from old cluster to new one, not the other way around");
        }
        Collection<InternalClusterNode> nodesInTopology = this.topologyService.allMembers();
        ResetClusterMessage message = this.buildResetClusterMessageForMigrate(targetClusterState);
        Map<String, CompletableFuture<NetworkMessage>> responseFutures = this.sendResetClusterMessageTo(nodesInTopology, message);
        return CompletableFutures.allOf(responseFutures.values()).handleAsync((res, ex) -> {
            SystemDisasterRecoveryManagerImpl.rethrowIfError(ex);
            this.restarter.initiateRestart();
            return null;
        }, this.restartExecutor);
    }

    private static boolean isDescendantOrSame(ClusterState potencialDescendant, ClusterState potentialAncestor) {
        List<UUID> descendantLineage = SystemDisasterRecoveryManagerImpl.extractClusterIdsLineage(potencialDescendant);
        List<UUID> ancestorLineage = SystemDisasterRecoveryManagerImpl.extractClusterIdsLineage(potentialAncestor);
        return descendantLineage.size() >= ancestorLineage.size() && descendantLineage.subList(0, ancestorLineage.size()).equals(ancestorLineage);
    }

    private static List<UUID> extractClusterIdsLineage(ClusterState state) {
        ArrayList<UUID> lineage = new ArrayList<UUID>();
        List<UUID> formerClusterIds = state.formerClusterIds();
        if (formerClusterIds != null) {
            lineage.addAll(formerClusterIds);
        }
        lineage.add(state.clusterTag().clusterId());
        return lineage;
    }

    private ResetClusterMessage buildResetClusterMessageForMigrate(ClusterState clusterState) {
        List<UUID> formerClusterIds = clusterState.formerClusterIds();
        assert (formerClusterIds != null) : "formerClusterIds is null, but it must never be here as it's from a node that saw a CMG reset; current node is " + this.thisNodeName;
        return this.messagesFactory.resetClusterMessage().newCmgNodes(clusterState.cmgNodes()).currentMetaStorageNodes(clusterState.metaStorageNodes()).clusterName(clusterState.clusterTag().clusterName()).clusterId(clusterState.clusterTag().clusterId()).formerClusterIds(formerClusterIds).initialClusterConfiguration(clusterState.initialClusterConfiguration()).build();
    }

    private static class ThreadPerTaskExecutor
    implements Executor {
        private final String threadNamePrefix;

        private ThreadPerTaskExecutor(String threadNamePrefix) {
            this.threadNamePrefix = threadNamePrefix;
        }

        @Override
        public void execute(Runnable command) {
            Thread thread = new Thread(command);
            thread.setName(this.threadNamePrefix + thread.getId());
            thread.setDaemon(true);
            thread.start();
        }
    }
}

