/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.raft.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ExceptionFactory;
import org.apache.ignite.internal.raft.GroupOverloadedException;
import org.apache.ignite.internal.raft.Marshaller;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeerUnavailableException;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.StoppingExceptionFactories;
import org.apache.ignite.internal.raft.ThrottlingContextHolder;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.client.RetryContext;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.rebalance.RaftStaleUpdateException;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.rpc.ActionRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
import org.apache.ignite.raft.jraft.rpc.impl.SMCompactedThrowable;
import org.apache.ignite.raft.jraft.rpc.impl.SMFullThrowable;
import org.apache.ignite.raft.jraft.rpc.impl.SMThrowable;
import org.apache.ignite.raft.jraft.util.Utils;
import org.jetbrains.annotations.Nullable;

public class RaftGroupServiceImpl
implements RaftGroupService {
    private static final IgniteLogger LOG = Loggers.forClass(RaftGroupServiceImpl.class);
    private final String groupId;
    private final ReplicationGroupId realGroupId;
    private final RaftMessagesFactory factory;
    private final RaftConfiguration configuration;
    @Nullable
    private volatile Peer leader;
    private volatile List<Peer> peers;
    private volatile List<Peer> learners;
    private final ClusterService cluster;
    private final ScheduledExecutorService executor;
    private final Marshaller commandsMarshaller;
    private final ExceptionFactory stoppingExceptionFactory;
    private final AtomicBoolean peersAreUnavailable = new AtomicBoolean();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private static final Supplier<String> NO_DESCRIPTION = () -> null;
    private final ThrottlingContextHolder throttlingContextHolder;
    private final TopologyEventHandler topologyEventHandler;
    private volatile boolean stopping;

    private RaftGroupServiceImpl(ReplicationGroupId groupId, ClusterService cluster, RaftMessagesFactory factory, RaftConfiguration configuration, PeersAndLearners membersConfiguration, @Nullable Peer leader, ScheduledExecutorService executor, Marshaller commandsMarshaller, ExceptionFactory stoppingExceptionFactory, ThrottlingContextHolder throttlingContextHolder) {
        this.cluster = cluster;
        this.configuration = configuration;
        this.peers = List.copyOf(membersConfiguration.peers());
        this.learners = List.copyOf(membersConfiguration.learners());
        this.factory = factory;
        this.groupId = groupId.toString();
        this.realGroupId = groupId;
        this.leader = leader;
        this.executor = executor;
        this.commandsMarshaller = commandsMarshaller;
        this.stoppingExceptionFactory = stoppingExceptionFactory;
        this.throttlingContextHolder = throttlingContextHolder;
        this.topologyEventHandler = this.topologyEventHandler();
        this.stopping = false;
    }

    public static RaftGroupService start(ReplicationGroupId groupId, ClusterService cluster, RaftMessagesFactory factory, RaftConfiguration configuration, PeersAndLearners membersConfiguration, ScheduledExecutorService executor, Marshaller commandsMarshaller, ThrottlingContextHolder throttlingContextHolder) {
        return RaftGroupServiceImpl.start(groupId, cluster, factory, configuration, membersConfiguration, executor, commandsMarshaller, StoppingExceptionFactories.indicateComponentStop(), throttlingContextHolder);
    }

    public static RaftGroupService start(ReplicationGroupId groupId, ClusterService cluster, RaftMessagesFactory factory, RaftConfiguration configuration, PeersAndLearners membersConfiguration, ScheduledExecutorService executor, Marshaller commandsMarshaller, ExceptionFactory stoppingExceptionFactory, ThrottlingContextHolder throttlingContextHolder) {
        boolean inBenchmark = IgniteSystemProperties.getBoolean((String)"IGNITE_SKIP_REPLICATION_IN_BENCHMARK");
        RaftGroupServiceImpl service = inBenchmark ? new RaftGroupServiceImpl(groupId, cluster, factory, configuration, membersConfiguration, null, executor, commandsMarshaller, stoppingExceptionFactory, throttlingContextHolder){

            @Override
            public <R> CompletableFuture<R> run(Command cmd) {
                return cmd.getClass().getSimpleName().contains("UpdateCommand") ? CompletableFutures.nullCompletedFuture() : super.run(cmd);
            }
        } : new RaftGroupServiceImpl(groupId, cluster, factory, configuration, membersConfiguration, null, executor, commandsMarshaller, stoppingExceptionFactory, throttlingContextHolder);
        return service;
    }

    public ReplicationGroupId groupId() {
        return this.realGroupId;
    }

    public Peer leader() {
        return this.leader;
    }

    public List<Peer> peers() {
        return this.peers;
    }

    public List<Peer> learners() {
        return this.learners;
    }

    public CompletableFuture<Void> refreshLeader() {
        return this.refreshLeader(NO_DESCRIPTION);
    }

    private CompletableFuture<Void> refreshLeader(Supplier<String> originDescription) {
        return this.refreshLeader(this.defaultTimeout(), originDescription);
    }

    private CompletableFuture<Void> refreshLeader(long timeout, Supplier<String> originDescription) {
        Function<Peer, CliRequests.GetLeaderRequest> requestFactory = targetPeer -> this.factory.getLeaderRequest().peerId(RaftGroupServiceImpl.peerId(targetPeer)).groupId(this.groupId).build();
        return this.sendWithRetry(this.randomNode(), timeout, originDescription, requestFactory, false).thenAccept(resp -> {
            this.leader = RaftGroupServiceImpl.parsePeer(resp.leaderId());
        });
    }

    public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm() {
        Function<Peer, CliRequests.GetLeaderRequest> requestFactory = targetPeer -> this.factory.getLeaderRequest().peerId(RaftGroupServiceImpl.peerId(targetPeer)).groupId(this.groupId).build();
        return this.sendWithRetry(this.randomNode(), requestFactory, false).thenApply(resp -> {
            Peer respLeader;
            if (resp.leaderId() == null) {
                return LeaderWithTerm.NO_LEADER;
            }
            this.leader = respLeader = RaftGroupServiceImpl.parsePeer(resp.leaderId());
            return new LeaderWithTerm(respLeader, resp.currentTerm());
        });
    }

    public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
        Peer leader = this.leader;
        if (leader == null) {
            return this.refreshLeader(() -> "refreshMembers").thenCompose(res -> this.refreshMembers(onlyAlive));
        }
        Function<Peer, CliRequests.GetPeersRequest> requestFactory = targetPeer -> this.factory.getPeersRequest().leaderId(RaftGroupServiceImpl.peerId(targetPeer)).onlyAlive(onlyAlive).groupId(this.groupId).build();
        return this.sendWithRetry(leader, requestFactory, false).thenAccept(resp -> {
            this.peers = RaftGroupServiceImpl.parsePeerList(resp.peersList());
            this.learners = RaftGroupServiceImpl.parsePeerList(resp.learnersList());
        });
    }

    public CompletableFuture<Void> addPeer(Peer peer, long sequenceToken) {
        Peer leader = this.leader;
        if (leader == null) {
            return this.refreshLeader(() -> "addPeer").thenCompose(res -> this.addPeer(peer, sequenceToken));
        }
        Function<Peer, CliRequests.AddPeerRequest> requestFactory = targetPeer -> this.factory.addPeerRequest().leaderId(RaftGroupServiceImpl.peerId(targetPeer)).groupId(this.groupId).peerId(RaftGroupServiceImpl.peerId(peer)).sequenceToken(sequenceToken).build();
        return this.sendWithRetry(leader, requestFactory, true).thenAccept(resp -> {
            this.peers = RaftGroupServiceImpl.parsePeerList(resp.newPeersList());
        });
    }

    public CompletableFuture<Void> removePeer(Peer peer, long sequenceToken) {
        Peer leader = this.leader;
        if (leader == null) {
            return this.refreshLeader(() -> "removePeer").thenCompose(res -> this.removePeer(peer, sequenceToken));
        }
        Function<Peer, CliRequests.RemovePeerRequest> requestFactory = targetPeer -> this.factory.removePeerRequest().leaderId(RaftGroupServiceImpl.peerId(targetPeer)).groupId(this.groupId).peerId(RaftGroupServiceImpl.peerId(peer)).sequenceToken(sequenceToken).build();
        return this.sendWithRetry(leader, requestFactory, false).thenAccept(resp -> {
            this.peers = RaftGroupServiceImpl.parsePeerList(resp.newPeersList());
        });
    }

    public CompletableFuture<Void> changePeersAndLearners(PeersAndLearners peersAndLearners, long term, long sequenceToken) {
        Peer leader = this.leader;
        if (leader == null) {
            return this.refreshLeader(() -> "changePeersAndLearners").thenCompose(res -> this.changePeersAndLearners(peersAndLearners, term, sequenceToken));
        }
        Function<Peer, CliRequests.ChangePeersAndLearnersRequest> requestFactory = targetPeer -> this.factory.changePeersAndLearnersRequest().leaderId(RaftGroupServiceImpl.peerId(targetPeer)).groupId(this.groupId).newPeersList(RaftGroupServiceImpl.peerIds(peersAndLearners.peers())).newLearnersList(RaftGroupServiceImpl.peerIds(peersAndLearners.learners())).term(term).sequenceToken(sequenceToken).build();
        LOG.info("Sending changePeersAndLearners request for group={} to peers={} and learners={} with leader term={}", new Object[]{this.groupId, peersAndLearners.peers(), peersAndLearners.learners(), term});
        return this.sendWithRetry(leader, requestFactory, false).thenAccept(resp -> {
            this.peers = RaftGroupServiceImpl.parsePeerList(resp.newPeersList());
            this.learners = RaftGroupServiceImpl.parsePeerList(resp.newLearnersList());
        });
    }

    public CompletableFuture<Void> changePeersAndLearnersAsync(PeersAndLearners peersAndLearners, long term, long sequenceToken) {
        Peer leader = this.leader;
        if (leader == null) {
            return this.refreshLeader(() -> "changePeersAndLearnersAsync").thenCompose(res -> this.changePeersAndLearnersAsync(peersAndLearners, term, sequenceToken));
        }
        Function<Peer, CliRequests.ChangePeersAndLearnersAsyncRequest> requestFactory = targetPeer -> this.factory.changePeersAndLearnersAsyncRequest().leaderId(RaftGroupServiceImpl.peerId(targetPeer)).groupId(this.groupId).term(term).newPeersList(RaftGroupServiceImpl.peerIds(peersAndLearners.peers())).newLearnersList(RaftGroupServiceImpl.peerIds(peersAndLearners.learners())).sequenceToken(sequenceToken).build();
        LOG.info("Sending changePeersAndLearnersAsync request for group={} to peers={} and learners={} with leader term={}", new Object[]{this.groupId, peersAndLearners.peers(), peersAndLearners.learners(), term});
        return this.sendWithRetry(leader, requestFactory, false).thenAccept(resp -> {
            assert (!(resp instanceof RpcRequests.ErrorResponse));
        });
    }

    public CompletableFuture<Void> addLearners(Collection<Peer> learners, long sequenceToken) {
        Peer leader = this.leader;
        if (leader == null) {
            return this.refreshLeader(() -> "addLearners").thenCompose(res -> this.addLearners(learners, sequenceToken));
        }
        Function<Peer, CliRequests.AddLearnersRequest> requestFactory = targetPeer -> this.factory.addLearnersRequest().leaderId(RaftGroupServiceImpl.peerId(targetPeer)).groupId(this.groupId).learnersList(RaftGroupServiceImpl.peerIds(learners)).sequenceToken(sequenceToken).build();
        return this.sendWithRetry(leader, requestFactory, false).thenAccept(resp -> {
            this.learners = RaftGroupServiceImpl.parsePeerList(resp.newLearnersList());
        });
    }

    public CompletableFuture<Void> removeLearners(Collection<Peer> learners, long sequenceToken) {
        Peer leader = this.leader;
        if (leader == null) {
            return this.refreshLeader(() -> "removeLearners").thenCompose(res -> this.removeLearners(learners, sequenceToken));
        }
        Function<Peer, CliRequests.RemoveLearnersRequest> requestFactory = targetPeer -> this.factory.removeLearnersRequest().leaderId(RaftGroupServiceImpl.peerId(targetPeer)).groupId(this.groupId).learnersList(RaftGroupServiceImpl.peerIds(learners)).sequenceToken(sequenceToken).build();
        return this.sendWithRetry(leader, requestFactory, false).thenAccept(resp -> {
            this.learners = RaftGroupServiceImpl.parsePeerList(resp.newLearnersList());
        });
    }

    public CompletableFuture<Void> resetLearners(Collection<Peer> learners, long sequenceToken) {
        Peer leader = this.leader;
        if (leader == null) {
            return this.refreshLeader(() -> "resetLearners").thenCompose(res -> this.resetLearners(learners, sequenceToken));
        }
        Function<Peer, CliRequests.ResetLearnersRequest> requestFactory = targetPeer -> this.factory.resetLearnersRequest().leaderId(RaftGroupServiceImpl.peerId(targetPeer)).groupId(this.groupId).learnersList(RaftGroupServiceImpl.peerIds(learners)).sequenceToken(sequenceToken).build();
        return this.sendWithRetry(leader, requestFactory, false).thenAccept(resp -> {
            this.learners = RaftGroupServiceImpl.parsePeerList(resp.newLearnersList());
        });
    }

    public CompletableFuture<Void> snapshot(Peer peer, boolean forced) {
        Function<Peer, NetworkMessage> requestFactory = peer1 -> this.factory.snapshotRequest().peerId(RaftGroupServiceImpl.peerId(peer)).groupId(this.groupId).forced(forced).build();
        return this.sendWithRetry(peer, -1L, Long.MAX_VALUE, NO_DESCRIPTION, requestFactory, false).thenAccept(resp -> {});
    }

    public CompletableFuture<Void> transferLeadership(Peer newLeader) {
        Peer leader = this.leader;
        if (leader == null) {
            return this.refreshLeader(() -> "transferLeadership").thenCompose(res -> this.transferLeadership(newLeader));
        }
        Function<Peer, CliRequests.TransferLeaderRequest> requestFactory = targetPeer -> this.factory.transferLeaderRequest().groupId(this.groupId).leaderId(RaftGroupServiceImpl.peerId(targetPeer)).peerId(RaftGroupServiceImpl.peerId(newLeader)).build();
        return this.sendWithRetry(leader, requestFactory, false).thenRun(() -> {
            this.leader = newLeader;
        });
    }

    public <R> CompletableFuture<R> run(Command cmd) {
        return this.run(cmd, this.defaultTimeout());
    }

    public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
        Peer leader = this.leader;
        if (leader == null) {
            return this.refreshLeader(timeoutMillis, () -> ((Command)cmd).toStringForLightLogging()).thenCompose(res -> this.run(cmd));
        }
        Function<Peer, ActionRequest> requestFactory = cmd instanceof WriteCommand ? targetPeer -> this.factory.writeActionRequest().groupId(this.groupId).command(this.commandsMarshaller.marshall((Object)cmd)).deserializedCommand((WriteCommand)cmd).build() : targetPeer -> this.factory.readActionRequest().groupId(this.groupId).command((ReadCommand)cmd).readOnlySafe(true).build();
        return this.sendWithRetry(leader, timeoutMillis, NO_DESCRIPTION, requestFactory, true).thenApply(resp -> resp.result());
    }

    public void shutdown() {
        if (!this.stopped.compareAndSet(false, true)) {
            return;
        }
        this.busyLock.block();
        this.clusterService().topologyService().removeEventHandler(this.topologyEventHandler);
    }

    public CompletableFuture<Long> readIndex() {
        Function<Peer, NetworkMessage> requestFactory = p -> this.factory.readIndexRequest().groupId(this.groupId).peerId(p.consistentId()).serverId(p.consistentId()).build();
        Peer leader = this.leader();
        Peer node = leader == null ? this.randomNode() : leader;
        return this.sendWithRetry(node, requestFactory, false).thenApply(RpcRequests.ReadIndexResponse::index);
    }

    public ClusterService clusterService() {
        return this.cluster;
    }

    public void updateConfiguration(PeersAndLearners configuration) {
        this.peers = List.copyOf(configuration.peers());
        this.learners = List.copyOf(configuration.learners());
        this.leader = null;
    }

    private long defaultTimeout() {
        return (Long)this.configuration.retryTimeoutMillis().value();
    }

    private <R extends NetworkMessage> CompletableFuture<R> sendWithRetry(Peer peer, Function<Peer, ? extends NetworkMessage> requestFactory, boolean throttleOnOverload) {
        return this.sendWithRetry(peer, this.defaultTimeout(), NO_DESCRIPTION, requestFactory, throttleOnOverload);
    }

    private <R extends NetworkMessage> CompletableFuture<R> sendWithRetry(Peer peer, long timeoutMillis, Supplier<String> originDescription, Function<Peer, ? extends NetworkMessage> requestFactory, boolean throttleOnOverload) {
        return this.sendWithRetry(peer, timeoutMillis, -1L, originDescription, requestFactory, throttleOnOverload);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <R extends NetworkMessage> CompletableFuture<R> sendWithRetry(Peer peer, long sendWithRetryTimeoutMillis, long singleRequestTimeoutMillis, Supplier<String> originDescription, Function<Peer, ? extends NetworkMessage> requestFactory, boolean throttleOnOverload) {
        CompletableFuture future = new CompletableFuture();
        if (!this.busyLock.enterBusy()) {
            future.completeExceptionally(this.stoppingExceptionFactory.create("Raft client is stopping [" + this.groupId + "]."));
            return future;
        }
        try {
            ThrottlingContextHolder peerThrottlingContextHolder = this.throttlingContextHolder.peerContextHolder(peer.consistentId());
            if (throttleOnOverload && peerThrottlingContextHolder.isOverloaded()) {
                this.executor.schedule(() -> future.completeExceptionally((Throwable)new GroupOverloadedException(this.groupId, peer)), 100L, TimeUnit.MILLISECONDS);
                CompletableFuture completableFuture = future;
                return completableFuture;
            }
            RetryContext context = new RetryContext(this.groupId, peer, originDescription, requestFactory, sendWithRetryTimeoutMillis, singleRequestTimeoutMillis);
            this.sendWithRetry(future, context, peerThrottlingContextHolder);
            CompletableFuture completableFuture = future;
            return completableFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private <R extends NetworkMessage> void sendWithRetry(CompletableFuture<R> fut, RetryContext retryContext) {
        ThrottlingContextHolder peerThrottlingContextHolder = this.throttlingContextHolder.peerContextHolder(retryContext.targetPeer().consistentId());
        this.sendWithRetry(fut, retryContext, peerThrottlingContextHolder);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <R extends NetworkMessage> void sendWithRetry(CompletableFuture<R> fut, RetryContext retryContext, ThrottlingContextHolder peerThrottlingContextHolder) {
        if (!this.busyLock.enterBusy()) {
            fut.completeExceptionally(this.stoppingExceptionFactory.create("Raft client is stopping [" + this.groupId + "]."));
            return;
        }
        try {
            long requestStartTime = Utils.monotonicMs();
            if (requestStartTime >= retryContext.stopTime()) {
                if (this.stopping) {
                    fut.completeExceptionally((Throwable)new NodeStoppingException());
                    return;
                }
                fut.completeExceptionally(retryContext.createTimeoutException());
                return;
            }
            peerThrottlingContextHolder.beforeRequest();
            long responseTimeout = retryContext.responseTimeoutMillis() == -1L ? peerThrottlingContextHolder.peerRequestTimeoutMillis() : retryContext.responseTimeoutMillis();
            ((CompletableFuture)this.resolvePeer(retryContext.targetPeer()).thenCompose(node -> {
                if (this.stopping) {
                    throw new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)new NodeStoppingException());
                }
                return this.cluster.messagingService().invoke(node, retryContext.request(), responseTimeout);
            })).whenComplete((resp, err) -> {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("sendWithRetry req={} resp={} from={} to={} err={}", new Object[]{retryContext.request(), resp, this.cluster.topologyService().localMember().address(), retryContext.targetPeer().consistentId(), err == null ? null : err.getMessage()});
                }
                peerThrottlingContextHolder.afterRequest(requestStartTime, RaftGroupServiceImpl.retriableError(err, resp));
                if (!this.busyLock.enterBusy()) {
                    fut.completeExceptionally(this.stoppingExceptionFactory.create("Raft client is stopping [" + this.groupId + "]."));
                    return;
                }
                try {
                    if (err != null) {
                        this.handleThrowable(fut, (Throwable)err, retryContext);
                    } else if (resp instanceof RpcRequests.ErrorResponse) {
                        this.handleErrorResponse(fut, (RpcRequests.ErrorResponse)resp, retryContext);
                    } else if (resp instanceof RpcRequests.SMErrorResponse) {
                        RaftGroupServiceImpl.handleSmErrorResponse(fut, (RpcRequests.SMErrorResponse)resp, retryContext);
                    } else {
                        this.leader = retryContext.targetPeer();
                        fut.complete(resp);
                    }
                }
                catch (Throwable e) {
                    fut.completeExceptionally(e);
                }
                finally {
                    this.busyLock.leaveBusy();
                }
            });
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private void handleThrowable(CompletableFuture<? extends NetworkMessage> fut, Throwable err, RetryContext retryContext) {
        if (!RaftGroupServiceImpl.recoverable(err = ExceptionUtils.unwrapCause((Throwable)err))) {
            fut.completeExceptionally(err);
            return;
        }
        Peer randomPeer = this.randomNode(retryContext);
        if (LOG.isDebugEnabled()) {
            String msg = err instanceof TimeoutException ? "Recoverable TimeoutException during the request occurred (will be retried on a randomly selected node) [request={}, peer={}, newPeer={}, traceId={}]." : "Recoverable error during the request occurred (will be retried on a randomly selected node) [request={}, peer={}, newPeer={}, traceId={}].";
            LOG.debug(msg, new Object[]{IgniteToStringBuilder.includeSensitive() ? retryContext.request() : retryContext.request().toStringForLightLogging(), retryContext.targetPeer(), randomPeer, retryContext.errorTraceId()});
        }
        String shortReasonMessage = "Peer " + RaftGroupServiceImpl.shortPeerString(retryContext.targetPeer()) + " threw " + err.getClass().getSimpleName();
        this.scheduleRetry(fut, retryContext.nextAttempt(randomPeer, shortReasonMessage));
    }

    private void handleErrorResponse(CompletableFuture<? extends NetworkMessage> fut, RpcRequests.ErrorResponse resp, RetryContext retryContext) {
        RaftError error = RaftError.forNumber(resp.errorCode());
        switch (error) {
            case SUCCESS: {
                this.leader = retryContext.targetPeer();
                fut.complete(null);
                break;
            }
            case EBUSY: 
            case EAGAIN: {
                this.scheduleRetry(fut, retryContext.nextAttempt(retryContext.targetPeer(), RaftGroupServiceImpl.getShortReasonMessage(retryContext, error, resp)));
                break;
            }
            case UNKNOWN: 
            case EINTERNAL: 
            case ENOENT: {
                NetworkMessage request = retryContext.request();
                Peer newTargetPeer = request instanceof CliRequests.GetLeaderRequest || request instanceof CliRequests.ChangePeersAndLearnersAsyncRequest ? this.randomNode(retryContext) : retryContext.targetPeer();
                this.scheduleRetry(fut, retryContext.nextAttempt(newTargetPeer, RaftGroupServiceImpl.getShortReasonMessage(retryContext, error, resp)));
                break;
            }
            case EHOSTDOWN: 
            case ESHUTDOWN: 
            case ENODESHUTDOWN: 
            case ESTOP: {
                Peer newTargetPeer = this.randomNode(retryContext);
                this.scheduleRetry(fut, retryContext.nextAttemptForUnavailablePeer(newTargetPeer, RaftGroupServiceImpl.getShortReasonMessage(retryContext, error, resp)));
                break;
            }
            case EPERM: {
                Peer newTargetPeer;
                if (resp.leaderId() == null) {
                    newTargetPeer = this.randomNode(retryContext);
                } else {
                    newTargetPeer = RaftGroupServiceImpl.parsePeer(resp.leaderId());
                    assert (newTargetPeer != null);
                    this.leader = newTargetPeer;
                }
                this.scheduleRetry(fut, retryContext.nextAttempt(newTargetPeer, RaftGroupServiceImpl.getShortReasonMessage(retryContext, error, resp)));
                break;
            }
            case ESTALE: {
                fut.completeExceptionally((Throwable)new RaftStaleUpdateException(resp.errorMsg()));
                break;
            }
            default: {
                fut.completeExceptionally((Throwable)((Object)new RaftException(error, resp.errorMsg())));
            }
        }
    }

    private static String shortPeerString(Peer peer) {
        return peer.consistentId() + ":" + peer.idx();
    }

    private static String getShortReasonMessage(RetryContext retryContext, RaftError error, RpcRequests.ErrorResponse resp) {
        return IgniteStringFormatter.format((String)"Peer {} returned code {}: {}", (Object[])new Object[]{RaftGroupServiceImpl.shortPeerString(retryContext.targetPeer()), error, resp.errorMsg()});
    }

    private static void handleSmErrorResponse(CompletableFuture<? extends NetworkMessage> fut, RpcRequests.SMErrorResponse resp, RetryContext retryContext) {
        SMThrowable th = resp.error();
        if (th instanceof SMCompactedThrowable) {
            SMCompactedThrowable compactedThrowable = (SMCompactedThrowable)th;
            try {
                Throwable restoredTh = (Throwable)Class.forName(compactedThrowable.throwableClassName()).getConstructor(String.class).newInstance(compactedThrowable.throwableMessage());
                fut.completeExceptionally(restoredTh);
            }
            catch (Exception e) {
                LOG.warn("Cannot restore throwable from user's state machine. Check if throwable " + compactedThrowable.throwableClassName() + " is present in the classpath.", new Object[0]);
                fut.completeExceptionally((Throwable)new IgniteInternalException(retryContext.errorTraceId(), ErrorGroups.Common.INTERNAL_ERR, compactedThrowable.throwableMessage()));
            }
        } else if (th instanceof SMFullThrowable) {
            fut.completeExceptionally(((SMFullThrowable)th).throwable());
        } else assert (false) : th;
    }

    @Nullable
    private static Boolean retriableError(@Nullable Throwable e, NetworkMessage raftResponse) {
        Throwable cause;
        int errorCode = raftResponse instanceof RpcRequests.ErrorResponse ? ((RpcRequests.ErrorResponse)raftResponse).errorCode() : 0;
        RaftError raftError = RaftError.forNumber(errorCode);
        if (raftError == RaftError.SUCCESS && e == null) {
            return null;
        }
        Throwable throwable = cause = e == null ? null : ExceptionUtils.unwrapCause((Throwable)e);
        if (cause instanceof TimeoutException) {
            return true;
        }
        return raftError == RaftError.EBUSY || raftError == RaftError.EAGAIN;
    }

    private void scheduleRetry(CompletableFuture<? extends NetworkMessage> fut, RetryContext retryContext) {
        this.executor.schedule(() -> {
            retryContext.onNewAttempt();
            this.sendWithRetry(fut, retryContext);
        }, (long)((Long)this.configuration.retryDelayMillis().value()), TimeUnit.MILLISECONDS);
    }

    private static boolean recoverable(Throwable t) {
        return (t = ExceptionUtils.unwrapCause((Throwable)t)) instanceof TimeoutException || t instanceof IOException || t instanceof PeerUnavailableException || t instanceof RecipientLeftException;
    }

    private Peer randomNode() {
        return this.randomNode(null);
    }

    private Peer randomNode(@Nullable RetryContext retryContext) {
        List<Peer> localPeers = this.peers;
        ArrayList<Peer> availablePeers = new ArrayList<Peer>(localPeers.size());
        if (retryContext == null) {
            availablePeers.addAll(localPeers);
        } else {
            for (Peer peer2 : localPeers) {
                if (retryContext.targetPeer().equals((Object)peer2) || retryContext.unavailablePeers().contains(peer2)) continue;
                availablePeers.add(peer2);
            }
            if (availablePeers.isEmpty()) {
                if (!this.peersAreUnavailable.getAndSet(true)) {
                    LOG.warn("All peers are unavailable, going to keep retrying until timeout [peers = {}, group = {}, trace ID: {}, request {}, origin command {}, instance={}].", new Object[]{localPeers, this.groupId, retryContext.errorTraceId(), retryContext.request().toStringForLightLogging(), retryContext.originCommandDescription(), this});
                }
                retryContext.resetUnavailablePeers();
                availablePeers.addAll(this.peers);
            } else {
                this.peersAreUnavailable.set(false);
            }
        }
        if (availablePeers.isEmpty()) {
            throw new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, "No peers available [groupId=" + this.groupId + "]");
        }
        Collections.shuffle(availablePeers, ThreadLocalRandom.current());
        return availablePeers.stream().filter(peer -> this.cluster.topologyService().getByConsistentId(peer.consistentId()) != null).findAny().orElse((Peer)availablePeers.get(0));
    }

    @Nullable
    private static Peer parsePeer(@Nullable String peerId) {
        PeerId id = PeerId.parsePeer(peerId);
        return id == null ? null : new Peer(id.getConsistentId(), id.getIdx());
    }

    private static List<Peer> parsePeerList(@Nullable Collection<String> peers) {
        if (peers == null) {
            return List.of();
        }
        ArrayList<Peer> res = new ArrayList<Peer>(peers.size());
        for (String peer : peers) {
            res.add(RaftGroupServiceImpl.parsePeer(peer));
        }
        return res;
    }

    private static String peerId(Peer peer) {
        return PeerId.fromPeer(peer).toString();
    }

    private static List<String> peerIds(Collection<Peer> peers) {
        return peers.stream().map(RaftGroupServiceImpl::peerId).collect(Collectors.toList());
    }

    private CompletableFuture<InternalClusterNode> resolvePeer(Peer peer) {
        InternalClusterNode node = this.cluster.topologyService().getByConsistentId(peer.consistentId());
        if (node == null) {
            return CompletableFuture.failedFuture(new PeerUnavailableException(peer.consistentId()));
        }
        return CompletableFuture.completedFuture(node);
    }

    private TopologyEventHandler topologyEventHandler() {
        return new TopologyEventHandler(){

            public void onDisappeared(InternalClusterNode member) {
                if (!RaftGroupServiceImpl.this.busyLock.enterBusy()) {
                    return;
                }
                try {
                    RaftGroupServiceImpl.this.executor.schedule(() -> RaftGroupServiceImpl.this.throttlingContextHolder.onNodeLeft(member.name()), (Long)RaftGroupServiceImpl.this.configuration.retryTimeoutMillis().value() * 3L, TimeUnit.MILLISECONDS);
                }
                finally {
                    RaftGroupServiceImpl.this.busyLock.leaveBusy();
                }
            }
        };
    }

    public void markAsStopping() {
        this.stopping = true;
    }
}

