/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.raft.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ExceptionFactory;
import org.apache.ignite.internal.raft.LeaderElectionListener;
import org.apache.ignite.internal.raft.Marshaller;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.ThrottlingContextHolder;
import org.apache.ignite.internal.raft.client.RaftGroupServiceImpl;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.rpc.CliRequests;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.jetbrains.annotations.Nullable;

public class TopologyAwareRaftGroupService
implements RaftGroupService {
    private static final IgniteLogger LOG = Loggers.forClass(TopologyAwareRaftGroupService.class);
    private final RaftMessagesFactory factory;
    private final ClusterService clusterService;
    private final RaftGroupService raftClient;
    private final LogicalTopologyService logicalTopologyService;
    private final ServerEventHandler serverEventHandler;
    private final RaftGroupEventsClientListener eventsClientListener;
    private final ScheduledExecutorService executor;
    private final RaftConfiguration raftConfiguration;
    private final boolean notifyOnSubscription;
    private final Map<Peer, CompletableFuture<?>> subscribersMap = new ConcurrentHashMap();
    private final LogicalTopologyEventListener topologyEventsListener;
    private final Map<UUID, AtomicBoolean> nodeLeftLtDuringSubscriptionMarkers = new ConcurrentHashMap<UUID, AtomicBoolean>();

    private TopologyAwareRaftGroupService(ClusterService cluster, RaftMessagesFactory factory, final ScheduledExecutorService executor, RaftConfiguration raftConfiguration, RaftGroupService raftClient, LogicalTopologyService logicalTopologyService, RaftGroupEventsClientListener eventsClientListener, boolean notifyOnSubscription) {
        this.clusterService = cluster;
        this.factory = factory;
        this.executor = executor;
        this.raftConfiguration = raftConfiguration;
        this.raftClient = raftClient;
        this.logicalTopologyService = logicalTopologyService;
        this.serverEventHandler = new ServerEventHandler();
        this.eventsClientListener = eventsClientListener;
        this.notifyOnSubscription = notifyOnSubscription;
        this.eventsClientListener.addLeaderElectionListener(this.groupId(), (LeaderElectionListener)this.serverEventHandler);
        this.topologyEventsListener = new LogicalTopologyEventListener(){

            public void onNodeJoined(LogicalNode appearedNode, LogicalTopologySnapshot newTopology) {
                Peer peer = new Peer(appearedNode.name(), 0);
                if (TopologyAwareRaftGroupService.this.peers().contains(peer) && TopologyAwareRaftGroupService.this.serverEventHandler.isSubscribed()) {
                    LOG.info("New peer will be sending a leader elected notification [grpId={}, consistentId={}]", new Object[]{TopologyAwareRaftGroupService.this.groupId(), peer.consistentId()});
                    AtomicBoolean leftWhileSubscribing = new AtomicBoolean(false);
                    TopologyAwareRaftGroupService.this.nodeLeftLtDuringSubscriptionMarkers.put(appearedNode.id(), leftWhileSubscribing);
                    ((CompletableFuture)TopologyAwareRaftGroupService.this.subscribeToNode((InternalClusterNode)appearedNode, peer, leftWhileSubscribing).thenComposeAsync(subscribed -> {
                        if (subscribed.booleanValue()) {
                            return TopologyAwareRaftGroupService.this.refreshAndGetLeaderWithTerm().thenAcceptAsync(leaderWithTerm -> {
                                if (!leaderWithTerm.isEmpty() && appearedNode.name().equals(leaderWithTerm.leader().consistentId())) {
                                    TopologyAwareRaftGroupService.this.serverEventHandler.onLeaderElected((InternalClusterNode)appearedNode, leaderWithTerm.term());
                                }
                            }, (Executor)executor);
                        }
                        return CompletableFutures.nullCompletedFuture();
                    }, (Executor)executor)).whenComplete((res, ex) -> TopologyAwareRaftGroupService.this.nodeLeftLtDuringSubscriptionMarkers.remove(appearedNode.id()));
                }
            }

            public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) {
                Peer peerToRemove = new Peer(leftNode.name(), 0);
                TopologyAwareRaftGroupService.this.subscribersMap.remove(peerToRemove);
                AtomicBoolean leftMarker = TopologyAwareRaftGroupService.this.nodeLeftLtDuringSubscriptionMarkers.remove(leftNode.id());
                if (leftMarker != null) {
                    leftMarker.set(true);
                }
            }
        };
        logicalTopologyService.addEventListener(this.topologyEventsListener);
    }

    public static TopologyAwareRaftGroupService start(ReplicationGroupId groupId, ClusterService cluster, RaftMessagesFactory factory, RaftConfiguration raftConfiguration, PeersAndLearners configuration, ScheduledExecutorService executor, LogicalTopologyService logicalTopologyService, RaftGroupEventsClientListener eventsClientListener, boolean notifyOnSubscription, Marshaller cmdMarshaller, ExceptionFactory stoppingExceptionFactory, ThrottlingContextHolder throttlingContextHolder) {
        return new TopologyAwareRaftGroupService(cluster, factory, executor, raftConfiguration, RaftGroupServiceImpl.start((ReplicationGroupId)groupId, (ClusterService)cluster, (RaftMessagesFactory)factory, (RaftConfiguration)raftConfiguration, (PeersAndLearners)configuration, (ScheduledExecutorService)executor, (Marshaller)cmdMarshaller, (ExceptionFactory)stoppingExceptionFactory, (ThrottlingContextHolder)throttlingContextHolder), logicalTopologyService, eventsClientListener, notifyOnSubscription);
    }

    private CompletableFuture<Boolean> sendSubscribeMessage(InternalClusterNode node, CliRequests.SubscriptionLeaderChangeRequest msg, @Nullable AtomicBoolean leftWhileCalling) {
        CompletableFuture<Boolean> msgSendFut = new CompletableFuture<Boolean>();
        this.sendWithRetry(node, msg, msgSendFut, leftWhileCalling);
        return msgSendFut;
    }

    private void sendWithRetry(InternalClusterNode node, CliRequests.SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut, @Nullable AtomicBoolean leftWhileCalling) {
        Long responseTimeout = (Long)this.raftConfiguration.responseTimeoutMillis().value();
        this.clusterService.messagingService().invoke(node, (NetworkMessage)msg, responseTimeout.longValue()).whenCompleteAsync((unused, invokeThrowable) -> {
            if (invokeThrowable == null) {
                msgSendFut.complete(true);
                return;
            }
            Throwable invokeCause = ExceptionUtils.unwrapCause((Throwable)invokeThrowable);
            if (!msg.subscribe()) {
                if (invokeCause instanceof Error) {
                    msgSendFut.completeExceptionally((Throwable)invokeThrowable);
                } else {
                    LOG.debug("An exception while trying to unsubscribe", invokeThrowable);
                    msgSendFut.complete(false);
                }
            } else if (leftWhileCalling != null && leftWhileCalling.get()) {
                LOG.info("Could not subscribe to leader update from a specific node, because the node had left the logical topology: [node={}]", new Object[]{node});
                msgSendFut.complete(false);
            } else if (TopologyAwareRaftGroupService.recoverable(invokeCause)) {
                this.sendWithRetry(node, msg, msgSendFut, leftWhileCalling);
            } else if (invokeCause instanceof RecipientLeftException) {
                LOG.info("Could not subscribe to leader update from a specific node, because the node had left the cluster: [node={}]", new Object[]{node});
                msgSendFut.complete(false);
            } else {
                if (!(invokeCause instanceof NodeStoppingException)) {
                    LOG.error("Could not send the subscribe message to the node: [node={}, msg={}]", invokeThrowable, new Object[]{node, msg});
                }
                msgSendFut.completeExceptionally((Throwable)invokeThrowable);
            }
        }, (Executor)this.executor);
    }

    private static boolean recoverable(Throwable t) {
        return t instanceof TimeoutException || t instanceof IOException;
    }

    public CompletableFuture<Void> subscribeLeader(LeaderElectionListener callback) {
        if (this.serverEventHandler.isSubscribed()) {
            ServerEventHandler wrappedEventHandler = new ServerEventHandler();
            wrappedEventHandler.setOnLeaderElectedCallback(callback);
            this.eventsClientListener.addLeaderElectionListener(this.groupId(), (LeaderElectionListener)wrappedEventHandler);
        } else {
            this.serverEventHandler.setOnLeaderElectedCallback(callback);
        }
        int peers = this.peers().size();
        CompletableFuture[] futs = new CompletableFuture[peers];
        for (int i = 0; i < peers; ++i) {
            Peer peer = this.peers().get(i);
            InternalClusterNode node = this.clusterService.topologyService().getByConsistentId(peer.consistentId());
            futs[i] = node != null ? this.subscribeToNode(node, peer) : CompletableFutures.nullCompletedFuture();
        }
        if (this.notifyOnSubscription) {
            return CompletableFuture.allOf(futs).whenCompleteAsync((unused, throwable) -> {
                if (this.subscribersMap.isEmpty()) {
                    return;
                }
                if (throwable != null) {
                    throw new IgniteException(ErrorGroups.Common.INTERNAL_ERR, throwable);
                }
                this.refreshAndGetLeaderWithTerm().thenAcceptAsync(leaderWithTerm -> {
                    if (!leaderWithTerm.isEmpty()) {
                        InternalClusterNode leaderHost = this.clusterService.topologyService().getByConsistentId(leaderWithTerm.leader().consistentId());
                        if (leaderHost != null) {
                            this.eventsClientListener.onLeaderElected(this.groupId(), leaderHost, leaderWithTerm.term());
                        } else {
                            LOG.warn("Leader host occurred to leave the topology [nodeId = {}].", new Object[]{leaderWithTerm.leader().consistentId()});
                        }
                    }
                }, (Executor)this.executor);
            }, (Executor)this.executor);
        }
        return CompletableFuture.allOf(futs);
    }

    public CompletableFuture<Void> unsubscribeLeader() {
        this.serverEventHandler.setOnLeaderElectedCallback(null);
        this.serverEventHandler.resetLeader();
        return this.sendUnsubscribeLeaderMessageAndClearSubscribersMap();
    }

    public void unsubscribeLeader(LeaderElectionListener callback) {
        this.eventsClientListener.removeLeaderElectionListener(this.groupId(), callback);
    }

    private CompletableFuture<Void> sendUnsubscribeLeaderMessageAndClearSubscribersMap() {
        List<Peer> peers = this.peers();
        ArrayList<CompletableFuture<Boolean>> futs = new ArrayList<CompletableFuture<Boolean>>();
        for (int i = 0; i < peers.size(); ++i) {
            Peer peer = peers.get(i);
            InternalClusterNode node = this.clusterService.topologyService().getByConsistentId(peer.consistentId());
            if (node == null) continue;
            futs.add(this.sendSubscribeMessage(node, this.subscriptionLeaderChangeRequest(false), null));
        }
        this.subscribersMap.clear();
        return CompletableFuture.allOf(futs.toArray(new CompletableFuture[0]));
    }

    public ReplicationGroupId groupId() {
        return this.raftClient.groupId();
    }

    @Nullable
    public Peer leader() {
        Peer leader = this.serverEventHandler.leader();
        return leader == null ? this.raftClient.leader() : leader;
    }

    @Nullable
    public List<Peer> peers() {
        return this.raftClient.peers();
    }

    @Nullable
    public List<Peer> learners() {
        return this.raftClient.learners();
    }

    public CompletableFuture<Void> refreshLeader() {
        return this.raftClient.refreshLeader();
    }

    public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm() {
        return this.raftClient.refreshAndGetLeaderWithTerm();
    }

    public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
        return this.raftClient.refreshMembers(onlyAlive);
    }

    public CompletableFuture<Void> addPeer(Peer peer, long sequenceToken) {
        return this.raftClient.addPeer(peer, sequenceToken);
    }

    public CompletableFuture<Void> removePeer(Peer peer, long sequenceToken) {
        return this.raftClient.removePeer(peer, sequenceToken);
    }

    public CompletableFuture<Void> changePeersAndLearners(PeersAndLearners peersAndLearners, long term, long sequenceToken) {
        return this.raftClient.changePeersAndLearners(peersAndLearners, term, sequenceToken);
    }

    public CompletableFuture<Void> changePeersAndLearnersAsync(PeersAndLearners peersAndLearners, long term, long sequenceToken) {
        return this.raftClient.changePeersAndLearnersAsync(peersAndLearners, term, sequenceToken);
    }

    public CompletableFuture<Void> addLearners(Collection<Peer> learners, long sequenceToken) {
        return this.raftClient.addLearners(learners, sequenceToken);
    }

    public CompletableFuture<Void> removeLearners(Collection<Peer> learners, long sequenceToken) {
        return this.raftClient.removeLearners(learners, sequenceToken);
    }

    public CompletableFuture<Void> resetLearners(Collection<Peer> learners, long sequenceToken) {
        return this.raftClient.resetLearners(learners, sequenceToken);
    }

    public CompletableFuture<Void> snapshot(Peer peer, boolean forced) {
        return this.raftClient.snapshot(peer, forced);
    }

    public CompletableFuture<Void> transferLeadership(Peer newLeader) {
        return this.raftClient.transferLeadership(newLeader);
    }

    public <R> CompletableFuture<R> run(Command cmd) {
        return this.raftClient.run(cmd);
    }

    public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
        return this.raftClient.run(cmd, timeoutMillis);
    }

    public void shutdown() {
        this.logicalTopologyService.removeEventListener(this.topologyEventsListener);
        this.eventsClientListener.removeLeaderElectionListener(this.groupId(), (LeaderElectionListener)this.serverEventHandler);
        this.raftClient.shutdown();
    }

    public CompletableFuture<Long> readIndex() {
        return this.raftClient.readIndex();
    }

    public ClusterService clusterService() {
        return this.raftClient.clusterService();
    }

    public void updateConfiguration(PeersAndLearners configuration) {
        InternalClusterNode node;
        this.raftClient.updateConfiguration(configuration);
        ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>();
        for (Peer peer : this.peers()) {
            if (this.subscribersMap.containsKey(peer) || (node = this.clusterService.topologyService().getByConsistentId(peer.consistentId())) == null) continue;
            futures.add(this.subscribeToNode(node, peer));
        }
        for (Peer peer : this.subscribersMap.keySet()) {
            if (this.peers().contains(peer)) continue;
            node = this.clusterService.topologyService().getByConsistentId(peer.consistentId());
            CompletableFuture<?> fut = this.subscribersMap.remove(peer);
            if (fut == null || node == null) continue;
            futures.add(fut.thenCompose(ignore -> this.sendSubscribeMessage(node, this.subscriptionLeaderChangeRequest(false), null)));
        }
        CompletableFuture.allOf((CompletableFuture[])futures.toArray(CompletableFuture[]::new)).thenAcceptAsync(unused -> {
            if (this.notifyOnSubscription) {
                this.refreshAndGetLeaderWithTerm().thenAcceptAsync(leaderWithTerm -> {
                    InternalClusterNode leaderHost = this.clusterService.topologyService().getByConsistentId(leaderWithTerm.leader().consistentId());
                    if (leaderHost != null) {
                        this.serverEventHandler.onLeaderElected(leaderHost, leaderWithTerm.term());
                    } else {
                        LOG.warn("Leader host occurred to leave the topology [nodeId = {}].", new Object[]{leaderWithTerm.leader().consistentId()});
                    }
                }, (Executor)this.executor);
            }
        }, (Executor)this.executor);
    }

    private CliRequests.SubscriptionLeaderChangeRequest subscriptionLeaderChangeRequest(boolean subscribe) {
        return this.factory.subscriptionLeaderChangeRequest().groupId(this.groupId()).subscribe(subscribe).build();
    }

    private CompletableFuture<Boolean> subscribeToNode(InternalClusterNode node, Peer peer) {
        return this.subscribeToNode(node, peer, null);
    }

    private synchronized CompletableFuture<Boolean> subscribeToNode(InternalClusterNode node, Peer peer, @Nullable AtomicBoolean leftWhileCalling) {
        CompletableFuture<Boolean> fut = this.sendSubscribeMessage(node, this.subscriptionLeaderChangeRequest(true), leftWhileCalling);
        this.subscribersMap.put(peer, fut);
        return fut;
    }

    public void markAsStopping() {
        this.raftClient.markAsStopping();
    }

    private static class ServerEventHandler
    implements LeaderElectionListener {
        private long term = 0L;
        private volatile Peer leaderPeer;
        private LeaderElectionListener onLeaderElectedCallback;

        private ServerEventHandler() {
        }

        public synchronized void onLeaderElected(InternalClusterNode node, long term) {
            if (this.onLeaderElectedCallback != null && term > this.term) {
                this.term = term;
                this.leaderPeer = new Peer(node.name());
                this.onLeaderElectedCallback.onLeaderElected(node, term);
            }
        }

        synchronized void setOnLeaderElectedCallback(LeaderElectionListener onLeaderElectedCallback) {
            this.onLeaderElectedCallback = onLeaderElectedCallback;
        }

        synchronized boolean isSubscribed() {
            return this.onLeaderElectedCallback != null;
        }

        Peer leader() {
            return this.leaderPeer;
        }

        void resetLeader() {
            this.leaderPeer = null;
        }
    }
}

