/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.dcr;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.gridgain.internal.dcr.ReplicationStatus;
import org.gridgain.internal.dcr.metastorage.DcrStorage;
import org.gridgain.internal.dcr.metastorage.ReplicationEntry;

public class DcrFailover {
    private static final IgniteLogger LOG = Loggers.forClass(DcrFailover.class);
    private final LogicalTopologyService logicalTopology;
    private final LogicalTopologyEventListener listener;

    public DcrFailover(String nodeName, LogicalTopologyService logicalTopology, TopologyService topologyService, DcrStorage store) {
        this.logicalTopology = logicalTopology;
        ExecutorService failoverExecutor = Executors.newSingleThreadExecutor(IgniteThreadFactory.create(nodeName, "dcr-failover", LOG, new ThreadOperation[0]));
        this.listener = DcrFailover.createListener(nodeName, topologyService, store, failoverExecutor);
    }

    public void start() {
        this.logicalTopology.addEventListener(this.listener);
    }

    public void stop() {
        this.logicalTopology.removeEventListener(this.listener);
    }

    private static LogicalTopologyEventListener createListener(final String nodeName, final TopologyService topologyService, final DcrStorage store, final ExecutorService failoverExecutor) {
        return new LogicalTopologyEventListener(){

            @Override
            public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
                this.start(newTopology);
            }

            @Override
            public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot newTopology) {
                this.start(newTopology);
            }

            private void start(LogicalTopologySnapshot newTopology) {
                UUID localId = topologyService.localMember().id();
                if (newTopology.nodes().stream().anyMatch(node -> node.id().equals(localId))) {
                    this.doStart();
                }
            }

            private void doStart() {
                store.findByNode(nodeName).thenAcceptAsync(replicationMap -> {
                    for (Map.Entry entry : replicationMap.entrySet()) {
                        String key = (String)entry.getKey();
                        ReplicationEntry replicationEntry = (ReplicationEntry)entry.getValue();
                        if (replicationEntry.replicationStatus() != ReplicationStatus.WORKER_NODE_OUT) continue;
                        LOG.debug("Replication {} started on node {}", key, nodeName);
                        store.update(key, replEntry -> replEntry.toBuilder().status(ReplicationStatus.REPLICATING).build(), false);
                    }
                }, (Executor)failoverExecutor);
            }

            @Override
            public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) {
                store.findByNode(leftNode.name()).thenAcceptAsync(replicationMap -> {
                    for (Map.Entry entry : replicationMap.entrySet()) {
                        String key = (String)entry.getKey();
                        ReplicationEntry replicationEntry = (ReplicationEntry)entry.getValue();
                        if (newTopology.nodes().stream().noneMatch(node -> replicationEntry.replicationNodes().contains(node.name()))) {
                            List newNames = newTopology.nodes().stream().map(InternalClusterNode::name).collect(Collectors.toList());
                            LOG.debug("New topology doesn't intersect with replication nodes. Replication {} is in WORKER_NODE_OUT state." + System.lineSeparator() + "New topology nodes {}" + System.lineSeparator() + "Replication nodes {}", key, newNames, replicationEntry.replicationNodes());
                            store.update(key, replEntry -> replEntry.toBuilder().status(ReplicationStatus.WORKER_NODE_OUT).build(), false);
                            continue;
                        }
                        if (!replicationEntry.replicationNodes().contains(nodeName)) continue;
                        LOG.debug("Change worker node from {} to {} for replication {}", replicationEntry.workerNode(), nodeName, key);
                        store.update(key, replEntry -> replEntry.toBuilder().workerNode(nodeName).build(), false);
                    }
                }, (Executor)failoverExecutor);
            }
        };
    }
}

