/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.partition.replicator;

import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite3.internal.catalog.Catalog;
import org.apache.ignite3.internal.catalog.CatalogService;
import org.apache.ignite3.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite3.internal.catalog.events.CatalogEvent;
import org.apache.ignite3.internal.catalog.events.CreateZoneEventParameters;
import org.apache.ignite3.internal.components.NodeProperties;
import org.apache.ignite3.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite3.internal.configuration.utils.SystemDistributedConfigurationPropertyHolder;
import org.apache.ignite3.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite3.internal.distributionzones.rebalance.AssignmentUtil;
import org.apache.ignite3.internal.distributionzones.rebalance.PartitionMover;
import org.apache.ignite3.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite3.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener;
import org.apache.ignite3.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite3.internal.event.AbstractEventProducer;
import org.apache.ignite3.internal.event.EventListener;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.lang.ComponentStoppingException;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.lowwatermark.LowWatermark;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.manager.IgniteComponent;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.MetaStorageManager;
import org.apache.ignite3.internal.metastorage.Revisions;
import org.apache.ignite3.internal.metastorage.WatchEvent;
import org.apache.ignite3.internal.metastorage.WatchListener;
import org.apache.ignite3.internal.metastorage.dsl.Condition;
import org.apache.ignite3.internal.metastorage.dsl.Conditions;
import org.apache.ignite3.internal.metastorage.dsl.Operation;
import org.apache.ignite3.internal.metastorage.dsl.Operations;
import org.apache.ignite3.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.partition.replicator.LocalPartitionReplicaEvent;
import org.apache.ignite3.internal.partition.replicator.LocalPartitionReplicaEventParameters;
import org.apache.ignite3.internal.partition.replicator.NaiveAsyncReadWriteLock;
import org.apache.ignite3.internal.partition.replicator.ReliableCatalogVersions;
import org.apache.ignite3.internal.partition.replicator.ReplicaTableProcessor;
import org.apache.ignite3.internal.partition.replicator.StartedReplicationGroups;
import org.apache.ignite3.internal.partition.replicator.ZonePartitionReplicaListener;
import org.apache.ignite3.internal.partition.replicator.ZoneResourcesManager;
import org.apache.ignite3.internal.partition.replicator.raft.RaftTableProcessor;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite3.internal.partition.replicator.schema.CatalogValidationSchemasSource;
import org.apache.ignite3.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService;
import org.apache.ignite3.internal.partitiondistribution.Assignment;
import org.apache.ignite3.internal.partitiondistribution.Assignments;
import org.apache.ignite3.internal.partitiondistribution.AssignmentsChain;
import org.apache.ignite3.internal.partitiondistribution.AssignmentsQueue;
import org.apache.ignite3.internal.partitiondistribution.PartitionDistributionUtils;
import org.apache.ignite3.internal.placementdriver.PlacementDriver;
import org.apache.ignite3.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite3.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite3.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
import org.apache.ignite3.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite3.internal.raft.Peer;
import org.apache.ignite3.internal.raft.PeersAndLearners;
import org.apache.ignite3.internal.raft.RaftGroupEventsListener;
import org.apache.ignite3.internal.raft.service.LeaderWithTerm;
import org.apache.ignite3.internal.raft.service.RaftCommandRunner;
import org.apache.ignite3.internal.replicator.Replica;
import org.apache.ignite3.internal.replicator.ReplicaManager;
import org.apache.ignite3.internal.replicator.ReplicationGroupId;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.schema.SchemaManager;
import org.apache.ignite3.internal.schema.SchemaSyncService;
import org.apache.ignite3.internal.storage.DataStorageManager;
import org.apache.ignite3.internal.storage.engine.StorageEngine;
import org.apache.ignite3.internal.thread.ThreadUtils;
import org.apache.ignite3.internal.tostring.S;
import org.apache.ignite3.internal.tx.TxManager;
import org.apache.ignite3.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite3.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite3.internal.util.ByteUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.Cursor;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteBusyLock;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.util.PendingComparableValuesTracker;
import org.apache.ignite3.lang.ErrorGroups;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
import org.jetbrains.annotations.VisibleForTesting;

