/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.raft.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ExceptionFactory;
import org.apache.ignite.internal.raft.LeaderElectionListener;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Marshaller;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeerUnavailableException;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
import org.apache.ignite.internal.raft.ThrottlingContextHolder;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.rpc.CliRequests;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.jetbrains.annotations.Nullable;

public class PhysicalTopologyAwareRaftGroupService
implements RaftGroupService {
    private static final IgniteLogger LOG = Loggers.forClass(TopologyAwareRaftGroupService.class);
    private static final RaftMessagesFactory MESSAGES_FACTORY = Loza.FACTORY;
    private final ServerEventHandler generalLeaderElectionListener;
    private final FailureManager failureManager;
    private final ClusterService clusterService;
    private final RaftGroupService raftClient;
    private final Executor executor;
    private final RaftConfiguration raftConfiguration;

    private PhysicalTopologyAwareRaftGroupService(FailureManager failureManager, final ClusterService clusterService, final Executor executor, RaftConfiguration raftConfiguration, final RaftGroupService raftClient, RaftGroupEventsClientListener eventsClientListener) {
        this.failureManager = failureManager;
        this.clusterService = clusterService;
        this.executor = executor;
        this.raftConfiguration = raftConfiguration;
        this.raftClient = raftClient;
        this.generalLeaderElectionListener = new ServerEventHandler(executor);
        eventsClientListener.addLeaderElectionListener(raftClient.groupId(), (LeaderElectionListener)this.generalLeaderElectionListener);
        clusterService.topologyService().addEventHandler(new TopologyEventHandler(){

            public void onAppeared(InternalClusterNode member) {
                CompletableFuture<Boolean> fut = PhysicalTopologyAwareRaftGroupService.this.changeNodeSubscriptionIfNeed(member, true);
                PhysicalTopologyAwareRaftGroupService.this.requestLeaderManually(clusterService, executor, raftClient, fut);
            }
        });
        ArrayList<CompletableFuture<Boolean>> futures = new ArrayList<CompletableFuture<Boolean>>();
        for (InternalClusterNode member : clusterService.topologyService().allMembers()) {
            futures.add(this.changeNodeSubscriptionIfNeed(member, true));
        }
        this.requestLeaderManually(clusterService, executor, raftClient, CompletableFutures.allOf(futures));
    }

    private void requestLeaderManually(ClusterService clusterService, Executor executor, RaftGroupService raftClient, CompletableFuture<?> subscriptionsFut) {
        subscriptionsFut.thenRunAsync(() -> raftClient.refreshAndGetLeaderWithTerm().whenCompleteAsync((leaderWithTerm, throwable) -> {
            InternalClusterNode leaderHost;
            if (throwable != null) {
                LOG.warn("Could not refresh and get leader with term [grp={}].", new Object[]{this.groupId(), throwable});
            }
            if ((leaderHost = clusterService.topologyService().getByConsistentId(leaderWithTerm.leader().consistentId())) != null) {
                this.generalLeaderElectionListener.onLeaderElected(leaderHost, leaderWithTerm.term());
            } else {
                LOG.warn("Leader host occurred to leave the topology [nodeId = {}].", new Object[]{leaderWithTerm.leader().consistentId()});
            }
        }, executor), executor);
    }

    private CompletableFuture<Boolean> changeNodeSubscriptionIfNeed(InternalClusterNode member, boolean subscribe) {
        Peer peer = new Peer(member.name());
        if (this.peers().contains(peer)) {
            CliRequests.SubscriptionLeaderChangeRequest msg = this.subscriptionLeaderChangeRequest(subscribe);
            return this.sendMessage(member, msg).whenComplete((isSent, err) -> {
                if (err != null) {
                    this.failureManager.process(new FailureContext(err, "Could not change subscription to leader updates [grp=" + this.groupId() + "]."));
                }
                LOG.info("Subscription status changed for the peer [grp={}, consistentId={}, subscribe={}, isSent={}].", new Object[]{this.groupId(), member.name(), subscribe, isSent});
            });
        }
        return CompletableFutures.booleanCompletedFuture((boolean)false);
    }

    public static PhysicalTopologyAwareRaftGroupService start(ReplicationGroupId groupId, ClusterService cluster, RaftMessagesFactory factory, RaftConfiguration raftConfiguration, PeersAndLearners configuration, ScheduledExecutorService executor, RaftGroupEventsClientListener eventsClientListener, Marshaller cmdMarshaller, ExceptionFactory stoppingExceptionFactory, ThrottlingContextHolder throttlingContextHolder, FailureManager failureManager) {
        return new PhysicalTopologyAwareRaftGroupService(failureManager, cluster, executor, raftConfiguration, RaftGroupServiceImpl.start((ReplicationGroupId)groupId, (ClusterService)cluster, (RaftMessagesFactory)factory, (RaftConfiguration)raftConfiguration, (PeersAndLearners)configuration, (ScheduledExecutorService)executor, (Marshaller)cmdMarshaller, (ExceptionFactory)stoppingExceptionFactory, (ThrottlingContextHolder)throttlingContextHolder), eventsClientListener);
    }

    private void finishSubscriptions() {
        for (InternalClusterNode member : this.clusterService.topologyService().allMembers()) {
            this.changeNodeSubscriptionIfNeed(member, false);
        }
    }

    public void subscribeLeader(LeaderElectionListener callback) {
        this.generalLeaderElectionListener.addCallbackAndNotify(callback);
    }

    public void unsubscribeLeader(LeaderElectionListener callback) {
        this.generalLeaderElectionListener.removeCallbackAndNotify(callback);
    }

    private CliRequests.SubscriptionLeaderChangeRequest subscriptionLeaderChangeRequest(boolean subscribe) {
        return MESSAGES_FACTORY.subscriptionLeaderChangeRequest().groupId(this.groupId()).subscribe(subscribe).build();
    }

    private CompletableFuture<Boolean> sendMessage(InternalClusterNode node, CliRequests.SubscriptionLeaderChangeRequest msg) {
        CompletableFuture<Boolean> msgSendFut = new CompletableFuture<Boolean>();
        this.sendWithRetry(node, msg, msgSendFut);
        return msgSendFut;
    }

    private void sendWithRetry(InternalClusterNode node, CliRequests.SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
        Long responseTimeout = (Long)this.raftConfiguration.responseTimeoutMillis().value();
        this.clusterService.messagingService().invoke(node, (NetworkMessage)msg, responseTimeout.longValue()).whenCompleteAsync((unused, invokeThrowable) -> {
            if (invokeThrowable == null) {
                msgSendFut.complete(true);
                return;
            }
            Throwable invokeCause = ExceptionUtils.unwrapCause((Throwable)invokeThrowable);
            if (!msg.subscribe()) {
                if (invokeCause instanceof Error) {
                    msgSendFut.completeExceptionally((Throwable)invokeThrowable);
                } else {
                    LOG.debug("An exception while trying to unsubscribe.", invokeThrowable);
                    msgSendFut.complete(false);
                }
            } else if (PhysicalTopologyAwareRaftGroupService.recoverable(invokeCause)) {
                this.sendWithRetry(node, msg, msgSendFut);
            } else if (invokeCause instanceof RecipientLeftException) {
                LOG.info("Could not subscribe to leader update from a specific node, because the node had left the cluster: [node={}].", new Object[]{node});
                msgSendFut.complete(false);
            } else if (invokeCause instanceof NodeStoppingException) {
                msgSendFut.complete(false);
            } else {
                LOG.error("Could not send the subscribe message to the node: [node={}, msg={}].", invokeThrowable, new Object[]{node, msg});
                msgSendFut.completeExceptionally((Throwable)invokeThrowable);
            }
        }, this.executor);
    }

    private static boolean recoverable(Throwable t) {
        return (t = ExceptionUtils.unwrapCause((Throwable)t)) instanceof TimeoutException || t instanceof IOException || t instanceof PeerUnavailableException || t instanceof RecipientLeftException;
    }

    public ReplicationGroupId groupId() {
        return this.raftClient.groupId();
    }

    @Nullable
    public Peer leader() {
        return this.raftClient.leader();
    }

    public List<Peer> peers() {
        return this.raftClient.peers();
    }

    @Nullable
    public List<Peer> learners() {
        return this.raftClient.learners();
    }

    public CompletableFuture<Void> refreshLeader() {
        return this.raftClient.refreshLeader();
    }

    public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm() {
        return this.raftClient.refreshAndGetLeaderWithTerm();
    }

    public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
        return this.raftClient.refreshMembers(onlyAlive);
    }

    public CompletableFuture<Void> addPeer(Peer peer, long sequenceToken) {
        return this.raftClient.addPeer(peer, sequenceToken);
    }

    public CompletableFuture<Void> removePeer(Peer peer, long sequenceToken) {
        return this.raftClient.removePeer(peer, sequenceToken);
    }

    public CompletableFuture<Void> changePeersAndLearners(PeersAndLearners peersAndLearners, long term, long sequenceToken) {
        return this.raftClient.changePeersAndLearners(peersAndLearners, term, sequenceToken);
    }

    public CompletableFuture<Void> changePeersAndLearnersAsync(PeersAndLearners peersAndLearners, long term, long sequenceToken) {
        return this.raftClient.changePeersAndLearnersAsync(peersAndLearners, term, sequenceToken);
    }

    public CompletableFuture<Void> addLearners(Collection<Peer> learners, long sequenceToken) {
        return this.raftClient.addLearners(learners, sequenceToken);
    }

    public CompletableFuture<Void> removeLearners(Collection<Peer> learners, long sequenceToken) {
        return this.raftClient.removeLearners(learners, sequenceToken);
    }

    public CompletableFuture<Void> resetLearners(Collection<Peer> learners, long sequenceToken) {
        return this.raftClient.resetLearners(learners, sequenceToken);
    }

    public CompletableFuture<Void> snapshot(Peer peer, boolean forced) {
        return this.raftClient.snapshot(peer, forced);
    }

    public CompletableFuture<Void> transferLeadership(Peer newLeader) {
        return this.raftClient.transferLeadership(newLeader);
    }

    public void shutdown() {
        this.finishSubscriptions();
        this.raftClient.shutdown();
    }

    public CompletableFuture<Long> readIndex() {
        return this.raftClient.readIndex();
    }

    public ClusterService clusterService() {
        return this.raftClient.clusterService();
    }

    public void updateConfiguration(PeersAndLearners configuration) {
        this.raftClient.updateConfiguration(configuration);
    }

    public <R> CompletableFuture<R> run(Command cmd) {
        return this.raftClient.run(cmd);
    }

    public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
        return this.raftClient.run(cmd, timeoutMillis);
    }

    public void markAsStopping() {
        this.raftClient.markAsStopping();
    }

    private static class ServerEventHandler
    implements LeaderElectionListener {
        private final Executor executor;
        CompletableFuture<Void> fut = CompletableFutures.nullCompletedFuture();
        private final ArrayList<LeaderElectionListener> callbacks = new ArrayList();
        private InternalClusterNode leaderNode = null;
        private long leaderTerm = -1L;

        ServerEventHandler(Executor executor) {
            this.executor = executor;
        }

        public synchronized void onLeaderElected(InternalClusterNode node, long term) {
            if (term > this.leaderTerm) {
                this.leaderTerm = term;
                this.leaderNode = node;
                if (this.callbacks.isEmpty()) {
                    return;
                }
                ArrayList<LeaderElectionListener> listeners = new ArrayList<LeaderElectionListener>(this.callbacks);
                this.fut = this.fut.isDone() ? CompletableFuture.runAsync(() -> {
                    for (LeaderElectionListener listener : listeners) {
                        listener.onLeaderElected(node, term);
                    }
                }, this.executor) : this.fut.thenRunAsync(() -> {
                    for (LeaderElectionListener listener : listeners) {
                        listener.onLeaderElected(node, term);
                    }
                }, this.executor);
            }
        }

        synchronized void addCallbackAndNotify(LeaderElectionListener callback) {
            this.callbacks.add(callback);
            if (this.leaderTerm != -1L) {
                long finalLeaderTerm = this.leaderTerm;
                InternalClusterNode finalLeaderNode = this.leaderNode;
                this.fut = this.fut.isDone() ? CompletableFuture.runAsync(() -> callback.onLeaderElected(finalLeaderNode, finalLeaderTerm), this.executor) : this.fut.thenRunAsync(() -> callback.onLeaderElected(finalLeaderNode, finalLeaderTerm), this.executor);
            }
        }

        synchronized void removeCallbackAndNotify(LeaderElectionListener callback) {
            this.callbacks.remove(callback);
        }
    }
}

