/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.raft.server.impl;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.FailureType;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metrics.sources.RaftMetricSource;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.raft.IndexWithTerm;
import org.apache.ignite.internal.raft.JraftGroupEventsListener;
import org.apache.ignite.internal.raft.Marshaller;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.StoredRaftNodeId;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.GroupStoragesContextResolver;
import org.apache.ignite.internal.raft.server.impl.RaftGroupEventsListenerAdapter;
import org.apache.ignite.internal.raft.server.impl.RaftServiceEventInterceptor;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.storage.GroupStoragesDestructionIntents;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
import org.apache.ignite.internal.raft.storage.impl.StorageDestructionIntent;
import org.apache.ignite.internal.raft.storage.impl.StoragesDestructionContext;
import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.Iterator;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.ActionRequestInterceptor;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.NullActionRequestInterceptor;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestInterceptor;
import org.apache.ignite.raft.jraft.rpc.impl.core.NullAppendEntriesRequestInterceptor;
import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.raft.jraft.util.Utils;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class JraftServerImpl
implements RaftServer {
    private static final IgniteLogger LOG = Loggers.forClass(JraftServerImpl.class);
    private final ClusterService service;
    private final FailureManager failureManager;
    private final GroupStoragesDestructionIntents groupStoragesDestructionIntents;
    private final GroupStoragesContextResolver groupStoragesContextResolver;
    private IgniteRpcServer rpcServer;
    private final ConcurrentMap<RaftNodeId, RaftGroupService> nodes = new ConcurrentHashMap<RaftNodeId, RaftGroupService>();
    private final List<Object> startGroupInProgressMonitors;
    private final NodeOptions opts;
    private final RaftGroupEventsClientListener raftGroupEventsClientListener;
    private ExecutorService requestExecutor;
    private final RaftServiceEventInterceptor serviceEventInterceptor;
    private AppendEntriesRequestInterceptor appendEntriesRequestInterceptor = new NullAppendEntriesRequestInterceptor();
    private ActionRequestInterceptor actionRequestInterceptor = new NullActionRequestInterceptor();
    private static final int SIMULTANEOUS_GROUP_START_PARALLELISM = Math.min(Utils.cpus() * 3, 25);

    public JraftServerImpl(ClusterService service, NodeOptions opts, RaftGroupEventsClientListener raftGroupEventsClientListener, FailureManager failureManager, GroupStoragesDestructionIntents groupStoragesDestructionIntents, GroupStoragesContextResolver groupStoragesContextResolver) {
        this.service = service;
        this.groupStoragesContextResolver = groupStoragesContextResolver;
        this.groupStoragesDestructionIntents = groupStoragesDestructionIntents;
        this.opts = opts;
        this.raftGroupEventsClientListener = raftGroupEventsClientListener;
        this.failureManager = failureManager;
        this.opts.setRpcConnectTimeoutMs(this.opts.getElectionTimeoutMs() / 3);
        this.opts.setRpcDefaultTimeout(this.opts.getElectionTimeoutMs() / 2);
        this.opts.setSharedPools(true);
        if (opts.getServerName() == null) {
            this.opts.setServerName(service.nodeName());
        }
        this.opts.setElectionTimeoutStrategy(new ExponentialBackoffTimeoutStrategy(11000, 3L));
        ArrayList<Object> monitors = new ArrayList<Object>(SIMULTANEOUS_GROUP_START_PARALLELISM);
        for (int i = 0; i < SIMULTANEOUS_GROUP_START_PARALLELISM; ++i) {
            monitors.add(new Object());
        }
        this.startGroupInProgressMonitors = Collections.unmodifiableList(monitors);
        this.serviceEventInterceptor = new RaftServiceEventInterceptor();
    }

    public void appendEntriesRequestInterceptor(AppendEntriesRequestInterceptor appendEntriesRequestInterceptor) {
        this.appendEntriesRequestInterceptor = appendEntriesRequestInterceptor;
    }

    public void actionRequestInterceptor(ActionRequestInterceptor actionRequestInterceptor) {
        this.actionRequestInterceptor = actionRequestInterceptor;
    }

    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        assert (this.opts.isSharedPools()) : "RAFT server is supposed to run in shared pools mode";
        if (this.opts.getCommonExecutor() == null) {
            this.opts.setCommonExecutor(JRaftUtils.createCommonExecutor(this.opts));
        }
        if (this.opts.getStripedExecutor() == null) {
            this.opts.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(this.opts));
        }
        if (this.opts.getScheduler() == null) {
            this.opts.setScheduler(JRaftUtils.createScheduler(this.opts));
        }
        if (this.opts.getClientExecutor() == null) {
            this.opts.setClientExecutor(JRaftUtils.createClientExecutor(this.opts, this.opts.getServerName()));
        }
        if (this.opts.getVoteTimer() == null) {
            this.opts.setVoteTimer(JRaftUtils.createTimer(this.opts, "JRaft-VoteTimer"));
        }
        if (this.opts.getElectionTimer() == null) {
            this.opts.setElectionTimer(JRaftUtils.createTimer(this.opts, "JRaft-ElectionTimer"));
        }
        if (this.opts.getStepDownTimer() == null) {
            this.opts.setStepDownTimer(JRaftUtils.createTimer(this.opts, "JRaft-StepDownTimer"));
        }
        if (this.opts.getSnapshotTimer() == null) {
            this.opts.setSnapshotTimer(JRaftUtils.createTimer(this.opts, "JRaft-SnapshotTimer"));
        }
        if (this.opts.getNodeManager() == null) {
            this.opts.setNodeManager(new NodeManager(this.service));
        }
        this.requestExecutor = Executors.newFixedThreadPool(this.opts.getRaftRpcThreadPoolSize(), (ThreadFactory)IgniteThreadFactory.create((String)this.opts.getServerName(), (String)"JRaft-Request-Processor", (IgniteLogger)LOG, (ThreadOperation[])new ThreadOperation[]{ThreadOperation.PROCESS_RAFT_REQ}));
        this.rpcServer = new IgniteRpcServer(this.service, this.opts.getNodeManager(), this.opts.getRaftMessagesFactory(), this.requestExecutor, this.serviceEventInterceptor, this.raftGroupEventsClientListener, this.appendEntriesRequestInterceptor, this.actionRequestInterceptor);
        if (this.opts.getRaftMetrics() == null) {
            this.opts.setRaftMetrics(new RaftMetricSource(this.opts.getStripes(), this.opts.getLogStripesCount()));
        }
        if (this.opts.getfSMCallerExecutorDisruptor() == null) {
            this.opts.setfSMCallerExecutorDisruptor(new StripedDisruptor<FSMCallerImpl.ApplyTask>(this.opts.getServerName(), "JRaft-FSMCaller-Disruptor", (stripeName, logger) -> IgniteThreadFactory.create((String)this.opts.getServerName(), (String)stripeName, (boolean)true, (IgniteLogger)logger, (ThreadOperation[])new ThreadOperation[]{ThreadOperation.STORAGE_READ, ThreadOperation.STORAGE_WRITE}), this.opts.getRaftOptions().getDisruptorBufferSize(), FSMCallerImpl.ApplyTask::new, this.opts.getStripes(), false, false, this.opts.getRaftMetrics().disruptorMetrics("raft.fsmcaller.disruptor")));
        }
        if (this.opts.getNodeApplyDisruptor() == null) {
            this.opts.setNodeApplyDisruptor(new StripedDisruptor<NodeImpl.LogEntryAndClosure>(this.opts.getServerName(), "JRaft-NodeImpl-Disruptor", (stripeName, logger) -> IgniteThreadFactory.create((String)this.opts.getServerName(), (String)stripeName, (boolean)true, (IgniteLogger)logger, (ThreadOperation[])new ThreadOperation[0]), this.opts.getRaftOptions().getDisruptorBufferSize(), NodeImpl.LogEntryAndClosure::new, this.opts.getStripes(), false, false, this.opts.getRaftMetrics().disruptorMetrics("raft.nodeimpl.disruptor")));
        }
        if (this.opts.getReadOnlyServiceDisruptor() == null) {
            this.opts.setReadOnlyServiceDisruptor(new StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent>(this.opts.getServerName(), "JRaft-ReadOnlyService-Disruptor", (stripeName, logger) -> IgniteThreadFactory.create((String)this.opts.getServerName(), (String)stripeName, (boolean)true, (IgniteLogger)logger, (ThreadOperation[])new ThreadOperation[0]), this.opts.getRaftOptions().getDisruptorBufferSize(), ReadOnlyServiceImpl.ReadIndexEvent::new, this.opts.getStripes(), false, false, this.opts.getRaftMetrics().disruptorMetrics("raft.readonlyservice.disruptor")));
        }
        if (this.opts.getLogManagerDisruptor() == null) {
            this.opts.setLogManagerDisruptor(new StripedDisruptor<LogManagerImpl.StableClosureEvent>(this.opts.getServerName(), "JRaft-LogManager-Disruptor", (stripeName, logger) -> IgniteThreadFactory.create((String)this.opts.getServerName(), (String)stripeName, (boolean)true, (IgniteLogger)logger, (ThreadOperation[])new ThreadOperation[0]), this.opts.getRaftOptions().getDisruptorBufferSize(), LogManagerImpl.StableClosureEvent::new, this.opts.getLogStripesCount(), true, this.opts.isLogYieldStrategy(), this.opts.getRaftMetrics().disruptorMetrics("raft.logmanager.disruptor")));
            this.opts.setLogStripes(IntStream.range(0, this.opts.getLogStripesCount()).mapToObj(i -> new StripeAwareLogManager.Stripe()).collect(Collectors.toList()));
        }
        this.rpcServer.init(null);
        this.opts.getNodeManager().init(this.opts);
        return this.completeRaftGroupStoragesDestruction(componentContext.executor());
    }

    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        assert (this.nodes.isEmpty()) : IgniteStringFormatter.format((String)"Raft nodes {} are still running on the Ignite node {}", (Object[])new Object[]{this.nodes.keySet(), this.service.topologyService().localMember().name()});
        this.opts.getNodeManager().shutdown();
        this.rpcServer.shutdown();
        if (this.opts.getfSMCallerExecutorDisruptor() != null) {
            this.opts.getfSMCallerExecutorDisruptor().shutdown();
        }
        if (this.opts.getNodeApplyDisruptor() != null) {
            this.opts.getNodeApplyDisruptor().shutdown();
        }
        if (this.opts.getReadOnlyServiceDisruptor() != null) {
            this.opts.getReadOnlyServiceDisruptor().shutdown();
        }
        if (this.opts.getLogManagerDisruptor() != null) {
            this.opts.getLogManagerDisruptor().shutdown();
        }
        if (this.opts.getCommonExecutor() != null) {
            ExecutorServiceHelper.shutdownAndAwaitTermination(this.opts.getCommonExecutor());
        }
        if (this.opts.getStripedExecutor() != null) {
            this.opts.getStripedExecutor().shutdownGracefully();
        }
        if (this.opts.getScheduler() != null) {
            this.opts.getScheduler().shutdown();
        }
        if (this.opts.getElectionTimer() != null) {
            this.opts.getElectionTimer().stop();
        }
        if (this.opts.getVoteTimer() != null) {
            this.opts.getVoteTimer().stop();
        }
        if (this.opts.getStepDownTimer() != null) {
            this.opts.getStepDownTimer().stop();
        }
        if (this.opts.getSnapshotTimer() != null) {
            this.opts.getSnapshotTimer().stop();
        }
        if (this.opts.getClientExecutor() != null) {
            ExecutorServiceHelper.shutdownAndAwaitTermination(this.opts.getClientExecutor());
        }
        IgniteUtils.shutdownAndAwaitTermination((ExecutorService)this.requestExecutor, (long)10L, (TimeUnit)TimeUnit.SECONDS);
        return CompletableFutures.nullCompletedFuture();
    }

    @Override
    public ClusterService clusterService() {
        return this.service;
    }

    public static Path getServerDataPath(Path basePath, RaftNodeId nodeId) {
        return JraftServerImpl.getServerDataPath(basePath, nodeId.nodeIdStringForStorage());
    }

    private static Path getServerDataPath(Path basePath, String nodeIdStringForStorage) {
        return basePath.resolve(nodeIdStringForStorage);
    }

    @Override
    public boolean startRaftNode(RaftNodeId nodeId, PeersAndLearners configuration, RaftGroupListener lsnr, RaftGroupOptions groupOptions) {
        return this.startRaftNode(nodeId, configuration, RaftGroupEventsListener.noopLsnr, lsnr, groupOptions);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean startRaftNode(RaftNodeId nodeId, PeersAndLearners configuration, RaftGroupEventsListener evLsnr, RaftGroupListener lsnr, RaftGroupOptions groupOptions) {
        assert (nodeId.peer().consistentId().equals(this.service.topologyService().localMember().name()));
        if (this.nodes.containsKey(nodeId)) {
            return false;
        }
        Object object = this.startNodeMonitor(nodeId);
        synchronized (object) {
            if (this.nodes.containsKey(nodeId)) {
                return false;
            }
            NodeOptions nodeOptions = this.opts.copy();
            nodeOptions.setSystemGroup(groupOptions.isSystemGroup());
            nodeOptions.setElectionTimeoutMs(Math.max(nodeOptions.getElectionTimeoutMs(), groupOptions.maxClockSkew()));
            nodeOptions.setLogUri(nodeId.nodeIdStringForStorage());
            Path serverDataPath = JraftServerImpl.serverDataPathForNodeId(nodeId, groupOptions);
            if (!groupOptions.volatileStores()) {
                try {
                    Files.createDirectories(serverDataPath, new FileAttribute[0]);
                }
                catch (IOException e) {
                    throw new IgniteInternalException((Throwable)e);
                }
            }
            nodeOptions.setRaftMetaUri(serverDataPath.resolve("meta").toString());
            nodeOptions.setSnapshotUri(serverDataPath.resolve("snapshot").toString());
            if (groupOptions.commandsMarshaller() != null) {
                nodeOptions.setCommandsMarshaller(groupOptions.commandsMarshaller());
            }
            nodeOptions.setFsm(new DelegatingStateMachine(lsnr, nodeOptions.getCommandsMarshaller(), this.failureManager));
            nodeOptions.setRaftGrpEvtsLsnr(new RaftGroupEventsListenerAdapter(nodeId.groupId(), this.serviceEventInterceptor, evLsnr));
            LogStorageFactory logStorageFactory = groupOptions.getLogStorageFactory();
            assert (logStorageFactory != null) : "LogStorageFactory was not set.";
            IgniteJraftServiceFactory serviceFactory = new IgniteJraftServiceFactory(logStorageFactory);
            if (groupOptions.snapshotStorageFactory() != null) {
                serviceFactory.setSnapshotStorageFactory(groupOptions.snapshotStorageFactory());
            }
            if (groupOptions.raftMetaStorageFactory() != null) {
                serviceFactory.setRaftMetaStorageFactory(groupOptions.raftMetaStorageFactory());
            }
            nodeOptions.setServiceFactory(serviceFactory);
            List<PeerId> peerIds = configuration.peers().stream().map(PeerId::fromPeer).collect(Collectors.toList());
            List<PeerId> learnerIds = configuration.learners().stream().map(PeerId::fromPeer).collect(Collectors.toList());
            nodeOptions.setInitialConf(new Configuration(peerIds, learnerIds, 0L));
            nodeOptions.setRpcClient(new IgniteRpcClient(this.service));
            nodeOptions.setExternallyEnforcedConfigIndex(groupOptions.externallyEnforcedConfigIndex());
            RaftGroupService server = new RaftGroupService(nodeId.groupId().toString(), PeerId.fromPeer(nodeId.peer()), nodeOptions, this.rpcServer);
            server.start();
            this.nodes.put(nodeId, server);
            return true;
        }
    }

    private static Path serverDataPathForNodeId(RaftNodeId nodeId, RaftGroupOptions groupOptions) {
        Path dataPath = groupOptions.serverDataPath();
        assert (dataPath != null) : "Raft metadata path was not set, nodeId is " + nodeId;
        return JraftServerImpl.getServerDataPath(dataPath, nodeId);
    }

    @Override
    public boolean isStarted(RaftNodeId nodeId) {
        return this.nodes.containsKey(nodeId);
    }

    @Override
    public boolean stopRaftNode(RaftNodeId nodeId) {
        boolean stopped;
        RaftGroupService svc = (RaftGroupService)this.nodes.remove(nodeId);
        boolean bl = stopped = svc != null;
        if (stopped) {
            svc.shutdown();
        }
        return stopped;
    }

    @Override
    public boolean stopRaftNodes(ReplicationGroupId groupId) {
        HashSet servicesToStop = new HashSet();
        boolean removed = this.nodes.entrySet().removeIf(e -> {
            RaftNodeId nodeId = (RaftNodeId)e.getKey();
            RaftGroupService service = (RaftGroupService)e.getValue();
            if (nodeId.groupId().equals(groupId)) {
                servicesToStop.add(service);
                return true;
            }
            return false;
        });
        for (RaftGroupService service : servicesToStop) {
            service.shutdown();
        }
        return removed;
    }

    @Override
    public void destroyRaftNodeStorages(RaftNodeId nodeId, RaftGroupOptions groupOptions) {
        this.destroyRaftNodeStoragesInternal(nodeId, groupOptions, false);
    }

    @Override
    public void destroyRaftNodeStoragesDurably(RaftNodeId nodeId, RaftGroupOptions groupOptions) {
        this.destroyRaftNodeStoragesInternal(nodeId, groupOptions, true);
    }

    public void createMetaStorage(RaftNodeId nodeId) {
        RaftGroupService raftGroupService = (RaftGroupService)this.nodes.get(nodeId);
        if (raftGroupService == null) {
            return;
        }
        ((NodeImpl)raftGroupService.getRaftNode()).metaStorage().createAfterDestroy();
    }

    private void destroyRaftNodeStoragesInternal(RaftNodeId nodeId, RaftGroupOptions groupOptions, boolean durable) {
        StorageDestructionIntent intent = this.groupStoragesContextResolver.getIntent(nodeId, groupOptions.volatileStores());
        if (durable) {
            this.groupStoragesDestructionIntents.saveStorageDestructionIntent(nodeId.groupId(), intent);
        }
        this.destroyStorages(new StoragesDestructionContext(intent, groupOptions.getLogStorageFactory(), groupOptions.serverDataPath()), durable);
    }

    private void destroyStorages(StoragesDestructionContext context) {
        this.destroyStorages(context, true);
    }

    private void destroyStorages(StoragesDestructionContext context, boolean wasDurable) {
        String nodeId = context.intent().nodeId();
        try {
            if (context.logStorageFactory() != null) {
                context.logStorageFactory().destroyLogStorage(nodeId);
            }
            Path dataPath = JraftServerImpl.getServerDataPath(context.serverDataPath(), nodeId);
            IgniteUtils.deleteIfExistsThrowable((Path)dataPath);
        }
        catch (Exception e) {
            throw new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, "Failed to delete storage for node: " + nodeId, (Throwable)e);
        }
        if (wasDurable) {
            this.groupStoragesDestructionIntents.removeStorageDestructionIntent(nodeId);
        }
    }

    public Set<StoredRaftNodeId> raftNodeIdsOnDisk() {
        HashSet<String> groupIdsForStorage = new HashSet<String>();
        for (LogStorageFactory logStorageFactory : this.groupStoragesContextResolver.logStorageFactories()) {
            groupIdsForStorage.addAll(logStorageFactory.raftNodeStorageIdsOnDisk());
        }
        groupIdsForStorage.addAll(this.raftNodeMetaStorageIdsOnDisk());
        return groupIdsForStorage.stream().map(nodeIdStr -> RaftNodeId.fromNodeIdStringForStorage((String)nodeIdStr, (String)this.service.nodeName())).collect(Collectors.toUnmodifiableSet());
    }

    private Set<String> raftNodeMetaStorageIdsOnDisk() {
        return this.groupStoragesContextResolver.serverDataPaths().stream().filter(x$0 -> Files.exists(x$0, new LinkOption[0])).flatMap(JraftServerImpl::listFiles).filter(x$0 -> Files.isDirectory(x$0, new LinkOption[0])).map(groupDirPath -> groupDirPath.getFileName().toString()).collect(Collectors.toUnmodifiableSet());
    }

    private static Stream<Path> listFiles(Path dir) {
        try {
            return Files.list(dir);
        }
        catch (IOException e) {
            throw new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, (Throwable)e);
        }
    }

    @Override
    @Nullable
    public IndexWithTerm raftNodeIndex(RaftNodeId nodeId) {
        RaftGroupService service = (RaftGroupService)this.nodes.get(nodeId);
        if (service == null) {
            return null;
        }
        LogId logId = service.getRaftNode().lastLogIndexAndTerm();
        return new IndexWithTerm(logId.getIndex(), logId.getTerm());
    }

    public Status resetPeers(RaftNodeId raftNodeId, PeersAndLearners peersAndLearners, long sequenceToken) {
        RaftGroupService raftGroupService = (RaftGroupService)this.nodes.get(raftNodeId);
        List<PeerId> peerIds = peersAndLearners.peers().stream().map(PeerId::fromPeer).collect(Collectors.toList());
        List<PeerId> learnerIds = peersAndLearners.learners().stream().map(PeerId::fromPeer).collect(Collectors.toList());
        return raftGroupService.getRaftNode().resetPeers(new Configuration(peerIds, learnerIds, sequenceToken));
    }

    public void forEach(BiConsumer<RaftNodeId, RaftGroupService> consumer) {
        this.nodes.forEach(consumer);
    }

    @Override
    public List<Peer> localPeers(ReplicationGroupId groupId) {
        return this.nodes.keySet().stream().filter(nodeId -> nodeId.groupId().equals(groupId)).map(RaftNodeId::peer).collect(Collectors.toList());
    }

    public RaftGroupService raftGroupService(RaftNodeId nodeId) {
        return (RaftGroupService)this.nodes.get(nodeId);
    }

    @Override
    public Set<RaftNodeId> localNodes() {
        return this.nodes.keySet();
    }

    @Override
    public NodeOptions options() {
        return this.opts;
    }

    @TestOnly
    public void blockMessages(RaftNodeId nodeId, BiPredicate<Object, String> predicate) {
        IgniteRpcClient client = (IgniteRpcClient)((RaftGroupService)this.nodes.get(nodeId)).getNodeOptions().getRpcClient();
        client.blockMessages(predicate);
    }

    @TestOnly
    public Queue<Object[]> blockedMessages(RaftNodeId nodeId) {
        IgniteRpcClient client = (IgniteRpcClient)((RaftGroupService)this.nodes.get(nodeId)).getNodeOptions().getRpcClient();
        return client.blockedMessages();
    }

    @TestOnly
    public void stopBlockMessages(RaftNodeId nodeId) {
        IgniteRpcClient client = (IgniteRpcClient)((RaftGroupService)this.nodes.get(nodeId)).getNodeOptions().getRpcClient();
        client.stopBlock();
    }

    private Object startNodeMonitor(RaftNodeId nodeId) {
        return this.startGroupInProgressMonitors.get(Math.abs(nodeId.hashCode() % SIMULTANEOUS_GROUP_START_PARALLELISM));
    }

    private CompletableFuture<Void> completeRaftGroupStoragesDestruction(ExecutorService executor) {
        return CompletableFuture.runAsync(() -> this.groupStoragesDestructionIntents.readStorageDestructionIntents().stream().map(this.groupStoragesContextResolver::getContext).forEach(this::destroyStorages), executor);
    }

    @TestOnly
    public void addRaftEventsListener(ReplicationGroupId groupId, final RaftGroupEventsListener listener) {
        List<Node> nodes = this.opts.getNodeManager().getNodesByGroupId(groupId.toString());
        if (nodes != null) {
            for (Node node : nodes) {
                final JraftGroupEventsListener delegate = node.getOptions().getRaftGrpEvtsLsnr();
                node.getOptions().setRaftGrpEvtsLsnr(new JraftGroupEventsListener(){

                    @Override
                    public void onLeaderElected(long term, long configurationTerm, long configurationIndex, Collection<PeerId> peers, Collection<PeerId> learners, long sequenceToken) {
                        listener.onLeaderElected(term, configurationTerm, configurationIndex, RaftGroupEventsListenerAdapter.configuration(peers, learners), sequenceToken);
                        delegate.onLeaderElected(term, configurationTerm, configurationIndex, peers, learners, sequenceToken);
                    }

                    @Override
                    public void onNewPeersConfigurationApplied(Collection<PeerId> peers, Collection<PeerId> learners, long term, long index, long sequenceToken) {
                        PeersAndLearners peersAndLearners = PeersAndLearners.fromPeers((Collection)peers.stream().map(peerId -> new Peer(peerId.getConsistentId(), peerId.getIdx())).collect(Collectors.toList()), (Collection)learners.stream().map(peerId -> new Peer(peerId.getConsistentId(), peerId.getIdx())).collect(Collectors.toList()));
                        listener.onNewPeersConfigurationApplied(peersAndLearners, term, index, sequenceToken);
                        delegate.onNewPeersConfigurationApplied(peers, learners, term, index, sequenceToken);
                    }

                    @Override
                    public void onReconfigurationError(Status status, Collection<PeerId> peers, Collection<PeerId> learners, long term, long sequenceToken) {
                        delegate.onReconfigurationError(status, peers, learners, term, sequenceToken);
                        delegate.onReconfigurationError(status, peers, learners, term, sequenceToken);
                    }
                });
            }
        }
    }

    public static class DelegatingStateMachine
    extends StateMachineAdapter {
        private final RaftGroupListener listener;
        private final Marshaller marshaller;
        private final FailureManager failureManager;

        public DelegatingStateMachine(RaftGroupListener listener, Marshaller marshaller, FailureManager failureManager) {
            this.listener = listener;
            this.marshaller = marshaller;
            this.failureManager = failureManager;
        }

        public RaftGroupListener getListener() {
            return this.listener;
        }

        @Override
        public void onApply(Iterator iter) {
            WriteCommandIterator writeCommandIterator = new WriteCommandIterator(iter, this.marshaller);
            try {
                this.listener.onWrite((java.util.Iterator)writeCommandIterator);
            }
            catch (Throwable err) {
                Status st = err.getMessage() != null ? new Status(RaftError.ESTATEMACHINE, err.getMessage(), new Object[0]) : new Status(RaftError.ESTATEMACHINE, "Unknown state machine error.", new Object[0]);
                Closure done = writeCommandIterator.doneForExceptionHandling();
                if (done != null) {
                    done.run(st);
                }
                iter.setErrorAndRollback(1L, st);
                this.failureManager.process(new FailureContext(FailureType.CRITICAL_ERROR, err));
            }
        }

        @Override
        public void onRawConfigurationCommitted(ConfigurationEntry entry, long lastAppliedIndex, long lastAppliedTerm) {
            boolean hasOldConf = entry.getOldConf() != null && entry.getOldConf().getPeers() != null;
            RaftGroupConfiguration committedConf = new RaftGroupConfiguration(entry.getId().getIndex(), entry.getId().getTerm(), entry.getConf().getSequenceToken(), hasOldConf ? entry.getOldConf().getSequenceToken() : 0L, DelegatingStateMachine.peersIdsToStrings(entry.getConf().getPeers()), DelegatingStateMachine.peersIdsToStrings(entry.getConf().getLearners()), hasOldConf ? DelegatingStateMachine.peersIdsToStrings(entry.getOldConf().getPeers()) : null, hasOldConf ? DelegatingStateMachine.peersIdsToStrings(entry.getOldConf().getLearners()) : null);
            this.listener.onConfigurationCommitted(committedConf, lastAppliedIndex, lastAppliedTerm);
        }

        private static List<String> peersIdsToStrings(Collection<PeerId> peerIds) {
            return peerIds.stream().map(PeerId::toString).collect(Collectors.toUnmodifiableList());
        }

        @Override
        public void onSnapshotSave(SnapshotWriter writer, Closure done) {
            try {
                this.listener.onSnapshotSave(Path.of(writer.getPath(), new String[0]), res -> {
                    if (res == null) {
                        File file = new File(writer.getPath());
                        File[] snapshotFiles = file.listFiles();
                        if (snapshotFiles != null) {
                            for (File file0 : snapshotFiles) {
                                if (!file0.isFile()) continue;
                                writer.addFile(file0.getName(), null);
                            }
                        }
                        done.run(Status.OK());
                    } else {
                        done.run(new Status(RaftError.EIO, "Fail to save snapshot to %s, reason %s", writer.getPath(), res.getMessage()));
                    }
                });
            }
            catch (Throwable e) {
                done.run(new Status(RaftError.EIO, "Fail to save snapshot %s", e.getMessage()));
                this.failureManager.process(new FailureContext(FailureType.CRITICAL_ERROR, e));
            }
        }

        @Override
        public boolean onSnapshotLoad(SnapshotReader reader) {
            try {
                return this.listener.onSnapshotLoad(Path.of(reader.getPath(), new String[0]));
            }
            catch (Throwable err) {
                this.failureManager.process(new FailureContext(FailureType.CRITICAL_ERROR, err));
                return false;
            }
        }

        @Override
        public void onShutdown() {
            this.listener.onShutdown();
        }

        @Override
        public void onLeaderStart(long term) {
            super.onLeaderStart(term);
            this.listener.onLeaderStart();
        }

        @Override
        public void onLeaderStop(Status status) {
            super.onLeaderStop(status);
            this.listener.onLeaderStop();
        }
    }

    private static class WriteCommandIterator
    implements java.util.Iterator<CommandClosure<WriteCommand>> {
        private final Iterator iter;
        private final Marshaller marshaller;
        @Nullable
        private Closure latestDone;

        private WriteCommandIterator(Iterator iter, Marshaller marshaller) {
            this.iter = iter;
            this.marshaller = marshaller;
        }

        @Override
        public boolean hasNext() {
            return this.iter.hasNext();
        }

        @Override
        public CommandClosure<WriteCommand> next() {
            Closure currentDone;
            this.latestDone = currentDone = this.iter.done();
            final @Nullable CommandClosure done = (CommandClosure)currentDone;
            ByteBuffer data = this.iter.getData();
            final WriteCommand command = done == null ? (WriteCommand)this.marshaller.unmarshall(data) : (WriteCommand)done.command();
            final HybridTimestamp safeTs = done == null ? command.safeTime() : done.safeTimestamp();
            final long commandIndex = this.iter.getIndex();
            final long commandTerm = this.iter.getTerm();
            return new CommandClosure<WriteCommand>(){

                public long index() {
                    return commandIndex;
                }

                public long term() {
                    return commandTerm;
                }

                @Nullable
                public HybridTimestamp safeTimestamp() {
                    return safeTs;
                }

                public WriteCommand command() {
                    return command;
                }

                public void result(Serializable res) {
                    if (done != null) {
                        done.result(res);
                    }
                    iter.next();
                }
            };
        }

        @Nullable
        private Closure doneForExceptionHandling() {
            if (this.latestDone == null) {
                this.latestDone = this.iter.done();
            }
            return this.latestDone;
        }
    }
}

