/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.raft.jraft.core;

import com.codahale.metrics.Metric;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metrics.sources.RaftMetricSource;
import org.apache.ignite.internal.raft.JraftGroupEventsListener;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.SafeTimeAwareCommandClosure;
import org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorage;
import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.JRaftServiceFactory;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.ReadOnlyService;
import org.apache.ignite.raft.jraft.ReplicatorGroup;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.closure.CatchUpClosure;
import org.apache.ignite.raft.jraft.closure.ClosureQueue;
import org.apache.ignite.raft.jraft.closure.ClosureQueueImpl;
import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
import org.apache.ignite.raft.jraft.core.BallotBox;
import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
import org.apache.ignite.raft.jraft.core.NodeMetrics;
import org.apache.ignite.raft.jraft.core.NotLeaderException;
import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
import org.apache.ignite.raft.jraft.core.Replicator;
import org.apache.ignite.raft.jraft.core.ReplicatorGroupImpl;
import org.apache.ignite.raft.jraft.core.ReplicatorType;
import org.apache.ignite.raft.jraft.core.State;
import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType;
import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.Ballot;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.LeaderChangeContext;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.RaftOutter;
import org.apache.ignite.raft.jraft.entity.Task;
import org.apache.ignite.raft.jraft.entity.UserLog;
import org.apache.ignite.raft.jraft.error.LogIndexOutOfBoundsException;
import org.apache.ignite.raft.jraft.error.LogNotFoundException;
import org.apache.ignite.raft.jraft.error.OverloadException;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.error.RaftException;
import org.apache.ignite.raft.jraft.option.BallotBoxOptions;
import org.apache.ignite.raft.jraft.option.BootstrapOptions;
import org.apache.ignite.raft.jraft.option.FSMCallerOptions;
import org.apache.ignite.raft.jraft.option.LogManagerOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftMetaStorageOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.option.ReadOnlyOption;
import org.apache.ignite.raft.jraft.option.ReadOnlyServiceOptions;
import org.apache.ignite.raft.jraft.option.ReplicatorGroupOptions;
import org.apache.ignite.raft.jraft.option.SnapshotExecutorOptions;
import org.apache.ignite.raft.jraft.rpc.AppendEntriesResponseBuilder;
import org.apache.ignite.raft.jraft.rpc.CliRequests;
import org.apache.ignite.raft.jraft.rpc.GetLeaderResponseBuilder;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftClientService;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
import org.apache.ignite.raft.jraft.rpc.RaftServerService;
import org.apache.ignite.raft.jraft.rpc.ReadIndexResponseBuilder;
import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.RpcResponseClosure;
import org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter;
import org.apache.ignite.raft.jraft.rpc.impl.core.DefaultRaftClientService;
import org.apache.ignite.raft.jraft.storage.LogManager;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.apache.ignite.raft.jraft.storage.RaftMetaStorage;
import org.apache.ignite.raft.jraft.storage.SnapshotExecutor;
import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
import org.apache.ignite.raft.jraft.util.Describer;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.RepeatedTimer;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.StringUtils;
import org.apache.ignite.raft.jraft.util.SystemPropertyUtil;
import org.apache.ignite.raft.jraft.util.ThreadId;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.LongHeldDetectingReadWriteLock;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class NodeImpl
implements Node,
RaftServerService {
    private static final IgniteLogger LOG = Loggers.forClass(NodeImpl.class);
    public static final Status LEADER_STEPPED_DOWN = new Status(RaftError.EPERM, "Leader stepped down.", new Object[0]);
    private volatile HybridClock clock;
    private final ReadWriteLock readWriteLock = new NodeReadWriteLock(this);
    protected final Lock writeLock = this.readWriteLock.writeLock();
    protected final Lock readLock = this.readWriteLock.readLock();
    private volatile State state;
    private volatile CountDownLatch shutdownLatch;
    private long currTerm;
    private volatile long lastLeaderTimestamp;
    private PeerId leaderId = new PeerId();
    private PeerId votedId;
    private final Ballot voteCtx = new Ballot();
    private final Ballot prevVoteCtx = new Ballot();
    private ConfigurationEntry conf;
    private StopTransferArg stopTransferArg;
    private boolean electionAdjusted;
    private long electionRound;
    private int initialElectionTimeout;
    private final String groupId;
    private NodeOptions options;
    private RaftOptions raftOptions;
    private final PeerId serverId;
    private final ConfigurationCtx confCtx;
    private LogStorage logStorage;
    private RaftMetaStorage metaStorage;
    private ClosureQueue closureQueue;
    private ConfigurationManager configManager;
    private LogManager logManager;
    private FSMCaller fsmCaller;
    private BallotBox ballotBox;
    private SnapshotExecutor snapshotExecutor;
    private ReplicatorGroup replicatorGroup;
    private RaftClientService rpcClientService;
    private ReadOnlyService readOnlyService;
    private RepeatedTimer electionTimer;
    private RepeatedTimer voteTimer;
    private RepeatedTimer stepDownTimer;
    private RepeatedTimer snapshotTimer;
    private ScheduledFuture<?> transferTimer;
    private ThreadId wakingCandidate;
    private StripedDisruptor<LogEntryAndClosure> applyDisruptor;
    private RingBuffer<LogEntryAndClosure> applyQueue;
    private NodeMetrics metrics;
    private NodeId nodeId;
    private JRaftServiceFactory serviceFactory;
    private final CopyOnWriteArrayList<Replicator.ReplicatorStateListener> replicatorStateListeners = new CopyOnWriteArrayList();
    private volatile int targetPriority;
    private volatile int electionTimeoutCounter;

    public NodeImpl(String groupId, PeerId serverId) {
        if (groupId != null) {
            Utils.verifyGroupId(groupId);
        }
        this.groupId = groupId;
        this.serverId = serverId != null ? serverId.copy() : null;
        this.state = State.STATE_UNINITIALIZED;
        this.currTerm = 0L;
        this.updateLastLeaderTimestamp(Utils.monotonicMs());
        this.confCtx = new ConfigurationCtx(this);
        this.wakingCandidate = null;
    }

    public HybridClock clock() {
        return this.clock;
    }

    private boolean initSnapshotStorage() {
        if (StringUtils.isEmpty(this.options.getSnapshotUri())) {
            LOG.warn("Do not set snapshot uri, ignore initSnapshotStorage [node={}].", new Object[]{this.getNodeId()});
            return true;
        }
        this.snapshotExecutor = new SnapshotExecutorImpl();
        SnapshotExecutorOptions opts = new SnapshotExecutorOptions();
        opts.setUri(this.options.getSnapshotUri());
        opts.setFsmCaller(this.fsmCaller);
        opts.setNode(this);
        opts.setLogManager(this.logManager);
        opts.setPeerId(this.serverId);
        opts.setInitTerm(this.currTerm);
        opts.setFilterBeforeCopyRemote(this.options.isFilterBeforeCopyRemote());
        opts.setSnapshotThrottle(this.options.getSnapshotThrottle());
        return this.snapshotExecutor.init(opts);
    }

    private boolean initLogStorage() {
        Requires.requireNonNull(this.fsmCaller, "Null fsm caller");
        this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions);
        this.logManager = new StripeAwareLogManager();
        LogManagerOptions opts = new LogManagerOptions();
        opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory());
        opts.setLogStorage(this.logStorage);
        opts.setConfigurationManager(this.configManager);
        opts.setNode(this);
        opts.setFsmCaller(this.fsmCaller);
        opts.setNodeMetrics(this.metrics);
        opts.setRaftOptions(this.raftOptions);
        opts.setLogManagerDisruptor(this.options.getLogManagerDisruptor());
        opts.setLogStripes(this.options.getLogStripes());
        return this.logManager.init(opts);
    }

    private boolean initMetaStorage() {
        this.metaStorage = this.serviceFactory.createRaftMetaStorage(this.options.getRaftMetaUri(), this.raftOptions);
        RaftMetaStorageOptions opts = new RaftMetaStorageOptions();
        opts.setNode(this);
        if (!this.metaStorage.init(opts)) {
            LOG.error("Node {} init meta storage failed, uri={}.", new Object[]{this.getNodeId(), this.options.getRaftMetaUri()});
            return false;
        }
        this.currTerm = this.metaStorage.getTerm();
        this.votedId = this.metaStorage.getVotedFor().copy();
        return true;
    }

    private void handleSnapshotTimeout() {
        this.writeLock.lock();
        try {
            if (!this.state.isActive()) {
                return;
            }
        }
        finally {
            this.writeLock.unlock();
        }
        Utils.runInThread(this.getOptions().getCommonExecutor(), () -> this.doSnapshot(null, false));
    }

    private void handleElectionTimeout() {
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            if (this.state != State.STATE_FOLLOWER) {
                return;
            }
            if (this.isCurrentLeaderValid()) {
                return;
            }
            this.resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.", this.leaderId));
            if (!this.allowLaunchElection()) {
                return;
            }
            doUnlock = false;
            this.adjustElectionTimeout();
            this.preVote();
        }
        finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
    }

    private void adjustElectionTimeout() {
        long timeout;
        ++this.electionRound;
        if (this.electionRound > 1L) {
            LOG.info("Unsuccessful election round number {}, group '{}'", new Object[]{this.electionRound, this.groupId});
        }
        if (!this.electionAdjusted) {
            this.initialElectionTimeout = this.options.getElectionTimeoutMs();
        }
        if ((timeout = (long)this.options.getElectionTimeoutStrategy().nextTimeout(this.options.getElectionTimeoutMs(), this.electionRound)) != (long)this.options.getElectionTimeoutMs()) {
            this.resetElectionTimeoutMs((int)timeout);
            LOG.info("Election timeout was adjusted according to [node={}, electionTimeoutStrategy={}].", new Object[]{this.getNodeId(), this.options.getElectionTimeoutStrategy()});
            this.electionAdjusted = true;
        }
    }

    private void resetElectionTimeoutToInitial() {
        this.electionRound = 0L;
        if (this.electionAdjusted) {
            LOG.info("Election timeout was reset to initial value [node={}].", new Object[]{this.getNodeId()});
            this.resetElectionTimeoutMs(this.initialElectionTimeout);
            this.electionAdjusted = false;
        }
    }

    private boolean allowLaunchElection() {
        if (this.serverId.isPriorityNotElected()) {
            LOG.warn("Node {} will never participate in election, because it's priority={}.", new Object[]{this.getNodeId(), this.serverId.getPriority()});
            return false;
        }
        if (this.serverId.isPriorityDisabled()) {
            return true;
        }
        if (this.serverId.getPriority() < this.targetPriority) {
            ++this.electionTimeoutCounter;
            if (this.electionTimeoutCounter > 1) {
                this.decayTargetPriority();
                this.electionTimeoutCounter = 0;
            }
            if (this.electionTimeoutCounter == 1) {
                LOG.debug("Node {} does not initiate leader election and waits for the next election timeout.", new Object[]{this.getNodeId()});
                return false;
            }
        }
        return this.serverId.getPriority() >= this.targetPriority;
    }

    private void decayTargetPriority() {
        int decayPriorityGap = Math.max(this.options.getDecayPriorityGap(), 10);
        int gap = Math.max(decayPriorityGap, this.targetPriority / 5);
        int prevTargetPriority = this.targetPriority;
        this.targetPriority = Math.max(1, this.targetPriority - gap);
        LOG.info("Node {} priority decay, from: {}, to: {}.", new Object[]{this.getNodeId(), prevTargetPriority, this.targetPriority});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkAndSetConfiguration(boolean inLock) {
        if (!inLock) {
            this.writeLock.lock();
        }
        try {
            ConfigurationEntry prevConf = this.conf;
            this.conf = this.logManager.checkAndSetConfiguration(prevConf);
            this.refreshLeadershipAbstaining();
            if (this.conf != prevConf) {
                int prevTargetPriority = this.targetPriority;
                this.targetPriority = this.getMaxPriorityOfNodes(this.conf.getConf().getPeers());
                if (prevTargetPriority != this.targetPriority) {
                    LOG.info("Node {} target priority value has changed from: {}, to: {}.", new Object[]{this.getNodeId(), prevTargetPriority, this.targetPriority});
                }
                this.electionTimeoutCounter = 0;
            }
        }
        finally {
            if (!inLock) {
                this.writeLock.unlock();
            }
        }
    }

    private int getMaxPriorityOfNodes(List<PeerId> peerIds) {
        Requires.requireNonNull(peerIds, "Null peer list");
        int maxPriority = Integer.MIN_VALUE;
        for (PeerId peerId : peerIds) {
            int priorityVal = peerId.getPriority();
            maxPriority = Math.max(priorityVal, maxPriority);
        }
        return maxPriority;
    }

    private boolean initFSMCaller(LogId bootstrapId) {
        if (this.fsmCaller == null) {
            LOG.error("Fail to init fsm caller, null instance, [node={}, bootstrapId={}].", new Object[]{this.getNodeId(), bootstrapId});
            return false;
        }
        this.closureQueue = new ClosureQueueImpl(this.getOptions());
        FSMCallerOptions opts = new FSMCallerOptions();
        opts.setAfterShutdown(status -> this.afterShutdown());
        opts.setLogManager(this.logManager);
        opts.setFsm(this.options.getFsm());
        opts.setClosureQueue(this.closureQueue);
        opts.setNode(this);
        opts.setBootstrapId(bootstrapId);
        opts.setRaftMessagesFactory(this.raftOptions.getRaftMessagesFactory());
        opts.setfSMCallerExecutorDisruptor(this.options.getfSMCallerExecutorDisruptor());
        return this.fsmCaller.init(opts);
    }

    public boolean bootstrap(BootstrapOptions opts) throws InterruptedException {
        if (opts.getLastLogIndex() > 0L && (opts.getGroupConf().isEmpty() || opts.getFsm() == null)) {
            LOG.error("Invalid arguments for bootstrap [node={}, groupConf={}, fsm={}, lastLogIndex={}].", new Object[]{this.getNodeId(), opts.getGroupConf(), opts.getFsm(), opts.getLastLogIndex()});
            return false;
        }
        if (opts.getGroupConf().isEmpty()) {
            LOG.error("Bootstrapping an empty node makes no sense [node={}].", new Object[]{this.getNodeId()});
            return false;
        }
        Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory");
        this.serviceFactory = opts.getServiceFactory();
        long bootstrapLogTerm = opts.getLastLogIndex() > 0L ? 1L : 0L;
        LogId bootstrapId = new LogId(opts.getLastLogIndex(), bootstrapLogTerm);
        this.options = opts.getNodeOptions() == null ? new NodeOptions() : opts.getNodeOptions();
        this.clock = this.options.getClock() == null ? new HybridClockImpl() : this.options.getClock();
        this.raftOptions = this.options.getRaftOptions();
        this.metrics = new NodeMetrics(opts.isEnableMetrics());
        this.options.setFsm(opts.getFsm());
        this.options.setLogUri(opts.getLogUri());
        this.options.setRaftMetaUri(opts.getRaftMetaUri());
        this.options.setSnapshotUri(opts.getSnapshotUri());
        this.configManager = new ConfigurationManager();
        this.fsmCaller = new FSMCallerImpl();
        this.initPools(this.options);
        if (!this.initLogStorage()) {
            LOG.error("Fail to init log storage [node={}].", new Object[]{this.getNodeId()});
            return false;
        }
        if (!this.initMetaStorage()) {
            LOG.error("Fail to init meta storage [node={}].", new Object[]{this.getNodeId()});
            return false;
        }
        if (this.currTerm == 0L) {
            this.currTerm = 1L;
            if (!this.metaStorage.setTermAndVotedFor(1L, new PeerId())) {
                LOG.error("Fail to set term [node={}].", new Object[]{this.getNodeId()});
                return false;
            }
        }
        if (opts.getFsm() != null && !this.initFSMCaller(bootstrapId)) {
            LOG.error("Fail to init fsm caller [node={}].", new Object[]{this.getNodeId()});
            return false;
        }
        LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION);
        entry.getId().setTerm(this.currTerm);
        entry.setPeers(opts.getGroupConf().listPeers());
        entry.setLearners(opts.getGroupConf().listLearners());
        ArrayList<LogEntry> entries = new ArrayList<LogEntry>();
        entries.add(entry);
        BootstrapStableClosure bootstrapDone = new BootstrapStableClosure();
        this.logManager.appendEntries(entries, bootstrapDone);
        if (!bootstrapDone.await().isOk()) {
            LOG.error("Fail to append configuration [node={}].", new Object[]{this.getNodeId()});
            return false;
        }
        if (opts.getLastLogIndex() > 0L) {
            if (!this.initSnapshotStorage()) {
                LOG.error("Fail to init snapshot storage [node={}].", new Object[]{this.getNodeId()});
                return false;
            }
            SynchronizedClosure snapshotDone = new SynchronizedClosure();
            this.snapshotExecutor.doSnapshot(snapshotDone, false);
            if (!snapshotDone.await().isOk()) {
                LOG.error("Fail to save snapshot [node={}, status={}].", new Object[]{this.getNodeId(), snapshotDone.getStatus()});
                return false;
            }
        }
        if (this.logManager.getFirstLogIndex() != opts.getLastLogIndex() + 1L) {
            throw new IllegalStateException("First and last log index mismatch");
        }
        if (opts.getLastLogIndex() > 0L ? this.logManager.getLastLogIndex() != opts.getLastLogIndex() : this.logManager.getLastLogIndex() != opts.getLastLogIndex() + 1L) {
            throw new IllegalStateException("Last log index mismatch");
        }
        return true;
    }

    private int heartbeatTimeout(int electionTimeout) {
        return Math.max(electionTimeout / this.raftOptions.getElectionHeartbeatFactor(), 10);
    }

    private int randomTimeout(int timeoutMs) {
        return ThreadLocalRandom.current().nextInt(timeoutMs, timeoutMs + this.raftOptions.getMaxElectionDelayMs());
    }

    @Override
    public boolean init(NodeOptions opts) {
        Requires.requireNonNull(opts, "Null node options");
        Requires.requireNonNull(opts.getRaftOptions(), "Null raft options");
        Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory");
        Requires.requireNonNull(opts.getCommandsMarshaller(), "Null commands marshaller");
        this.serviceFactory = opts.getServiceFactory();
        this.options = opts;
        this.clock = this.options.getClock() == null ? new HybridClockImpl() : this.options.getClock();
        this.raftOptions = opts.getRaftOptions();
        this.metrics = new NodeMetrics(opts.isEnableMetrics());
        this.serverId.setPriority(opts.getElectionPriority());
        this.electionTimeoutCounter = 0;
        if (opts.getReplicationStateListeners() != null) {
            this.replicatorStateListeners.addAll(opts.getReplicationStateListeners());
        }
        if (this.serverId.isEmpty()) {
            LOG.error("Server ID is empty.", new Object[0]);
            return false;
        }
        this.initTimers(opts);
        this.initPools(opts);
        this.configManager = new ConfigurationManager();
        this.applyDisruptor = opts.getNodeApplyDisruptor();
        this.applyQueue = this.applyDisruptor.subscribe(this.getNodeId(), new LogEntryAndClosureHandler());
        if (this.metrics.getMetricRegistry() != null) {
            this.metrics.getMetricRegistry().register("jraft-node-impl-disruptor", (Metric)new DisruptorMetricSet(this.applyQueue));
        }
        this.fsmCaller = new FSMCallerImpl();
        if (!this.initLogStorage()) {
            LOG.error("Node {} initLogStorage failed.", new Object[]{this.getNodeId()});
            return false;
        }
        if (!this.initMetaStorage()) {
            LOG.error("Node {} initMetaStorage failed.", new Object[]{this.getNodeId()});
            return false;
        }
        if (!this.initFSMCaller(new LogId(0L, 0L))) {
            LOG.error("Node {} initFSMCaller failed.", new Object[]{this.getNodeId()});
            return false;
        }
        if (!this.initSnapshotStorage()) {
            LOG.error("Node {} initSnapshotStorage failed.", new Object[]{this.getNodeId()});
            return false;
        }
        Status st = this.logManager.checkConsistency();
        if (!st.isOk()) {
            LOG.error("Node {} is initialized with inconsistent log, status={}.", new Object[]{this.getNodeId(), st});
            return false;
        }
        this.conf = new ConfigurationEntry();
        this.conf.setId(new LogId());
        if (this.logManager.getLastLogIndex() > 0L) {
            this.checkAndSetConfiguration(false);
        } else {
            this.conf.setConf(this.options.getInitialConf());
            this.targetPriority = this.getMaxPriorityOfNodes(this.conf.getConf().getPeers());
        }
        if (!this.initBallotBox()) {
            LOG.error("Node {} init ballotBox failed.", new Object[]{this.getNodeId()});
            return false;
        }
        if (!this.conf.isEmpty()) {
            Requires.requireTrue(this.conf.isValid(), "Invalid conf: %s", this.conf);
        } else {
            LOG.info("Init node with empty conf [node={}].", new Object[]{this.getNodeId()});
        }
        this.replicatorGroup = new ReplicatorGroupImpl();
        this.rpcClientService = new DefaultRaftClientService();
        ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions();
        rgOpts.setHeartbeatTimeoutMs(this.heartbeatTimeout(this.options.getElectionTimeoutMs()));
        rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs());
        rgOpts.setLogManager(this.logManager);
        rgOpts.setBallotBox(this.ballotBox);
        rgOpts.setNode(this);
        rgOpts.setRaftRpcClientService(this.rpcClientService);
        rgOpts.setSnapshotStorage(this.snapshotExecutor != null ? this.snapshotExecutor.getSnapshotStorage() : null);
        rgOpts.setRaftOptions(this.raftOptions);
        rgOpts.setTimerManager(this.options.getScheduler());
        this.options.setMetricRegistry(this.metrics.getMetricRegistry());
        if (!this.rpcClientService.init(this.options)) {
            LOG.error("Fail to init rpc service [node={}].", new Object[]{this.getNodeId()});
            return false;
        }
        this.replicatorGroup.init(new NodeId(this.groupId, this.serverId), rgOpts);
        this.readOnlyService = new ReadOnlyServiceImpl();
        ReadOnlyServiceOptions rosOpts = new ReadOnlyServiceOptions();
        rosOpts.setFsmCaller(this.fsmCaller);
        rosOpts.setNode(this);
        rosOpts.setRaftOptions(this.raftOptions);
        rosOpts.setReadOnlyServiceDisruptor(opts.getReadOnlyServiceDisruptor());
        if (!this.readOnlyService.init(rosOpts)) {
            LOG.error("Fail to init readOnlyService [node={}].", new Object[]{this.getNodeId()});
            return false;
        }
        this.state = State.STATE_FOLLOWER;
        if (LOG.isInfoEnabled()) {
            LOG.info("Node {} init, term={}, lastLogId={}, conf={}, oldConf={}.", new Object[]{this.getNodeId(), this.currTerm, this.logManager.getLastLogId(false), this.conf.getConf(), this.conf.getOldConf()});
        }
        if (this.snapshotExecutor != null && this.options.getSnapshotIntervalSecs() > 0) {
            LOG.debug("Node {} start snapshot timer, term={}.", new Object[]{this.getNodeId(), this.currTerm});
            this.snapshotTimer.start();
        }
        if (!this.conf.isEmpty()) {
            this.stepDown(this.currTerm, false, new Status());
        }
        this.writeLock.lock();
        if (this.conf.isStable() && this.conf.getConf().size() == 1 && this.conf.getConf().contains(this.serverId)) {
            this.electSelf();
        } else {
            this.writeLock.unlock();
        }
        return true;
    }

    private void refreshLeadershipAbstaining() {
        Long externallyEnforcedConfigIndex = this.options.getExternallyEnforcedConfigIndex();
        if (externallyEnforcedConfigIndex == null) {
            return;
        }
        if (this.conf.getId().getIndex() >= externallyEnforcedConfigIndex) {
            return;
        }
        LOG.info("Will abstain from becoming a leader until a configuration with target index gets applied [node={}, externallyEnforcedConfigIndex={}].", new Object[]{this.getNodeId(), externallyEnforcedConfigIndex});
        Configuration newConf = this.pseudoConfigToAbstainFromBecomingLeader();
        LOG.info("Node {} set config from {} to {} to abstain from becoming a leader.", new Object[]{this.getNodeId(), this.conf.getConf(), newConf});
        this.conf.setConf(newConf);
        this.conf.getOldConf().reset();
    }

    private Configuration pseudoConfigToAbstainFromBecomingLeader() {
        List<PeerId> peersWithoutThisNode = List.of(new PeerId("not-me-" + this.serverId.getConsistentId()));
        List<PeerId> learnersWithThisNode = List.of(this.serverId);
        return new Configuration(peersWithoutThisNode, learnersWithThisNode);
    }

    private boolean initBallotBox() {
        this.ballotBox = new BallotBox();
        BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
        ballotBoxOpts.setWaiter(this.fsmCaller);
        ballotBoxOpts.setClosureQueue(this.closureQueue);
        long lastCommittedIndex = 0L;
        if (this.snapshotExecutor != null) {
            lastCommittedIndex = this.snapshotExecutor.getLastSnapshotIndex();
        }
        if (this.getQuorum() == 1) {
            lastCommittedIndex = Math.max(lastCommittedIndex, this.logManager.getLastLogIndex());
        }
        ballotBoxOpts.setLastCommittedIndex(lastCommittedIndex);
        LOG.info("Node {} init ballot box's lastCommittedIndex={}.", new Object[]{this.getNodeId(), lastCommittedIndex});
        return this.ballotBox.init(ballotBoxOpts);
    }

    private boolean validateOption(NodeOptions opts, String name) {
        if (opts.isSharedPools()) {
            throw new IllegalArgumentException(name + " is required if shared pools are enabled");
        }
        return true;
    }

    private void initTimers(NodeOptions opts) {
        if (opts.getScheduler() == null && this.validateOption(opts, "scheduler")) {
            opts.setScheduler(JRaftUtils.createScheduler(opts));
        }
        String name = "JRaft-VoteTimer";
        if (opts.getVoteTimer() == null && this.validateOption(opts, "voteTimer")) {
            opts.setVoteTimer(JRaftUtils.createTimer(opts, name));
        }
        this.voteTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(), opts.getVoteTimer()){

            @Override
            protected void onTrigger() {
                NodeImpl.this.handleVoteTimeout();
            }

            @Override
            protected int adjustTimeout(int timeoutMs) {
                return NodeImpl.this.randomTimeout(timeoutMs);
            }
        };
        name = "JRaft-ElectionTimer";
        if (opts.getElectionTimer() == null && this.validateOption(opts, "electionTimer")) {
            opts.setElectionTimer(JRaftUtils.createTimer(opts, name));
        }
        this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(), opts.getElectionTimer()){

            @Override
            protected void onTrigger() {
                NodeImpl.this.handleElectionTimeout();
            }

            @Override
            protected int adjustTimeout(int timeoutMs) {
                return NodeImpl.this.randomTimeout(timeoutMs);
            }
        };
        name = "JRaft-StepDownTimer";
        if (opts.getStepDownTimer() == null && this.validateOption(opts, "stepDownTimer")) {
            opts.setStepDownTimer(JRaftUtils.createTimer(opts, name));
        }
        this.stepDownTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs() >> 1, opts.getStepDownTimer()){

            @Override
            protected void onTrigger() {
                NodeImpl.this.handleStepDownTimeout();
            }
        };
        name = "JRaft-SnapshotTimer";
        if (opts.getSnapshotTimer() == null && this.validateOption(opts, "snapshotTimer")) {
            opts.setSnapshotTimer(JRaftUtils.createTimer(opts, name));
        }
        this.snapshotTimer = new RepeatedTimer(name, this.options.getSnapshotIntervalSecs() * 1000, opts.getSnapshotTimer()){
            private volatile boolean firstSchedule;
            {
                this.firstSchedule = true;
            }

            @Override
            protected void onTrigger() {
                NodeImpl.this.handleSnapshotTimeout();
            }

            @Override
            protected int adjustTimeout(int timeoutMs) {
                if (!this.firstSchedule) {
                    return timeoutMs;
                }
                this.firstSchedule = false;
                if (timeoutMs > 0) {
                    int half = timeoutMs / 2;
                    return half + ThreadLocalRandom.current().nextInt(half);
                }
                return timeoutMs;
            }
        };
    }

    private void initPools(NodeOptions opts) {
        if (opts.getCommonExecutor() == null && this.validateOption(opts, "commonExecutor")) {
            opts.setCommonExecutor(JRaftUtils.createCommonExecutor(opts));
        }
        if (opts.getStripedExecutor() == null && this.validateOption(opts, "stripedExecutor")) {
            opts.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(opts));
        }
        if (opts.getClientExecutor() == null && this.validateOption(opts, "clientExecutor")) {
            opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, opts.getServerName()));
        }
        if (opts.getRaftMetrics() == null) {
            opts.setRaftMetrics(new RaftMetricSource(opts.getStripes(), opts.getLogStripesCount()));
        }
        if (opts.isSystemGroup()) {
            opts.setfSMCallerExecutorDisruptor(StripedDisruptor.createSerialDisruptor(opts.getServerName(), "JRaft-FSMCaller-Disruptor-" + this.groupId, (stripeName, logger) -> IgniteThreadFactory.create((String)opts.getServerName(), (String)stripeName, (boolean)true, (IgniteLogger)logger, (ThreadOperation[])new ThreadOperation[0]), opts.getRaftOptions().getDisruptorBufferSize(), () -> new FSMCallerImpl.ApplyTask(), false, null));
        } else if (opts.getfSMCallerExecutorDisruptor() == null) {
            opts.setfSMCallerExecutorDisruptor(new StripedDisruptor<FSMCallerImpl.ApplyTask>(opts.getServerName(), "JRaft-FSMCaller-Disruptor", (stripeName, logger) -> IgniteThreadFactory.create((String)opts.getServerName(), (String)stripeName, (boolean)true, (IgniteLogger)LOG, (ThreadOperation[])new ThreadOperation[]{ThreadOperation.STORAGE_READ, ThreadOperation.STORAGE_WRITE}), opts.getRaftOptions().getDisruptorBufferSize(), () -> new FSMCallerImpl.ApplyTask(), opts.getStripes(), false, false, opts.getRaftMetrics().disruptorMetrics("raft.fsmcaller.disruptor")));
        }
        if (opts.getNodeApplyDisruptor() == null) {
            opts.setNodeApplyDisruptor(new StripedDisruptor<LogEntryAndClosure>(opts.getServerName(), "JRaft-NodeImpl-Disruptor", (stripeName, logger) -> IgniteThreadFactory.create((String)opts.getServerName(), (String)stripeName, (boolean)true, (IgniteLogger)logger, (ThreadOperation[])new ThreadOperation[0]), opts.getRaftOptions().getDisruptorBufferSize(), () -> new LogEntryAndClosure(), opts.getStripes(), false, false, opts.getRaftMetrics().disruptorMetrics("raft.nodeimpl.disruptor")));
        }
        if (opts.getReadOnlyServiceDisruptor() == null) {
            opts.setReadOnlyServiceDisruptor(new StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent>(opts.getServerName(), "JRaft-ReadOnlyService-Disruptor", (stripeName, logger) -> IgniteThreadFactory.create((String)opts.getServerName(), (String)stripeName, (boolean)true, (IgniteLogger)logger, (ThreadOperation[])new ThreadOperation[0]), opts.getRaftOptions().getDisruptorBufferSize(), () -> new ReadOnlyServiceImpl.ReadIndexEvent(), opts.getStripes(), false, false, opts.getRaftMetrics().disruptorMetrics("raft.readonlyservice.disruptor")));
        }
        if (opts.getLogManagerDisruptor() == null) {
            opts.setLogManagerDisruptor(new StripedDisruptor<LogManagerImpl.StableClosureEvent>(opts.getServerName(), "JRaft-LogManager-Disruptor", (stripeName, logger) -> IgniteThreadFactory.create((String)opts.getServerName(), (String)stripeName, (boolean)true, (IgniteLogger)logger, (ThreadOperation[])new ThreadOperation[0]), opts.getRaftOptions().getDisruptorBufferSize(), () -> new LogManagerImpl.StableClosureEvent(), opts.getLogStripesCount(), this.logStorage instanceof RocksDbSharedLogStorage, opts.isLogYieldStrategy(), opts.getRaftMetrics().disruptorMetrics("raft.logmanager.disruptor")));
            opts.setLogStripes(IntStream.range(0, opts.getLogStripesCount()).mapToObj(i -> new StripeAwareLogManager.Stripe()).collect(Collectors.toList()));
        }
    }

    @OnlyForTest
    void tryElectSelf() {
        this.writeLock.lock();
        this.electSelf();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void electSelf() {
        long electSelfTerm;
        try {
            LOG.info("Node {} start vote and grant vote self, term={}.", new Object[]{this.getNodeId(), this.currTerm});
            if (!this.conf.contains(this.serverId)) {
                LOG.warn("Node {} can't do electSelf as it is not in {}.", new Object[]{this.getNodeId(), this.conf});
                return;
            }
            if (this.state == State.STATE_FOLLOWER) {
                LOG.debug("Node {} stop election timer, term={}.", new Object[]{this.getNodeId(), this.currTerm});
                this.electionTimer.stop();
            }
            this.resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "A follower's leader_id is reset to NULL as it begins to request_vote.", new Object[0]));
            this.state = State.STATE_CANDIDATE;
            ++this.currTerm;
            this.votedId = this.serverId.copy();
            LOG.debug("Node {} start vote timer, term={} .", new Object[]{this.getNodeId(), this.currTerm});
            this.voteTimer.start();
            this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
            electSelfTerm = this.currTerm;
        }
        finally {
            this.writeLock.unlock();
        }
        LogId lastLogId = this.logManager.getLastLogId(true);
        this.writeLock.lock();
        try {
            if (electSelfTerm != this.currTerm) {
                LOG.warn("Node {} raise term {} when getLastLogId.", new Object[]{this.getNodeId(), this.currTerm});
                return;
            }
            if (this.state != State.STATE_CANDIDATE) {
                LOG.warn("Node {} state changed from CANDIDATE to {} during election.", new Object[]{this.getNodeId(), this.state});
                return;
            }
            for (PeerId peer : this.conf.listPeers()) {
                if (peer.equals(this.serverId)) continue;
                this.rpcClientService.connectAsync(peer).thenAccept(ok -> {
                    if (!ok.booleanValue()) {
                        LOG.warn("Node {} failed to init channel, address={}.", new Object[]{this.getNodeId(), peer});
                        return;
                    }
                    OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, electSelfTerm, this);
                    done.request = this.raftOptions.getRaftMessagesFactory().requestVoteRequest().preVote(false).groupId(this.groupId).serverId(this.serverId.toString()).peerId(peer.toString()).term(electSelfTerm).lastLogIndex(lastLogId.getIndex()).lastLogTerm(lastLogId.getTerm()).build();
                    this.rpcClientService.requestVote(peer, done.request, done);
                });
            }
            this.metaStorage.setTermAndVotedFor(electSelfTerm, this.serverId);
            this.voteCtx.grant(this.serverId);
            if (this.voteCtx.isGranted()) {
                this.becomeLeader();
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void resetLeaderId(PeerId newLeaderId, Status status) {
        if (newLeaderId.isEmpty()) {
            if (!this.leaderId.isEmpty() && this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
                this.fsmCaller.onStopFollowing(new LeaderChangeContext(this.leaderId.copy(), this.currTerm, status));
            }
            this.leaderId = PeerId.emptyPeer();
        } else {
            if (this.leaderId == null || this.leaderId.isEmpty()) {
                this.fsmCaller.onStartFollowing(new LeaderChangeContext(newLeaderId, this.currTerm, status));
            }
            this.leaderId = newLeaderId.copy();
            this.resetElectionTimeoutToInitial();
        }
    }

    private void checkStepDown(long requestTerm, PeerId serverId) {
        Status status = new Status();
        if (requestTerm > this.currTerm) {
            status.setError(RaftError.ENEWLEADER, "Raft node receives message from new leader with higher term.", new Object[0]);
            this.stepDown(requestTerm, false, status);
        } else if (this.state != State.STATE_FOLLOWER) {
            status.setError(RaftError.ENEWLEADER, "Candidate receives message from new leader with the same term.", new Object[0]);
            this.stepDown(requestTerm, false, status);
        } else if (this.leaderId.isEmpty()) {
            status.setError(RaftError.ENEWLEADER, "Follower receives message from new leader with the same term.", new Object[0]);
            this.stepDown(requestTerm, false, status);
        }
        if (this.leaderId == null || this.leaderId.isEmpty()) {
            this.resetLeaderId(serverId, status);
        }
    }

    private void becomeLeader() {
        Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state);
        LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", new Object[]{this.getNodeId(), this.currTerm, this.conf.getConf(), this.conf.getOldConf()});
        this.stopVoteTimer();
        this.state = State.STATE_LEADER;
        this.leaderId = this.serverId.copy();
        this.replicatorGroup.resetTerm(this.currTerm);
        for (PeerId peer : this.conf.listPeers()) {
            if (peer.equals(this.serverId)) continue;
            LOG.debug("Node {} add a replicator, term={}, peer={}.", new Object[]{this.getNodeId(), this.currTerm, peer});
            if (this.replicatorGroup.addReplicator(peer)) continue;
            LOG.error("Fail to add a replicator [node={}, peer={}].", new Object[]{this.getNodeId(), peer});
        }
        for (PeerId peer : this.conf.listLearners()) {
            LOG.debug("Node {} add a learner replicator, term={}, peer={}.", new Object[]{this.getNodeId(), this.currTerm, peer});
            if (this.replicatorGroup.addReplicator(peer, ReplicatorType.Learner)) continue;
            LOG.error("Fail to add a learner replicator [node={}, peer={}].", new Object[]{this.getNodeId(), peer});
        }
        this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1L);
        if (this.confCtx.isBusy()) {
            throw new IllegalStateException();
        }
        this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());
        this.resetElectionTimeoutToInitial();
        this.stepDownTimer.start();
    }

    private void stepDown(long term, boolean wakeupCandidate, Status status) {
        LOG.debug("Node {} stepDown, term={}, newTerm={}, wakeupCandidate={}.", new Object[]{this.getNodeId(), this.currTerm, term, wakeupCandidate});
        if (!this.state.isActive()) {
            return;
        }
        if (this.state == State.STATE_CANDIDATE) {
            this.stopVoteTimer();
        } else if (this.state.compareTo(State.STATE_TRANSFERRING) <= 0) {
            this.stopStepDownTimer();
            this.ballotBox.clearPendingTasks();
            if (this.state == State.STATE_LEADER) {
                this.onLeaderStop(status);
            }
        }
        this.resetLeaderId(PeerId.emptyPeer(), status);
        this.state = State.STATE_FOLLOWER;
        this.confCtx.reset();
        this.updateLastLeaderTimestamp(Utils.monotonicMs());
        if (this.snapshotExecutor != null) {
            this.snapshotExecutor.interruptDownloadingSnapshots(term);
        }
        if (term > this.currTerm) {
            this.currTerm = term;
            this.votedId = PeerId.emptyPeer();
            this.metaStorage.setTermAndVotedFor(term, this.votedId);
        }
        if (wakeupCandidate) {
            this.wakingCandidate = this.replicatorGroup.stopAllAndFindTheNextCandidate(this.conf);
            if (this.wakingCandidate != null) {
                Replicator.sendTimeoutNowAndStop(this.wakingCandidate, this.options.getElectionTimeoutMs());
            }
        } else {
            this.replicatorGroup.stopAll();
        }
        if (this.stopTransferArg != null) {
            if (this.transferTimer != null) {
                this.transferTimer.cancel(true);
            }
            this.stopTransferArg = null;
        }
        if (!this.isLearner(false)) {
            this.electionTimer.restart();
        } else {
            LOG.info("Node {} is a learner, election timer is not started.", new Object[]{this.getNodeId()});
        }
    }

    private boolean isLearner(boolean blocking) {
        if (!blocking) {
            return this.conf.listLearners().contains(this.serverId);
        }
        this.readLock.lock();
        try {
            boolean bl = this.conf.listLearners().contains(this.serverId);
            return bl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    private void stopStepDownTimer() {
        if (this.stepDownTimer != null) {
            this.stepDownTimer.stop();
        }
    }

    private void stopVoteTimer() {
        if (this.voteTimer != null) {
            this.voteTimer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeApplyingTasks(List<LogEntryAndClosure> tasks) {
        if (!this.logManager.hasAvailableCapacityToAppendEntries(1)) {
            List dones = tasks.stream().map(ele -> ele.done).filter(Objects::nonNull).collect(Collectors.toList());
            Utils.runInThread(this.getOptions().getCommonExecutor(), () -> {
                for (Closure done : dones) {
                    done.run(new Status(RaftError.EBUSY, "Node %s log manager is busy.", this.getNodeId()));
                }
            });
            return;
        }
        this.writeLock.lock();
        try {
            int size = tasks.size();
            State nodeState = this.state;
            if (nodeState != State.STATE_LEADER) {
                Status st = NodeImpl.cannotApplyBecauseNotLeaderStatus(nodeState);
                LOG.debug("Node {} can't apply, status={}.", new Object[]{this.getNodeId(), st});
                List dones = tasks.stream().map(ele -> ele.done).filter(Objects::nonNull).collect(Collectors.toList());
                Utils.runInThread(this.getOptions().getCommonExecutor(), () -> {
                    for (Closure done : dones) {
                        done.run(st);
                    }
                });
                return;
            }
            ArrayList<LogEntry> entries = new ArrayList<LogEntry>(size);
            for (int i = 0; i < size; ++i) {
                LogEntryAndClosure task = tasks.get(i);
                if (task.expectedTerm != -1L && task.expectedTerm != this.currTerm) {
                    LOG.debug("Node {} can't apply task whose expectedTerm={} doesn't match currTerm={}.", new Object[]{this.getNodeId(), task.expectedTerm, this.currTerm});
                    if (task.done == null) continue;
                    Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d", task.expectedTerm, this.currTerm);
                    Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.done, st);
                    task.reset();
                    continue;
                }
                if (!this.ballotBox.appendPendingTask(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) {
                    Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.done, new Status(RaftError.EINTERNAL, "Fail to append task.", new Object[0]));
                    task.reset();
                    continue;
                }
                task.entry.getId().setTerm(this.currTerm);
                task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
                entries.add(task.entry);
                task.reset();
            }
            this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
            this.checkAndSetConfiguration(true);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    public static Status cannotApplyBecauseNotLeaderStatus(State nodeState) {
        Status st = new Status();
        if (nodeState != State.STATE_TRANSFERRING) {
            st.setError(RaftError.EPERM, "Is not leader.", new Object[0]);
        } else {
            st.setError(RaftError.EBUSY, "Is transferring leadership.", new Object[0]);
        }
        return st;
    }

    @Override
    public NodeMetrics getNodeMetrics() {
        return this.metrics;
    }

    public JRaftServiceFactory getServiceFactory() {
        return this.serviceFactory;
    }

    @Override
    public void readIndex(byte[] requestContext, ReadIndexClosure done) {
        if (this.shutdownLatch != null) {
            Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down.", new Object[0]));
            throw new IllegalStateException("Node is shutting down");
        }
        Requires.requireNonNull(done, "Null closure");
        this.readOnlyService.addRequest(requestContext, done);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void handleReadIndexRequest(RpcRequests.ReadIndexRequest request, RpcResponseClosure<RpcRequests.ReadIndexResponse> done) {
        long startMs = Utils.monotonicMs();
        this.readLock.lock();
        try {
            switch (this.state) {
                case STATE_LEADER: {
                    this.readLeader(request, done);
                    return;
                }
                case STATE_FOLLOWER: {
                    this.readFollower(request, done);
                    return;
                }
                case STATE_TRANSFERRING: {
                    done.run(new Status(RaftError.EBUSY, "Is transferring leadership.", new Object[0]));
                    return;
                }
                default: {
                    done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.", new Object[]{this.state}));
                    return;
                }
            }
        }
        finally {
            this.readLock.unlock();
            this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs);
            this.metrics.recordSize("handle-read-index-entries", Utils.size(request.entriesList()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void handleGetLeaderAndTermRequest(CliRequests.GetLeaderRequest request, RpcResponseClosure<CliRequests.GetLeaderResponse> done) {
        long startMs = Utils.monotonicMs();
        this.readLock.lock();
        try {
            switch (this.state) {
                case STATE_LEADER: {
                    this.getLeaderFromLeader(done);
                    return;
                }
                case STATE_FOLLOWER: {
                    this.getLeaderFromFollower(request, done);
                    return;
                }
                case STATE_TRANSFERRING: {
                    done.run(new Status(RaftError.EBUSY, "Is transferring leadership.", new Object[0]));
                    return;
                }
                default: {
                    done.run(new Status(RaftError.UNKNOWN, "Invalid state for getLeaderAndTerm: %s.", new Object[]{this.state}));
                    return;
                }
            }
        }
        finally {
            this.readLock.unlock();
            this.metrics.recordLatency("handle-get-leader", Utils.monotonicMs() - startMs);
        }
    }

    private void getLeaderFromFollower(CliRequests.GetLeaderRequest request, RpcResponseClosure<CliRequests.GetLeaderResponse> closure) {
        PeerId leaderId = this.leaderId;
        if (leaderId == null || leaderId.isEmpty()) {
            closure.run(new Status(RaftError.UNKNOWN, "No leader at term %d.", this.currTerm));
            return;
        }
        CliRequests.GetLeaderRequest newRequest = this.raftOptions.getRaftMessagesFactory().getLeaderRequest().groupId(request.groupId()).peerId(leaderId.toString()).build();
        this.rpcClientService.getLeaderAndTerm(leaderId, newRequest, -1, closure);
    }

    private void getLeaderFromLeader(RpcResponseClosure<CliRequests.GetLeaderResponse> closure) {
        PeerId leaderId = this.leaderId;
        if (leaderId == null || leaderId.isEmpty()) {
            closure.run(new Status(RaftError.UNKNOWN, "No leader at term %d.", this.currTerm));
            return;
        }
        GetLeaderResponseBuilder respBuilder = this.raftOptions.getRaftMessagesFactory().getLeaderResponse().leaderId(leaderId.toString()).currentTerm(this.getCurrentTerm());
        int quorum = this.getQuorum();
        if (quorum <= 1) {
            closure.setResponse(respBuilder.build());
            closure.run(Status.OK());
            return;
        }
        ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
        if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !this.isLeaderLeaseValid()) {
            readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
        }
        switch (readOnlyOpt) {
            case ReadOnlySafe: {
                List<PeerId> peers = this.conf.getConf().getPeers();
                Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers");
                QuorumConfirmedHeartbeatResponseClosure<CliRequests.GetLeaderResponse> heartbeatDone = new QuorumConfirmedHeartbeatResponseClosure<CliRequests.GetLeaderResponse>(response -> {
                    closure.setResponse((CliRequests.GetLeaderResponse)response);
                    closure.run(Status.OK());
                }, success -> respBuilder.build(), quorum, peers.size());
                for (PeerId peer : peers) {
                    if (peer.equals(this.serverId)) continue;
                    this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
                }
                break;
            }
            case ReadOnlyLeaseBased: {
                closure.setResponse(respBuilder.build());
                closure.run(Status.OK());
            }
        }
    }

    private int getQuorum() {
        Configuration c = this.conf.getConf();
        if (c.isEmpty()) {
            return 0;
        }
        return c.getPeers().size() / 2 + 1;
    }

    private void readFollower(RpcRequests.ReadIndexRequest request, RpcResponseClosure<RpcRequests.ReadIndexResponse> closure) {
        if (this.leaderId == null || this.leaderId.isEmpty()) {
            closure.run(new Status(RaftError.EPERM, "No leader at term %d.", this.currTerm));
            return;
        }
        RpcRequests.ReadIndexRequest newRequest = this.raftOptions.getRaftMessagesFactory().readIndexRequest().groupId(request.groupId()).serverId(request.serverId()).peerId(request.peerId()).entriesList(request.entriesList()).peerId(this.leaderId.toString()).build();
        this.rpcClientService.readIndex(this.leaderId, newRequest, -1, closure);
    }

    private void readLeader(RpcRequests.ReadIndexRequest request, RpcResponseClosure<RpcRequests.ReadIndexResponse> closure) {
        ReadOnlyOption readOnlyOpt;
        ReadIndexResponseBuilder respBuilder = this.raftOptions.getRaftMessagesFactory().readIndexResponse();
        int quorum = this.getQuorum();
        if (quorum <= 1) {
            respBuilder.success(true).index(this.ballotBox.getLastCommittedIndex());
            closure.setResponse(respBuilder.build());
            closure.run(Status.OK());
            return;
        }
        long lastCommittedIndex = this.ballotBox.getLastCommittedIndex();
        if (this.logManager.getTerm(lastCommittedIndex) != this.currTerm) {
            closure.run(new Status(RaftError.EAGAIN, "ReadIndex request rejected because leader has not committed any log entry at its term, logIndex=%d, currTerm=%d.", lastCommittedIndex, this.currTerm));
            return;
        }
        respBuilder.index(lastCommittedIndex);
        if (request.peerId() != null) {
            PeerId peer = new PeerId();
            peer.parse(request.serverId());
            if (!this.conf.contains(peer) && !this.conf.containsLearner(peer)) {
                closure.run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: %s.", peer, this.conf));
                return;
            }
        }
        if ((readOnlyOpt = this.raftOptions.getReadOnlyOptions()) == ReadOnlyOption.ReadOnlyLeaseBased && !this.isLeaderLeaseValid()) {
            readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
        }
        switch (readOnlyOpt) {
            case ReadOnlySafe: {
                List<PeerId> peers = this.conf.getConf().getPeers();
                Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers");
                QuorumConfirmedHeartbeatResponseClosure<RpcRequests.ReadIndexResponse> heartbeatDone = new QuorumConfirmedHeartbeatResponseClosure<RpcRequests.ReadIndexResponse>(response -> {
                    closure.setResponse((RpcRequests.ReadIndexResponse)response);
                    closure.run(Status.OK());
                }, success -> {
                    respBuilder.success((boolean)success);
                    return respBuilder.build();
                }, quorum, peers.size());
                for (PeerId peer : peers) {
                    if (peer.equals(this.serverId)) continue;
                    this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
                }
                break;
            }
            case ReadOnlyLeaseBased: {
                respBuilder.success(true);
                closure.setResponse(respBuilder.build());
                closure.run(Status.OK());
            }
        }
    }

    @Override
    public void apply(Task task) {
        if (this.shutdownLatch != null) {
            Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down.", new Object[0]));
            throw new IllegalStateException("Node is shutting down");
        }
        Requires.requireNonNull(task, "Null task");
        LogEntry entry = new LogEntry();
        entry.setData(task.getData());
        EventTranslator translator = (event, sequence) -> {
            event.reset();
            event.nodeId = this.getNodeId();
            event.done = task.getDone();
            event.entry = entry;
            event.expectedTerm = task.getExpectedTerm();
        };
        switch (this.options.getApplyTaskMode()) {
            case Blocking: {
                this.applyQueue.publishEvent(translator);
                break;
            }
            default: {
                if (this.applyQueue.tryPublishEvent(translator)) break;
                String errorMsg = "Node is busy, has too many tasks, queue is full and bufferSize=" + this.applyQueue.getBufferSize();
                Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.getDone(), new Status(RaftError.EBUSY, errorMsg, new Object[0]));
                LOG.warn("Node {} applyQueue is overload.", new Object[]{this.getNodeId()});
                this.metrics.recordTimes("apply-task-overload-times", 1L);
                if (task.getDone() != null) break;
                throw new OverloadException(errorMsg);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Message handlePreVoteRequest(RpcRequests.RequestVoteRequest request) {
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            if (!this.state.isActive()) {
                LOG.warn("Node {} is not in active state, currTerm={}.", new Object[]{this.getNodeId(), this.currTerm});
                Message message = RaftRpcFactory.DEFAULT.newResponse(this.raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, "Node %s is not in active state, state %s.", this.getNodeId(), this.state.name());
                return message;
            }
            PeerId candidateId = new PeerId();
            if (!candidateId.parse(request.serverId())) {
                LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", new Object[]{this.getNodeId(), request.serverId()});
                Message message = RaftRpcFactory.DEFAULT.newResponse(this.raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, "Parse candidateId failed: %s.", request.serverId());
                return message;
            }
            boolean granted = false;
            if (!this.conf.contains(candidateId)) {
                LOG.warn("Node {} ignore PreVoteRequest from {} as it is not in conf <{}>.", new Object[]{this.getNodeId(), request.serverId(), this.conf});
            } else if (this.leaderId != null && !this.leaderId.isEmpty() && this.isCurrentLeaderValid()) {
                LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.", new Object[]{this.getNodeId(), request.serverId(), request.term(), this.currTerm, this.leaderId});
            } else if (request.term() < this.currTerm) {
                LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", new Object[]{this.getNodeId(), request.serverId(), request.term(), this.currTerm});
                this.checkReplicator(candidateId);
            } else {
                this.checkReplicator(candidateId);
                doUnlock = false;
                this.writeLock.unlock();
                LogId lastLogId = this.logManager.getLastLogId(true);
                doUnlock = true;
                this.writeLock.lock();
                LogId requestLastLogId = new LogId(request.lastLogIndex(), request.lastLogTerm());
                granted = requestLastLogId.compareTo(lastLogId) >= 0;
                LOG.info("Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.", new Object[]{this.getNodeId(), request.serverId(), request.term(), this.currTerm, granted, requestLastLogId, lastLogId});
            }
            RpcRequests.RequestVoteResponse requestVoteResponse = this.raftOptions.getRaftMessagesFactory().requestVoteResponse().term(this.currTerm).granted(granted).build();
            return requestVoteResponse;
        }
        finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
    }

    private boolean isLeaderLeaseValid() {
        long monotonicNowMs = Utils.monotonicMs();
        if (this.checkLeaderLease(monotonicNowMs)) {
            return true;
        }
        this.checkDeadNodes0(this.conf.getConf().getPeers(), monotonicNowMs, false, null);
        return this.checkLeaderLease(monotonicNowMs);
    }

    private boolean checkLeaderLease(long monotonicNowMs) {
        return monotonicNowMs - this.lastLeaderTimestamp < (long)this.options.getLeaderLeaseTimeoutMs();
    }

    private boolean isCurrentLeaderValid() {
        return this.checkLeaderLease(Utils.monotonicMs());
    }

    private void updateLastLeaderTimestamp(long lastLeaderTimestamp) {
        this.lastLeaderTimestamp = lastLeaderTimestamp;
    }

    private void checkReplicator(PeerId candidateId) {
        if (this.state == State.STATE_LEADER) {
            this.replicatorGroup.checkReplicator(candidateId, false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Message handleRequestVoteRequest(RpcRequests.RequestVoteRequest request) {
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            PeerId candidateId;
            block14: {
                block13: {
                    block11: {
                        block12: {
                            if (!this.state.isActive()) {
                                LOG.warn("Node {} is not in active state, currTerm={}.", new Object[]{this.getNodeId(), this.currTerm});
                                Message message = RaftRpcFactory.DEFAULT.newResponse(this.raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, "Node %s is not in active state, state %s.", this.getNodeId(), this.state.name());
                                return message;
                            }
                            candidateId = new PeerId();
                            if (!candidateId.parse(request.serverId())) {
                                LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", new Object[]{this.getNodeId(), request.serverId()});
                                Message message = RaftRpcFactory.DEFAULT.newResponse(this.raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, "Parse candidateId failed: %s.", request.serverId());
                                return message;
                            }
                            if (request.term() < this.currTerm) break block11;
                            LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", new Object[]{this.getNodeId(), request.serverId(), request.term(), this.currTerm});
                            if (request.term() <= this.currTerm) break block12;
                            this.stepDown(request.term(), false, new Status(RaftError.EHIGHERTERMRESPONSE, "Raft node receives higher term RequestVoteRequest.", new Object[0]));
                            break block13;
                        }
                        if (!candidateId.equals(this.leaderId)) break block13;
                        LOG.info("Node {} ignores RequestVoteRequest from {}, term={}, currTerm={}.", new Object[]{this.getNodeId(), request.serverId(), request.term(), this.currTerm});
                        break block14;
                    }
                    LOG.info("Node {} ignores RequestVoteRequest from {}, term={}, currTerm={}.", new Object[]{this.getNodeId(), request.serverId(), request.term(), this.currTerm});
                    break block14;
                }
                doUnlock = false;
                this.writeLock.unlock();
                LogId lastLogId = this.logManager.getLastLogId(true);
                doUnlock = true;
                this.writeLock.lock();
                if (request.term() != this.currTerm) {
                    LOG.warn("Node {} raise term {} when get lastLogId.", new Object[]{this.getNodeId(), this.currTerm});
                } else {
                    boolean logIsOk;
                    boolean bl = logIsOk = new LogId(request.lastLogIndex(), request.lastLogTerm()).compareTo(lastLogId) >= 0;
                    if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) {
                        this.stepDown(request.term(), false, new Status(RaftError.EVOTEFORCANDIDATE, "Raft node votes for some candidate, step down to restart election_timer.", new Object[0]));
                        this.votedId = candidateId.copy();
                        this.metaStorage.setVotedFor(candidateId);
                    }
                }
            }
            RpcRequests.RequestVoteResponse requestVoteResponse = this.raftOptions.getRaftMessagesFactory().requestVoteResponse().term(this.currTerm).granted(request.term() == this.currTerm && candidateId.equals(this.votedId)).build();
            return requestVoteResponse;
        }
        finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Message handleAppendEntriesRequest(RpcRequests.AppendEntriesRequest request, RpcRequestClosure done) {
        boolean doUnlock = true;
        long startMs = Utils.monotonicMs();
        this.writeLock.lock();
        int entriesCount = Utils.size(request.entriesList());
        boolean success = false;
        try {
            if (!this.state.isActive()) {
                LOG.warn("Node {} is not in active state, currTerm={}.", new Object[]{this.getNodeId(), this.currTerm});
                Message message = RaftRpcFactory.DEFAULT.newResponse(this.raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, "Node %s is not in active state, state %s.", this.getNodeId(), this.state.name());
                return message;
            }
            PeerId serverId = new PeerId();
            if (!serverId.parse(request.serverId())) {
                LOG.warn("Node {} received AppendEntriesRequest from {} serverId bad format.", new Object[]{this.getNodeId(), request.serverId()});
                Message processLatency = RaftRpcFactory.DEFAULT.newResponse(this.raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, "Parse serverId failed: %s.", request.serverId());
                return processLatency;
            }
            if (request.term() < this.currTerm) {
                LOG.info("Node {} ignore stale AppendEntriesRequest from {}, term={}, currTerm={}.", new Object[]{this.getNodeId(), request.serverId(), request.term(), this.currTerm});
                AppendEntriesResponseBuilder rb = this.raftOptions.getRaftMessagesFactory().appendEntriesResponse().success(false).term(this.currTerm);
                RpcRequests.AppendEntriesResponse processLatency = rb.build();
                return processLatency;
            }
            this.checkStepDown(request.term(), serverId);
            if (!serverId.equals(this.leaderId)) {
                LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.", new Object[]{serverId, this.currTerm, this.leaderId});
                this.stepDown(request.term() + 1L, false, new Status(RaftError.ELEADERCONFLICT, "More than one leader in the same term.", new Object[0]));
                AppendEntriesResponseBuilder rb = this.raftOptions.getRaftMessagesFactory().appendEntriesResponse().success(false).term(request.term() + 1L);
                RpcRequests.AppendEntriesResponse processLatency = rb.build();
                return processLatency;
            }
            this.updateLastLeaderTimestamp(Utils.monotonicMs());
            if (entriesCount > 0 && this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
                LOG.warn("Node {} received AppendEntriesRequest while installing snapshot.", new Object[]{this.getNodeId()});
                Message rb = RaftRpcFactory.DEFAULT.newResponse(this.raftOptions.getRaftMessagesFactory(), RaftError.EBUSY, "Node %s:%s is installing snapshot.", this.groupId, this.serverId);
                return rb;
            }
            long prevLogIndex = request.prevLogIndex();
            long prevLogTerm = request.prevLogTerm();
            long localPrevLogTerm = this.logManager.getTerm(prevLogIndex);
            if (localPrevLogTerm != prevLogTerm) {
                long lastLogIndex = this.logManager.getLastLogIndex();
                LOG.info("Node {} reject term_unmatched AppendEntriesRequest from {}, term={}, prevLogIndex={}, prevLogTerm={}, localPrevLogTerm={}, lastLogIndex={}, entriesSize={}.", new Object[]{this.getNodeId(), request.serverId(), request.term(), prevLogIndex, prevLogTerm, localPrevLogTerm, lastLogIndex, entriesCount});
                AppendEntriesResponseBuilder rb = this.raftOptions.getRaftMessagesFactory().appendEntriesResponse().success(false).term(this.currTerm).lastLogIndex(lastLogIndex);
                RpcRequests.AppendEntriesResponse appendEntriesResponse = rb.build();
                return appendEntriesResponse;
            }
            if (entriesCount == 0) {
                AppendEntriesResponseBuilder respBuilder = this.raftOptions.getRaftMessagesFactory().appendEntriesResponse().success(true).term(this.currTerm).lastLogIndex(this.logManager.getLastLogIndex());
                doUnlock = false;
                this.writeLock.unlock();
                this.ballotBox.setLastCommittedIndex(Math.min(request.committedIndex(), prevLogIndex));
                RpcRequests.AppendEntriesResponse appendEntriesResponse = respBuilder.build();
                return appendEntriesResponse;
            }
            if (!this.logManager.hasAvailableCapacityToAppendEntries(1)) {
                LOG.warn("Node {} received AppendEntriesRequest but log manager is busy.", new Object[]{this.getNodeId()});
                AppendEntriesResponseBuilder rb = this.raftOptions.getRaftMessagesFactory().appendEntriesResponse().success(false).errorCode(RaftError.EBUSY.getNumber()).errorMsg(String.format("Node %s:%s log manager is busy.", this.groupId, this.serverId)).term(this.currTerm);
                RpcRequests.AppendEntriesResponse appendEntriesResponse = rb.build();
                return appendEntriesResponse;
            }
            long index = prevLogIndex;
            ArrayList<LogEntry> entries = new ArrayList<LogEntry>(entriesCount);
            ByteBuffer allData = request.data() != null ? request.data().asReadOnlyBuffer() : ArrayUtils.EMPTY_BYTE_BUFFER.asReadOnlyBuffer();
            Collection<RaftOutter.EntryMeta> entriesList = request.entriesList();
            for (RaftOutter.EntryMeta entry : entriesList) {
                LogEntry logEntry;
                if ((logEntry = this.logEntryFromMeta(++index, allData, entry)) == null) continue;
                if (this.raftOptions.isEnableLogEntryChecksum() && logEntry.isCorrupted()) {
                    long realChecksum = logEntry.checksum();
                    LOG.error("Corrupted log entry received from leader [node={}, index={}, term={}, expectedChecksum={}, realChecksum={}]", new Object[]{this.getNodeId(), logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(), realChecksum});
                    Message message = RaftRpcFactory.DEFAULT.newResponse(this.raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, "The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d", logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(), realChecksum);
                    return message;
                }
                entries.add(logEntry);
            }
            FollowerStableClosure closure = new FollowerStableClosure(request, this.raftOptions.getRaftMessagesFactory().appendEntriesResponse().term(this.currTerm), this, done, this.currTerm);
            this.logManager.appendEntries(entries, closure);
            this.checkAndSetConfiguration(true);
            success = true;
            Message message = null;
            return message;
        }
        finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
            long processLatency = Utils.monotonicMs() - startMs;
            if (entriesCount == 0) {
                this.metrics.recordLatency("handle-heartbeat-requests", processLatency);
            } else {
                this.metrics.recordLatency("handle-append-entries", processLatency);
            }
            if (success) {
                this.metrics.recordSize("handle-append-entries-count", entriesCount);
            }
        }
    }

    private LogEntry logEntryFromMeta(long index, ByteBuffer allData, RaftOutter.EntryMeta entry) {
        if (entry.type() != EnumOutter.EntryType.ENTRY_TYPE_UNKNOWN) {
            long dataLen;
            LogEntry logEntry = new LogEntry();
            logEntry.setId(new LogId(index, entry.term()));
            logEntry.setType(entry.type());
            if (entry.hasChecksum()) {
                logEntry.setChecksum(entry.checksum());
            }
            if ((dataLen = entry.dataLen()) > 0L) {
                byte[] bs = new byte[(int)dataLen];
                assert (allData != null);
                allData.get(bs, 0, bs.length);
                logEntry.setData(ByteBuffer.wrap(bs));
            }
            if (entry.peersList() != null) {
                if (entry.type() != EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
                    throw new IllegalStateException("Invalid log entry that contains peers but is not ENTRY_TYPE_CONFIGURATION type: " + entry.type());
                }
                this.fillLogEntryPeers(entry, logEntry);
            } else if (entry.type() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
                throw new IllegalStateException("Invalid log entry that contains zero peers but is ENTRY_TYPE_CONFIGURATION type");
            }
            return logEntry;
        }
        return null;
    }

    private void fillLogEntryPeers(RaftOutter.EntryMeta entry, LogEntry logEntry) {
        PeerId peer;
        ArrayList<PeerId> peers;
        if (entry.peersList() != null) {
            peers = new ArrayList<PeerId>();
            for (String peerStr : entry.peersList()) {
                peer = new PeerId();
                peer.parse(peerStr);
                peers.add(peer);
            }
            logEntry.setPeers(peers);
        }
        if (entry.oldPeersList() != null) {
            ArrayList<PeerId> oldPeers = new ArrayList<PeerId>();
            for (String peerStr : entry.oldPeersList()) {
                peer = new PeerId();
                peer.parse(peerStr);
                oldPeers.add(peer);
            }
            logEntry.setOldPeers(oldPeers);
        }
        if (entry.learnersList() != null) {
            peers = new ArrayList();
            for (String peerStr : entry.learnersList()) {
                peer = new PeerId();
                peer.parse(peerStr);
                peers.add(peer);
            }
            logEntry.setLearners(peers);
        }
        if (entry.oldLearnersList() != null) {
            peers = new ArrayList();
            for (String peerStr : entry.oldLearnersList()) {
                peer = new PeerId();
                peer.parse(peerStr);
                peers.add(peer);
            }
            logEntry.setOldLearners(peers);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void increaseTermTo(long newTerm, Status status) {
        this.writeLock.lock();
        try {
            if (newTerm < this.currTerm) {
                return;
            }
            this.stepDown(newTerm, false, status);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onCaughtUp(PeerId peer, long term, long version, Status st) {
        this.writeLock.lock();
        try {
            if (term != this.currTerm && this.state != State.STATE_LEADER) {
                return;
            }
            if (st.isOk()) {
                this.confCtx.onCaughtUp(version, peer, true);
                return;
            }
            if (st.getCode() == RaftError.ETIMEDOUT.getNumber() && Utils.monotonicMs() - this.replicatorGroup.getLastRpcSendTimestamp(peer) <= (long)this.options.getElectionTimeoutMs()) {
                LOG.debug("Node {} waits peer {} to catch up.", new Object[]{this.getNodeId(), peer});
                OnCaughtUp caughtUp = new OnCaughtUp(this, term, peer, version);
                long dueTime = Utils.nowMs() + (long)this.options.getElectionTimeoutMs();
                if (this.replicatorGroup.waitCaughtUp(peer, this.options.getCatchupMargin(), dueTime, caughtUp)) {
                    return;
                }
                LOG.warn("Node {} waitCaughtUp failed, peer={}.", new Object[]{this.getNodeId(), peer});
            }
            LOG.warn("Node {} caughtUp failed, status={}, peer={}.", new Object[]{this.getNodeId(), st, peer});
            this.confCtx.onCaughtUp(version, peer, false);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private boolean checkDeadNodes(Configuration conf, long monotonicNowMs, boolean stepDownOnCheckFail) {
        Configuration deadNodes;
        for (PeerId peer : conf.getLearners()) {
            this.checkReplicator(peer);
        }
        List<PeerId> peers = conf.listPeers();
        if (this.checkDeadNodes0(peers, monotonicNowMs, true, deadNodes = new Configuration())) {
            return true;
        }
        if (stepDownOnCheckFail) {
            LOG.warn("Node {} steps down when alive nodes don't satisfy quorum, term={}, deadNodes={}, conf={}.", new Object[]{this.getNodeId(), this.currTerm, deadNodes, conf});
            Status status = new Status();
            status.setError(RaftError.ERAFTTIMEDOUT, "Majority of the group dies: %d/%d", deadNodes.size(), peers.size());
            this.stepDown(this.currTerm, false, status);
        }
        return false;
    }

    private boolean checkDeadNodes0(List<PeerId> peers, long monotonicNowMs, boolean checkReplicator, Configuration deadNodes) {
        int leaderLeaseTimeoutMs = this.options.getLeaderLeaseTimeoutMs();
        int aliveCount = 0;
        long startLease = Long.MAX_VALUE;
        for (PeerId peer : peers) {
            long lastRpcSendTimestamp;
            if (peer.equals(this.serverId)) {
                ++aliveCount;
                continue;
            }
            if (checkReplicator) {
                this.checkReplicator(peer);
            }
            if (monotonicNowMs - (lastRpcSendTimestamp = this.replicatorGroup.getLastRpcSendTimestamp(peer)) <= (long)leaderLeaseTimeoutMs) {
                ++aliveCount;
                if (startLease <= lastRpcSendTimestamp) continue;
                startLease = lastRpcSendTimestamp;
                continue;
            }
            if (deadNodes == null) continue;
            deadNodes.addPeer(peer);
        }
        if (aliveCount >= peers.size() / 2 + 1) {
            this.updateLastLeaderTimestamp(startLease);
            return true;
        }
        return false;
    }

    private List<PeerId> getAliveNodes(Collection<PeerId> peers, long monotonicNowMs) {
        int leaderLeaseTimeoutMs = this.options.getLeaderLeaseTimeoutMs();
        ArrayList<PeerId> alivePeers = new ArrayList<PeerId>();
        for (PeerId peer : peers) {
            if (peer.equals(this.serverId)) {
                alivePeers.add(peer.copy());
                continue;
            }
            if (monotonicNowMs - this.replicatorGroup.getLastRpcSendTimestamp(peer) > (long)leaderLeaseTimeoutMs) continue;
            alivePeers.add(peer.copy());
        }
        return alivePeers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleStepDownTimeout() {
        long monotonicNowMs;
        this.readLock.lock();
        try {
            if (this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
                LOG.debug("Node {} stop step-down timer, term={}, state={}.", new Object[]{this.getNodeId(), this.currTerm, this.state});
                return;
            }
            monotonicNowMs = Utils.monotonicMs();
            if (this.checkDeadNodes(this.conf.getConf(), monotonicNowMs, false) && (this.conf.getOldConf().isEmpty() || this.checkDeadNodes(this.conf.getOldConf(), monotonicNowMs, false))) {
                return;
            }
        }
        finally {
            this.readLock.unlock();
        }
        this.writeLock.lock();
        try {
            if (this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
                LOG.debug("Node {} stop step-down timer, term={}, state={}.", new Object[]{this.getNodeId(), this.currTerm, this.state});
                return;
            }
            monotonicNowMs = Utils.monotonicMs();
            this.checkDeadNodes(this.conf.getConf(), monotonicNowMs, true);
            if (!this.conf.getOldConf().isEmpty()) {
                this.checkDeadNodes(this.conf.getOldConf(), monotonicNowMs, true);
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void unsafeApplyConfiguration(Configuration newConf, Configuration oldConf, boolean leaderStart) {
        ConfigurationChangeDone configurationChangeDone;
        Requires.requireTrue(this.confCtx.isBusy(), "ConfigurationContext is not busy");
        LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION);
        entry.setId(new LogId(0L, this.currTerm));
        entry.setPeers(newConf.listPeers());
        entry.setLearners(newConf.listLearners());
        if (oldConf != null) {
            entry.setOldPeers(oldConf.listPeers());
            entry.setOldLearners(oldConf.listLearners());
        }
        if (!this.ballotBox.appendPendingTask(newConf, oldConf, configurationChangeDone = new ConfigurationChangeDone(this.currTerm, leaderStart))) {
            Utils.runClosureInThread(this.getOptions().getCommonExecutor(), configurationChangeDone, new Status(RaftError.EINTERNAL, "Fail to append task.", new Object[0]));
            return;
        }
        ArrayList<LogEntry> entries = new ArrayList<LogEntry>();
        entries.add(entry);
        this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
        this.checkAndSetConfiguration(false);
    }

    private void unsafeRegisterConfChange(Configuration oldConf, Configuration newConf, Closure done) {
        this.unsafeRegisterConfChange(oldConf, newConf, done, false);
    }

    private void unsafeRegisterConfChange(Configuration oldConf, Configuration newConf, Closure done, boolean async) {
        Requires.requireTrue(newConf.isValid(), "Invalid new conf: %s", newConf);
        Requires.requireTrue(new ConfigurationEntry(null, newConf, oldConf).isValid(), "Invalid conf entry: %s", newConf);
        if (this.state != State.STATE_LEADER) {
            LOG.warn("Node {} refused configuration changing as the state={}.", new Object[]{this.getNodeId(), this.state});
            if (done != null) {
                Status status2 = new Status();
                if (this.state == State.STATE_TRANSFERRING) {
                    status2.setError(RaftError.EBUSY, "Is transferring leadership.", new Object[0]);
                } else {
                    status2.setError(RaftError.EPERM, "Not leader", new Object[0]);
                }
                Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done, status2);
            }
            return;
        }
        if (this.confCtx.isBusy()) {
            LOG.warn("Node {} refused configuration concurrent changing.", new Object[]{this.getNodeId()});
            if (done != null) {
                Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done, new Status(RaftError.EBUSY, "Doing another configuration change.", new Object[0]));
            }
            return;
        }
        if (this.conf.getConf().equals(newConf)) {
            Closure newDone = status -> {
                JraftGroupEventsListener listener = this.getOptions().getRaftGrpEvtsLsnr();
                if (listener != null) {
                    LogId id = this.conf.getId();
                    listener.onNewPeersConfigurationApplied(newConf.getPeers(), newConf.getLearners(), id.getTerm(), id.getIndex());
                }
                done.run(status);
            };
            Utils.runClosureInThread(this.getOptions().getCommonExecutor(), newDone);
            return;
        }
        this.confCtx.start(oldConf, newConf, done, async);
    }

    private void afterShutdown() {
        this.writeLock.lock();
        try {
            if (this.logStorage != null) {
                this.logStorage.shutdown();
            }
            this.state = State.STATE_SHUTDOWN;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public NodeOptions getOptions() {
        return this.options;
    }

    @Override
    public RaftOptions getRaftOptions() {
        return this.raftOptions;
    }

    @Override
    public long getCurrentTerm() {
        this.readLock.lock();
        try {
            long l = this.currTerm;
            return l;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public boolean isInstallingSnapshot() {
        this.readLock.lock();
        try {
            boolean bl = this.snapshotExecutor.isInstallingSnapshot();
            return bl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public long lastLogIndex() {
        return this.lastLogIndexAndTerm().getIndex();
    }

    @Override
    public LogId lastLogIndexAndTerm() {
        this.readLock.lock();
        try {
            LogId logId = this.logManager.getLastLogId(false).copy();
            return logId;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @OnlyForTest
    ConfigurationEntry getConf() {
        this.readLock.lock();
        try {
            ConfigurationEntry configurationEntry = this.conf;
            return configurationEntry;
        }
        finally {
            this.readLock.unlock();
        }
    }

    public void onConfigurationChangeDone(long term) {
        this.writeLock.lock();
        try {
            if (term != this.currTerm || this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
                LOG.warn("Node {} process onConfigurationChangeDone at term {} while state={}, currTerm={}.", new Object[]{this.getNodeId(), term, this.state, this.currTerm});
                return;
            }
            this.confCtx.nextStage();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public PeerId getLeaderId() {
        this.readLock.lock();
        try {
            PeerId peerId = this.leaderId.isEmpty() ? null : this.leaderId;
            return peerId;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Nullable
    private String getLeaderConsistentId() {
        PeerId leaderId = this.getLeaderId();
        return leaderId == null ? null : leaderId.getConsistentId();
    }

    @Override
    public String getGroupId() {
        return this.groupId;
    }

    public PeerId getServerId() {
        return this.serverId;
    }

    @Override
    public NodeId getNodeId() {
        if (this.nodeId == null) {
            this.nodeId = new NodeId(this.groupId, this.serverId);
        }
        return this.nodeId;
    }

    public RaftClientService getRpcClientService() {
        return this.rpcClientService;
    }

    public void onError(RaftException error) {
        LOG.warn("Node {} got error: {}.", new Object[]{this.getNodeId(), error});
        if (this.fsmCaller != null) {
            this.fsmCaller.onError(error);
        }
        if (this.readOnlyService != null) {
            this.readOnlyService.setError(error);
        }
        this.writeLock.lock();
        try {
            if (this.state.compareTo(State.STATE_FOLLOWER) <= 0) {
                this.stepDown(this.currTerm, this.state == State.STATE_LEADER, new Status(RaftError.EBADNODE, "Raft node(leader or candidate) is in error.", new Object[0]));
            }
            if (this.state.compareTo(State.STATE_ERROR) < 0) {
                this.state = State.STATE_ERROR;
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleRequestVoteResponse(PeerId peerId, long term, RpcRequests.RequestVoteResponse response) {
        this.writeLock.lock();
        try {
            if (this.state != State.STATE_CANDIDATE) {
                LOG.info("Node {} received invalid RequestVoteResponse from {}, state not in STATE_CANDIDATE but {}.", new Object[]{this.getNodeId(), peerId, this.state});
                return;
            }
            if (term != this.currTerm) {
                LOG.warn("Node {} received stale RequestVoteResponse from {}, term={}, currTerm={}.", new Object[]{this.getNodeId(), peerId, term, this.currTerm});
                return;
            }
            if (response.term() > this.currTerm) {
                LOG.warn("Node {} received invalid RequestVoteResponse from {}, term={}, expect={}.", new Object[]{this.getNodeId(), peerId, response.term(), this.currTerm});
                this.stepDown(response.term(), false, new Status(RaftError.EHIGHERTERMRESPONSE, "Raft node receives higher term request_vote_response.", new Object[0]));
                return;
            }
            if (response.granted()) {
                this.voteCtx.grant(peerId);
                if (this.voteCtx.isGranted()) {
                    this.becomeLeader();
                }
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handlePreVoteResponse(PeerId peerId, long term, RpcRequests.RequestVoteResponse response) {
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            if (this.state != State.STATE_FOLLOWER) {
                LOG.info("Node {} received invalid PreVoteResponse from {}, state not in STATE_FOLLOWER but {}.", new Object[]{this.getNodeId(), peerId, this.state});
                return;
            }
            if (term != this.currTerm) {
                LOG.warn("Node {} received invalid PreVoteResponse from {}, term={}, currTerm={}.", new Object[]{this.getNodeId(), peerId, term, this.currTerm});
                return;
            }
            if (response.term() > this.currTerm) {
                LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.", new Object[]{this.getNodeId(), peerId, response.term(), this.currTerm});
                this.stepDown(response.term(), false, new Status(RaftError.EHIGHERTERMRESPONSE, "Raft node receives higher term pre_vote_response.", new Object[0]));
                return;
            }
            LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.", new Object[]{this.getNodeId(), peerId, response.term(), response.granted()});
            if (response.granted()) {
                this.prevVoteCtx.grant(peerId);
                if (this.prevVoteCtx.isGranted()) {
                    doUnlock = false;
                    this.electSelf();
                }
            }
        }
        finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void preVote() {
        long preVoteTerm;
        try {
            LOG.info("Node {} term {} start preVote.", new Object[]{this.getNodeId(), this.currTerm});
            if (this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
                LOG.warn("Node {} term {} doesn't do preVote when installing snapshot as the configuration may be out of date.", new Object[]{this.getNodeId(), this.currTerm});
                return;
            }
            if (!this.conf.contains(this.serverId)) {
                LOG.warn("Node {} can't do preVote as it is not in conf <{}>.", new Object[]{this.getNodeId(), this.conf});
                return;
            }
            preVoteTerm = this.currTerm;
        }
        finally {
            this.writeLock.unlock();
        }
        LogId lastLogId = this.logManager.getLastLogId(true);
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            if (preVoteTerm != this.currTerm) {
                LOG.warn("Node {} raise term {} when get lastLogId.", new Object[]{this.getNodeId(), this.currTerm});
                return;
            }
            this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
            for (PeerId peer : this.conf.listPeers()) {
                if (peer.equals(this.serverId)) continue;
                this.rpcClientService.connectAsync(peer).thenAccept(ok -> {
                    if (!ok.booleanValue()) {
                        LOG.warn("Node {} failed to init channel, address={}.", new Object[]{this.getNodeId(), peer});
                        return;
                    }
                    OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, preVoteTerm);
                    done.request = this.raftOptions.getRaftMessagesFactory().requestVoteRequest().preVote(true).groupId(this.groupId).serverId(this.serverId.toString()).peerId(peer.toString()).term(preVoteTerm + 1L).lastLogIndex(lastLogId.getIndex()).lastLogTerm(lastLogId.getTerm()).build();
                    this.rpcClientService.preVote(peer, done.request, done);
                });
            }
            this.prevVoteCtx.grant(this.serverId);
            if (this.prevVoteCtx.isGranted()) {
                doUnlock = false;
                this.electSelf();
            }
        }
        finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
    }

    private void handleVoteTimeout() {
        this.writeLock.lock();
        if (this.state != State.STATE_CANDIDATE) {
            this.writeLock.unlock();
            return;
        }
        if (this.prevVoteCtx.isGranted()) {
            this.adjustElectionTimeout();
        }
        if (this.raftOptions.isStepDownWhenVoteTimedout()) {
            LOG.warn("Candidate node {} term {} steps down when election reaching vote timeout: fail to get quorum vote-granted.", new Object[]{this.getNodeId(), this.currTerm});
            this.stepDown(this.currTerm, false, new Status(RaftError.ETIMEDOUT, "Vote timeout: fail to get quorum vote-granted.", new Object[0]));
            this.preVote();
        } else {
            LOG.debug("Node {} term {} retry to vote self.", new Object[]{this.getNodeId(), this.currTerm});
            this.electSelf();
        }
    }

    @Override
    public boolean isLeader() {
        return this.isLeader(true);
    }

    @Override
    public boolean isLearner() {
        return this.isLearner(true);
    }

    @Override
    public boolean isLeader(boolean blocking) {
        if (!blocking) {
            return this.state == State.STATE_LEADER;
        }
        this.readLock.lock();
        try {
            boolean bl = this.state == State.STATE_LEADER;
            return bl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public void shutdown() {
        this.writeLock.lock();
        try {
            LOG.info("Node {} shutdown, currTerm={} state={}.", new Object[]{this.getNodeId(), this.currTerm, this.state});
            if (this.state.compareTo(State.STATE_SHUTTING) < 0) {
                if (this.state.compareTo(State.STATE_FOLLOWER) <= 0) {
                    this.stepDown(this.currTerm, this.state == State.STATE_LEADER, new Status(RaftError.ESHUTDOWN, "Raft node is going to quit.", new Object[0]));
                }
                this.state = State.STATE_SHUTTING;
                this.stopAllTimers();
                if (this.readOnlyService != null) {
                    this.readOnlyService.shutdown();
                }
                if (this.logManager != null) {
                    this.logManager.shutdown();
                }
                if (this.metaStorage != null) {
                    this.metaStorage.shutdown();
                }
                if (this.snapshotExecutor != null) {
                    this.snapshotExecutor.shutdown();
                }
                if (this.wakingCandidate != null) {
                    Replicator.stop(this.wakingCandidate);
                }
                if (this.fsmCaller != null) {
                    this.fsmCaller.shutdown();
                }
                if (this.rpcClientService != null) {
                    this.rpcClientService.shutdown();
                }
                if (this.applyQueue != null) {
                    CountDownLatch latch;
                    this.shutdownLatch = latch = new CountDownLatch(1);
                    Utils.runInThread(this.getOptions().getCommonExecutor(), () -> this.applyQueue.publishEvent((event, sequence) -> {
                        event.nodeId = this.getNodeId();
                        event.handler = null;
                        event.evtType = DisruptorEventType.REGULAR;
                        event.shutdownLatch = latch;
                    }));
                }
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private List<RepeatedTimer> stopAllTimers() {
        ArrayList<RepeatedTimer> timers = new ArrayList<RepeatedTimer>();
        if (this.electionTimer != null) {
            this.electionTimer.stop();
            timers.add(this.electionTimer);
        }
        if (this.voteTimer != null) {
            this.voteTimer.stop();
            timers.add(this.voteTimer);
        }
        if (this.stepDownTimer != null) {
            this.stepDownTimer.stop();
            timers.add(this.stepDownTimer);
        }
        if (this.snapshotTimer != null) {
            this.snapshotTimer.stop();
            timers.add(this.snapshotTimer);
        }
        return timers;
    }

    private void destroyAllTimers(List<RepeatedTimer> timers) {
        for (RepeatedTimer timer : timers) {
            timer.destroy();
        }
    }

    @Override
    public synchronized void join() throws InterruptedException {
        NodeOptions opts;
        if (this.shutdownLatch != null) {
            if (this.readOnlyService != null) {
                this.readOnlyService.join();
            }
            if (this.logManager != null) {
                this.logManager.join();
            }
            if (this.snapshotExecutor != null) {
                this.snapshotExecutor.join();
            }
            if (this.wakingCandidate != null) {
                Replicator.join(this.wakingCandidate);
            }
            this.shutdownLatch.await();
            this.applyDisruptor.unsubscribe(this.getNodeId());
            this.shutdownLatch = null;
        }
        if (this.fsmCaller != null) {
            this.fsmCaller.join();
        }
        if ((opts = this.getOptions()).getScheduler() != null && !opts.isSharedPools()) {
            opts.getScheduler().shutdown();
        }
        if (opts.getElectionTimer() != null && !opts.isSharedPools()) {
            opts.getElectionTimer().stop();
        }
        if (opts.getVoteTimer() != null && !opts.isSharedPools()) {
            opts.getVoteTimer().stop();
        }
        if (opts.getStepDownTimer() != null && !opts.isSharedPools()) {
            opts.getStepDownTimer().stop();
        }
        if (opts.getSnapshotTimer() != null && !opts.isSharedPools()) {
            opts.getSnapshotTimer().stop();
        }
        if (opts.getCommonExecutor() != null && !opts.isSharedPools()) {
            ExecutorServiceHelper.shutdownAndAwaitTermination(opts.getCommonExecutor());
        }
        if (opts.getStripedExecutor() != null && !opts.isSharedPools()) {
            opts.getStripedExecutor().shutdownGracefully();
        }
        if (opts.getClientExecutor() != null && !opts.isSharedPools()) {
            ExecutorServiceHelper.shutdownAndAwaitTermination(opts.getClientExecutor());
        }
        if (opts.getfSMCallerExecutorDisruptor() != null && (!opts.isSharedPools() || opts.isSystemGroup())) {
            opts.getfSMCallerExecutorDisruptor().shutdown();
        }
        if (opts.getNodeApplyDisruptor() != null && !opts.isSharedPools()) {
            opts.getNodeApplyDisruptor().shutdown();
        }
        if (opts.getReadOnlyServiceDisruptor() != null && !opts.isSharedPools()) {
            opts.getReadOnlyServiceDisruptor().shutdown();
        }
        if (opts.getLogManagerDisruptor() != null && !opts.isSharedPools()) {
            opts.getLogManagerDisruptor().shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleTransferTimeout(long term, PeerId peer) {
        LOG.info("Node {} failed to transfer leadership to peer {}, reached timeout.", new Object[]{this.getNodeId(), peer});
        this.writeLock.lock();
        try {
            if (term == this.currTerm) {
                this.replicatorGroup.stopTransferLeadership(peer);
                if (this.state == State.STATE_TRANSFERRING) {
                    this.fsmCaller.onLeaderStart(term);
                    this.state = State.STATE_LEADER;
                    this.stopTransferArg = null;
                }
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void onTransferTimeout(StopTransferArg arg) {
        arg.node.handleTransferTimeout(arg.term, arg.peer);
    }

    public Configuration getCurrentConf() {
        this.readLock.lock();
        try {
            if (this.conf != null && this.conf.getConf() != null) {
                Configuration configuration = this.conf.getConf().copy();
                return configuration;
            }
            Configuration configuration = null;
            return configuration;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public List<PeerId> listPeers() {
        this.readLock.lock();
        try {
            if (this.state != State.STATE_LEADER) {
                throw new NotLeaderException(this.getLeaderConsistentId());
            }
            List<PeerId> list = this.conf.getConf().listPeers();
            return list;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public List<PeerId> listAlivePeers() {
        this.readLock.lock();
        try {
            if (this.state != State.STATE_LEADER) {
                throw new NotLeaderException(this.getLeaderConsistentId());
            }
            List<PeerId> list = this.getAliveNodes(this.conf.getConf().getPeers(), Utils.monotonicMs());
            return list;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public List<PeerId> listLearners() {
        this.readLock.lock();
        try {
            if (this.state != State.STATE_LEADER) {
                throw new NotLeaderException(this.getLeaderConsistentId());
            }
            List<PeerId> list = this.conf.getConf().listLearners();
            return list;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public List<PeerId> listAliveLearners() {
        this.readLock.lock();
        try {
            if (this.state != State.STATE_LEADER) {
                throw new NotLeaderException(this.getLeaderConsistentId());
            }
            List<PeerId> list = this.getAliveNodes(this.conf.getConf().getLearners(), Utils.monotonicMs());
            return list;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addPeer(PeerId peer, Closure done) {
        Requires.requireNonNull(peer, "Null peer");
        this.writeLock.lock();
        try {
            Requires.requireTrue(!this.conf.getConf().contains(peer), "Peer already exists in current configuration");
            Configuration newConf = new Configuration(this.conf.getConf());
            newConf.addPeer(peer);
            this.unsafeRegisterConfChange(this.conf.getConf(), newConf, done);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removePeer(PeerId peer, Closure done) {
        Requires.requireNonNull(peer, "Null peer");
        this.writeLock.lock();
        try {
            Requires.requireTrue(this.conf.getConf().contains(peer), "Peer not found in current configuration");
            Configuration newConf = new Configuration(this.conf.getConf());
            newConf.removePeer(peer);
            this.unsafeRegisterConfChange(this.conf.getConf(), newConf, done);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void changePeersAndLearners(Configuration newPeersAndLearners, long term, Closure done) {
        Requires.requireNonNull(newPeersAndLearners, "Null new configuration");
        Requires.requireTrue(!newPeersAndLearners.isEmpty(), "Empty new configuration");
        this.writeLock.lock();
        try {
            long currentTerm = this.getCurrentTerm();
            if (currentTerm != term) {
                LOG.warn("Node {} ignored the configuration because of mismatching terms. Current term is {}, but provided is {}.", new Object[]{this.getNodeId(), currentTerm, term});
                Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done, Status.OK());
                return;
            }
            this.logConfigurationChange(newPeersAndLearners);
            this.unsafeRegisterConfChange(this.conf.getConf(), newPeersAndLearners, done);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void changePeersAndLearnersAsync(Configuration newConf, long term, Closure done) {
        Requires.requireNonNull(newConf, "Null new configuration");
        Requires.requireTrue(!newConf.isEmpty(), "Empty new configuration");
        this.writeLock.lock();
        try {
            long currentTerm = this.getCurrentTerm();
            if (currentTerm != term) {
                LOG.warn("Node {} ignored the configuration because of mismatching terms. Current term is {}, but provided is {}.", new Object[]{this.getNodeId(), currentTerm, term});
                Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done, Status.OK());
                return;
            }
            this.logConfigurationChange(newConf);
            this.unsafeRegisterConfChange(this.conf.getConf(), newConf, done, true);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Status resetPeers(Configuration newPeers) {
        if (this.options.getExternallyEnforcedConfigIndex() != null) {
            throw new IllegalStateException("Using both externallyEnforcedConfigIndex and resetPeers() is not supported [externallyEnforcedConfigIndex=" + this.options.getExternallyEnforcedConfigIndex() + "]");
        }
        Requires.requireNonNull(newPeers, "Null new peers");
        Requires.requireTrue(!newPeers.isEmpty(), "Empty new peers");
        Requires.requireTrue(newPeers.isValid(), "Invalid new peers: %s", newPeers);
        this.writeLock.lock();
        try {
            if (newPeers.isEmpty()) {
                LOG.warn("Node {} set empty peers.", new Object[]{this.getNodeId()});
                Status status = new Status(RaftError.EINVAL, "newPeers is empty", new Object[0]);
                return status;
            }
            if (!this.state.isActive()) {
                LOG.warn("Node {} is in state {}, can't set peers.", new Object[]{this.getNodeId(), this.state});
                Status status = new Status(RaftError.EPERM, "Bad state: %s", new Object[]{this.state});
                return status;
            }
            if (this.conf.getConf().isEmpty()) {
                LOG.info("Node {} set peers to {} from empty.", new Object[]{this.getNodeId(), newPeers});
                this.conf.setConf(newPeers);
                this.stepDown(this.currTerm + 1L, false, new Status(RaftError.ESETPEER, "Set peer from empty configuration", new Object[0]));
                Status status = Status.OK();
                return status;
            }
            if (this.state == State.STATE_LEADER && this.confCtx.isBusy()) {
                LOG.warn("Node {} set peers need wait current conf changing.", new Object[]{this.getNodeId()});
                Status status = new Status(RaftError.EBUSY, "Changing to another configuration", new Object[0]);
                return status;
            }
            if (this.conf.getConf().equals(newPeers)) {
                Status status = Status.OK();
                return status;
            }
            Configuration newConf = new Configuration(newPeers);
            LOG.info("Node {} set peers from {} to {}.", new Object[]{this.getNodeId(), this.conf.getConf(), newPeers});
            this.conf.setConf(newConf);
            this.conf.getOldConf().reset();
            this.stepDown(this.currTerm + 1L, false, new Status(RaftError.ESETPEER, "Raft node set peer normally", new Object[0]));
            this.resetElectionTimeoutToInitial();
            Status status = Status.OK();
            return status;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addLearners(List<PeerId> learners, Closure done) {
        this.checkPeers(learners);
        this.writeLock.lock();
        try {
            Configuration newConf = new Configuration(this.conf.getConf());
            for (PeerId peer : learners) {
                newConf.addLearner(peer);
            }
            this.unsafeRegisterConfChange(this.conf.getConf(), newConf, done);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void checkPeers(List<PeerId> peers) {
        Requires.requireNonNull(peers, "Null peers");
        Requires.requireTrue(!peers.isEmpty(), "Empty peers");
        for (PeerId peer : peers) {
            Requires.requireNonNull(peer, "Null peer");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeLearners(List<PeerId> learners, Closure done) {
        this.checkPeers(learners);
        this.writeLock.lock();
        try {
            Configuration newConf = new Configuration(this.conf.getConf());
            for (PeerId peer : learners) {
                newConf.removeLearner(peer);
            }
            this.unsafeRegisterConfChange(this.conf.getConf(), newConf, done);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resetLearners(List<PeerId> learners, Closure done) {
        this.checkPeers(learners);
        this.writeLock.lock();
        try {
            Configuration newConf = new Configuration(this.conf.getConf());
            newConf.setLearners(new LinkedHashSet<PeerId>(learners));
            this.unsafeRegisterConfChange(this.conf.getConf(), newConf, done);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public void snapshot(Closure done) {
        this.doSnapshot(done, false);
    }

    @Override
    public void snapshot(Closure done, boolean forced) {
        this.doSnapshot(done, forced);
    }

    private void doSnapshot(Closure done, boolean forced) {
        if (this.snapshotExecutor != null) {
            this.snapshotExecutor.doSnapshot(done, forced);
        } else if (done != null) {
            Status status = new Status(RaftError.EINVAL, "Snapshot is not supported", new Object[0]);
            Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done, status);
        }
    }

    @Override
    public void resetElectionTimeoutMs(int electionTimeoutMs) {
        Requires.requireTrue(electionTimeoutMs > 0, "Invalid electionTimeoutMs");
        this.writeLock.lock();
        try {
            this.options.setElectionTimeoutMs(electionTimeoutMs);
            this.replicatorGroup.resetHeartbeatInterval(this.heartbeatTimeout(this.options.getElectionTimeoutMs()));
            this.replicatorGroup.resetElectionTimeoutInterval(electionTimeoutMs);
            LOG.info("Node {} reset electionTimeout, currTimer {} state {} new electionTimeout {}.", new Object[]{this.getNodeId(), this.currTerm, this.state, electionTimeoutMs});
            this.electionTimer.reset(electionTimeoutMs);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Status transferLeadershipTo(PeerId peer) {
        Requires.requireNonNull(peer, "Null peer");
        this.writeLock.lock();
        try {
            StopTransferArg stopArg;
            if (this.state != State.STATE_LEADER) {
                LOG.warn("Node {} can't transfer leadership to peer {} as it is in state {}.", new Object[]{this.getNodeId(), peer, this.state});
                Status status = new Status(this.state == State.STATE_TRANSFERRING ? RaftError.EBUSY : RaftError.EPERM, "Not a leader", new Object[0]);
                return status;
            }
            if (this.confCtx.isBusy()) {
                LOG.warn("Node {} refused to transfer leadership to peer {} when the leader is changing the configuration.", new Object[]{this.getNodeId(), peer});
                Status status = new Status(RaftError.EBUSY, "Changing the configuration", new Object[0]);
                return status;
            }
            PeerId peerId = peer.copy();
            if (peerId.isEmpty()) {
                LOG.info("Node {} starts to transfer leadership to any peer.", new Object[]{this.getNodeId()});
                peerId = this.replicatorGroup.findTheNextCandidate(this.conf);
                if (peerId == null) {
                    Status status = new Status(-1, "Candidate not found for any peer");
                    return status;
                }
            }
            if (peerId.equals(this.serverId)) {
                LOG.info("Node transferred leadership to self [node={}].", new Object[]{this.getNodeId()});
                Status status = Status.OK();
                return status;
            }
            if (!this.conf.contains(peerId)) {
                LOG.info("Node {} refused to transfer leadership to peer {} as it is not in {}.", new Object[]{this.getNodeId(), peer, this.conf});
                Status status = new Status(RaftError.EINVAL, "Not in current configuration", new Object[0]);
                return status;
            }
            long lastLogIndex = this.logManager.getLastLogIndex();
            if (!this.replicatorGroup.transferLeadershipTo(peerId, lastLogIndex)) {
                LOG.warn("No such peer [node={}, peer={}].", new Object[]{this.getNodeId(), peer});
                Status status = new Status(RaftError.EINVAL, "No such peer %s", peer);
                return status;
            }
            this.state = State.STATE_TRANSFERRING;
            Status status = new Status(RaftError.ETRANSFERLEADERSHIP, "Raft leader is transferring leadership to %s", peerId);
            this.onLeaderStop(status);
            LOG.info("Node {} starts to transfer leadership to peer {}.", new Object[]{this.getNodeId(), peer});
            this.stopTransferArg = stopArg = new StopTransferArg(this, this.currTerm, peerId);
            this.transferTimer = this.getOptions().getScheduler().schedule(() -> this.onTransferTimeout(stopArg), this.options.getElectionTimeoutMs(), TimeUnit.MILLISECONDS);
        }
        finally {
            this.writeLock.unlock();
        }
        return Status.OK();
    }

    private void onLeaderStop(Status status) {
        this.replicatorGroup.clearFailureReplicators();
        this.fsmCaller.onLeaderStop(status);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Message handleTimeoutNowRequest(RpcRequests.TimeoutNowRequest request, RpcRequestClosure done) {
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            if (request.term() != this.currTerm) {
                long savedCurrTerm = this.currTerm;
                if (request.term() > this.currTerm) {
                    this.stepDown(request.term(), false, new Status(RaftError.EHIGHERTERMREQUEST, "Raft node receives higher term request", new Object[0]));
                }
                LOG.info("Node {} received TimeoutNowRequest from {} while currTerm={} didn't match requestTerm={}.", new Object[]{this.getNodeId(), request.peerId(), savedCurrTerm, request.term()});
                RpcRequests.TimeoutNowResponse timeoutNowResponse = this.raftOptions.getRaftMessagesFactory().timeoutNowResponse().term(this.currTerm).success(false).build();
                return timeoutNowResponse;
            }
            if (this.state != State.STATE_FOLLOWER) {
                LOG.info("Node {} received TimeoutNowRequest from {}, while state={}, term={}.", new Object[]{this.getNodeId(), request.serverId(), this.state, this.currTerm});
                RpcRequests.TimeoutNowResponse savedCurrTerm = this.raftOptions.getRaftMessagesFactory().timeoutNowResponse().term(this.currTerm).success(false).build();
                return savedCurrTerm;
            }
            this.getOptions().getClock().update(request.timestamp());
            long savedTerm = this.currTerm;
            RpcRequests.TimeoutNowResponse resp = this.raftOptions.getRaftMessagesFactory().timeoutNowResponse().term(this.currTerm + 1L).success(true).build();
            done.sendResponse(resp);
            doUnlock = false;
            LOG.info("Node {} received TimeoutNowRequest from {}, term={} and starts voting.", new Object[]{this.getNodeId(), request.serverId(), savedTerm});
            this.electSelf();
        }
        finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Message handleInstallSnapshot(RpcRequests.InstallSnapshotRequest request, RpcRequestClosure done) {
        Object groupId;
        if (this.snapshotExecutor == null) {
            return RaftRpcFactory.DEFAULT.newResponse(this.raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, "Not supported snapshot", new Object[0]);
        }
        PeerId serverId = new PeerId();
        if (!serverId.parse(request.serverId())) {
            LOG.warn("Node {} ignore InstallSnapshotRequest from {} bad server id.", new Object[]{this.getNodeId(), request.serverId()});
            return RaftRpcFactory.DEFAULT.newResponse(this.raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, "Parse serverId failed: %s", request.serverId());
        }
        PeerId dstPeerId = new PeerId();
        if (dstPeerId.parse(request.peerId())) {
            groupId = request.groupId();
            Node node = done.getRpcCtx().getNodeManager().get((String)groupId, dstPeerId);
            if (node == null) {
                return RaftRpcFactory.DEFAULT.newResponse(this.raftOptions.getRaftMessagesFactory(), RaftError.ENOENT, "Peer id not found: %s, group: %s", request.peerId(), groupId);
            }
        } else {
            return RaftRpcFactory.DEFAULT.newResponse(this.raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, "Fail to parse peerId: %s", request.peerId());
        }
        this.writeLock.lock();
        try {
            if (!this.state.isActive()) {
                LOG.info("Node {} ignore InstallSnapshotRequest as it is not in active state {}.", new Object[]{this.getNodeId(), this.state});
                groupId = RaftRpcFactory.DEFAULT.newResponse(this.raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, "Node %s:%s is not in active state, state %s.", this.groupId, this.serverId, this.state.name());
                return groupId;
            }
            if (request.term() < this.currTerm) {
                LOG.warn("Node {} ignore stale InstallSnapshotRequest from {}, term={}, currTerm={}.", new Object[]{this.getNodeId(), request.peerId(), request.term(), this.currTerm});
                groupId = this.raftOptions.getRaftMessagesFactory().installSnapshotResponse().term(this.currTerm).success(false).build();
                return groupId;
            }
            this.checkStepDown(request.term(), serverId);
            if (!serverId.equals(this.leaderId)) {
                LOG.error("Another peer {} of groupId={} declares that it is the leader at term {} which was occupied by leader {}.", new Object[]{serverId, this.groupId, this.currTerm, this.leaderId});
                this.stepDown(request.term() + 1L, false, new Status(RaftError.ELEADERCONFLICT, "More than one leader in the same term.", new Object[0]));
                groupId = this.raftOptions.getRaftMessagesFactory().installSnapshotResponse().term(request.term() + 1L).success(false).build();
                return groupId;
            }
        }
        finally {
            this.writeLock.unlock();
        }
        long startMs = Utils.monotonicMs();
        try {
            if (LOG.isInfoEnabled()) {
                LOG.info("Node {} received InstallSnapshotRequest from {}, lastIncludedLogIndex={}, lastIncludedLogTerm={}, lastLogId={}.", new Object[]{this.getNodeId(), request.serverId(), request.meta().lastIncludedIndex(), request.meta().lastIncludedTerm(), this.logManager.getLastLogId(false)});
            }
            this.snapshotExecutor.installSnapshot(request, this.raftOptions.getRaftMessagesFactory().installSnapshotResponse(), done);
            Message message = null;
            return message;
        }
        finally {
            this.metrics.recordLatency("install-snapshot", Utils.monotonicMs() - startMs);
        }
    }

    public void updateConfigurationAfterInstallingSnapshot() {
        this.checkAndSetConfiguration(false);
    }

    private void stopReplicator(Collection<PeerId> keep, Collection<PeerId> drop) {
        if (drop != null) {
            for (PeerId peer : drop) {
                if (keep.contains(peer) || peer.equals(this.serverId)) continue;
                this.replicatorGroup.stopReplicator(peer);
            }
        }
    }

    @Override
    public UserLog readCommittedUserLog(long index) {
        if (index <= 0L) {
            throw new LogIndexOutOfBoundsException("Request index is invalid: " + index);
        }
        long savedLastAppliedIndex = this.fsmCaller.getLastAppliedIndex();
        if (index > savedLastAppliedIndex) {
            throw new LogIndexOutOfBoundsException("Request index " + index + " is greater than lastAppliedIndex: " + savedLastAppliedIndex);
        }
        long curIndex = index;
        LogEntry entry = this.logManager.getEntry(curIndex);
        if (entry == null) {
            throw new LogNotFoundException("User log is deleted at index: " + index);
        }
        do {
            if (entry.getType() == EnumOutter.EntryType.ENTRY_TYPE_DATA) {
                return new UserLog(curIndex, entry.getData());
            }
            if (++curIndex <= savedLastAppliedIndex) continue;
            throw new IllegalStateException("No user log between index:" + index + " and last_applied_index:" + savedLastAppliedIndex);
        } while ((entry = this.logManager.getEntry(curIndex)) != null);
        throw new LogNotFoundException("User log is deleted at index: " + curIndex);
    }

    @Override
    public void addReplicatorStateListener(Replicator.ReplicatorStateListener replicatorStateListener) {
        Requires.requireNonNull(replicatorStateListener, "replicatorStateListener");
        this.replicatorStateListeners.add(replicatorStateListener);
    }

    @Override
    public void removeReplicatorStateListener(Replicator.ReplicatorStateListener replicatorStateListener) {
        Requires.requireNonNull(replicatorStateListener, "replicatorStateListener");
        this.replicatorStateListeners.remove(replicatorStateListener);
    }

    @Override
    public void clearReplicatorStateListeners() {
        this.replicatorStateListeners.clear();
    }

    @Override
    public List<Replicator.ReplicatorStateListener> getReplicatorStateListeners() {
        return this.replicatorStateListeners;
    }

    @Override
    public int getNodeTargetPriority() {
        return this.targetPriority;
    }

    @Override
    public State getNodeState() {
        return this.state;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void describe(Describer.Printer out) {
        int _targetPriority;
        String _conf;
        long _currTerm;
        String _leaderId;
        String _state;
        String _nodeId;
        this.readLock.lock();
        try {
            _nodeId = String.valueOf(this.getNodeId());
            _state = String.valueOf((Object)this.state);
            _leaderId = String.valueOf(this.leaderId);
            _currTerm = this.currTerm;
            _conf = String.valueOf(this.conf);
            _targetPriority = this.targetPriority;
        }
        finally {
            this.readLock.unlock();
        }
        out.print("nodeId: ").println(_nodeId);
        out.print("state: ").println(_state);
        out.print("leaderId: ").println(_leaderId);
        out.print("term: ").println(_currTerm);
        out.print("conf: ").println(_conf);
        out.print("targetPriority: ").println(_targetPriority);
        out.println("electionTimer: ");
        this.electionTimer.describe(out);
        out.println("voteTimer: ");
        this.voteTimer.describe(out);
        out.println("stepDownTimer: ");
        this.stepDownTimer.describe(out);
        out.println("snapshotTimer: ");
        this.snapshotTimer.describe(out);
        out.println("logManager: ");
        this.logManager.describe(out);
        out.println("fsmCaller: ");
        this.fsmCaller.describe(out);
        out.println("ballotBox: ");
        this.ballotBox.describe(out);
        if (this.snapshotExecutor != null) {
            out.println("snapshotExecutor: ");
            this.snapshotExecutor.describe(out);
        }
        out.println("replicatorGroup: ");
        this.replicatorGroup.describe(out);
        if (this.logStorage instanceof Describer) {
            out.println("logStorage: ");
            ((Describer)((Object)this.logStorage)).describe(out);
        }
    }

    public State getState() {
        return this.state;
    }

    public String toString() {
        return "JRaftNode [nodeId=" + this.getNodeId() + "]";
    }

    private void logConfigurationChange(Configuration newConfiguration) {
        if (LOG.isDebugEnabled()) {
            if (this.conf.getConf().equals(newConfiguration)) {
                LOG.debug("Node {} change configuration to the same one {}.", new Object[]{this.getNodeId(), this.conf.getConf()});
            } else {
                LOG.debug("Node {} change configuration from {} to {}.", new Object[]{this.getNodeId(), this.conf.getConf(), newConfiguration});
            }
        }
    }

    @TestOnly
    public LogStorage logStorage() {
        return this.logStorage;
    }

    @TestOnly
    public FSMCaller fsmCaller() {
        return this.fsmCaller;
    }

    private static class NodeReadWriteLock
    extends LongHeldDetectingReadWriteLock {
        static final long MAX_BLOCKING_MS_TO_REPORT = SystemPropertyUtil.getLong("jraft.node.detecting.lock.max_blocking_ms_to_report", -1L);
        private final Node node;

        NodeReadWriteLock(Node node) {
            super(MAX_BLOCKING_MS_TO_REPORT, TimeUnit.MILLISECONDS);
            this.node = node;
        }

        @Override
        public void report(LongHeldDetectingReadWriteLock.AcquireMode acquireMode, Thread heldThread, Collection<Thread> queuedThreads, long blockedNanos) {
            long blockedMs = TimeUnit.NANOSECONDS.toMillis(blockedNanos);
            LOG.warn("Raft-Node-Lock report [node={}, currentThread={}, acquireMode={}, heldThread={}, queuedThreads={}, blockedMs={}].", new Object[]{this.node.getNodeId(), Thread.currentThread(), acquireMode, heldThread, queuedThreads, blockedMs});
            NodeMetrics metrics = this.node.getNodeMetrics();
            if (metrics != null) {
                metrics.recordLatency("node-lock-blocked", blockedMs);
            }
        }
    }

    private static class ConfigurationCtx {
        final NodeImpl node;
        Stage stage;
        int nchanges;
        long version;
        List<PeerId> newPeers = new ArrayList<PeerId>();
        List<PeerId> oldPeers = new ArrayList<PeerId>();
        List<PeerId> addingPeers = new ArrayList<PeerId>();
        List<PeerId> newLearners = new ArrayList<PeerId>();
        List<PeerId> oldLearners = new ArrayList<PeerId>();
        Closure done;
        boolean async;

        ConfigurationCtx(NodeImpl node) {
            this.node = node;
            this.stage = Stage.STAGE_NONE;
            this.version = 0L;
            this.done = null;
        }

        void start(Configuration oldConf, Configuration newConf, Closure done, boolean async) {
            if (this.isBusy()) {
                if (done != null) {
                    Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), done, new Status(RaftError.EBUSY, "Already in busy stage.", new Object[0]));
                }
                throw new IllegalStateException("Busy stage");
            }
            if (this.done != null) {
                if (done != null) {
                    Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), done, new Status(RaftError.EINVAL, "Already have done closure.", new Object[0]));
                }
                throw new IllegalArgumentException("Already have done closure");
            }
            this.done = done;
            this.stage = Stage.STAGE_CATCHING_UP;
            this.async = async;
            if (async) {
                Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), done, Status.OK());
            }
            this.oldPeers = oldConf.listPeers();
            this.newPeers = newConf.listPeers();
            this.oldLearners = oldConf.listLearners();
            this.newLearners = newConf.listLearners();
            Configuration adding = new Configuration();
            Configuration removing = new Configuration();
            newConf.diff(oldConf, adding, removing);
            this.nchanges = adding.size() + removing.size();
            this.addNewLearners();
            if (adding.isEmpty()) {
                this.nextStage();
                return;
            }
            this.addNewPeers(adding);
        }

        private void addNewPeers(Configuration adding) {
            this.addingPeers = adding.listPeers();
            LOG.info("Adding peers [node={}, peers={}].", new Object[]{this.node.getNodeId(), this.addingPeers});
            for (PeerId newPeer : this.addingPeers) {
                if (!this.node.replicatorGroup.addReplicator(newPeer)) {
                    LOG.error("Node {} start the replicator failed, peer={}.", new Object[]{this.node.getNodeId(), newPeer});
                    this.onCaughtUp(this.version, newPeer, false);
                    return;
                }
                OnCaughtUp caughtUp = new OnCaughtUp(this.node, this.node.currTerm, newPeer, this.version);
                long dueTime = Utils.nowMs() + (long)this.node.options.getElectionTimeoutMs();
                if (this.node.replicatorGroup.waitCaughtUp(newPeer, this.node.options.getCatchupMargin(), dueTime, caughtUp)) continue;
                LOG.error("Node {} waitCaughtUp, peer={}.", new Object[]{this.node.getNodeId(), newPeer});
                this.onCaughtUp(this.version, newPeer, false);
                return;
            }
        }

        private void addNewLearners() {
            HashSet<PeerId> addingLearners = new HashSet<PeerId>(this.newLearners);
            addingLearners.removeAll(this.oldLearners);
            LOG.info("Adding learners [node={}, learners={}].", new Object[]{this.node.getNodeId(), addingLearners});
            for (PeerId newLearner : addingLearners) {
                if (this.node.replicatorGroup.addReplicator(newLearner, ReplicatorType.Learner)) continue;
                LOG.error("Node {} start the learner replicator failed, peer={}.", new Object[]{this.node.getNodeId(), newLearner});
            }
        }

        void onCaughtUp(long version, PeerId peer, boolean success) {
            if (version != this.version) {
                LOG.warn("Ignore onCaughtUp message, mismatch configuration context [node={}, expectedVersion={}, gotVersion={}].", new Object[]{this.node.getNodeId(), this.version, version});
                return;
            }
            Requires.requireTrue(this.stage == Stage.STAGE_CATCHING_UP, "Stage is not in STAGE_CATCHING_UP");
            if (success) {
                LOG.info("Catch up for the peer was finished [node={}, peer={}]", new Object[]{this.node.getNodeId(), peer});
                this.addingPeers.remove(peer);
                if (this.addingPeers.isEmpty()) {
                    this.nextStage();
                    return;
                }
                return;
            }
            LOG.warn("Node {} fail to catch up peer {} when trying to change peers from {} to {}.", new Object[]{this.node.getNodeId(), peer, this.oldPeers, this.newPeers});
            this.reset(new Status(RaftError.ECATCHUP, "Peer %s failed to catch up.", peer));
        }

        void reset() {
            this.reset(null);
        }

        void reset(Status st) {
            if (st != null && st.isOk()) {
                this.node.stopReplicator(this.newPeers, this.oldPeers);
                this.node.stopReplicator(this.newLearners, this.oldLearners);
            } else {
                this.node.stopReplicator(this.oldPeers, this.newPeers);
                this.node.stopReplicator(this.oldLearners, this.newLearners);
            }
            List<PeerId> resultPeerIds = List.copyOf(this.newPeers);
            List<PeerId> resultLearnerIds = List.copyOf(this.newLearners);
            this.clearPeers();
            this.clearLearners();
            ++this.version;
            this.stage = Stage.STAGE_NONE;
            this.nchanges = 0;
            Closure oldDoneClosure = this.done;
            if (this.done != null) {
                Closure newDone = status -> {
                    JraftGroupEventsListener listener = this.node.getOptions().getRaftGrpEvtsLsnr();
                    if (listener != null) {
                        if (status.isOk()) {
                            LogId id = this.node.conf.getId();
                            listener.onNewPeersConfigurationApplied(resultPeerIds, resultLearnerIds, id.getTerm(), id.getIndex());
                        } else {
                            listener.onReconfigurationError(status, resultPeerIds, resultLearnerIds, this.node.getCurrentTerm());
                        }
                    }
                    if (!this.async) {
                        oldDoneClosure.run(status);
                    }
                };
                Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), newDone, st != null ? st : LEADER_STEPPED_DOWN);
                this.done = null;
            }
        }

        private void clearLearners() {
            this.newLearners.clear();
            this.oldLearners.clear();
        }

        private void clearPeers() {
            this.newPeers.clear();
            this.oldPeers.clear();
            this.addingPeers.clear();
        }

        void flush(Configuration conf, Configuration oldConf) {
            Requires.requireTrue(!this.isBusy(), "Flush when busy");
            this.newPeers = conf.listPeers();
            this.newLearners = conf.listLearners();
            if (oldConf == null || oldConf.isEmpty()) {
                this.stage = Stage.STAGE_STABLE;
                this.oldPeers = this.newPeers;
                this.oldLearners = this.newLearners;
            } else {
                this.stage = Stage.STAGE_JOINT;
                this.oldPeers = oldConf.listPeers();
                this.oldLearners = oldConf.listLearners();
            }
            this.node.unsafeApplyConfiguration(conf, oldConf == null || oldConf.isEmpty() ? null : oldConf, true);
        }

        void nextStage() {
            Requires.requireTrue(this.isBusy(), "Not in busy stage");
            switch (this.stage) {
                case STAGE_CATCHING_UP: {
                    LOG.info("Catch up phase to change peers was successfully finished [node={}, from peers={} to peers={}, from learners={}, to learners={}].", new Object[]{this.node.getNodeId(), this.oldPeers, this.newPeers, this.oldLearners, this.newLearners});
                    if (this.nchanges > 0) {
                        this.stage = Stage.STAGE_JOINT;
                        this.node.unsafeApplyConfiguration(new Configuration(this.newPeers, this.newLearners), new Configuration(this.oldPeers), false);
                        return;
                    }
                }
                case STAGE_JOINT: {
                    this.stage = Stage.STAGE_STABLE;
                    this.node.unsafeApplyConfiguration(new Configuration(this.newPeers, this.newLearners), null, false);
                    break;
                }
                case STAGE_STABLE: {
                    boolean shouldStepDown = !this.newPeers.contains(this.node.serverId);
                    this.reset(new Status());
                    if (!shouldStepDown) break;
                    this.node.stepDown(this.node.currTerm, true, new Status(RaftError.ELEADERREMOVED, "This node was removed.", new Object[0]));
                    break;
                }
                case STAGE_NONE: {
                    Requires.requireTrue(false, "Can't reach here");
                }
            }
        }

        boolean isBusy() {
            return this.stage != Stage.STAGE_NONE;
        }

        static enum Stage {
            STAGE_NONE,
            STAGE_CATCHING_UP,
            STAGE_JOINT,
            STAGE_STABLE;

        }
    }

    private static class BootstrapStableClosure
    extends LogManager.StableClosure {
        private final SynchronizedClosure done = new SynchronizedClosure();

        BootstrapStableClosure() {
            super(null);
        }

        public Status await() throws InterruptedException {
            return this.done.await();
        }

        @Override
        public void run(Status status) {
            this.done.run(status);
        }
    }

    private class LogEntryAndClosureHandler
    implements EventHandler<LogEntryAndClosure> {
        private final List<LogEntryAndClosure> tasks;
        @Nullable
        private HybridTimestamp safeTs;

        private LogEntryAndClosureHandler() {
            this.tasks = new ArrayList<LogEntryAndClosure>(NodeImpl.this.raftOptions.getApplyBatch());
            this.safeTs = null;
        }

        public void onEvent(LogEntryAndClosure event, long sequence, boolean endOfBatch) {
            SafeTimeAwareCommandClosure clo;
            WriteCommand command;
            HybridTimestamp timestamp;
            if (event.shutdownLatch != null) {
                if (!this.tasks.isEmpty()) {
                    NodeImpl.this.executeApplyingTasks(this.tasks);
                    this.reset();
                }
                event.shutdownLatch.countDown();
                return;
            }
            if (event.done instanceof SafeTimeAwareCommandClosure && (timestamp = (command = (WriteCommand)(clo = (SafeTimeAwareCommandClosure)event.done).command()).initiatorTime()) != null) {
                if (this.safeTs == null) {
                    this.safeTs = NodeImpl.this.clock.update(timestamp);
                } else if (timestamp.compareTo(this.safeTs) > 0) {
                    this.safeTs = NodeImpl.this.clock.update(timestamp);
                }
                clo.safeTimestamp(this.safeTs);
            }
            this.tasks.add(event);
            if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
                NodeImpl.this.executeApplyingTasks(this.tasks);
                this.reset();
            }
        }

        private void reset() {
            for (LogEntryAndClosure task : this.tasks) {
                task.reset();
            }
            this.tasks.clear();
            this.safeTs = null;
        }
    }

    private static class StopTransferArg {
        final NodeImpl node;
        final long term;
        final PeerId peer;

        StopTransferArg(NodeImpl node, long term, PeerId peer) {
            this.node = node;
            this.term = term;
            this.peer = peer;
        }
    }

    public static class LogEntryAndClosure
    extends NodeIdAware {
        LogEntry entry;
        Closure done;
        long expectedTerm;
        CountDownLatch shutdownLatch;

        @Override
        public void reset() {
            super.reset();
            this.entry = null;
            this.done = null;
            this.expectedTerm = 0L;
            this.shutdownLatch = null;
        }
    }

    class LeaderStableClosure
    extends LogManager.StableClosure {
        LeaderStableClosure(List<LogEntry> entries) {
            super(entries);
        }

        @Override
        public void run(Status status) {
            if (status.isOk()) {
                NodeImpl.this.ballotBox.commitAt(this.firstLogIndex, this.firstLogIndex + (long)this.nEntries - 1L, NodeImpl.this.serverId);
            } else {
                LOG.error("Node {} append [{}, {}] failed, status={}.", new Object[]{NodeImpl.this.getNodeId(), this.firstLogIndex, this.firstLogIndex + (long)this.nEntries - 1L, status});
            }
        }
    }

    public static class QuorumConfirmedHeartbeatResponseClosure<T extends Message>
    extends RpcResponseClosureAdapter<RpcRequests.AppendEntriesResponse> {
        final Function<Boolean, T> responseBuilder;
        final Consumer<T> responseConsumer;
        final int quorum;
        final int failPeersThreshold;
        int ackSuccess;
        int ackFailures;
        boolean isDone;

        QuorumConfirmedHeartbeatResponseClosure(Consumer<T> responseConsumer, Function<Boolean, T> responseBuilder, int quorum, int peersCount) {
            this.responseConsumer = responseConsumer;
            this.responseBuilder = responseBuilder;
            this.quorum = quorum;
            this.failPeersThreshold = peersCount % 2 == 0 ? quorum - 1 : quorum;
            this.ackSuccess = 0;
            this.ackFailures = 0;
            this.isDone = false;
        }

        @Override
        public synchronized void run(Status status) {
            if (this.isDone) {
                return;
            }
            if (status.isOk() && ((RpcRequests.AppendEntriesResponse)this.getResponse()).success()) {
                ++this.ackSuccess;
            } else {
                ++this.ackFailures;
            }
            if (this.ackSuccess + 1 >= this.quorum) {
                Message response = (Message)this.responseBuilder.apply(true);
                this.responseConsumer.accept(response);
                this.isDone = true;
            } else if (this.ackFailures >= this.failPeersThreshold) {
                Message response = (Message)this.responseBuilder.apply(false);
                this.responseConsumer.accept(response);
                this.isDone = true;
            }
        }
    }

    private static class FollowerStableClosure
    extends LogManager.StableClosure {
        final long committedIndex;
        final AppendEntriesResponseBuilder responseBuilder;
        final NodeImpl node;
        final RpcRequestClosure done;
        final long term;

        FollowerStableClosure(RpcRequests.AppendEntriesRequest request, AppendEntriesResponseBuilder responseBuilder, NodeImpl node, RpcRequestClosure done, long term) {
            super(null);
            this.committedIndex = Math.min(request.committedIndex(), request.prevLogIndex() + (long)Utils.size(request.entriesList()));
            this.responseBuilder = responseBuilder;
            this.node = node;
            this.done = done;
            this.term = term;
        }

        @Override
        public void run(Status status) {
            if (!status.isOk()) {
                this.done.run(status);
                return;
            }
            this.node.readLock.lock();
            try {
                if (this.term != this.node.currTerm) {
                    this.responseBuilder.success(false).term(this.node.currTerm);
                    this.done.sendResponse(this.responseBuilder.build());
                    return;
                }
            }
            finally {
                this.node.readLock.unlock();
            }
            this.responseBuilder.success(true).term(this.term);
            this.node.ballotBox.setLastCommittedIndex(this.committedIndex);
            this.done.sendResponse(this.responseBuilder.build());
        }
    }

    private static class OnCaughtUp
    extends CatchUpClosure {
        private final NodeImpl node;
        private final long term;
        private final PeerId peer;
        private final long version;

        OnCaughtUp(NodeImpl node, long term, PeerId peer, long version) {
            this.node = node;
            this.term = term;
            this.peer = peer;
            this.version = version;
        }

        @Override
        public void run(Status status) {
            this.node.onCaughtUp(this.peer, this.term, this.version, status);
        }
    }

    private class ConfigurationChangeDone
    implements Closure {
        private final long term;
        private final boolean leaderStart;

        ConfigurationChangeDone(long term, boolean leaderStart) {
            this.term = term;
            this.leaderStart = leaderStart;
        }

        @Override
        public void run(Status status) {
            if (status.isOk()) {
                NodeImpl.this.onConfigurationChangeDone(this.term);
                if (this.leaderStart) {
                    if (NodeImpl.this.getOptions().getRaftGrpEvtsLsnr() != null) {
                        ConfigurationEntry targetConfiguration = NodeImpl.this.logManager.checkAndSetConfiguration(NodeImpl.this.conf);
                        NodeImpl.this.options.getRaftGrpEvtsLsnr().onLeaderElected(this.term, targetConfiguration.getId().getTerm(), targetConfiguration.getId().getIndex(), targetConfiguration.getConf().listPeers(), targetConfiguration.getConf().listLearners());
                    }
                    NodeImpl.this.getOptions().getFsm().onLeaderStart(this.term);
                }
            } else {
                LOG.error("Fail to run ConfigurationChangeDone, [node={}, status={}].", new Object[]{NodeImpl.this.getNodeId(), status});
            }
        }
    }

    private class OnPreVoteRpcDone
    extends RpcResponseClosureAdapter<RpcRequests.RequestVoteResponse> {
        final long startMs = Utils.monotonicMs();
        final PeerId peer;
        final long term;
        RpcRequests.RequestVoteRequest request;

        OnPreVoteRpcDone(PeerId peer, long term) {
            this.peer = peer;
            this.term = term;
        }

        @Override
        public void run(Status status) {
            long latency = Utils.monotonicMs() - this.startMs;
            NodeImpl.this.metrics.recordLatency("pre-vote", latency);
            if (!status.isOk()) {
                LOG.warn("Node {} PreVote to {} latency={} error: {}.", new Object[]{NodeImpl.this.getNodeId(), this.peer, latency, status});
            } else {
                NodeImpl.this.handlePreVoteResponse(this.peer, this.term, (RpcRequests.RequestVoteResponse)this.getResponse());
            }
        }
    }

    private class OnRequestVoteRpcDone
    extends RpcResponseClosureAdapter<RpcRequests.RequestVoteResponse> {
        final long startMs = Utils.monotonicMs();
        final PeerId peer;
        final long term;
        final NodeImpl node;
        RpcRequests.RequestVoteRequest request;

        OnRequestVoteRpcDone(PeerId peer, long term, NodeImpl node) {
            this.peer = peer;
            this.term = term;
            this.node = node;
        }

        @Override
        public void run(Status status) {
            NodeImpl.this.metrics.recordLatency("request-vote", Utils.monotonicMs() - this.startMs);
            if (!status.isOk()) {
                LOG.warn("Node {} RequestVote to {} error: {}.", new Object[]{this.node.getNodeId(), this.peer, status});
            } else {
                this.node.handleRequestVoteResponse(this.peer, this.term, (RpcRequests.RequestVoteResponse)this.getResponse());
            }
        }
    }
}

