/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.cdc.core.failover;

import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.gridgain.internal.cdc.api.CdcManager;
import org.gridgain.internal.cdc.api.replication.CdcReplicationInstance;
import org.gridgain.internal.cdc.api.replication.CdcReplicationStatus;

public class CdcFailover {
    private static final IgniteLogger LOG = Loggers.forClass(CdcFailover.class);
    private final LogicalTopologyService logicalTopology;
    private final LogicalTopologyEventListener listener;
    private final Executor failoverExecutor;

    public CdcFailover(String localNodeName, LogicalTopologyService logicalTopology, TopologyService topologyService, CdcManager manager) {
        this.logicalTopology = logicalTopology;
        this.failoverExecutor = Executors.newSingleThreadExecutor(IgniteThreadFactory.create(localNodeName, "cdc-failover", LOG, new ThreadOperation[0]));
        this.listener = CdcFailover.createListener(localNodeName, topologyService, manager, this.failoverExecutor);
    }

    public void start() {
        this.logicalTopology.addEventListener(this.listener);
    }

    public void stop() {
        this.logicalTopology.removeEventListener(this.listener);
    }

    private static LogicalTopologyEventListener createListener(final String localNodeName, final TopologyService topologyService, final CdcManager manager, final Executor failoverExecutor) {
        return new LogicalTopologyEventListener(){

            @Override
            public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
                this.doStart(newTopology);
            }

            @Override
            public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot newTopology) {
                this.doStart(newTopology);
            }

            private void doStart(LogicalTopologySnapshot newTopology) {
                UUID localId = topologyService.localMember().id();
                if (newTopology.nodes().stream().anyMatch(node -> node.id().equals(localId))) {
                    this.doStart();
                }
            }

            private void doStart() {
                failoverExecutor.execute(() -> ((CompletableFuture)manager.listReplicationsByStatus(CdcReplicationStatus.FAILED).thenApplyAsync(allFailedReplications -> {
                    LOG.info("Found {} CDC replications with FAILED state in the cluster.", allFailedReplications.size());
                    this.restoreLocalFailedReplications((Collection<CdcReplicationInstance>)allFailedReplications);
                    return allFailedReplications;
                }, failoverExecutor)).thenAccept(this::restoreNotLocalFailedReplications));
            }

            private void restoreLocalFailedReplications(Collection<CdcReplicationInstance> allFailedReplications) {
                Collection failedReplicationsOnThisNode = allFailedReplications.stream().filter(replication -> localNodeName.equals(replication.runningOnNodeId())).collect(Collectors.toList());
                LOG.info("Found " + failedReplicationsOnThisNode.size() + " CDC replications with FAILED state that were running on this node before, starting these replications.", new Object[0]);
                failedReplicationsOnThisNode.forEach(replication -> manager.startReplication(replication.name()).whenComplete((result, throwable) -> {
                    if (throwable != null) {
                        LOG.warn("Failed to start replication on node start: " + replication.name(), (Throwable)throwable);
                    } else {
                        LOG.info("Successfully started replication on node start: " + replication.name(), new Object[0]);
                    }
                }));
            }

            private void restoreNotLocalFailedReplications(Collection<CdcReplicationInstance> allFailedReplications) {
                Collection failedReplicationsOnOtherNodes = allFailedReplications.stream().filter(replication -> !localNodeName.equals(replication.runningOnNodeId())).collect(Collectors.toList());
                Set<String> remainingNodesSet = CdcFailover.getRemainingLiveNodesSet(topologyService);
                failedReplicationsOnOtherNodes.forEach(replication -> {
                    List<String> failoverCandidates = CdcFailover.extractFailoverCandidatesForReplication(replication, remainingNodesSet);
                    if (failoverCandidates.isEmpty()) {
                        return;
                    }
                    if (failoverCandidates.get(0).equals(localNodeName)) {
                        LOG.info("Performing failover for replication: " + replication.name() + ", updating running node to: " + localNodeName, new Object[0]);
                        ((CompletableFuture)manager.updateReplication(replication.toBuilder().status(CdcReplicationStatus.STOPPED).build()).thenCompose(unused -> manager.startReplication(replication.name()))).whenComplete((result, throwable) -> {
                            if (throwable != null) {
                                LOG.warn("Failed to start replication on failover: " + replication.name(), (Throwable)throwable);
                            } else {
                                LOG.info("Successfully started replication on failover: " + replication.name(), new Object[0]);
                            }
                        });
                        return;
                    }
                    LOG.info("Skipping failover for replication: {}, expected failover node: {}", replication.name(), failoverCandidates.get(0));
                });
            }