public class PartitionReplicaLifecycleManager
extends AbstractEventProducer<LocalPartitionReplicaEvent, LocalPartitionReplicaEventParameters>
implements IgniteComponent {
    private final CatalogService catalogService;
    private final ReplicaManager replicaMgr;
    private final DistributionZoneManager distributionZoneMgr;
    private final MetaStorageManager metaStorageMgr;
    private final TopologyService topologyService;
    private final LowWatermark lowWatermark;
    private final FailureProcessor failureProcessor;
    private final NodeProperties nodeProperties;
    private final WatchListener pendingAssignmentsRebalanceListener;
    private final WatchListener stableAssignmentsRebalanceListener;
    private final WatchListener assignmentsSwitchRebalanceListener;
    private static final IgniteLogger LOG = Loggers.forClass(PartitionReplicaLifecycleManager.class);
    private final StartedReplicationGroups startedReplicationGroups = new StartedReplicationGroups();
    private final Map<Integer, NaiveAsyncReadWriteLock> zonePartitionsLocks = new ConcurrentHashMap<Integer, NaiveAsyncReadWriteLock>();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final ExecutorService ioExecutor;
    private final ScheduledExecutorService rebalanceScheduler;
    private final Executor partitionOperationsExecutor;
    private final ClockService clockService;
    private final PlacementDriver executorInclinedPlacementDriver;
    private final SchemaSyncService executorInclinedSchemaSyncService;
    private final TxManager txManager;
    private final SchemaManager schemaManager;
    private final Predicate<Assignment> isLocalNodeAssignment = assignment -> assignment.consistentId().equals(this.localNode().name());
    private final SystemDistributedConfigurationPropertyHolder<Integer> rebalanceRetryDelayConfiguration;
    private final DataStorageManager dataStorageManager;
    private final ZoneResourcesManager zoneResourcesManager;
    private final ReliableCatalogVersions reliableCatalogVersions;
    private final EventListener<CreateZoneEventParameters> onCreateZoneListener = this::onCreateZone;
    private final EventListener<PrimaryReplicaEventParameters> onPrimaryReplicaExpiredListener = this::onPrimaryReplicaExpired;
    private final CompletableFuture<Void> stopReplicaLifecycleFuture = new CompletableFuture();

    public PartitionReplicaLifecycleManager(CatalogService catalogService, ReplicaManager replicaMgr, DistributionZoneManager distributionZoneMgr, MetaStorageManager metaStorageMgr, TopologyService topologyService, LowWatermark lowWatermark, FailureProcessor failureProcessor, NodeProperties nodeProperties, ExecutorService ioExecutor, ScheduledExecutorService rebalanceScheduler, Executor partitionOperationsExecutor, ClockService clockService, PlacementDriver placementDriver, SchemaSyncService schemaSyncService, SystemDistributedConfiguration systemDistributedConfiguration, TxStateRocksDbSharedStorage sharedTxStateStorage, TxManager txManager, SchemaManager schemaManager, DataStorageManager dataStorageManager, OutgoingSnapshotsManager outgoingSnapshotsManager) {
        this(catalogService, replicaMgr, distributionZoneMgr, metaStorageMgr, topologyService, lowWatermark, failureProcessor, nodeProperties, ioExecutor, rebalanceScheduler, partitionOperationsExecutor, clockService, placementDriver, schemaSyncService, systemDistributedConfiguration, txManager, schemaManager, dataStorageManager, new ZoneResourcesManager(sharedTxStateStorage, txManager, outgoingSnapshotsManager, topologyService, catalogService, failureProcessor, partitionOperationsExecutor));
    }

    @VisibleForTesting
    PartitionReplicaLifecycleManager(CatalogService catalogService, ReplicaManager replicaMgr, DistributionZoneManager distributionZoneMgr, MetaStorageManager metaStorageMgr, TopologyService topologyService, LowWatermark lowWatermark, FailureProcessor failureProcessor, NodeProperties nodeProperties, ExecutorService ioExecutor, ScheduledExecutorService rebalanceScheduler, Executor partitionOperationsExecutor, ClockService clockService, PlacementDriver placementDriver, SchemaSyncService schemaSyncService, SystemDistributedConfiguration systemDistributedConfiguration, TxManager txManager, SchemaManager schemaManager, DataStorageManager dataStorageManager, ZoneResourcesManager zoneResourcesManager) {
        this.catalogService = catalogService;
        this.replicaMgr = replicaMgr;
        this.distributionZoneMgr = distributionZoneMgr;
        this.metaStorageMgr = metaStorageMgr;
        this.topologyService = topologyService;
        this.lowWatermark = lowWatermark;
        this.failureProcessor = failureProcessor;
        this.nodeProperties = nodeProperties;
        this.ioExecutor = ioExecutor;
        this.rebalanceScheduler = rebalanceScheduler;
        this.partitionOperationsExecutor = partitionOperationsExecutor;
        this.clockService = clockService;
        this.executorInclinedSchemaSyncService = new ExecutorInclinedSchemaSyncService(schemaSyncService, partitionOperationsExecutor);
        this.executorInclinedPlacementDriver = new ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
        this.txManager = txManager;
        this.schemaManager = schemaManager;
        this.dataStorageManager = dataStorageManager;
        this.zoneResourcesManager = zoneResourcesManager;
        this.rebalanceRetryDelayConfiguration = new SystemDistributedConfigurationPropertyHolder<Integer>(systemDistributedConfiguration, (v, r) -> {}, "rebalanceRetryDelay", 200, Integer::parseInt);
        this.pendingAssignmentsRebalanceListener = this.createPendingAssignmentsRebalanceListener();
        this.stableAssignmentsRebalanceListener = this.createStableAssignmentsRebalanceListener();
        this.assignmentsSwitchRebalanceListener = this.createAssignmentsSwitchRebalanceListener();
        this.reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService);
    }

    @Override
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        CompletableFuture<Revisions> recoveryFinishFuture = this.metaStorageMgr.recoveryFinishedFuture();
        assert (recoveryFinishFuture.isDone());
        long recoveryRevision = recoveryFinishFuture.join().revision();
        this.cleanUpResourcesForDroppedZonesOnRecovery();
        CompletionStage processZonesAndAssignmentsOnStart = this.processZonesOnStart(recoveryRevision, this.lowWatermark.getLowWatermark()).thenCompose(ignored -> this.processAssignmentsOnRecovery(recoveryRevision));
        this.metaStorageMgr.registerPrefixWatch(new ByteArray(ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES), this.pendingAssignmentsRebalanceListener);
        this.metaStorageMgr.registerPrefixWatch(new ByteArray(ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES), this.stableAssignmentsRebalanceListener);
        this.metaStorageMgr.registerPrefixWatch(new ByteArray(ZoneRebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES), this.assignmentsSwitchRebalanceListener);
        this.catalogService.listen(CatalogEvent.ZONE_CREATE, this.onCreateZoneListener);
        this.rebalanceRetryDelayConfiguration.init();
        this.executorInclinedPlacementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.onPrimaryReplicaExpiredListener);
        return processZonesAndAssignmentsOnStart;
    }

    private CompletableFuture<Void> processZonesOnStart(long recoveryRevision, @Nullable HybridTimestamp lwm) {
        int earliestCatalogVersion = lwm == null ? this.catalogService.earliestCatalogVersion() : this.catalogService.activeCatalogVersion(lwm.longValue());
        int latestCatalogVersion = this.catalogService.latestCatalogVersion();
        IntOpenHashSet startedZones = new IntOpenHashSet();
        ArrayList startZoneFutures = new ArrayList();
        for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion; --ver) {
            int ver0 = ver;
            this.catalogService.catalog(ver).zones().stream().filter(zone -> startedZones.add(zone.id())).forEach(zoneDescriptor -> startZoneFutures.add(this.calculateZoneAssignmentsAndCreateReplicationNodes(recoveryRevision, ver0, (CatalogZoneDescriptor)zoneDescriptor, true)));
        }
        return CompletableFuture.allOf((CompletableFuture[])startZoneFutures.toArray(CompletableFuture[]::new)).whenComplete((unused, throwable) -> {
            if (throwable != null && !ExceptionUtils.hasCause(throwable, NodeStoppingException.class)) {
                this.failureProcessor.process(new FailureContext((Throwable)throwable, "Error starting zones"));
            } else {
                LOG.debug("Zones started successfully [earliestCatalogVersion={}, latestCatalogVersion={}, startedZoneIds={}]", earliestCatalogVersion, latestCatalogVersion, startedZones);
            }
        });
    }

    private CompletableFuture<Void> processAssignmentsOnRecovery(long recoveryRevision) {
        return this.recoverStableAssignments(recoveryRevision).thenCompose(v -> this.recoverPendingAssignments(recoveryRevision));
    }

    private CompletableFuture<Void> recoverStableAssignments(long recoveryRevision) {
        return this.handleAssignmentsOnRecovery(new ByteArray(ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES), recoveryRevision, (entry, rev) -> this.handleChangeStableAssignmentEvent((Entry)entry, (long)rev, true), "stable");
    }

    private CompletableFuture<Void> recoverPendingAssignments(long recoveryRevision) {
        return this.handleAssignmentsOnRecovery(new ByteArray(ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES), recoveryRevision, (pendingAssignmentsEntry, revision) -> this.handleChangePendingAssignmentEvent((Entry)pendingAssignmentsEntry, (long)revision, true), "pending");
    }

    private CompletableFuture<Void> handleAssignmentsOnRecovery(ByteArray prefix, long revision, BiFunction<Entry, Long, CompletableFuture<Void>> assignmentsEventHandler, String assignmentsType) {
        try (Cursor<Entry> cursor = this.metaStorageMgr.prefixLocally(prefix, revision);){
            CompletableFuture[] futures = (CompletableFuture[])cursor.stream().map(entry -> {
                if (LOG.isInfoEnabled()) {
                    LOG.info("Non handled {} assignments for key '{}' discovered, performing recovery", assignmentsType, new String(entry.key(), StandardCharsets.UTF_8));
                }
                return (CompletableFuture)assignmentsEventHandler.apply((Entry)entry, revision);
            }).toArray(CompletableFuture[]::new);
            CompletionStage completionStage = CompletableFuture.allOf(futures).whenComplete((res, e) -> {
                if (e != null && !ExceptionUtils.hasCause(e, NodeStoppingException.class)) {
                    this.failureProcessor.process(new FailureContext((Throwable)e, "Error when performing assignments recovery"));
                }
            });
            return completionStage;
        }
    }

    private void cleanUpResourcesForDroppedZonesOnRecovery() {
    }

    private CompletableFuture<Boolean> onCreateZone(CreateZoneEventParameters createZoneEventParameters) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> this.calculateZoneAssignmentsAndCreateReplicationNodes(createZoneEventParameters.causalityToken(), createZoneEventParameters.catalogVersion(), createZoneEventParameters.zoneDescriptor(), false).thenApply(unused -> false));
    }

    private boolean zoneReplicaIsNotApplicable(CatalogZoneDescriptor zoneDescriptor) {
        boolean hasSecondaryStorage = zoneDescriptor.storageProfiles().profiles().stream().anyMatch(profile -> this.dataStorageManager.secondaryEngineByStorageProfile(profile.storageProfile()) != null);
        return !this.nodeProperties.colocationEnabled() && !hasSecondaryStorage;
    }

    private CompletableFuture<Void> calculateZoneAssignmentsAndCreateReplicationNodes(long causalityToken, int catalogVersion, CatalogZoneDescriptor zoneDescriptor, boolean onNodeRecovery) {
        if (this.zoneReplicaIsNotApplicable(zoneDescriptor)) {
            return CompletableFutures.nullCompletedFuture();
        }
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            int zoneId = zoneDescriptor.id();
            return ((CompletableFuture)this.getOrCreateAssignments(zoneDescriptor, causalityToken, catalogVersion).thenCompose(assignments -> this.writeZoneAssignmentsToMetastore(zoneId, zoneDescriptor.consistencyMode(), (List<Assignments>)assignments))).thenCompose(stableAssignments -> this.createZoneReplicationNodes(zoneId, (List<Assignments>)stableAssignments, ZoneRebalanceUtil.zonePendingAssignmentsGetLocally(this.metaStorageMgr, zoneId, zoneDescriptor.partitions(), causalityToken), ZoneRebalanceUtil.zoneAssignmentsChainGetLocally(this.metaStorageMgr, zoneId, zoneDescriptor.partitions(), causalityToken), causalityToken, catalogVersion, zoneDescriptor.partitions(), onNodeRecovery));
        });
    }

    private CompletableFuture<Void> createZoneReplicationNodes(int zoneId, List<Assignments> stableAssignmentsForZone, List<@Nullable Assignments> pendingAssignmentsForZone, List<AssignmentsChain> assignmentsChains, long causalityToken, int catalogVersion, int partitionCount, boolean onNodeRecovery) {
        assert (stableAssignmentsForZone != null) : IgniteStringFormatter.format("Zone has empty assignments [id={}].", zoneId);
        Supplier<CompletableFuture> createZoneReplicationNodes = () -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            CompletableFuture[] partitionsStartFutures = new CompletableFuture[stableAssignmentsForZone.size()];
            for (int partId = 0; partId < stableAssignmentsForZone.size(); ++partId) {
                boolean shouldStartPartition;
                Assignments stableAssignments = (Assignments)stableAssignmentsForZone.get(partId);
                Assignments pendingAssignments = (Assignments)pendingAssignmentsForZone.get(partId);
                Assignment localAssignmentInStable = this.localAssignment(stableAssignments);
                AssignmentsChain assignmentsChain = (AssignmentsChain)assignmentsChains.get(partId);
                if (onNodeRecovery) {
                    if (PartitionReplicaLifecycleManager.lastRebalanceWasGraceful(assignmentsChain)) {
                        shouldStartPartition = localAssignmentInStable != null && (pendingAssignments == null || !pendingAssignments.force());
                    } else {
                        LOG.warn("Recovery after a forced rebalance for zone is not supported yet [zoneId={}, partitionId={}].", zoneId, partId);
                        shouldStartPartition = localAssignmentInStable != null && (pendingAssignments == null || !pendingAssignments.force());
                    }
                } else {
                    boolean bl = shouldStartPartition = localAssignmentInStable != null;
                }
                if (shouldStartPartition) {
                    ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, partId);
                    partitionsStartFutures[partId] = this.createZonePartitionReplicationNode(zonePartitionId, localAssignmentInStable, stableAssignments, causalityToken, partitionCount, this.isVolatileZoneForCatalogVersion(zoneId, catalogVersion), onNodeRecovery, !onNodeRecovery);
                    continue;
                }
                partitionsStartFutures[partId] = CompletableFutures.nullCompletedFuture();
            }
            return CompletableFuture.allOf(partitionsStartFutures);
        });
        if (onNodeRecovery) {
            return createZoneReplicationNodes.get();
        }
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.executeUnderZoneWriteLock(zoneId, createZoneReplicationNodes));
    }

    private boolean isVolatileZoneForCatalogVersion(int zoneId, int catalogVersion) {
        CatalogZoneDescriptor zoneDescriptor = this.catalogService.catalog(catalogVersion).zone(zoneId);
        assert (zoneDescriptor != null);
        return this.isVolatileZone(zoneDescriptor);
    }

    private boolean isVolatileZone(CatalogZoneDescriptor zoneDescriptor) {
        return zoneDescriptor.storageProfiles().profiles().stream().map(CatalogStorageProfileDescriptor::storageProfile).allMatch(storageProfile -> {
            StorageEngine storageEngine = this.dataStorageManager.engineByStorageProfile((String)storageProfile);
            return storageEngine != null && storageEngine.isVolatile();
        });
    }

    private CompletableFuture<?> createZonePartitionReplicationNode(ZonePartitionId zonePartitionId, Assignment localAssignment, Assignments stableAssignments, long revision, int partitionCount, boolean isVolatileZone, boolean onRecovery, boolean holdingZoneWriteLock) {
        Assignments forcedAssignments = stableAssignments.force() ? stableAssignments : null;
        PeersAndLearners stablePeersAndLearners = PeersAndLearners.fromAssignments(stableAssignments.nodes());
        RaftGroupEventsListener raftGroupEventsListener = this.createRaftGroupEventsListener(zonePartitionId);
        Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
            PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<Long, Void>(0L);
            LocalPartitionReplicaEventParameters eventParams = new LocalPartitionReplicaEventParameters(zonePartitionId, revision, onRecovery);
            ZoneResourcesManager.ZonePartitionResources zoneResources = this.zoneResourcesManager.allocateZonePartitionResources(zonePartitionId, partitionCount, storageIndexTracker);
            this.startedReplicationGroups.beforeStartingGroup(zonePartitionId);
            return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams).thenCompose(v -> {
                try {
                    return this.replicaMgr.startReplica(zonePartitionId, raftClient -> {
                        ZonePartitionReplicaListener replicaListener = new ZonePartitionReplicaListener(zoneResources.txStatePartitionStorage(), this.clockService, this.txManager, new CatalogValidationSchemasSource(this.catalogService, this.schemaManager), this.executorInclinedSchemaSyncService, this.catalogService, this.executorInclinedPlacementDriver, this.topologyService, new ExecutorInclinedRaftCommandRunner((RaftCommandRunner)raftClient, this.partitionOperationsExecutor), this.failureProcessor, this.nodeProperties, this.topologyService.localMember(), zonePartitionId);
                        zoneResources.replicaListenerFuture().complete(replicaListener);
                        return replicaListener;
                    }, new PartitionSnapshotStorageFactory(zoneResources.snapshotStorage()), stablePeersAndLearners, zoneResources.raftListener(), raftGroupEventsListener, isVolatileZone, this.busyLock, storageIndexTracker);
                }
                catch (NodeStoppingException e) {
                    return CompletableFuture.failedFuture(e);
                }
            })).whenComplete((replica, throwable) -> {
                if (throwable != null) {
                    this.startedReplicationGroups.startingFailed(zonePartitionId);
                }
            })).thenCompose(v -> {
                Supplier addReplicationGroupIdFuture = () -> {
                    this.startedReplicationGroups.startingCompleted(zonePartitionId);
                    return CompletableFutures.nullCompletedFuture();
                };
                if (holdingZoneWriteLock) {
                    return addReplicationGroupIdFuture.get();
                }
                return this.executeUnderZoneWriteLock(zonePartitionId.zoneId(), addReplicationGroupIdFuture);
            })).thenApply(unused -> true);
        };
        return this.replicaMgr.weakStartReplica(zonePartitionId, startReplicaSupplier, forcedAssignments, revision).whenComplete((res, ex) -> {
            if (ex != null && !ExceptionUtils.hasCause(ex, NodeStoppingException.class)) {
                String errorMessage = String.format("Unable to update raft groups on the node [zonePartitionId=%s]", zonePartitionId);
                this.failureProcessor.process(new FailureContext((Throwable)ex, errorMessage));
            }
        });
    }

    private CompletableFuture<Set<Assignment>> calculateZoneAssignments(ZonePartitionId zonePartitionId, Long assignmentsTimestamp) {
        CompletionStage assignmentsFuture = this.reliableCatalogVersions.safeReliableCatalogFor(HybridTimestamp.hybridTimestamp(assignmentsTimestamp)).thenCompose(catalog -> this.calculateZoneAssignments(zonePartitionId, (Catalog)catalog));
        return CompletableFuture.anyOf(new CompletableFuture[]{this.stopReplicaLifecycleFuture, assignmentsFuture}).thenApply(a -> (Set)a);
    }

    private CompletableFuture<Set<Assignment>> calculateZoneAssignments(ZonePartitionId zonePartitionId, Catalog catalog) {
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zonePartitionId.zoneId());
        int zoneId = zonePartitionId.zoneId();
        List<Set<Assignment>> currentAssignments = AssignmentUtil.currentDistributionFromLocalMetaStorage(this.metaStorageMgr, zoneDescriptor.partitions(), p -> ZoneRebalanceUtil.stablePartAssignmentsKey(new ZonePartitionId(zoneId, (int)p)), p -> ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(new ZonePartitionId(zoneId, (int)p)));
        return this.distributionZoneMgr.dataNodes(zoneDescriptor.updateTimestamp(), catalog.version(), zoneDescriptor.id()).thenApply(dataNodes -> PartitionDistributionUtils.calculateAssignmentForPartition(dataNodes, currentAssignments, zonePartitionId.partitionId(), zoneDescriptor.partitions(), zoneDescriptor.replicas(), zoneDescriptor.consensusGroupSize()));
    }

    private PartitionMover createPartitionMover(ZonePartitionId replicaGrpId) {
        return new PartitionMover(this.busyLock, this.rebalanceScheduler, () -> {
            CompletableFuture<Replica> replicaFut = this.replicaMgr.replica(replicaGrpId);
            if (replicaFut == null) {
                return CompletableFuture.failedFuture(new IgniteInternalException("No such replica for partition " + replicaGrpId.partitionId() + " in zone " + replicaGrpId.zoneId()));
            }
            return replicaFut.thenApply(Replica::raftClient);
        });
    }

    private RaftGroupEventsListener createRaftGroupEventsListener(ZonePartitionId zonePartitionId) {
        return new ZoneRebalanceRaftGroupEventsListener(this.metaStorageMgr, this.failureProcessor, zonePartitionId, this.busyLock, this.createPartitionMover(zonePartitionId), this.rebalanceScheduler, this::calculateZoneAssignments, this.rebalanceRetryDelayConfiguration);
    }

    private InternalClusterNode localNode() {
        return this.topologyService.localMember();
    }

    @Override
    public void beforeNodeStop() {
        this.stopReplicaLifecycleFuture.completeExceptionally(new NodeStoppingException());
        this.busyLock.block();
        this.executorInclinedPlacementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.onPrimaryReplicaExpiredListener);
        this.catalogService.removeListener(CatalogEvent.ZONE_CREATE, this.onCreateZoneListener);
        this.metaStorageMgr.unregisterWatch(this.pendingAssignmentsRebalanceListener);
        this.metaStorageMgr.unregisterWatch(this.stableAssignmentsRebalanceListener);
        this.metaStorageMgr.unregisterWatch(this.assignmentsSwitchRebalanceListener);
        this.startedReplicationGroups.waitForStartingReplicas();
        this.cleanUpPartitionsResources(this.startedReplicationGroups.streamStartedReplicationGroups());
    }

    private CompletableFuture<List<Assignments>> writeZoneAssignmentsToMetastore(int zoneId, ConsistencyMode consistencyMode, List<Assignments> newAssignments) {
        assert (!newAssignments.isEmpty());
        boolean haMode = consistencyMode == ConsistencyMode.HIGH_AVAILABILITY;
        ArrayList<Operation> partitionAssignments = new ArrayList<Operation>(newAssignments.size());
        for (int i = 0; i < newAssignments.size(); ++i) {
            ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, i);
            ByteArray stableAssignmentsKey = ZoneRebalanceUtil.stablePartAssignmentsKey(zonePartitionId);
            byte[] anAssignment = newAssignments.get(i).toBytes();
            Operation op = Operations.put(stableAssignmentsKey, anAssignment);
            partitionAssignments.add(op);
            if (!haMode) continue;
            ByteArray assignmentsChainKey = ZoneRebalanceUtil.assignmentsChainKey(zonePartitionId);
            byte[] assignmentChain = AssignmentsChain.of(newAssignments.get(i)).toBytes();
            Operation chainOp = Operations.put(assignmentsChainKey, assignmentChain);
            partitionAssignments.add(chainOp);
        }
        SimpleCondition condition = Conditions.notExists(new ByteArray(ByteUtils.toByteArray(((Operation)partitionAssignments.get(0)).key())));
        return ((CompletableFuture)this.metaStorageMgr.invoke((Condition)condition, partitionAssignments, Collections.emptyList()).whenComplete((invokeResult, e) -> {
            if (e != null && !ExceptionUtils.hasCause(e, NodeStoppingException.class)) {
                String errorMessage = String.format("Couldn't write assignments [assignmentsList=%s] to metastore during invoke.", Assignments.assignmentListToString(newAssignments));
                this.failureProcessor.process(new FailureContext((Throwable)e, errorMessage));
            }
        })).thenCompose(invokeResult -> {
            if (invokeResult.booleanValue()) {
                LOG.info("Assignments calculated from data nodes are successfully written to meta storage [zoneId={}, assignments={}].", zoneId, Assignments.assignmentListToString(newAssignments));
                return CompletableFuture.completedFuture(newAssignments);
            }
            Set<ByteArray> partKeys = IntStream.range(0, newAssignments.size()).mapToObj(p -> ZoneRebalanceUtil.stablePartAssignmentsKey(new ZonePartitionId(zoneId, p))).collect(Collectors.toSet());
            return ((CompletableFuture)this.metaStorageMgr.getAll(partKeys).thenApply(metaStorageAssignments -> {
                ArrayList<Assignments> realAssignments = new ArrayList<Assignments>();
                for (int p = 0; p < newAssignments.size(); ++p) {
                    ZonePartitionId partId = new ZonePartitionId(zoneId, p);
                    Entry assignmentsEntry = (Entry)metaStorageAssignments.get(ZoneRebalanceUtil.stablePartAssignmentsKey(partId));
                    assert (assignmentsEntry != null && !assignmentsEntry.empty() && !assignmentsEntry.tombstone()) : "Unexpected assignments for partition [" + partId + ", entry=" + assignmentsEntry + "].";
                    Assignments real = Assignments.fromBytes(assignmentsEntry.value());
                    realAssignments.add(real);
                }
                LOG.info("Assignments picked up from meta storage [zoneId={}, assignments={}].", zoneId, Assignments.assignmentListToString(realAssignments));
                return realAssignments;
            })).whenComplete((realAssignments, e) -> {
                if (e != null && !ExceptionUtils.hasCause(e, NodeStoppingException.class)) {
                    String errorMessage = String.format("Couldn't get assignments from metastore for zone [zoneId=%s].", zoneId);
                    this.failureProcessor.process(new FailureContext((Throwable)e, errorMessage));
                }
            });
        });
    }

    private CompletableFuture<List<Assignments>> getOrCreateAssignments(CatalogZoneDescriptor zoneDescriptor, long causalityToken, int catalogVersion) {
        if (ZoneRebalanceUtil.zonePartitionAssignmentsGetLocally(this.metaStorageMgr, zoneDescriptor.id(), 0, causalityToken) != null) {
            return CompletableFuture.completedFuture(ZoneRebalanceUtil.zoneAssignmentsGetLocally(this.metaStorageMgr, zoneDescriptor.id(), zoneDescriptor.partitions(), causalityToken));
        }
        Catalog catalog = this.catalogService.catalog(catalogVersion);
        long assignmentsTimestamp = catalog.time();
        return ((CompletableFuture)this.distributionZoneMgr.dataNodes(zoneDescriptor.updateTimestamp(), catalogVersion, zoneDescriptor.id()).thenApply(dataNodes -> PartitionDistributionUtils.calculateAssignments(dataNodes, Collections.emptyList(), zoneDescriptor.partitions(), zoneDescriptor.replicas(), zoneDescriptor.consensusGroupSize()).stream().map(assignments -> Assignments.of(assignments, assignmentsTimestamp)).collect(Collectors.toList()))).whenComplete((assignments, e) -> {
            if (e == null && LOG.isInfoEnabled()) {
                LOG.info("Assignments calculated from data nodes [zone={}, zoneId={}, assignments={}, revision={}]", zoneDescriptor.name(), zoneDescriptor.id(), Assignments.assignmentListToString(assignments), causalityToken);
            }
        });
    }

    public boolean hasLocalPartition(ZonePartitionId zonePartitionId) {
        assert (Optional.ofNullable(this.zonePartitionsLocks.get(zonePartitionId.zoneId())).map(NaiveAsyncReadWriteLock::isReadLocked).orElse(false).booleanValue()) : zonePartitionId;
        return this.startedReplicationGroups.hasReplicationGroupStarted(zonePartitionId);
    }

    private WatchListener createPendingAssignmentsRebalanceListener() {
        return evt -> {
            if (!this.busyLock.enterBusy()) {
                return CompletableFuture.failedFuture(new NodeStoppingException());
            }
            try {
                Entry newEntry = evt.entryEvent().newEntry();
                CompletableFuture<Void> completableFuture = this.handleChangePendingAssignmentEvent(newEntry, evt.revision(), false);
                return completableFuture;
            }
            finally {
                this.busyLock.leaveBusy();
            }
        };
    }

    private WatchListener createStableAssignmentsRebalanceListener() {
        return evt -> {
            if (!this.busyLock.enterBusy()) {
                return CompletableFuture.failedFuture(new NodeStoppingException());
            }
            try {
                CompletableFuture<Void> completableFuture = this.handleChangeStableAssignmentEvent(evt);
                return completableFuture;
            }
            finally {
                this.busyLock.leaveBusy();
            }
        };
    }

    private WatchListener createAssignmentsSwitchRebalanceListener() {
        return evt -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            byte[] key = evt.entryEvent().newEntry().key();
            ZonePartitionId replicaGrpId = ZoneRebalanceUtil.extractZonePartitionId(key, ZoneRebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES);
            Assignments assignments = Assignments.fromBytes(evt.entryEvent().newEntry().value());
            long assignmentsTimestamp = assignments.timestamp();
            return this.reliableCatalogVersions.safeReliableCatalogFor(HybridTimestamp.hybridTimestamp(assignmentsTimestamp)).thenCompose(catalog -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.handleReduceChanged(evt, (Catalog)catalog, replicaGrpId, assignmentsTimestamp)));
        });
    }

    private CompletableFuture<Void> handleReduceChanged(WatchEvent evt, Catalog catalog, ZonePartitionId replicaGrpId, long assignmentsTimestamp) {
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(replicaGrpId.zoneId());
        return this.distributionZoneMgr.dataNodes(zoneDescriptor.updateTimestamp(), catalog.version(), replicaGrpId.zoneId()).thenCompose(dataNodes -> ZoneRebalanceRaftGroupEventsListener.handleReduceChanged(this.metaStorageMgr, dataNodes, zoneDescriptor.partitions(), zoneDescriptor.replicas(), zoneDescriptor.consensusGroupSize(), replicaGrpId, evt, assignmentsTimestamp));
    }

    private CompletableFuture<Void> handleChangeStableAssignmentEvent(WatchEvent evt) {
        if (evt.entryEvents().stream().allMatch(e -> e.oldEntry().value() == null)) {
            return CompletableFutures.nullCompletedFuture();
        }
        if (!evt.single()) {
            assert (evt.entryEvents().stream().allMatch(entryEvent -> entryEvent.newEntry().tombstone())) : evt;
            return CompletableFutures.nullCompletedFuture();
        }
        assert (evt.single()) : evt;
        if (evt.entryEvent().oldEntry() == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        Entry stableAssignmentsWatchEvent = evt.entryEvent().newEntry();
        long revision = evt.revision();
        assert (stableAssignmentsWatchEvent.revision() == revision) : stableAssignmentsWatchEvent;
        if (stableAssignmentsWatchEvent.value() == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        return this.handleChangeStableAssignmentEvent(stableAssignmentsWatchEvent, evt.revision(), false);
    }

    private CompletableFuture<Void> handleChangeStableAssignmentEvent(Entry stableAssignmentsWatchEvent, long revision, boolean onNodeRecovery) {
        ZonePartitionId zonePartitionId = ZoneRebalanceUtil.extractZonePartitionId(stableAssignmentsWatchEvent.key(), ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES);
        Set stableAssignments = stableAssignmentsWatchEvent.value() == null ? Collections.emptySet() : Assignments.fromBytes(stableAssignmentsWatchEvent.value()).nodes();
        return CompletableFuture.supplyAsync(() -> {
            Entry pendingAssignmentsEntry = this.metaStorageMgr.getLocally(ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(zonePartitionId), revision);
            byte[] pendingAssignmentsFromMetaStorage = pendingAssignmentsEntry.value();
            Assignments pendingAssignments = pendingAssignmentsFromMetaStorage == null ? Assignments.EMPTY : AssignmentsQueue.fromBytes(pendingAssignmentsFromMetaStorage).poll();
            return this.stopAndMaybeDestroyPartitionAndUpdateClients(zonePartitionId, stableAssignments, pendingAssignments, onNodeRecovery, revision);
        }, this.ioExecutor).thenCompose(Function.identity());
    }

    private CompletableFuture<Void> updatePartitionClients(ZonePartitionId zonePartitionId, Set<Assignment> stableAssignments, Set<Assignment> pendingAssignments) {
        return this.isLocalNodeIsPrimary(zonePartitionId).thenCompose(isLeaseholder -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            boolean isLocalInStable = this.isLocalNodeInAssignments(stableAssignments);
            if (!isLocalInStable && !isLeaseholder.booleanValue()) {
                return CompletableFutures.nullCompletedFuture();
            }
            assert (this.replicaMgr.isReplicaStarted(zonePartitionId)) : "The local node is outside of the replication group [groupId=" + zonePartitionId + ", stable=" + stableAssignments + ", isLeaseholder=" + isLeaseholder + "].";
            return this.replicaMgr.replica(zonePartitionId).thenAccept(replica -> replica.updatePeersAndLearners(PeersAndLearners.fromAssignments(RebalanceUtil.union(stableAssignments, pendingAssignments))));
        }));
    }

    private CompletableFuture<Void> stopAndMaybeDestroyPartitionAndUpdateClients(ZonePartitionId zonePartitionId, Set<Assignment> stableAssignments, Assignments pendingAssignments, boolean onNodeRecovery, long revision) {
        CompletableFuture<Void> clientUpdateFuture = onNodeRecovery ? CompletableFutures.nullCompletedFuture() : this.updatePartitionClients(zonePartitionId, stableAssignments, pendingAssignments.nodes());
        boolean shouldStopLocalServices = (pendingAssignments.force() ? pendingAssignments.nodes().stream() : Stream.concat(stableAssignments.stream(), pendingAssignments.nodes().stream())).noneMatch(assignment -> assignment.consistentId().equals(this.localNode().name()));
        if (!shouldStopLocalServices) {
            return clientUpdateFuture;
        }
        return clientUpdateFuture.thenCompose(v -> this.replicaMgr.weakStopReplica(zonePartitionId, ReplicaManager.WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS, () -> this.stopAndDestroyPartition(zonePartitionId, revision)));
    }

    private CompletableFuture<Void> stopPartitionForRestart(ZonePartitionId zonePartitionId, long revision) {
        return this.replicaMgr.weakStopReplica(zonePartitionId, ReplicaManager.WeakReplicaStopReason.RESTART, () -> this.stopPartitionInternal(zonePartitionId, LocalPartitionReplicaEvent.BEFORE_REPLICA_STOPPED, LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, revision, replica -> {}));
    }

    private CompletableFuture<Void> stopPartitionAndDestroyForRestart(ZonePartitionId zonePartitionId, long revision) {
        return this.replicaMgr.weakStopReplica(zonePartitionId, ReplicaManager.WeakReplicaStopReason.RESTART, () -> this.stopAndDestroyPartition(zonePartitionId, revision));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry pendingAssignmentsEntry, long revision, boolean isRecovery) {
        if (pendingAssignmentsEntry.value() == null || pendingAssignmentsEntry.empty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        ZonePartitionId zonePartitionId = ZoneRebalanceUtil.extractZonePartitionId(pendingAssignmentsEntry.key(), ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES);
        Assignments stableAssignments = this.stableAssignments(zonePartitionId, revision);
        AssignmentsQueue pendingAssignmentsQueue = AssignmentsQueue.fromBytes(pendingAssignmentsEntry.value());
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            if (LOG.isInfoEnabled()) {
                String stringKey = new String(pendingAssignmentsEntry.key(), StandardCharsets.UTF_8);
                LOG.info("Received update on pending assignments. Check if new replication node should be started [key={}, partition={}, zoneId={}, localMemberAddress={}, pendingAssignments={}, revision={}]", stringKey, zonePartitionId.partitionId(), zonePartitionId.zoneId(), this.localNode().address(), pendingAssignmentsQueue, revision);
            }
            Assignments pendingAssignments = pendingAssignmentsQueue == null ? Assignments.EMPTY : pendingAssignmentsQueue.poll();
            CompletionStage completionStage = ((CompletableFuture)this.handleChangePendingAssignmentEvent(zonePartitionId, stableAssignments, pendingAssignments, revision, isRecovery).thenRun(() -> {
                boolean isLocalNodeInStableOrPending = this.isNodeInReducedStableOrPendingAssignments(zonePartitionId, stableAssignments, pendingAssignments, revision);
                if (!isLocalNodeInStableOrPending) {
                    return;
                }
                if (isRecovery && !this.replicaMgr.isReplicaStarted(zonePartitionId)) {
                    return;
                }
                this.changePeersOnRebalance(this.replicaMgr, zonePartitionId, pendingAssignments.nodes(), revision);
            })).whenComplete((v, ex) -> {
                if (ex != null) {
                    LOG.debug("Failed to handle change pending assignment event [zonePartitionId={}, stableAssignments={}, pendingAssignments={}, revision={}, isRecovery={}].", zonePartitionId, stableAssignments, pendingAssignments, revision, isRecovery, ex);
                }
            });
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private CompletableFuture<Void> handleChangePendingAssignmentEvent(ZonePartitionId replicaGrpId, @Nullable Assignments stableAssignments, Assignments pendingAssignments, long revision, boolean isRecovery) {
        CompletableFuture<Object> localServicesStartFuture;
        boolean shouldStartLocalGroupNode;
        boolean pendingAssignmentsAreForced = pendingAssignments.force();
        Set<Assignment> pendingAssignmentsNodes = pendingAssignments.nodes();
        Assignment localAssignmentInPending = this.localAssignment(pendingAssignments);
        Assignment localAssignmentInStable = this.localAssignment(stableAssignments);
        AssignmentsChain assignmentsChain = ZoneRebalanceUtil.assignmentsChainGetLocally(this.metaStorageMgr, replicaGrpId, revision);
        if (isRecovery) {
            if (PartitionReplicaLifecycleManager.lastRebalanceWasGraceful(assignmentsChain)) {
                shouldStartLocalGroupNode = localAssignmentInPending != null;
            } else {
                LOG.warn("Recovery after a forced rebalance for zone is not supported yet [zonePartitionId={}].", replicaGrpId);
                shouldStartLocalGroupNode = localAssignmentInPending != null;
            }
        } else {
            shouldStartLocalGroupNode = localAssignmentInPending != null && localAssignmentInStable == null;
        }
        Assignments computedStableAssignments = PartitionReplicaLifecycleManager.getComputedStableAssignments(stableAssignments, pendingAssignments);
        if (shouldStartLocalGroupNode) {
            CatalogZoneDescriptor zoneDescriptor = this.zoneDescriptorAt(replicaGrpId.zoneId(), pendingAssignments.timestamp());
            localServicesStartFuture = this.createZonePartitionReplicationNode(replicaGrpId, localAssignmentInPending, computedStableAssignments, revision, zoneDescriptor.partitions(), this.isVolatileZone(zoneDescriptor), isRecovery, false);
        } else {
            localServicesStartFuture = pendingAssignmentsAreForced && localAssignmentInPending != null ? this.replicaMgr.resetWithRetry(replicaGrpId, PeersAndLearners.fromAssignments(computedStableAssignments.nodes()), revision) : CompletableFutures.nullCompletedFuture();
        }
        return ((CompletableFuture)localServicesStartFuture.thenComposeAsync(v -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> this.isLocalNodeIsPrimary(replicaGrpId)), (Executor)this.ioExecutor)).thenAcceptAsync(isLeaseholder -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            boolean isLocalNodeInStableOrPending = this.isNodeInReducedStableOrPendingAssignments(replicaGrpId, stableAssignments, pendingAssignments, revision);
            if (!isLocalNodeInStableOrPending && !isLeaseholder.booleanValue()) {
                return;
            }
            assert (isLocalNodeInStableOrPending || isLeaseholder.booleanValue()) : "The local node is outside of the replication group [inStableOrPending=" + isLocalNodeInStableOrPending + ", isLeaseholder=" + isLeaseholder + "].";
            if (isRecovery && !this.replicaMgr.isReplicaStarted(replicaGrpId)) {
                return;
            }
            assert (this.replicaMgr.isReplicaStarted(replicaGrpId)) : "The local node is outside of the replication group [, groupId=" + replicaGrpId + ", stable=" + stableAssignments + ", pending=" + pendingAssignments + ", localName=" + this.localNode().name() + "].";
            Set<Assignment> newAssignments = pendingAssignmentsAreForced || stableAssignments == null ? pendingAssignmentsNodes : RebalanceUtil.union(pendingAssignmentsNodes, stableAssignments.nodes());
            this.replicaMgr.replica(replicaGrpId).thenAccept(replica -> replica.updatePeersAndLearners(PeersAndLearners.fromAssignments(newAssignments)));
        }), (Executor)this.ioExecutor);
    }

    private static Assignments getComputedStableAssignments(@Nullable Assignments stableAssignments, Assignments pendingAssignments) {
        if (stableAssignments == null || stableAssignments.nodes().isEmpty()) {
            return Assignments.forced(pendingAssignments.nodes(), pendingAssignments.timestamp());
        }
        if (pendingAssignments.force()) {
            return pendingAssignments;
        }
        return stableAssignments;
    }

    private CatalogZoneDescriptor zoneDescriptorAt(int zoneId, long timestamp) {
        Catalog catalog = this.catalogService.activeCatalog(timestamp);
        assert (catalog != null) : "Catalog is not available at " + HybridTimestamp.nullableHybridTimestamp(timestamp);
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
        assert (zoneDescriptor != null) : "Zone descriptor is not available at " + HybridTimestamp.nullableHybridTimestamp(timestamp) + " for zone " + zoneId;
        return zoneDescriptor;
    }

    private void changePeersOnRebalance(ReplicaManager replicaMgr, ZonePartitionId replicaGrpId, Set<Assignment> pendingAssignments, long revision) {
        ((CompletableFuture)((CompletableFuture)replicaMgr.replica(replicaGrpId).thenApply(Replica::raftClient)).thenCompose(raftClient -> ((CompletableFuture)raftClient.refreshAndGetLeaderWithTerm().exceptionally(throwable -> {
            if (ExceptionUtils.hasCause(throwable, TimeoutException.class)) {
                LOG.info("Node couldn't get the leader within timeout so the changing peers is skipped [grp={}].", replicaGrpId);
            } else if (ExceptionUtils.hasCause(throwable, ComponentStoppingException.class)) {
                LOG.info("Replica is being stopped so the changing peers is skipped [grp={}].", replicaGrpId);
            } else {
                LOG.info("Failed to get a leader for the RAFT replication group [grp={}].", (Throwable)throwable, (Object)replicaGrpId);
            }
            return LeaderWithTerm.NO_LEADER;
        })).thenCompose(leaderWithTerm -> {
            if (leaderWithTerm.isEmpty() || !this.isLocalPeer(leaderWithTerm.leader())) {
                return CompletableFutures.nullCompletedFuture();
            }
            LOG.info("Current node={} is the leader of partition raft group={}. Initiate rebalance process for partition={}, zoneId={}", leaderWithTerm.leader(), replicaGrpId, replicaGrpId.partitionId(), replicaGrpId.zoneId());
            return this.metaStorageMgr.get(ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(replicaGrpId)).thenCompose(latestPendingAssignmentsEntry -> {
                if (revision < latestPendingAssignmentsEntry.revision()) {
                    return CompletableFutures.nullCompletedFuture();
                }
                PeersAndLearners newConfiguration = PeersAndLearners.fromAssignments(pendingAssignments);
                return raftClient.changePeersAndLearnersAsync(newConfiguration, leaderWithTerm.term(), revision).exceptionally(e -> null);
            });
        }))).whenComplete((res, ex) -> {
            if (ex != null) {
                LOG.warn("Failed to change peers [grp={}].", (Throwable)ex, (Object)replicaGrpId);
            }
        });
    }

    private boolean isLocalPeer(Peer peer) {
        return peer.consistentId().equals(this.localNode().name());
    }

    private boolean isLocalNodeInAssignments(Collection<Assignment> assignments) {
        return assignments.stream().anyMatch(this.isLocalNodeAssignment);
    }

    private CompletableFuture<Void> waitForMetadataCompleteness(long ts) {
        return this.executorInclinedSchemaSyncService.waitForMetadataCompleteness(HybridTimestamp.hybridTimestamp(ts));
    }

    private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId replicationGroupId) {
        HybridTimestamp currentSafeTime = this.metaStorageMgr.clusterTime().currentSafeTime();
        if (HybridTimestamp.MIN_VALUE.equals(currentSafeTime)) {
            return CompletableFutures.falseCompletedFuture();
        }
        long skewMs = this.clockService.maxClockSkewMillis();
        try {
            HybridTimestamp previousMetastoreSafeTime = currentSafeTime.subtractPhysicalTime(skewMs);
            return this.executorInclinedPlacementDriver.getPrimaryReplica(replicationGroupId, previousMetastoreSafeTime).thenApply(replicaMeta -> replicaMeta != null && replicaMeta.getLeaseholderId() != null && replicaMeta.getLeaseholderId().equals(this.localNode().id()));
        }
        catch (IllegalArgumentException e) {
            long currentSafeTimeMs = currentSafeTime.longValue();
            throw new AssertionError("Got a negative time [currentSafeTime=" + currentSafeTime + ", currentSafeTimeMs=" + currentSafeTimeMs + ", skewMs=" + skewMs + ", internal=" + (currentSafeTimeMs + (-skewMs << 16)) + "]", e);
        }
    }

    private boolean isNodeInReducedStableOrPendingAssignments(ZonePartitionId replicaGrpId, @Nullable Assignments stableAssignments, Assignments pendingAssignments, long revision) {
        Entry reduceEntry = this.metaStorageMgr.getLocally(ZoneRebalanceUtil.switchReduceKey(replicaGrpId), revision);
        Assignments reduceAssignments = reduceEntry != null ? Assignments.fromBytes(reduceEntry.value()) : null;
        Set<Assignment> reducedStableAssignments = reduceAssignments != null ? RebalanceUtil.subtract(stableAssignments.nodes(), reduceAssignments.nodes()) : stableAssignments.nodes();
        return this.isLocalNodeInAssignments(RebalanceUtil.union(reducedStableAssignments, pendingAssignments.nodes()));
    }

    @Nullable
    private Assignment localAssignment(@Nullable Assignments assignments) {
        if (assignments != null) {
            for (Assignment assignment : assignments.nodes()) {
                if (!this.isLocalNodeAssignment.test(assignment)) continue;
                return assignment;
            }
        }
        return null;
    }

    @Override
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        try {
            IgniteUtils.closeAllManually(this.zoneResourcesManager);
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    @Nullable
    private Assignments stableAssignments(ZonePartitionId zonePartitionId, long revision) {
        Entry entry = this.metaStorageMgr.getLocally(ZoneRebalanceUtil.stablePartAssignmentsKey(zonePartitionId), revision);
        return Assignments.fromBytes(entry.value());
    }

    @VisibleForTesting
    public CompletableFuture<Void> stopPartitionInternal(ZonePartitionId zonePartitionId, LocalPartitionReplicaEvent beforeReplicaStoppedEvent, LocalPartitionReplicaEvent afterReplicaStoppedEvent, long eventRevision, Consumer<Boolean> afterReplicaStopAction) {
        return this.executeUnderZoneWriteLock(zonePartitionId.zoneId(), () -> {
            LocalPartitionReplicaEventParameters eventParameters = new LocalPartitionReplicaEventParameters(zonePartitionId, eventRevision, false);
            return this.fireEvent(beforeReplicaStoppedEvent, eventParameters).thenCompose(v -> {
                try {
                    return this.replicaMgr.stopReplica(zonePartitionId).thenComposeAsync(replicaWasStopped -> {
                        afterReplicaStopAction.accept((Boolean)replicaWasStopped);
                        if (!replicaWasStopped.booleanValue()) {
                            return CompletableFutures.nullCompletedFuture();
                        }
                        this.startedReplicationGroups.afterStoppingGroup(zonePartitionId);
                        return this.fireEvent(afterReplicaStoppedEvent, eventParameters);
                    }, (Executor)this.ioExecutor);
                }
                catch (NodeStoppingException e) {
                    return CompletableFutures.nullCompletedFuture();
                }
            });
        });
    }

    private void cleanUpPartitionsResources(Stream<ZonePartitionId> partitionIds) {
        CompletableFuture[] stopPartitionsFuture = (CompletableFuture[])partitionIds.map(zonePartitionId -> this.stopPartitionInternal((ZonePartitionId)zonePartitionId, LocalPartitionReplicaEvent.BEFORE_REPLICA_STOPPED, LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, -1L, replicaWasStopped -> {})).toArray(CompletableFuture[]::new);
        try {
            CompletableFuture<Void> fut = CompletableFuture.allOf(stopPartitionsFuture);
            CompletableFuture.delayedExecutor(30L, TimeUnit.SECONDS, Runnable::run).execute(() -> {
                if (!fut.isDone()) {
                    this.printPartitionState(partitionIds);
                }
            });
            fut.get();
        }
        catch (Throwable e) {
            this.failureProcessor.process(new FailureContext(e, "Unable to clean up zones resources"));
        }
    }

    private void printPartitionState(Stream<ZonePartitionId> partitionIds) {
        List nonStoppedPartitions = partitionIds.filter(partId -> this.replicaMgr.replica((ReplicationGroupId)partId) != null).collect(Collectors.toList());
        int exceedLimit = nonStoppedPartitions.size() - 100;
        String partitionsStr = "There are still some partitions that are being stopped: " + S.toString(nonStoppedPartitions, (sb, e, i) -> sb.app(e).app(i < nonStoppedPartitions.size() - 1 ? ", " : "")) + (exceedLimit > 0 ? IgniteStringFormatter.format(" and {} more; ", exceedLimit) : "; ");
        ThreadUtils.dumpThreads(LOG, partitionsStr, false);
    }

    public CompletableFuture<Long> lockZoneForRead(int zoneId) {
        NaiveAsyncReadWriteLock lock = this.zonePartitionsLocks.computeIfAbsent(zoneId, id -> PartitionReplicaLifecycleManager.newZoneLock());
        return lock.readLock();
    }

    private static NaiveAsyncReadWriteLock newZoneLock() {
        return new NaiveAsyncReadWriteLock();
    }

    public void unlockZoneForRead(int zoneId, long stamp) {
        this.zonePartitionsLocks.get(zoneId).unlockRead(stamp);
    }

    public void loadTableListenerToZoneReplica(ZonePartitionId zonePartitionId, int tableId, Function<RaftCommandRunner, ReplicaTableProcessor> tablePartitionReplicaProcessorFactory, RaftTableProcessor raftTableProcessor, PartitionMvStorageAccess partitionMvStorageAccess, boolean onNodeRecovery) {
        ZoneResourcesManager.ZonePartitionResources resources = this.zonePartitionResources(zonePartitionId);
        resources.replicaListenerFuture().thenAccept(zoneReplicaListener -> zoneReplicaListener.addTableReplicaProcessor(tableId, tablePartitionReplicaProcessorFactory));
        if (onNodeRecovery) {
            resources.raftListener().addTableProcessorOnRecovery(tableId, raftTableProcessor);
        } else {
            resources.raftListener().addTableProcessor(tableId, raftTableProcessor);
        }
        resources.snapshotStorage().addMvPartition(tableId, partitionMvStorageAccess);
    }

    public CompletableFuture<Void> unloadTableResourcesFromZoneReplica(ZonePartitionId zonePartitionId, int tableId) {
        return this.zoneResourcesManager.removeTableResources(zonePartitionId, tableId);
    }

    public CompletableFuture<?> restartPartitionWithCleanUp(ZonePartitionId zonePartitionId, long revision, long assignmentsTimestamp) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.stopPartitionAndDestroyForRestart(zonePartitionId, revision).thenComposeAsync(unused -> {
            Assignments stableAssignments = ZoneRebalanceUtil.zoneStableAssignmentsGetLocally(this.metaStorageMgr, zonePartitionId, revision);
            assert (stableAssignments != null) : "zonePartitionId=" + zonePartitionId + ", revision=" + revision;
            return this.waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(unused2 -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
                Assignment localAssignment = this.localAssignment(stableAssignments);
                if (localAssignment == null) {
                    return CompletableFutures.nullCompletedFuture();
                }
                CatalogZoneDescriptor zoneDescriptor = this.zoneDescriptorAt(zonePartitionId.zoneId(), assignmentsTimestamp);
                return this.createZonePartitionReplicationNode(zonePartitionId, localAssignment, stableAssignments, revision, zoneDescriptor.partitions(), this.isVolatileZone(zoneDescriptor), false, false);
            }));
        }, (Executor)this.ioExecutor));
    }

    public CompletableFuture<?> restartPartition(ZonePartitionId zonePartitionId, long revision, long assignmentsTimestamp) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.stopPartitionForRestart(zonePartitionId, revision).thenComposeAsync(unused -> {
            Assignments stableAssignments = ZoneRebalanceUtil.zoneStableAssignmentsGetLocally(this.metaStorageMgr, zonePartitionId, revision);
            assert (stableAssignments != null) : "zonePartitionId=" + zonePartitionId + ", revision=" + revision;
            return this.waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(unused2 -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
                Assignment localAssignment = this.localAssignment(stableAssignments);
                if (localAssignment == null) {
                    return CompletableFutures.nullCompletedFuture();
                }
                CatalogZoneDescriptor zoneDescriptor = this.zoneDescriptorAt(zonePartitionId.zoneId(), assignmentsTimestamp);
                return this.createZonePartitionReplicationNode(zonePartitionId, localAssignment, stableAssignments, revision, zoneDescriptor.partitions(), this.isVolatileZone(zoneDescriptor), false, false);
            }));
        }, (Executor)this.ioExecutor));
    }

    private <T> CompletableFuture<T> executeUnderZoneWriteLock(int zoneId, Supplier<CompletableFuture<T>> action) {
        NaiveAsyncReadWriteLock lock = this.zonePartitionsLocks.computeIfAbsent(zoneId, id -> PartitionReplicaLifecycleManager.newZoneLock());
        return lock.writeLock().thenCompose(stamp -> {
            try {
                return ((CompletableFuture)action.get()).whenComplete((v, e) -> lock.unlockWrite((long)stamp));
            }
            catch (Throwable e2) {
                lock.unlockWrite((long)stamp);
                return CompletableFuture.failedFuture(e2);
            }
        });
    }

    private CompletableFuture<Boolean> onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
        if (this.topologyService.localMember().id().equals(parameters.leaseholderId()) && parameters.groupId() instanceof ZonePartitionId) {
            ZonePartitionId groupId = (ZonePartitionId)parameters.groupId();
            this.replicaMgr.weakStopReplica(groupId, ReplicaManager.WeakReplicaStopReason.PRIMARY_EXPIRED, () -> this.stopAndDestroyPartition(groupId, parameters.causalityToken()));
        }
        return CompletableFutures.falseCompletedFuture();
    }

    private CompletableFuture<Void> stopAndDestroyPartition(ZonePartitionId zonePartitionId, long revision) {
        return this.stopPartitionInternal(zonePartitionId, LocalPartitionReplicaEvent.BEFORE_REPLICA_DESTROYED, LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED, revision, replicaWasStopped -> {
            if (replicaWasStopped.booleanValue()) {
                this.zoneResourcesManager.destroyZonePartitionResources(zonePartitionId);
                try {
                    this.replicaMgr.destroyReplicationProtocolStoragesDurably(zonePartitionId, false);
                }
                catch (NodeStoppingException e) {
                    throw new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)e);
                }
            }
        });
    }

    @TestOnly
    @Nullable
    public TxStatePartitionStorage txStatePartitionStorage(int zoneId, int partitionId) {
        return this.zoneResourcesManager.txStatePartitionStorage(zoneId, partitionId);
    }

    @TestOnly
    public HybridTimestamp currentSafeTimeForZonePartition(int zoneId, int partId) {
        return this.zonePartitionResources(new ZonePartitionId(zoneId, partId)).raftListener().currentSafeTime();
    }

    public ZoneResourcesManager.ZonePartitionResources zonePartitionResources(ZonePartitionId zonePartitionId) {
        ZoneResourcesManager.ZonePartitionResources resources = this.zoneResourcesManager.getZonePartitionResources(zonePartitionId);
        assert (resources != null) : String.format("Missing resources for zone partition [zonePartitionId=%s]", zonePartitionId);
        return resources;
    }

    private static boolean lastRebalanceWasGraceful(@Nullable AssignmentsChain assignmentsChain) {
        return assignmentsChain == null || assignmentsChain.size() == 1;
    }
}

