package org.apache.ignite3.internal.replicator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite3.internal.event.AbstractEventProducer;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.failure.FailureType;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.ComponentStoppingException;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.IgniteThrottledLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.manager.IgniteComponent;
import org.apache.ignite3.internal.network.ChannelType;
import org.apache.ignite3.internal.network.ClusterService;
import org.apache.ignite3.internal.network.NetworkMessage;
import org.apache.ignite3.internal.network.NetworkMessageHandler;
import org.apache.ignite3.internal.partitiondistribution.Assignments;
import org.apache.ignite3.internal.placementdriver.PlacementDriver;
import org.apache.ignite3.internal.placementdriver.message.PlacementDriverMessageGroup;
import org.apache.ignite3.internal.placementdriver.message.PlacementDriverMessagesFactory;
import org.apache.ignite3.internal.placementdriver.message.PlacementDriverReplicaMessage;
import org.apache.ignite3.internal.placementdriver.message.StopLeaseProlongationMessageResponse;
import org.apache.ignite3.internal.raft.Loza;
import org.apache.ignite3.internal.raft.Marshaller;
import org.apache.ignite3.internal.raft.Peer;
import org.apache.ignite3.internal.raft.PeersAndLearners;
import org.apache.ignite3.internal.raft.RaftGroupEventsListener;
import org.apache.ignite3.internal.raft.RaftGroupOptionsConfigurer;
import org.apache.ignite3.internal.raft.RaftManager;
import org.apache.ignite3.internal.raft.RaftNodeId;
import org.apache.ignite3.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite3.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite3.internal.raft.server.RaftGroupOptions;
import org.apache.ignite3.internal.raft.service.RaftGroupListener;
import org.apache.ignite3.internal.raft.service.RaftGroupService;
import org.apache.ignite3.internal.raft.storage.SnapshotStorageFactory;
import org.apache.ignite3.internal.raft.storage.impl.LogStorageFactoryCreator;
import org.apache.ignite3.internal.raft.storage.impl.VolatileRaftMetaStorage;
import org.apache.ignite3.internal.replicator.exception.ExpectedReplicationException;
import org.apache.ignite3.internal.replicator.exception.ReplicaStoppingException;
import org.apache.ignite3.internal.replicator.exception.ReplicaUnavailableException;
import org.apache.ignite3.internal.replicator.listener.ReplicaListener;
import org.apache.ignite3.internal.replicator.message.AwaitReplicaRequest;
import org.apache.ignite3.internal.replicator.message.PrimaryReplicaRequest;
import org.apache.ignite3.internal.replicator.message.ReadOnlyDirectReplicaRequest;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite3.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite3.internal.replicator.message.ReplicaRequest;
import org.apache.ignite3.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite3.internal.replicator.message.TimestampAware;
import org.apache.ignite3.internal.thread.ExecutorChooser;
import org.apache.ignite3.internal.thread.NamedThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteStripedBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.util.PendingComparableValuesTracker;
import org.apache.ignite3.internal.util.TrackerClosedException;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.lang.IgniteException;
import org.apache.ignite3.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/* loaded from: input_file:org/apache/ignite3/internal/replicator/ReplicaManager.class */
public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, LocalReplicaEventParameters> implements IgniteComponent {
    private static final long STOP_LEASE_PROLONGATION_RETRIES_TIMEOUT_MS = 60000;
    private static final IgniteLogger LOG;
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY;
    private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY;
    private final ExecutorService throttledLogExecutor;
    private final IgniteThrottledLogger throttledLog;
    private final ClusterService clusterNetSvc;
    private final ClusterManagementGroupManager cmgMgr;
    private final RaftManager raftManager;
    private final TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory;
    private final LogStorageFactoryCreator volatileLogStorageFactoryCreator;
    private final Executor replicaStartStopExecutor;
    private final Marshaller raftCommandsMarshaller;
    private final PlacementDriver placementDriver;
    private final LongSupplier idleSafeTimePropagationPeriodMsSupplier;
    private final ClockService clockService;
    private final ScheduledExecutorService scheduledIdleSafeTimeSyncExecutor;
    private final Executor requestsExecutor;
    private final FailureProcessor failureProcessor;
    private final Set<Class<?>> messageGroupsToHandle;
    private final RaftGroupOptionsConfigurer partitionRaftConfigurer;
    private final ReplicaStateManager replicaStateManager;
    private volatile UUID localNodeId;
    private volatile String localNodeConsistentId;

    @Nullable
    private volatile HybridTimestamp lastIdleSafeTimeProposal;
    private final Function<ReplicationGroupId, CompletableFuture<byte[]>> getPendingAssignmentsSupplier;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final IgniteStripedBusyLock busyLock = new IgniteStripedBusyLock();
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final CompletableFuture<Set<String>> msNodes = new CompletableFuture<>();
    private final ConcurrentHashMap<ReplicationGroupId, CompletableFuture<Replica>> replicas = new ConcurrentHashMap<>();
    private final NetworkMessageHandler handler = this::onReplicaMessageReceived;
    private final NetworkMessageHandler placementDriverMessageHandler = this::onPlacementDriverMessageReceived;

    /* loaded from: input_file:org/apache/ignite3/internal/replicator/ReplicaManager$WeakReplicaStopReason.class */
    public enum WeakReplicaStopReason {
        EXCLUDED_FROM_ASSIGNMENTS,
        PRIMARY_EXPIRED,
        RESTART
    }

    public ReplicaManager(String str, ClusterService clusterService, ClusterManagementGroupManager clusterManagementGroupManager, ClockService clockService, Set<Class<?>> set, PlacementDriver placementDriver, Executor executor, LongSupplier longSupplier, FailureProcessor failureProcessor, @Nullable Marshaller marshaller, TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory, RaftManager raftManager, RaftGroupOptionsConfigurer raftGroupOptionsConfigurer, LogStorageFactoryCreator logStorageFactoryCreator, Executor executor2, Function<ReplicationGroupId, CompletableFuture<byte[]>> function) {
        this.clusterNetSvc = clusterService;
        this.cmgMgr = clusterManagementGroupManager;
        this.clockService = clockService;
        this.messageGroupsToHandle = set;
        this.volatileLogStorageFactoryCreator = logStorageFactoryCreator;
        this.placementDriver = placementDriver;
        this.requestsExecutor = executor;
        this.idleSafeTimePropagationPeriodMsSupplier = longSupplier;
        this.failureProcessor = failureProcessor;
        this.raftCommandsMarshaller = marshaller;
        this.raftGroupServiceFactory = topologyAwareRaftGroupServiceFactory;
        this.raftManager = raftManager;
        this.partitionRaftConfigurer = raftGroupOptionsConfigurer;
        this.getPendingAssignmentsSupplier = function;
        this.replicaStartStopExecutor = executor2;
        this.replicaStateManager = new ReplicaStateManager(executor2, clockService, placementDriver, this, failureProcessor);
        this.scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool(1, NamedThreadFactory.create(str, "scheduled-idle-safe-time-sync-thread", LOG));
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        this.throttledLogExecutor = new ThreadPoolExecutor(availableProcessors, availableProcessors, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue(), NamedThreadFactory.create(str, "throttled-log-replica-manager", LOG));
        this.throttledLog = Loggers.toThrottledLogger(LOG, this.throttledLogExecutor);
    }

    private void onReplicaMessageReceived(NetworkMessage networkMessage, ClusterNode clusterNode, @Nullable Long l) {
        if (networkMessage instanceof ReplicaRequest) {
            if (!$assertionsDisabled && l == null) {
                throw new AssertionError();
            }
            ReplicaRequest replicaRequest = (ReplicaRequest) networkMessage;
            if (IgniteUtils.shouldSwitchToRequestsExecutor(ThreadOperation.STORAGE_READ, ThreadOperation.STORAGE_WRITE, ThreadOperation.TX_STATE_STORAGE_ACCESS)) {
                this.requestsExecutor.execute(() -> {
                    handleReplicaRequest(replicaRequest, clusterNode, l);
                });
            } else {
                handleReplicaRequest(replicaRequest, clusterNode, l);
            }
        }
    }

    private void handleReplicaRequest(ReplicaRequest replicaRequest, ClusterNode clusterNode, @Nullable Long l) {
        if (!enterBusy()) {
            if (LOG.isInfoEnabled()) {
                LOG.info("Failed to process replica request (the node is stopping) [request={}].", replicaRequest);
                return;
            }
            return;
        }
        ReplicationGroupId asReplicationGroupId = replicaRequest.groupId().asReplicationGroupId();
        String name = clusterNode.name();
        try {
            if (replicaRequest instanceof AwaitReplicaRequest) {
                this.replicas.compute(asReplicationGroupId, (replicationGroupId, completableFuture) -> {
                    if (completableFuture == null) {
                        completableFuture = new CompletableFuture();
                    }
                    if (completableFuture.isDone()) {
                        sendAwaitReplicaResponse(name, l.longValue());
                    } else {
                        completableFuture.whenComplete((replica, th) -> {
                            if (th != null) {
                                this.clusterNetSvc.messagingService().respond(name, REPLICA_MESSAGES_FACTORY.errorReplicaResponse().throwable(th).build(), l.longValue());
                            } else {
                                sendAwaitReplicaResponse(name, l.longValue());
                            }
                        });
                    }
                    return completableFuture;
                });
                leaveBusy();
                return;
            }
            CompletableFuture<Replica> completableFuture2 = this.replicas.get(asReplicationGroupId);
            HybridTimestamp extractTimestamp = extractTimestamp(replicaRequest);
            if (completableFuture2 == null || !completableFuture2.isDone()) {
                sendReplicaUnavailableErrorResponse(name, l.longValue(), asReplicationGroupId, extractTimestamp);
                leaveBusy();
                return;
            }
            if (extractTimestamp != null) {
                this.clockService.updateClock(extractTimestamp);
            }
            boolean z = (replicaRequest instanceof TimestampAware) || (replicaRequest instanceof ReadOnlyDirectReplicaRequest);
            completableFuture2.join().processRequest(replicaRequest, clusterNode.id()).handle((replicaResult, th) -> {
                NetworkMessage prepareReplicaErrorResponse;
                if (th == null) {
                    prepareReplicaErrorResponse = prepareReplicaResponse(z, replicaResult);
                } else {
                    if (indicatesUnexpectedProblem(th)) {
                        this.throttledLog.warn("Failed to process replica request [request={}].", th, replicaRequest);
                    } else {
                        this.throttledLog.debug("Failed to process replica request [request={}].", th, replicaRequest);
                    }
                    prepareReplicaErrorResponse = prepareReplicaErrorResponse(z, th);
                }
                this.clusterNetSvc.messagingService().respond(name, prepareReplicaErrorResponse, l.longValue());
                if ((replicaRequest instanceof PrimaryReplicaRequest) && isConnectivityRelatedException(th)) {
                    LOG.info("The replica does not meet the requirements for the leaseholder [groupId={}].", asReplicationGroupId);
                    stopLeaseProlongation(asReplicationGroupId, null);
                }
                if (th != null || replicaResult.applyResult().replicationFuture() == null) {
                    return null;
                }
                replicaResult.applyResult().replicationFuture().whenComplete(replicaResult.delayedAckProcessor != null ? replicaResult.delayedAckProcessor : (obj, th) -> {
                    NetworkMessage prepareReplicaErrorResponse2;
                    LOG.debug("Sending delayed response for replica request [request={}]", replicaRequest);
                    if (th == null) {
                        prepareReplicaErrorResponse2 = prepareReplicaResponse(z, new ReplicaResult(obj, null));
                    } else {
                        LOG.warn("Failed to process delayed response [request={}]", th, replicaRequest);
                        prepareReplicaErrorResponse2 = prepareReplicaErrorResponse(z, th);
                    }
                    this.clusterNetSvc.messagingService().send(name, ChannelType.DEFAULT, prepareReplicaErrorResponse2);
                });
                return null;
            }).whenComplete((BiConsumer<? super U, ? super Throwable>) (obj, th2) -> {
                if (th2 != null) {
                    this.failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, th2));
                }
            });
            leaveBusy();
        } catch (Throwable th3) {
            leaveBusy();
            throw th3;
        }
    }

    private static boolean indicatesUnexpectedProblem(Throwable th) {
        Throwable unwrapCause = ExceptionUtils.unwrapCause(th);
        return ((unwrapCause instanceof ExpectedReplicationException) || (unwrapCause instanceof TrackerClosedException)) ? false : true;
    }

    private static boolean isConnectivityRelatedException(@Nullable Throwable th) {
        if ((th instanceof ExecutionException) || (th instanceof CompletionException)) {
            th = th.getCause();
        }
        return (th instanceof TimeoutException) || (th instanceof IOException);
    }

    private void onPlacementDriverMessageReceived(NetworkMessage networkMessage, ClusterNode clusterNode, @Nullable Long l) {
        if (networkMessage instanceof PlacementDriverReplicaMessage) {
            String name = clusterNode.name();
            if (!$assertionsDisabled && l == null) {
                throw new AssertionError();
            }
            PlacementDriverReplicaMessage placementDriverReplicaMessage = (PlacementDriverReplicaMessage) networkMessage;
            if (!enterBusy()) {
                if (LOG.isInfoEnabled()) {
                    LOG.info("Failed to process placement driver message (the node is stopping) [msg={}].", placementDriverReplicaMessage);
                }
            } else {
                try {
                    this.replicas.computeIfAbsent(placementDriverReplicaMessage.groupId(), replicationGroupId -> {
                        return new CompletableFuture();
                    }).thenCompose(replica -> {
                        return replica.processPlacementDriverMessage(placementDriverReplicaMessage);
                    }).whenComplete((BiConsumer<? super U, ? super Throwable>) (networkMessage2, th) -> {
                        if (th == null) {
                            this.clusterNetSvc.messagingService().respond(name, networkMessage2, l.longValue());
                        } else {
                            if (ExceptionUtils.hasCause(th, NodeStoppingException.class, ReplicaStoppingException.class)) {
                                return;
                            }
                            this.failureProcessor.process(new FailureContext(th, String.format("Failed to process placement driver message [msg=%s].", placementDriverReplicaMessage)));
                        }
                    });
                    leaveBusy();
                } catch (Throwable th2) {
                    leaveBusy();
                    throw th2;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<HybridTimestamp> stopLeaseProlongation(ReplicationGroupId replicationGroupId, @Nullable String str) {
        return stopLeaseProlongation(replicationGroupId, str, System.currentTimeMillis() + STOP_LEASE_PROLONGATION_RETRIES_TIMEOUT_MS);
    }

    private CompletableFuture<HybridTimestamp> stopLeaseProlongation(ReplicationGroupId replicationGroupId, @Nullable String str, long j) {
        long currentTimeMillis = j - System.currentTimeMillis();
        return currentTimeMillis <= 0 ? CompletableFuture.failedFuture(new IgniteException(ErrorGroups.Common.INTERNAL_ERR, IgniteStringFormatter.format("Failed to stop lease prolongation within timeout [groupId={}]", replicationGroupId))) : this.msNodes.thenCompose(set -> {
            ArrayList arrayList = new ArrayList();
            Iterator it = set.iterator();
            while (it.hasNext()) {
                ClusterNode byConsistentId = this.clusterNetSvc.topologyService().getByConsistentId((String) it.next());
                if (byConsistentId != null) {
                    arrayList.add(this.clusterNetSvc.messagingService().invoke(byConsistentId, PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage().groupId(replicationGroupId).redirectProposal(str).build(), currentTimeMillis).exceptionally(th -> {
                        return null;
                    }));
                }
            }
            return CompletableFutures.allOf(arrayList).thenCompose(r13 -> {
                NetworkMessage networkMessage = (NetworkMessage) arrayList.stream().map((v0) -> {
                    return v0.join();
                }).filter(networkMessage2 -> {
                    return (networkMessage2 instanceof StopLeaseProlongationMessageResponse) && ((StopLeaseProlongationMessageResponse) networkMessage2).deniedLeaseExpirationTime() != null;
                }).findAny().orElse(null);
                return networkMessage == null ? CompletableFuture.supplyAsync(() -> {
                    return null;
                }, CompletableFuture.delayedExecutor(50L, TimeUnit.MILLISECONDS)).thenComposeAsync(obj -> {
                    return stopLeaseProlongation(replicationGroupId, str, j);
                }, this.requestsExecutor) : CompletableFuture.completedFuture(((StopLeaseProlongationMessageResponse) networkMessage).deniedLeaseExpirationTime());
            });
        });
    }

    @Deprecated
    public void startLearnerNode(ReplicationGroupId replicationGroupId, PeersAndLearners peersAndLearners, SnapshotStorageFactory snapshotStorageFactory, RaftGroupListener raftGroupListener) {
        try {
            ((Loza) this.raftManager).startRaftGroupNode(new RaftNodeId(replicationGroupId, new Peer(this.localNodeConsistentId, 1)), peersAndLearners, raftGroupListener, RaftGroupEventsListener.noopLsnr, groupOptionsForPartition(false, snapshotStorageFactory));
        } catch (NodeStoppingException e) {
            throw new CompletionException(e);
        }
    }

    public CompletableFuture<Replica> startReplica(RaftGroupEventsListener raftGroupEventsListener, RaftGroupListener raftGroupListener, boolean z, @Nullable SnapshotStorageFactory snapshotStorageFactory, Function<RaftGroupService, ReplicaListener> function, PendingComparableValuesTracker<Long, Void> pendingComparableValuesTracker, ReplicationGroupId replicationGroupId, PeersAndLearners peersAndLearners) throws NodeStoppingException {
        if (!enterBusy()) {
            throw new NodeStoppingException();
        }
        try {
            ClusterNode localMember = this.clusterNetSvc.topologyService().localMember();
            CompletableFuture<Replica> startReplicaInternal = startReplicaInternal(replicationGroupId, snapshotStorageFactory, peersAndLearners, raftGroupListener, raftGroupEventsListener, z, topologyAwareRaftGroupService -> {
                PlacementDriver placementDriver = this.placementDriver;
                ClockService clockService = this.clockService;
                ReplicaStateManager replicaStateManager = this.replicaStateManager;
                Objects.requireNonNull(replicaStateManager);
                return new ReplicaImpl(replicationGroupId, (ReplicaListener) function.apply(topologyAwareRaftGroupService), localMember, this.placementDriver, this.getPendingAssignmentsSupplier, this.failureProcessor, new PlacementDriverMessageProcessor(replicationGroupId, localMember, placementDriver, clockService, replicaStateManager::reserveReplica, this.requestsExecutor, pendingComparableValuesTracker, topologyAwareRaftGroupService, this.failureProcessor));
            });
            leaveBusy();
            return startReplicaInternal;
        } catch (Throwable th) {
            leaveBusy();
            throw th;
        }
    }

    public CompletableFuture<Replica> startReplica(ReplicationGroupId replicationGroupId, Function<RaftGroupService, ReplicaListener> function, SnapshotStorageFactory snapshotStorageFactory, PeersAndLearners peersAndLearners, RaftGroupListener raftGroupListener, RaftGroupEventsListener raftGroupEventsListener, boolean z, IgniteSpinBusyLock igniteSpinBusyLock, PendingComparableValuesTracker<Long, Void> pendingComparableValuesTracker) throws NodeStoppingException {
        if (!igniteSpinBusyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            CompletableFuture<Replica> startReplicaInternal = startReplicaInternal(replicationGroupId, snapshotStorageFactory, peersAndLearners, raftGroupListener, raftGroupEventsListener, z, topologyAwareRaftGroupService -> {
                ClusterNode localMember = this.clusterNetSvc.topologyService().localMember();
                PlacementDriver placementDriver = this.placementDriver;
                ClockService clockService = this.clockService;
                ReplicaStateManager replicaStateManager = this.replicaStateManager;
                Objects.requireNonNull(replicaStateManager);
                return new ZonePartitionReplicaImpl(replicationGroupId, (ReplicaListener) function.apply(topologyAwareRaftGroupService), topologyAwareRaftGroupService, new PlacementDriverMessageProcessor(replicationGroupId, localMember, placementDriver, clockService, replicaStateManager::reserveReplica, this.requestsExecutor, pendingComparableValuesTracker, topologyAwareRaftGroupService, this.failureProcessor));
            });
            igniteSpinBusyLock.leaveBusy();
            return startReplicaInternal;
        } catch (Throwable th) {
            igniteSpinBusyLock.leaveBusy();
            throw th;
        }
    }

    private CompletableFuture<Replica> startReplicaInternal(ReplicationGroupId replicationGroupId, @Nullable SnapshotStorageFactory snapshotStorageFactory, PeersAndLearners peersAndLearners, RaftGroupListener raftGroupListener, RaftGroupEventsListener raftGroupEventsListener, boolean z, Function<TopologyAwareRaftGroupService, Replica> function) throws NodeStoppingException {
        TopologyAwareRaftGroupService topologyAwareRaftGroupService = (TopologyAwareRaftGroupService) ((Loza) this.raftManager).startRaftGroupNode(new RaftNodeId(replicationGroupId, new Peer(this.localNodeConsistentId)), peersAndLearners, raftGroupListener, raftGroupEventsListener, groupOptionsForPartition(z, snapshotStorageFactory), this.raftGroupServiceFactory);
        LOG.info("Replica is about to start [replicationGroupId={}].", replicationGroupId);
        Replica apply = function.apply(topologyAwareRaftGroupService);
        CompletableFuture<Replica> compute = this.replicas.compute(replicationGroupId, (replicationGroupId2, completableFuture) -> {
            if (completableFuture != null && !completableFuture.isDone()) {
                LOG.info("Replica is started, existing replica waiter was completed [replicationGroupId={}].", replicationGroupId);
                completableFuture.complete(apply);
                return completableFuture;
            }
            if (!$assertionsDisabled && completableFuture != null && !CompletableFutures.isCompletedSuccessfully(completableFuture)) {
                throw new AssertionError();
            }
            LOG.info("Replica is started [replicationGroupId={}].", replicationGroupId);
            return CompletableFuture.completedFuture(apply);
        });
        return fireEvent(LocalReplicaEvent.AFTER_REPLICA_STARTED, new LocalReplicaEventParameters(replicationGroupId)).exceptionally(th -> {
            LOG.error("Error when notifying about AFTER_REPLICA_STARTED event.", th);
            return null;
        }).thenCompose(r3 -> {
            return compute;
        });
    }

    public CompletableFuture<Replica> replica(ReplicationGroupId replicationGroupId) {
        return this.replicas.get(replicationGroupId);
    }

    public void resetPeers(ReplicationGroupId replicationGroupId, PeersAndLearners peersAndLearners) {
        ((Loza) this.raftManager).resetPeers(new RaftNodeId(replicationGroupId, new Peer(this.localNodeConsistentId)), peersAndLearners);
    }

    private RaftGroupOptions groupOptionsForPartition(boolean z, @Nullable SnapshotStorageFactory snapshotStorageFactory) {
        RaftGroupOptions forPersistentStores;
        if (z) {
            forPersistentStores = RaftGroupOptions.forVolatileStores().setLogStorageFactory(this.volatileLogStorageFactoryCreator.factory(((Loza) this.raftManager).volatileRaft().logStorageBudget().value())).raftMetaStorageFactory((str, raftOptions) -> {
                return new VolatileRaftMetaStorage();
            });
        } else {
            forPersistentStores = RaftGroupOptions.forPersistentStores();
        }
        forPersistentStores.snapshotStorageFactory(snapshotStorageFactory);
        forPersistentStores.maxClockSkew((int) this.clockService.maxClockSkewMillis());
        forPersistentStores.commandsMarshaller(this.raftCommandsMarshaller);
        this.partitionRaftConfigurer.configure(forPersistentStores);
        return forPersistentStores;
    }

    public CompletableFuture<Boolean> stopReplica(ReplicationGroupId replicationGroupId) throws NodeStoppingException {
        if (!enterBusy()) {
            throw new NodeStoppingException();
        }
        try {
            return stopReplicaInternal(replicationGroupId);
        } finally {
            leaveBusy();
        }
    }

    private CompletableFuture<Boolean> stopReplicaInternal(ReplicationGroupId replicationGroupId) {
        CompletableFuture completableFuture = new CompletableFuture();
        fireEvent(LocalReplicaEvent.BEFORE_REPLICA_STOPPED, new LocalReplicaEventParameters(replicationGroupId)).whenComplete((r9, th) -> {
            if (th != null) {
                this.failureProcessor.process(new FailureContext(th, "Error when notifying about BEFORE_REPLICA_STOPPED event."));
            }
            if (!enterBusy()) {
                completableFuture.completeExceptionally(new NodeStoppingException());
                return;
            }
            try {
                this.replicas.compute(replicationGroupId, (replicationGroupId2, completableFuture2) -> {
                    if (completableFuture2 == null) {
                        completableFuture.complete(false);
                        return null;
                    }
                    if (!completableFuture2.isDone()) {
                        completableFuture2.completeExceptionally(new ReplicaStoppingException(replicationGroupId2, this.clusterNetSvc.topologyService().localMember()));
                        completableFuture.complete(true);
                        return null;
                    }
                    if (CompletableFutures.isCompletedSuccessfully(completableFuture2)) {
                        completableFuture2.thenCompose((v0) -> {
                            return v0.shutdown();
                        }).whenComplete((r9, th) -> {
                            if (th != null) {
                                this.failureProcessor.process(new FailureContext(th, String.format("Failed to stop replica [replicaGrpId=%s].", replicationGroupId2)));
                            }
                            completableFuture.complete(Boolean.valueOf(th == null));
                        });
                        return null;
                    }
                    completableFuture.complete(true);
                    return null;
                });
                leaveBusy();
            } catch (Throwable th) {
                leaveBusy();
                throw th;
            }
        });
        return completableFuture.thenApplyAsync(bool -> {
            if (!bool.booleanValue()) {
                return false;
            }
            try {
                this.raftManager.stopRaftNodes(replicationGroupId);
                return true;
            } catch (NodeStoppingException e) {
                return false;
            }
        }, this.replicaStartStopExecutor);
    }

    @Override // org.apache.ignite3.internal.manager.IgniteComponent
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        ExecutorChooser<NetworkMessage> executorChooser = networkMessage -> {
            return this.requestsExecutor;
        };
        this.clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, executorChooser, this.handler);
        this.clusterNetSvc.messagingService().addMessageHandler(PlacementDriverMessageGroup.class, this.placementDriverMessageHandler);
        this.messageGroupsToHandle.forEach(cls -> {
            this.clusterNetSvc.messagingService().addMessageHandler(cls, executorChooser, this.handler);
        });
        this.scheduledIdleSafeTimeSyncExecutor.scheduleAtFixedRate(this::idleSafeTimeSync, 0L, this.idleSafeTimePropagationPeriodMsSupplier.getAsLong(), TimeUnit.MILLISECONDS);
        this.cmgMgr.metaStorageNodes().whenComplete((set, th) -> {
            if (th != null) {
                this.msNodes.completeExceptionally(th);
            } else {
                this.msNodes.complete(set);
            }
        });
        this.localNodeId = this.clusterNetSvc.topologyService().localMember().id();
        this.localNodeConsistentId = this.clusterNetSvc.topologyService().localMember().name();
        this.replicaStateManager.start(this.localNodeId);
        return CompletableFutures.nullCompletedFuture();
    }

    @Override // org.apache.ignite3.internal.manager.IgniteComponent
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.replicaStateManager.stop();
        blockBusy();
        IgniteUtils.shutdownAndAwaitTermination(this.scheduledIdleSafeTimeSyncExecutor, 10, TimeUnit.SECONDS);
        IgniteUtils.shutdownAndAwaitTermination(this.throttledLogExecutor, 10, TimeUnit.SECONDS);
        try {
            IgniteUtils.closeAllManually(() -> {
                if (!$assertionsDisabled && !this.replicas.values().stream().noneMatch((v0) -> {
                    return v0.isDone();
                })) {
                    throw new AssertionError("There are replicas alive [replicas=" + this.replicas.entrySet().stream().filter(entry -> {
                        return ((CompletableFuture) entry.getValue()).isDone();
                    }).map((v0) -> {
                        return v0.getKey();
                    }).collect(Collectors.toSet()) + "]");
                }
                this.replicas.values().forEach(completableFuture -> {
                    completableFuture.completeExceptionally(new NodeStoppingException());
                });
            });
            return CompletableFutures.nullCompletedFuture();
        } catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    @Nullable
    private static HybridTimestamp extractTimestamp(ReplicaRequest replicaRequest) {
        if (replicaRequest instanceof TimestampAware) {
            return ((TimestampAware) replicaRequest).timestamp();
        }
        return null;
    }

    private void sendReplicaUnavailableErrorResponse(String str, long j, ReplicationGroupId replicationGroupId, @Nullable HybridTimestamp hybridTimestamp) {
        if (hybridTimestamp != null) {
            this.clusterNetSvc.messagingService().respond(str, REPLICA_MESSAGES_FACTORY.errorTimestampAwareReplicaResponse().throwable(new ReplicaUnavailableException(replicationGroupId, this.clusterNetSvc.topologyService().localMember())).timestamp(this.clockService.updateClock(hybridTimestamp)).build(), j);
        } else {
            this.clusterNetSvc.messagingService().respond(str, REPLICA_MESSAGES_FACTORY.errorReplicaResponse().throwable(new ReplicaUnavailableException(replicationGroupId, this.clusterNetSvc.topologyService().localMember())).build(), j);
        }
    }

    private void sendAwaitReplicaResponse(String str, long j) {
        this.clusterNetSvc.messagingService().respond(str, REPLICA_MESSAGES_FACTORY.awaitReplicaResponse().build(), j);
    }

    private NetworkMessage prepareReplicaResponse(boolean z, ReplicaResult replicaResult) {
        if (!z) {
            return REPLICA_MESSAGES_FACTORY.replicaResponse().result(replicaResult.result()).build();
        }
        HybridTimestamp commitTimestamp = replicaResult.applyResult().commitTimestamp();
        return REPLICA_MESSAGES_FACTORY.timestampAwareReplicaResponse().result(replicaResult.result()).timestamp(commitTimestamp == null ? this.clockService.current() : commitTimestamp).build();
    }

    private NetworkMessage prepareReplicaErrorResponse(boolean z, Throwable th) {
        return z ? REPLICA_MESSAGES_FACTORY.errorTimestampAwareReplicaResponse().throwable(th).timestamp(this.clockService.now()).build() : REPLICA_MESSAGES_FACTORY.errorReplicaResponse().throwable(th).build();
    }

    private void idleSafeTimeSync() {
        if (shouldAdvanceIdleSafeTime()) {
            this.lastIdleSafeTimeProposal = this.clockService.now();
            for (Map.Entry<ReplicationGroupId, CompletableFuture<Replica>> entry : this.replicas.entrySet()) {
                try {
                    sendSafeTimeSyncIfReplicaReady(entry.getValue());
                } catch (Throwable th) {
                    this.failureProcessor.process(new FailureContext(th, String.format("Error while trying to send a safe time sync request [groupId=%s]", entry.getKey())));
                }
            }
        }
    }

    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> completableFuture) {
        if (CompletableFutures.isCompletedSuccessfully(completableFuture)) {
            Replica join = completableFuture.join();
            join.processRequest(REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest().groupId(toReplicationGroupIdMessage(join.groupId())).build(), this.localNodeId).whenComplete((replicaResult, th) -> {
                if (th == null || ExceptionUtils.hasCause(th, NodeStoppingException.class, ComponentStoppingException.class, TimeoutException.class)) {
                    return;
                }
                this.failureProcessor.process(new FailureContext(th, String.format("Could not advance safe time for %s", join.groupId())));
            });
        }
    }

    private boolean shouldAdvanceIdleSafeTime() {
        HybridTimestamp hybridTimestamp = this.lastIdleSafeTimeProposal;
        if (hybridTimestamp == null) {
            return true;
        }
        return this.placementDriver.isActualAt(hybridTimestamp.addPhysicalTime(this.clockService.maxClockSkewMillis()));
    }

    @TestOnly
    @Deprecated
    public boolean isReplicaStarted(ReplicationGroupId replicationGroupId) {
        CompletableFuture<Replica> completableFuture = this.replicas.get(replicationGroupId);
        return completableFuture != null && CompletableFutures.isCompletedSuccessfully(completableFuture);
    }

    public CompletableFuture<Boolean> weakStartReplica(ReplicationGroupId replicationGroupId, Supplier<CompletableFuture<Boolean>> supplier, @Nullable Assignments assignments) {
        return this.replicaStateManager.weakStartReplica(replicationGroupId, supplier, assignments);
    }

    public CompletableFuture<Void> weakStopReplica(ReplicationGroupId replicationGroupId, WeakReplicaStopReason weakReplicaStopReason, Supplier<CompletableFuture<Void>> supplier) {
        return this.replicaStateManager.weakStopReplica(replicationGroupId, weakReplicaStopReason, supplier);
    }

    @TestOnly
    public boolean isReplicaTouched(ReplicationGroupId replicationGroupId) {
        return this.replicas.containsKey(replicationGroupId);
    }

    @TestOnly
    public Set<ReplicationGroupId> startedGroups() {
        return (Set) this.replicas.entrySet().stream().filter(entry -> {
            return CompletableFutures.isCompletedSuccessfully((CompletableFuture) entry.getValue());
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet());
    }

    public void destroyReplicationProtocolStorages(ReplicationGroupId replicationGroupId, boolean z) throws NodeStoppingException {
        ((Loza) this.raftManager).destroyRaftNodeStorages(new RaftNodeId(replicationGroupId, new Peer(this.localNodeConsistentId)), groupOptionsForPartition(z, null));
    }

    private static ReplicationGroupIdMessage toReplicationGroupIdMessage(ReplicationGroupId replicationGroupId) {
        if (replicationGroupId instanceof TablePartitionId) {
            return ReplicaMessageUtils.toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, (TablePartitionId) replicationGroupId);
        }
        if (replicationGroupId instanceof ZonePartitionId) {
            return ReplicaMessageUtils.toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, (ZonePartitionId) replicationGroupId);
        }
        throw new AssertionError("Not supported: " + replicationGroupId);
    }

    private boolean enterBusy() {
        return this.busyLock.enterBusy();
    }

    private void leaveBusy() {
        this.busyLock.leaveBusy();
    }

    private void blockBusy() {
        this.busyLock.block();
    }

    static {
        $assertionsDisabled = !ReplicaManager.class.desiredAssertionStatus();
        LOG = Loggers.forClass(ReplicaManager.class);
        REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
        PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
    }
}