            @Override
            public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) {
                failoverExecutor.execute(() -> ((CompletableFuture)manager.listReplicationsByStatus(CdcReplicationStatus.RUNNING).thenAcceptAsync(allRunningReplications -> {
                    LOG.info("Found {} CDC replications running in the cluster.", allRunningReplications.size());
                    Collection runningReplicationsOnLeftNode = allRunningReplications.stream().filter(replication -> leftNode.name().equals(replication.runningOnNodeId())).collect(Collectors.toList());
                    LOG.info("Found {} CDC replications running on the node that has left the topology.", allRunningReplications.size());
                    if (runningReplicationsOnLeftNode.isEmpty()) {
                        return;
                    }
                    if (leftNode.name().equals(localNodeName)) {
                        LOG.info("The node that has left the topology is this node, stopping all local replications.", new Object[0]);
                        runningReplicationsOnLeftNode.forEach(replication -> manager.stopReplication(replication.name()).whenComplete((result, throwable) -> {
                            if (throwable != null) {
                                LOG.warn("Failed to stop replication on node leave: {}", (Throwable)throwable, (Object)replication.name());
                            } else {
                                LOG.info("Successfully stopped replication on node leave: {}", replication.name());
                            }
                        }));
                        return;
                    }
                    Set<String> remainingNodes = CdcFailover.getRemainingLiveNodesSet(topologyService);
                    runningReplicationsOnLeftNode.forEach(replication -> {
                        List<String> failoverCandidates = CdcFailover.extractFailoverCandidatesForReplication(replication, remainingNodes);
                        if (failoverCandidates.isEmpty()) {
                            LOG.info("Updating replication from RUNNING to FAILED: ", replication.name());
                            manager.updateReplication(replication.toBuilder().status(CdcReplicationStatus.FAILED).errorContext("No available nodes the replication restart. Topology has " + remainingNodes.size() + " nodes and none of them is a member of `executionNodes`: " + replication.executionNodes()).build()).whenComplete((result, throwable) -> {
                                if (throwable != null) {
                                    LOG.warn("Failed to update replication status for: ", (Throwable)throwable, (Object)replication.name());
                                } else {
                                    LOG.info("Successfully marked replication as FAILED: {}", replication.name());
                                }
                            });
                            return;
                        }
                        if (failoverCandidates.get(0).equals(localNodeName)) {
                            LOG.info("Performing failover for replication: {}, updating running node to: {}", replication.name(), localNodeName);
                            ((CompletableFuture)manager.updateReplication(replication.toBuilder().status(CdcReplicationStatus.STOPPED).errorContext(null).build()).thenCompose(unused -> manager.startReplication(replication.name()))).whenComplete((result, throwable) -> {
                                if (throwable != null) {
                                    LOG.warn("Failed to start replication on failover: {}", (Throwable)throwable, (Object)replication.name());
                                } else {
                                    LOG.info("Successfully started replication on failover: {}", replication.name());
                                }
                            });
                            return;
                        }
                        LOG.info("Skipping failover for replication: {}, expected failover node: {}", replication.name(), failoverCandidates.get(0));
                    });
                }, failoverExecutor)).exceptionally(th -> {
                    LOG.warn("Got error during CDC Failover", (Throwable)th);
                    return null;
                }));
            }
        };
    }

    private static Set<String> getRemainingLiveNodesSet(TopologyService topologyService) {
        return topologyService.logicalTopologyMembers().stream().map(InternalClusterNode::name).collect(Collectors.toSet());
    }

    private static List<String> extractFailoverCandidatesForReplication(CdcReplicationInstance replication, Set<String> remainingNodesSet) {
        List<String> executionNodes = replication.executionNodes().nodes();
        if (executionNodes == null || executionNodes.isEmpty()) {
            return List.of();
        }
        return executionNodes.stream().filter(remainingNodesSet::contains).sorted().collect(Collectors.toList());
    }
}

