/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.replicator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.FailureType;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ComponentStoppingException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.IgniteThrottledLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
import org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessageResponse;
import org.apache.ignite.internal.raft.GroupOverloadedException;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Marshaller;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.RaftServiceFactory;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.LogStorageBudgetView;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
import org.apache.ignite.internal.raft.storage.impl.VolatileRaftMetaStorage;
import org.apache.ignite.internal.replicator.LocalReplicaEvent;
import org.apache.ignite.internal.replicator.LocalReplicaEventParameters;
import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.replicator.PlacementDriverMessageProcessor;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaImpl;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.ReplicaStateManager;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionReplicaImpl;
import org.apache.ignite.internal.replicator.exception.ExpectedReplicationException;
import org.apache.ignite.internal.replicator.exception.ReplicaStoppingException;
import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.AwaitReplicaRequest;
import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.internal.thread.ExecutorChooser;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteStripedBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class ReplicaManager
extends AbstractEventProducer<LocalReplicaEvent, LocalReplicaEventParameters>
implements IgniteComponent {
    private static final long STOP_LEASE_PROLONGATION_RETRIES_TIMEOUT_MS = 60000L;
    private static final IgniteLogger LOG = Loggers.forClass(ReplicaManager.class);
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
    private static final int MAXIMUM_ATTEMPTS_WITHOUT_LOGGING = 10;
    private final Map<ReplicationGroupId, Integer> timeoutAttemptsCounters = new ConcurrentHashMap<ReplicationGroupId, Integer>();
    private final ThreadPoolExecutor throttledLogExecutor;
    private final IgniteThrottledLogger throttledLog;
    private final IgniteStripedBusyLock busyLock = new IgniteStripedBusyLock();
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final CompletableFuture<Set<String>> msNodes = new CompletableFuture();
    private final ClusterService clusterNetSvc;
    private final ClusterManagementGroupManager cmgMgr;
    private final NetworkMessageHandler handler;
    private final RaftManager raftManager;
    private final TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory;
    private final LogStorageFactoryCreator volatileLogStorageFactoryCreator;
    private final Executor replicaStartStopExecutor;
    private final Marshaller raftCommandsMarshaller;
    private final NetworkMessageHandler placementDriverMessageHandler;
    private final PlacementDriver placementDriver;
    private final LongSupplier idleSafeTimePropagationPeriodMsSupplier;
    private final ConcurrentHashMap<ReplicationGroupId, CompletableFuture<Replica>> replicas = new ConcurrentHashMap();
    private final ClockService clockService;
    private final ScheduledExecutorService scheduledIdleSafeTimeSyncExecutor;
    private final Executor requestsExecutor;
    private final FailureProcessor failureProcessor;
    private final Set<Class<?>> messageGroupsToHandle;
    private final RaftGroupOptionsConfigurer partitionRaftConfigurer;
    private final ReplicaStateManager replicaStateManager;
    private volatile UUID localNodeId;
    private volatile String localNodeConsistentId;
    @Nullable
    private volatile HybridTimestamp lastIdleSafeTimeProposal;
    private final Function<ReplicationGroupId, CompletableFuture<byte[]>> getPendingAssignmentsSupplier;

    public ReplicaManager(String nodeName, ClusterService clusterNetSvc, ClusterManagementGroupManager cmgMgr, ClockService clockService, Set<Class<?>> messageGroupsToHandle, PlacementDriver placementDriver, Executor requestsExecutor, LongSupplier idleSafeTimePropagationPeriodMsSupplier, FailureProcessor failureProcessor, @Nullable Marshaller raftCommandsMarshaller, TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory, RaftManager raftManager, RaftGroupOptionsConfigurer partitionRaftConfigurer, LogStorageFactoryCreator volatileLogStorageFactoryCreator, Executor replicaStartStopExecutor, Function<ReplicationGroupId, CompletableFuture<byte[]>> getPendingAssignmentsSupplier) {
        this.clusterNetSvc = clusterNetSvc;
        this.cmgMgr = cmgMgr;
        this.clockService = clockService;
        this.messageGroupsToHandle = messageGroupsToHandle;
        this.volatileLogStorageFactoryCreator = volatileLogStorageFactoryCreator;
        this.handler = this::onReplicaMessageReceived;
        this.placementDriverMessageHandler = this::onPlacementDriverMessageReceived;
        this.placementDriver = placementDriver;
        this.requestsExecutor = requestsExecutor;
        this.idleSafeTimePropagationPeriodMsSupplier = idleSafeTimePropagationPeriodMsSupplier;
        this.failureProcessor = failureProcessor;
        this.raftCommandsMarshaller = raftCommandsMarshaller;
        this.raftGroupServiceFactory = raftGroupServiceFactory;
        this.raftManager = raftManager;
        this.partitionRaftConfigurer = partitionRaftConfigurer;
        this.getPendingAssignmentsSupplier = getPendingAssignmentsSupplier;
        this.replicaStartStopExecutor = replicaStartStopExecutor;
        this.replicaStateManager = new ReplicaStateManager(replicaStartStopExecutor, placementDriver, this, failureProcessor);
        this.scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool(1, (ThreadFactory)IgniteThreadFactory.create((String)nodeName, (String)"scheduled-idle-safe-time-sync-thread", (IgniteLogger)LOG, (ThreadOperation[])new ThreadOperation[0]));
        int threadCount = Runtime.getRuntime().availableProcessors();
        this.throttledLogExecutor = new ThreadPoolExecutor(threadCount, threadCount, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)IgniteThreadFactory.create((String)nodeName, (String)"throttled-log-replica-manager", (IgniteLogger)LOG, (ThreadOperation[])new ThreadOperation[0]));
        this.throttledLogExecutor.allowCoreThreadTimeOut(true);
        this.throttledLog = Loggers.toThrottledLogger((IgniteLogger)LOG, (Executor)this.throttledLogExecutor);
    }

    private void onReplicaMessageReceived(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) {
        if (!(message instanceof ReplicaRequest)) {
            return;
        }
        assert (correlationId != null);
        ReplicaRequest request = (ReplicaRequest)message;
        if (IgniteUtils.shouldSwitchToRequestsExecutor((ThreadOperation[])new ThreadOperation[]{ThreadOperation.STORAGE_READ, ThreadOperation.STORAGE_WRITE, ThreadOperation.TX_STATE_STORAGE_ACCESS})) {
            this.requestsExecutor.execute(() -> this.handleReplicaRequest(request, sender, correlationId));
        } else {
            this.handleReplicaRequest(request, sender, correlationId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleReplicaRequest(ReplicaRequest request, InternalClusterNode sender, @Nullable Long correlationId) {
        if (!this.enterBusy()) {
            if (LOG.isInfoEnabled()) {
                LOG.info("Failed to process replica request (the node is stopping) [request={}].", new Object[]{request});
            }
            return;
        }
        ReplicationGroupId groupId = request.groupId().asReplicationGroupId();
        String senderConsistentId = sender.name();
        try {
            if (request instanceof AwaitReplicaRequest) {
                this.replicas.compute(groupId, (replicationGroupId, replicaFut) -> {
                    if (replicaFut == null) {
                        replicaFut = new CompletableFuture();
                    }
                    if (!replicaFut.isDone()) {
                        replicaFut.whenComplete((createdReplica, ex) -> {
                            if (ex != null) {
                                this.clusterNetSvc.messagingService().respond(senderConsistentId, (NetworkMessage)REPLICA_MESSAGES_FACTORY.errorReplicaResponse().throwable((Throwable)ex).build(), correlationId.longValue());
                            } else {
                                this.sendAwaitReplicaResponse(senderConsistentId, correlationId);
                            }
                        });
                    } else {
                        this.sendAwaitReplicaResponse(senderConsistentId, correlationId);
                    }
                    return replicaFut;
                });
                return;
            }
            CompletableFuture<Replica> replicaFut2 = this.replicas.get(groupId);
            HybridTimestamp requestTimestamp = ReplicaManager.extractTimestamp(request);
            if (replicaFut2 == null || !replicaFut2.isDone()) {
                this.sendReplicaUnavailableErrorResponse(senderConsistentId, correlationId, groupId, requestTimestamp);
                return;
            }
            if (requestTimestamp != null) {
                this.clockService.updateClock(requestTimestamp);
            }
            boolean sendTimestamp = request instanceof TimestampAware || request instanceof ReadOnlyDirectReplicaRequest;
            Replica replica = replicaFut2.join();
            CompletableFuture<ReplicaResult> resFut = replica.processRequest(request, sender.id());
            ((CompletableFuture)resFut.handle((res, ex) -> {
                NetworkMessage msg;
                if (ex == null) {
                    msg = this.prepareReplicaResponse(sendTimestamp, (ReplicaResult)res);
                } else {
                    if (ReplicaManager.indicatesUnexpectedProblem(ex)) {
                        this.throttledLog.warn("Failed to process replica request [request={}].", ex, new Object[]{request});
                    } else {
                        this.throttledLog.debug("Failed to process replica request [request={}].", ex, new Object[]{request});
                    }
                    msg = this.prepareReplicaErrorResponse(sendTimestamp, (Throwable)ex);
                }
                this.clusterNetSvc.messagingService().respond(senderConsistentId, msg, correlationId.longValue());
                if (request instanceof PrimaryReplicaRequest && ReplicaManager.isConnectivityRelatedException(ex)) {
                    LOG.info("The replica does not meet the requirements for the leaseholder [groupId={}].", new Object[]{groupId});
                    this.stopLeaseProlongation(groupId, null);
                }
                if (ex == null && res.applyResult().replicationFuture() != null) {
                    res.applyResult().replicationFuture().whenComplete((BiConsumer)(res.delayedAckProcessor != null ? res.delayedAckProcessor : (res0, ex0) -> {
                        NetworkMessage msg0;
                        LOG.debug("Sending delayed response for replica request [request={}]", new Object[]{request});
                        if (ex0 == null) {
                            msg0 = this.prepareReplicaResponse(sendTimestamp, new ReplicaResult(res0, null));
                        } else {
                            if (ReplicaManager.indicatesUnexpectedProblem(ex0)) {
                                LOG.warn("Failed to process delayed response [request={}]", ex0, new Object[]{request});
                            }
                            msg0 = this.prepareReplicaErrorResponse(sendTimestamp, (Throwable)ex0);
                        }
                        this.clusterNetSvc.messagingService().send(senderConsistentId, ChannelType.DEFAULT, msg0);
                    }));
                }
                return null;
            })).whenComplete((res, ex) -> {
                if (ex != null) {
                    this.failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, ex));
                }
            });
        }
        finally {
            this.leaveBusy();
        }
    }

    private static boolean indicatesUnexpectedProblem(Throwable ex) {
        Throwable unwrapped = ExceptionUtils.unwrapCause((Throwable)ex);
        return !(unwrapped instanceof ExpectedReplicationException) && !ExceptionUtils.hasCause((Throwable)ex, (Class[])new Class[]{NodeStoppingException.class, TrackerClosedException.class, ComponentStoppingException.class, GroupOverloadedException.class});
    }

    private static boolean isConnectivityRelatedException(@Nullable Throwable ex) {
        if (ex instanceof ExecutionException || ex instanceof CompletionException) {
            ex = ex.getCause();
        }
        return ex instanceof TimeoutException || ex instanceof IOException;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onPlacementDriverMessageReceived(NetworkMessage msg0, InternalClusterNode sender, @Nullable Long correlationId) {
        if (!(msg0 instanceof PlacementDriverReplicaMessage)) {
            return;
        }
        String senderConsistentId = sender.name();
        assert (correlationId != null);
        PlacementDriverReplicaMessage msg = (PlacementDriverReplicaMessage)msg0;
        if (!this.enterBusy()) {
            if (LOG.isInfoEnabled()) {
                LOG.info("Failed to process placement driver message (the node is stopping) [msg={}].", new Object[]{msg});
            }
            return;
        }
        try {
            CompletableFuture replicaFut = this.replicas.computeIfAbsent(msg.groupId(), k -> new CompletableFuture());
            ((CompletableFuture)replicaFut.thenCompose(replica -> replica.processPlacementDriverMessage(msg))).whenComplete((response, ex) -> {
                if (ex == null) {
                    this.clusterNetSvc.messagingService().respond(senderConsistentId, response, correlationId.longValue());
                } else if (!ExceptionUtils.hasCause((Throwable)ex, (Class[])new Class[]{NodeStoppingException.class, ReplicaStoppingException.class})) {
                    String errorMessage = String.format("Failed to process placement driver message [msg=%s].", msg);
                    this.failureProcessor.process(new FailureContext(ex, errorMessage));
                }
            });
        }
        finally {
            this.leaveBusy();
        }
    }

    CompletableFuture<HybridTimestamp> stopLeaseProlongation(ReplicationGroupId groupId, @Nullable String redirectNodeId) {
        long startTime = System.currentTimeMillis();
        return this.stopLeaseProlongation(groupId, redirectNodeId, startTime + 60000L);
    }

    private CompletableFuture<HybridTimestamp> stopLeaseProlongation(ReplicationGroupId groupId, @Nullable String redirectNodeId, long endTime) {
        long timeout = endTime - System.currentTimeMillis();
        if (timeout <= 0L) {
            return CompletableFuture.failedFuture((Throwable)new IgniteException(ErrorGroups.Common.INTERNAL_ERR, IgniteStringFormatter.format((String)"Failed to stop lease prolongation within timeout [groupId={}]", (Object[])new Object[]{groupId})));
        }
        return this.msNodes.thenCompose(nodeIds -> {
            ArrayList<CompletionStage> futs = new ArrayList<CompletionStage>();
            for (String nodeId : nodeIds) {
                InternalClusterNode node = this.clusterNetSvc.topologyService().getByConsistentId(nodeId);
                if (node == null) continue;
                futs.add(this.clusterNetSvc.messagingService().invoke(node, (NetworkMessage)PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage().groupId(groupId).redirectProposal(redirectNodeId).build(), timeout).exceptionally(th -> null));
            }
            return CompletableFutures.allOf(futs).thenCompose(unused -> {
                NetworkMessage response = futs.stream().map(CompletableFuture::join).filter(resp -> resp instanceof StopLeaseProlongationMessageResponse && ((StopLeaseProlongationMessageResponse)resp).deniedLeaseExpirationTime() != null).findAny().orElse(null);
                if (response == null) {
                    return CompletableFuture.supplyAsync(() -> null, CompletableFuture.delayedExecutor(50L, TimeUnit.MILLISECONDS)).thenComposeAsync(un -> this.stopLeaseProlongation(groupId, redirectNodeId, endTime), this.requestsExecutor);
                }
                return CompletableFuture.completedFuture(((StopLeaseProlongationMessageResponse)response).deniedLeaseExpirationTime());
            });
        });
    }

    @Deprecated
    public void startLearnerNode(ReplicationGroupId replicationGroupId, PeersAndLearners newConfiguration, SnapshotStorageFactory snapshotStorageFactory, RaftGroupListener raftGroupListener) {
        RaftNodeId learnerRaftNodeId = new RaftNodeId(replicationGroupId, new Peer(this.localNodeConsistentId, 1));
        RaftGroupOptions learnerOptions = this.groupOptionsForPartition(false, snapshotStorageFactory);
        try {
            ((Loza)this.raftManager).startRaftGroupNode(learnerRaftNodeId, newConfiguration, raftGroupListener, RaftGroupEventsListener.noopLsnr, learnerOptions);
        }
        catch (NodeStoppingException ex) {
            throw new CompletionException(ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Replica> startReplica(RaftGroupEventsListener raftGroupEventsListener, RaftGroupListener raftGroupListener, boolean isVolatileStorage, @Nullable SnapshotStorageFactory snapshotStorageFactory, Function<RaftGroupService, ReplicaListener> createListener, PendingComparableValuesTracker<Long, Void> storageIndexTracker, ReplicationGroupId replicaGrpId, PeersAndLearners newConfiguration) throws NodeStoppingException {
        if (!this.enterBusy()) {
            throw new NodeStoppingException();
        }
        try {
            InternalClusterNode localNode = this.clusterNetSvc.topologyService().localMember();
            CompletableFuture<Replica> completableFuture = this.startReplicaInternal(replicaGrpId, snapshotStorageFactory, newConfiguration, raftGroupListener, raftGroupEventsListener, isVolatileStorage, raftClient -> {
                PlacementDriverMessageProcessor placementDriverMessageProcessor = new PlacementDriverMessageProcessor(replicaGrpId, localNode, this.placementDriver, this.clockService, this.replicaStateManager::reserveReplica, this.requestsExecutor, storageIndexTracker, (TopologyAwareRaftGroupService)raftClient);
                return new ReplicaImpl(replicaGrpId, (ReplicaListener)createListener.apply((RaftGroupService)raftClient), localNode, this.placementDriver, this.getPendingAssignmentsSupplier, this.failureProcessor, placementDriverMessageProcessor);
            });
            return completableFuture;
        }
        finally {
            this.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Replica> startReplica(ReplicationGroupId replicaGrpId, Function<RaftGroupService, ReplicaListener> listenerFactory, SnapshotStorageFactory snapshotStorageFactory, PeersAndLearners newConfiguration, RaftGroupListener raftGroupListener, RaftGroupEventsListener raftGroupEventsListener, boolean isVolatileStorage, IgniteSpinBusyLock busyLock, PendingComparableValuesTracker<Long, Void> storageIndexTracker) throws NodeStoppingException {
        if (!busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            CompletableFuture<Replica> completableFuture = this.startReplicaInternal(replicaGrpId, snapshotStorageFactory, newConfiguration, raftGroupListener, raftGroupEventsListener, isVolatileStorage, raftClient -> {
                PlacementDriverMessageProcessor placementDriverMessageProcessor = new PlacementDriverMessageProcessor(replicaGrpId, this.clusterNetSvc.topologyService().localMember(), this.placementDriver, this.clockService, this.replicaStateManager::reserveReplica, this.requestsExecutor, storageIndexTracker, (TopologyAwareRaftGroupService)raftClient);
                return new ZonePartitionReplicaImpl(replicaGrpId, (ReplicaListener)listenerFactory.apply((RaftGroupService)raftClient), (TopologyAwareRaftGroupService)raftClient, placementDriverMessageProcessor);
            });
            return completableFuture;
        }
        finally {
            busyLock.leaveBusy();
        }
    }

    private CompletableFuture<Replica> startReplicaInternal(ReplicationGroupId replicaGrpId, @Nullable SnapshotStorageFactory snapshotStorageFactory, PeersAndLearners newConfiguration, RaftGroupListener raftGroupListener, RaftGroupEventsListener raftGroupEventsListener, boolean isVolatileStorage, Function<TopologyAwareRaftGroupService, Replica> replicaFactory) throws NodeStoppingException {
        RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new Peer(this.localNodeConsistentId));
        RaftGroupOptions groupOptions = this.groupOptionsForPartition(isVolatileStorage, snapshotStorageFactory);
        TopologyAwareRaftGroupService raftClient = (TopologyAwareRaftGroupService)((Loza)this.raftManager).startRaftGroupNode(raftNodeId, newConfiguration, raftGroupListener, raftGroupEventsListener, groupOptions, (RaftServiceFactory)this.raftGroupServiceFactory);
        this.timeoutAttemptsCounters.put(replicaGrpId, 0);
        LOG.info("Replica is about to start [replicationGroupId={}].", new Object[]{replicaGrpId});
        Replica newReplica = replicaFactory.apply(raftClient);
        CompletableFuture newReplicaFuture = this.replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> {
            if (existingReplicaFuture == null || existingReplicaFuture.isDone()) {
                assert (existingReplicaFuture == null || CompletableFutures.isCompletedSuccessfully((CompletableFuture)existingReplicaFuture));
                LOG.info("Replica is started [replicationGroupId={}].", new Object[]{replicaGrpId});
                return CompletableFuture.completedFuture(newReplica);
            }
            LOG.info("Replica is started, existing replica waiter was completed [replicationGroupId={}].", new Object[]{replicaGrpId});
            existingReplicaFuture.complete(newReplica);
            return existingReplicaFuture;
        });
        LocalReplicaEventParameters eventParams = new LocalReplicaEventParameters(replicaGrpId);
        return ((CompletableFuture)this.fireEvent(LocalReplicaEvent.AFTER_REPLICA_STARTED, eventParams).exceptionally(e -> {
            LOG.error("Error when notifying about AFTER_REPLICA_STARTED event.", e);
            return null;
        })).thenCompose(v -> newReplicaFuture);
    }

    public CompletableFuture<Replica> replica(ReplicationGroupId replicationGroupId) {
        return this.replicas.get(replicationGroupId);
    }

    public void resetPeers(ReplicationGroupId replicaGrpId, PeersAndLearners peersAndLearners) {
        RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new Peer(this.localNodeConsistentId));
        ((Loza)this.raftManager).resetPeers(raftNodeId, peersAndLearners);
    }

    private RaftGroupOptions groupOptionsForPartition(boolean isVolatileStorage, @Nullable SnapshotStorageFactory snapshotFactory) {
        RaftGroupOptions raftGroupOptions;
        if (isVolatileStorage) {
            LogStorageBudgetView view = (LogStorageBudgetView)((Loza)this.raftManager).volatileRaft().logStorageBudget().value();
            raftGroupOptions = RaftGroupOptions.forVolatileStores().setLogStorageFactory(this.volatileLogStorageFactoryCreator.factory(view)).raftMetaStorageFactory((groupId, raftOptions) -> new VolatileRaftMetaStorage());
        } else {
            raftGroupOptions = RaftGroupOptions.forPersistentStores();
        }
        raftGroupOptions.snapshotStorageFactory(snapshotFactory);
        raftGroupOptions.maxClockSkew((int)this.clockService.maxClockSkewMillis());
        raftGroupOptions.commandsMarshaller(this.raftCommandsMarshaller);
        this.partitionRaftConfigurer.configure((Object)raftGroupOptions);
        return raftGroupOptions;
    }

    public CompletableFuture<Boolean> stopReplica(ReplicationGroupId replicaGrpId) throws NodeStoppingException {
        if (!this.enterBusy()) {
            throw new NodeStoppingException();
        }
        try {
            CompletableFuture<Boolean> completableFuture = this.stopReplicaInternal(replicaGrpId);
            return completableFuture;
        }
        finally {
            this.leaveBusy();
        }
    }

    private CompletableFuture<Boolean> stopReplicaInternal(ReplicationGroupId replicaGrpId) {
        CompletableFuture isRemovedFuture = new CompletableFuture();
        LocalReplicaEventParameters eventParams = new LocalReplicaEventParameters(replicaGrpId);
        LOG.info("Replica is stopping [replicationGroupId={}].", new Object[]{replicaGrpId});
        this.fireEvent(LocalReplicaEvent.BEFORE_REPLICA_STOPPED, eventParams).whenComplete((v, e) -> {
            if (e != null) {
                this.failureProcessor.process(new FailureContext(e, "Error when notifying about BEFORE_REPLICA_STOPPED event."));
            }
            if (!this.enterBusy()) {
                isRemovedFuture.completeExceptionally(new NodeStoppingException());
                return;
            }
            try {
                this.replicas.compute(replicaGrpId, (grpId, replicaFuture) -> {
                    if (replicaFuture == null) {
                        isRemovedFuture.complete(false);
                    } else if (!replicaFuture.isDone()) {
                        InternalClusterNode localMember = this.clusterNetSvc.topologyService().localMember();
                        replicaFuture.completeExceptionally((Throwable)((Object)new ReplicaStoppingException((ReplicationGroupId)grpId, localMember)));
                        isRemovedFuture.complete(true);
                    } else if (!CompletableFutures.isCompletedSuccessfully((CompletableFuture)replicaFuture)) {
                        isRemovedFuture.complete(true);
                    } else {
                        ((CompletableFuture)replicaFuture.thenCompose(Replica::shutdown)).whenComplete((notUsed, throwable) -> {
                            if (throwable != null) {
                                String errorMessage = String.format("Failed to stop replica [replicaGrpId=%s].", grpId);
                                this.failureProcessor.process(new FailureContext(throwable, errorMessage));
                            }
                            isRemovedFuture.complete(throwable == null);
                        });
                    }
                    return null;
                });
            }
            finally {
                this.leaveBusy();
            }
        });
        return isRemovedFuture.thenApplyAsync(replicaWasRemoved -> {
            if (!replicaWasRemoved.booleanValue()) {
                return false;
            }
            this.timeoutAttemptsCounters.remove(replicaGrpId);
            try {
                this.raftManager.stopRaftNodes(replicaGrpId);
            }
            catch (NodeStoppingException ignored) {
                return false;
            }
            return true;
        }, this.replicaStartStopExecutor);
    }

    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        ExecutorChooser replicaMessagesExecutorChooser = message -> this.requestsExecutor;
        this.clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, replicaMessagesExecutorChooser, this.handler);
        this.clusterNetSvc.messagingService().addMessageHandler(PlacementDriverMessageGroup.class, this.placementDriverMessageHandler);
        this.messageGroupsToHandle.forEach(mg -> this.clusterNetSvc.messagingService().addMessageHandler(mg, replicaMessagesExecutorChooser, this.handler));
        this.scheduledIdleSafeTimeSyncExecutor.scheduleAtFixedRate(this::idleSafeTimeSync, 0L, this.idleSafeTimePropagationPeriodMsSupplier.getAsLong(), TimeUnit.MILLISECONDS);
        this.cmgMgr.metaStorageNodes().whenComplete((nodes, e) -> {
            if (e != null) {
                this.msNodes.completeExceptionally((Throwable)e);
            } else {
                this.msNodes.complete((Set<String>)nodes);
            }
        });
        this.localNodeId = this.clusterNetSvc.topologyService().localMember().id();
        this.localNodeConsistentId = this.clusterNetSvc.topologyService().localMember().name();
        this.replicaStateManager.start(this.localNodeId);
        return CompletableFutures.nullCompletedFuture();
    }

    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.replicaStateManager.stop();
        this.blockBusy();
        int shutdownTimeoutSeconds = 10;
        IgniteUtils.shutdownAndAwaitTermination((ExecutorService)this.scheduledIdleSafeTimeSyncExecutor, (long)shutdownTimeoutSeconds, (TimeUnit)TimeUnit.SECONDS);
        IgniteUtils.shutdownAndAwaitTermination((ExecutorService)this.throttledLogExecutor, (long)shutdownTimeoutSeconds, (TimeUnit)TimeUnit.SECONDS);
        try {
            IgniteUtils.closeAllManually((ManuallyCloseable[])new ManuallyCloseable[]{() -> {
                assert (this.replicas.values().stream().noneMatch(CompletableFuture::isDone)) : "There are replicas alive [replicas=" + this.replicas.entrySet().stream().filter(e -> ((CompletableFuture)e.getValue()).isDone()).map(Map.Entry::getKey).collect(Collectors.toSet()) + "]";
                this.replicas.values().forEach(replicaFuture -> replicaFuture.completeExceptionally(new NodeStoppingException()));
            }});
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    @Nullable
    private static HybridTimestamp extractTimestamp(ReplicaRequest request) {
        if (request instanceof TimestampAware) {
            return ((TimestampAware)((Object)request)).timestamp();
        }
        return null;
    }

    private void sendReplicaUnavailableErrorResponse(String senderConsistentId, long correlationId, ReplicationGroupId groupId, @Nullable HybridTimestamp requestTimestamp) {
        if (requestTimestamp != null) {
            this.clusterNetSvc.messagingService().respond(senderConsistentId, (NetworkMessage)REPLICA_MESSAGES_FACTORY.errorTimestampAwareReplicaResponse().throwable((Throwable)((Object)new ReplicaUnavailableException(groupId, this.clusterNetSvc.topologyService().localMember()))).timestamp(this.clockService.updateClock(requestTimestamp)).build(), correlationId);
        } else {
            this.clusterNetSvc.messagingService().respond(senderConsistentId, (NetworkMessage)REPLICA_MESSAGES_FACTORY.errorReplicaResponse().throwable((Throwable)((Object)new ReplicaUnavailableException(groupId, this.clusterNetSvc.topologyService().localMember()))).build(), correlationId);
        }
    }

    private void sendAwaitReplicaResponse(String senderConsistentId, long correlationId) {
        this.clusterNetSvc.messagingService().respond(senderConsistentId, (NetworkMessage)REPLICA_MESSAGES_FACTORY.awaitReplicaResponse().build(), correlationId);
    }

    private NetworkMessage prepareReplicaResponse(boolean sendTimestamp, ReplicaResult result) {
        if (sendTimestamp) {
            HybridTimestamp commitTs = result.applyResult().commitTimestamp();
            return REPLICA_MESSAGES_FACTORY.timestampAwareReplicaResponse().result(result.result()).timestamp(commitTs == null ? this.clockService.current() : commitTs).build();
        }
        return REPLICA_MESSAGES_FACTORY.replicaResponse().result(result.result()).build();
    }

    private NetworkMessage prepareReplicaErrorResponse(boolean sendTimestamp, Throwable ex) {
        if (sendTimestamp) {
            return REPLICA_MESSAGES_FACTORY.errorTimestampAwareReplicaResponse().throwable(ex).timestamp(this.clockService.now()).build();
        }
        return REPLICA_MESSAGES_FACTORY.errorReplicaResponse().throwable(ex).build();
    }

    private void idleSafeTimeSync() {
        if (!this.shouldAdvanceIdleSafeTime()) {
            return;
        }
        this.lastIdleSafeTimeProposal = this.clockService.now();
        for (Map.Entry<ReplicationGroupId, CompletableFuture<Replica>> entry : this.replicas.entrySet()) {
            try {
                this.sendSafeTimeSyncIfReplicaReady(entry.getValue());
            }
            catch (Throwable e) {
                String errorMessage = String.format("Error while trying to send a safe time sync request [groupId=%s]", entry.getKey());
                this.failureProcessor.process(new FailureContext(e, errorMessage));
            }
        }
    }

    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> replicaFuture) {
        if (!CompletableFutures.isCompletedSuccessfully(replicaFuture)) {
            return;
        }
        Replica replica = replicaFuture.join();
        ReplicationGroupId replicaGroupId = replica.groupId();
        ReplicaSafeTimeSyncRequest req = REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest().groupId(ReplicaManager.toReplicationGroupIdMessage(replicaGroupId)).build();
        replica.processRequest(req, this.localNodeId).whenComplete((res, ex) -> {
            if (ex != null) {
                if (ExceptionUtils.hasCause((Throwable)ex, (Class[])new Class[]{TimeoutException.class, ReplicationTimeoutException.class})) {
                    this.tryToLogTimeoutFailure(replicaGroupId, (Throwable)ex);
                } else {
                    this.timeoutAttemptsCounters.put(replicaGroupId, 0);
                }
                if (!ExceptionUtils.hasCause((Throwable)ex, (Class[])new Class[]{NodeStoppingException.class, ComponentStoppingException.class, TimeoutException.class, GroupOverloadedException.class})) {
                    this.failureProcessor.process(new FailureContext(ex, String.format("Could not advance safe time for %s", replica.groupId())));
                }
            }
        });
    }

    private void tryToLogTimeoutFailure(ReplicationGroupId replicaGroupId, Throwable timeoutException) {
        Integer currentAttempt = this.timeoutAttemptsCounters.computeIfPresent(replicaGroupId, (id, attempts) -> attempts + 1);
        if (currentAttempt == null) {
            return;
        }
        if (currentAttempt < 10) {
            return;
        }
        this.throttledLog.warn("SafeTime-Sync-Timeouts", "Failed to sync safe time for partition, the same kind of issue may affect all other replicas on this node [groupId={}, attempt={}].", timeoutException, new Object[]{replicaGroupId, currentAttempt});
    }

    private boolean shouldAdvanceIdleSafeTime() {
        HybridTimestamp lastProposal = this.lastIdleSafeTimeProposal;
        if (lastProposal == null) {
            return true;
        }
        HybridTimestamp requiredLastAttemptActualityTime = lastProposal.addPhysicalTime(this.clockService.maxClockSkewMillis());
        return this.placementDriver.isActualAt(requiredLastAttemptActualityTime);
    }

    @Deprecated
    @TestOnly
    public boolean isReplicaStarted(ReplicationGroupId replicaGrpId) {
        CompletableFuture<Replica> replicaFuture = this.replicas.get(replicaGrpId);
        return replicaFuture != null && CompletableFutures.isCompletedSuccessfully(replicaFuture);
    }

    public CompletableFuture<Boolean> weakStartReplica(ReplicationGroupId groupId, Supplier<CompletableFuture<Boolean>> startOperation, @Nullable Assignments forcedAssignments) {
        return this.replicaStateManager.weakStartReplica(groupId, startOperation, forcedAssignments);
    }

    public CompletableFuture<Void> weakStopReplica(ReplicationGroupId groupId, WeakReplicaStopReason reason, Supplier<CompletableFuture<Void>> stopOperation) {
        return this.replicaStateManager.weakStopReplica(groupId, reason, stopOperation);
    }

    @TestOnly
    public boolean isReplicaTouched(ReplicationGroupId replicaGrpId) {
        return this.replicas.containsKey(replicaGrpId);
    }

    @TestOnly
    public Set<ReplicationGroupId> startedGroups() {
        return this.replicas.entrySet().stream().filter(entry -> CompletableFutures.isCompletedSuccessfully((CompletableFuture)((CompletableFuture)entry.getValue()))).map(Map.Entry::getKey).collect(Collectors.toSet());
    }

    public void destroyReplicationProtocolStoragesOnStartup(ReplicationGroupId replicaGrpId) throws NodeStoppingException {
        this.destroyReplicationProtocolStorages(replicaGrpId, false);
    }

    public void destroyReplicationProtocolStorages(ReplicationGroupId replicaGrpId, boolean isVolatileStorage) throws NodeStoppingException {
        RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new Peer(this.localNodeConsistentId));
        RaftGroupOptions groupOptions = this.groupOptionsForPartition(isVolatileStorage, null);
        ((Loza)this.raftManager).destroyRaftNodeStorages(raftNodeId, groupOptions);
    }

    public void destroyReplicationProtocolStoragesDurably(ReplicationGroupId replicaGrpId, boolean isVolatileStorage) throws NodeStoppingException {
        RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new Peer(this.localNodeConsistentId));
        RaftGroupOptions groupOptions = this.groupOptionsForPartition(isVolatileStorage, null);
        ((Loza)this.raftManager).destroyRaftNodeStoragesDurably(raftNodeId, groupOptions);
    }

    public Set<TablePartitionId> replicationProtocolTablePartitionIdsOnDisk() throws NodeStoppingException {
        return ((Loza)this.raftManager).raftNodeIdsOnDisk().stream().map(id -> {
            assert (id.peer().idx() == 0) : id;
            return id.groupIdName();
        }).filter(PartitionGroupId::matchesString).map(TablePartitionId::fromString).collect(Collectors.toUnmodifiableSet());
    }

    private static ReplicationGroupIdMessage toReplicationGroupIdMessage(ReplicationGroupId replicationGroupId) {
        if (replicationGroupId instanceof TablePartitionId) {
            return ReplicaMessageUtils.toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, (TablePartitionId)replicationGroupId);
        }
        if (replicationGroupId instanceof ZonePartitionId) {
            return ReplicaMessageUtils.toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, (ZonePartitionId)replicationGroupId);
        }
        throw new AssertionError((Object)("Not supported: " + replicationGroupId));
    }

    private boolean enterBusy() {
        return this.busyLock.enterBusy();
    }

    private void leaveBusy() {
        this.busyLock.leaveBusy();
    }

    private void blockBusy() {
        this.busyLock.block();
    }

    public static enum WeakReplicaStopReason {
        EXCLUDED_FROM_ASSIGNMENTS,
        PRIMARY_EXPIRED,
        RESTART;

    }
}

