/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.partition.replicator;

import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
import org.apache.ignite.internal.catalog.events.DropZoneEventParameters;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite.internal.configuration.utils.SystemDistributedConfigurationPropertyHolder;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.rebalance.AssignmentUtil;
import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener;
import org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.event.Event;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.event.EventParameters;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.ComponentStoppingException;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.metrics.MetricSource;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.partition.replicator.LocalBeforeReplicaStartEventParameters;
import org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent;
import org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEventParameters;
import org.apache.ignite.internal.partition.replicator.NaiveAsyncReadWriteLock;
import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
import org.apache.ignite.internal.partition.replicator.StartedReplicationGroups;
import org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
import org.apache.ignite.internal.partition.replicator.ZoneResourcesManager;
import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
import org.apache.ignite.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.partitiondistribution.AssignmentsChain;
import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
import org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils;
import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.rebalance.ChangePeersAndLearnersWithRetry;
import org.apache.ignite.internal.raft.rebalance.RaftStaleUpdateException;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.thread.ThreadUtils;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateStorageRebalanceException;
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteBusyLock;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.LongPriorityQueue;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;
import org.apache.ignite.lang.ErrorGroups;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
import org.jetbrains.annotations.VisibleForTesting;

public class PartitionReplicaLifecycleManager
extends AbstractEventProducer<LocalPartitionReplicaEvent, LocalPartitionReplicaEventParameters>
implements IgniteComponent {
    private final CatalogService catalogService;
    private final ReplicaManager replicaMgr;
    private final DistributionZoneManager distributionZoneMgr;
    private final MetaStorageManager metaStorageMgr;
    private final TopologyService topologyService;
    private final LowWatermark lowWatermark;
    private final FailureProcessor failureProcessor;
    private final WatchListener pendingAssignmentsRebalanceListener;
    private final WatchListener stableAssignmentsRebalanceListener;
    private final WatchListener assignmentsSwitchRebalanceListener;
    private static final IgniteLogger LOG = Loggers.forClass(PartitionReplicaLifecycleManager.class);
    private final StartedReplicationGroups startedReplicationGroups = new StartedReplicationGroups();
    private final Map<Integer, NaiveAsyncReadWriteLock> zonePartitionsLocks = new ConcurrentHashMap<Integer, NaiveAsyncReadWriteLock>();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final ExecutorService ioExecutor;
    private final ScheduledExecutorService rebalanceScheduler;
    private final Executor partitionOperationsExecutor;
    private final ClockService clockService;
    private final PlacementDriver executorInclinedPlacementDriver;
    private final SchemaSyncService executorInclinedSchemaSyncService;
    private final TxManager txManager;
    private final SchemaManager schemaManager;
    private final Predicate<Assignment> isLocalNodeAssignment = assignment -> assignment.consistentId().equals(this.localNode().name());
    private final SystemDistributedConfigurationPropertyHolder<Integer> rebalanceRetryDelayConfiguration;
    private final DataStorageManager dataStorageManager;
    private final ZoneResourcesManager zoneResourcesManager;
    private final MetricManager metricManager;
    private final ReliableCatalogVersions reliableCatalogVersions;
    private final TransactionStateResolver transactionStateResolver;
    private final TxMessageSender txMessageSender;
    private final EventListener<CreateZoneEventParameters> onCreateZoneListener = this::onCreateZone;
    private final EventListener<PrimaryReplicaEventParameters> onPrimaryReplicaExpiredListener = this::onPrimaryReplicaExpired;
    private final EventListener<DropZoneEventParameters> onZoneDropListener = EventListener.fromConsumer(this::onZoneDrop);
    private final EventListener<ChangeLowWatermarkEventParameters> onLowWatermarkChangedListener = this::onLwmChanged;
    private final CompletableFuture<Void> stopReplicaLifecycleFuture = new CompletableFuture();
    private final LongPriorityQueue<DestroyZoneEvent> destructionEventsQueue = new LongPriorityQueue(DestroyZoneEvent::catalogVersion);

    public PartitionReplicaLifecycleManager(CatalogService catalogService, ReplicaManager replicaMgr, DistributionZoneManager distributionZoneMgr, MetaStorageManager metaStorageMgr, TopologyService topologyService, LowWatermark lowWatermark, FailureProcessor failureProcessor, ExecutorService ioExecutor, ScheduledExecutorService rebalanceScheduler, Executor partitionOperationsExecutor, ClockService clockService, PlacementDriver placementDriver, SchemaSyncService schemaSyncService, SystemDistributedConfiguration systemDistributedConfiguration, TxStateRocksDbSharedStorage sharedTxStateStorage, TxManager txManager, SchemaManager schemaManager, DataStorageManager dataStorageManager, OutgoingSnapshotsManager outgoingSnapshotsManager, MetricManager metricManager, MessagingService messagingService, ReplicaService replicaService) {
        this(catalogService, replicaMgr, distributionZoneMgr, metaStorageMgr, topologyService, lowWatermark, failureProcessor, ioExecutor, rebalanceScheduler, partitionOperationsExecutor, clockService, placementDriver, schemaSyncService, systemDistributedConfiguration, txManager, schemaManager, dataStorageManager, new ZoneResourcesManager(sharedTxStateStorage, txManager, outgoingSnapshotsManager, topologyService, catalogService, failureProcessor, partitionOperationsExecutor, replicaMgr), metricManager, messagingService, replicaService);
    }

    @VisibleForTesting
    PartitionReplicaLifecycleManager(CatalogService catalogService, ReplicaManager replicaMgr, DistributionZoneManager distributionZoneMgr, MetaStorageManager metaStorageMgr, TopologyService topologyService, LowWatermark lowWatermark, FailureProcessor failureProcessor, ExecutorService ioExecutor, ScheduledExecutorService rebalanceScheduler, Executor partitionOperationsExecutor, ClockService clockService, PlacementDriver placementDriver, SchemaSyncService schemaSyncService, SystemDistributedConfiguration systemDistributedConfiguration, TxManager txManager, SchemaManager schemaManager, DataStorageManager dataStorageManager, ZoneResourcesManager zoneResourcesManager, MetricManager metricManager, MessagingService messagingService, ReplicaService replicaService) {
        this.catalogService = catalogService;
        this.replicaMgr = replicaMgr;
        this.distributionZoneMgr = distributionZoneMgr;
        this.metaStorageMgr = metaStorageMgr;
        this.topologyService = topologyService;
        this.lowWatermark = lowWatermark;
        this.failureProcessor = failureProcessor;
        this.ioExecutor = ioExecutor;
        this.rebalanceScheduler = rebalanceScheduler;
        this.partitionOperationsExecutor = partitionOperationsExecutor;
        this.clockService = clockService;
        this.executorInclinedSchemaSyncService = new ExecutorInclinedSchemaSyncService(schemaSyncService, partitionOperationsExecutor);
        this.executorInclinedPlacementDriver = new ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
        this.txManager = txManager;
        this.schemaManager = schemaManager;
        this.dataStorageManager = dataStorageManager;
        this.zoneResourcesManager = zoneResourcesManager;
        this.metricManager = metricManager;
        this.rebalanceRetryDelayConfiguration = new SystemDistributedConfigurationPropertyHolder(systemDistributedConfiguration, (v, r) -> {}, "rebalanceRetryDelay", (Object)200, Integer::parseInt);
        this.txMessageSender = new TxMessageSender(messagingService, replicaService, clockService);
        this.transactionStateResolver = new TransactionStateResolver(txManager, clockService, (ClusterNodeResolver)topologyService, messagingService, this.executorInclinedPlacementDriver, this.txMessageSender);
        this.pendingAssignmentsRebalanceListener = this.createPendingAssignmentsRebalanceListener();
        this.stableAssignmentsRebalanceListener = this.createStableAssignmentsRebalanceListener();
        this.assignmentsSwitchRebalanceListener = this.createAssignmentsSwitchRebalanceListener();
        this.reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService);
    }

    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        this.transactionStateResolver.start();
        CompletableFuture recoveryFinishFuture = this.metaStorageMgr.recoveryFinishedFuture();
        assert (recoveryFinishFuture.isDone());
        long recoveryRevision = ((Revisions)recoveryFinishFuture.join()).revision();
        this.handleResourcesForDroppedZonesOnRecovery();
        CompletionStage processZonesAndAssignmentsOnStart = this.processZonesOnStart(recoveryRevision, this.lowWatermark.getLowWatermark()).thenCompose(ignored -> this.processAssignmentsOnRecovery(recoveryRevision));
        this.metaStorageMgr.registerPrefixWatch(new ByteArray(ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES), this.pendingAssignmentsRebalanceListener);
        this.metaStorageMgr.registerPrefixWatch(new ByteArray(ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES), this.stableAssignmentsRebalanceListener);
        this.metaStorageMgr.registerPrefixWatch(new ByteArray(ZoneRebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES), this.assignmentsSwitchRebalanceListener);
        this.catalogService.listen((Event)CatalogEvent.ZONE_CREATE, this.onCreateZoneListener);
        this.catalogService.listen((Event)CatalogEvent.ZONE_DROP, this.onZoneDropListener);
        this.lowWatermark.listen((Event)LowWatermarkEvent.LOW_WATERMARK_CHANGED, this.onLowWatermarkChangedListener);
        this.rebalanceRetryDelayConfiguration.init();
        this.executorInclinedPlacementDriver.listen((Event)PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.onPrimaryReplicaExpiredListener);
        this.metricManager.registerSource((MetricSource)this.zoneResourcesManager.snapshotsMetricsSource());
        this.metricManager.enable((MetricSource)this.zoneResourcesManager.snapshotsMetricsSource());
        return processZonesAndAssignmentsOnStart;
    }

    private CompletableFuture<Void> processZonesOnStart(long recoveryRevision, @Nullable HybridTimestamp lwm) {
        int earliestCatalogVersion = lwm == null ? this.catalogService.earliestCatalogVersion() : this.catalogService.activeCatalogVersion(lwm.longValue());
        int latestCatalogVersion = this.catalogService.latestCatalogVersion();
        IntOpenHashSet startedZones = new IntOpenHashSet();
        ArrayList startZoneFutures = new ArrayList();
        Catalog nextCatalog = null;
        for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion; --ver) {
            Catalog catalog = this.catalogService.catalog(ver);
            Catalog finalNextCatalog = nextCatalog;
            int ver0 = ver;
            this.catalogService.catalog(ver).zones().stream().filter(zone -> startedZones.add(zone.id())).forEach(zoneDescriptor -> {
                int zoneId = zoneDescriptor.id();
                startZoneFutures.add(this.calculateZoneAssignmentsAndCreateReplicationNodes(recoveryRevision, ver0, (CatalogZoneDescriptor)zoneDescriptor, true));
                if (finalNextCatalog != null && finalNextCatalog.zone(zoneId) == null) {
                    this.destructionEventsQueue.enqueue((Object)new DestroyZoneEvent(finalNextCatalog.version(), zoneId, zoneDescriptor.partitions()));
                }
            });
            nextCatalog = catalog;
        }
        return CompletableFuture.allOf((CompletableFuture[])startZoneFutures.toArray(CompletableFuture[]::new)).whenComplete((unused, throwable) -> {
            if (throwable != null && !ExceptionUtils.hasCause((Throwable)throwable, (Class[])new Class[]{NodeStoppingException.class})) {
                this.failureProcessor.process(new FailureContext(throwable, "Error starting zones"));
            } else {
                LOG.debug("Zones started successfully [earliestCatalogVersion={}, latestCatalogVersion={}, startedZoneIds={}]", new Object[]{earliestCatalogVersion, latestCatalogVersion, startedZones});
            }
        });
    }

    private CompletableFuture<Void> processAssignmentsOnRecovery(long recoveryRevision) {
        return this.recoverStableAssignments(recoveryRevision).thenCompose(v -> this.recoverPendingAssignments(recoveryRevision));
    }

    private CompletableFuture<Void> recoverStableAssignments(long recoveryRevision) {
        return this.handleAssignmentsOnRecovery(new ByteArray(ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES), recoveryRevision, (entry, rev) -> this.handleChangeStableAssignmentEvent((Entry)entry, (long)rev, true), "stable");
    }

    private CompletableFuture<Void> recoverPendingAssignments(long recoveryRevision) {
        return this.handleAssignmentsOnRecovery(new ByteArray(ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES), recoveryRevision, (pendingAssignmentsEntry, revision) -> this.handleChangePendingAssignmentEvent((Entry)pendingAssignmentsEntry, (long)revision, true), "pending");
    }

    private CompletableFuture<Void> handleAssignmentsOnRecovery(ByteArray prefix, long revision, BiFunction<Entry, Long, CompletableFuture<Void>> assignmentsEventHandler, String assignmentsType) {
        try (Cursor cursor = this.metaStorageMgr.prefixLocally(prefix, revision);){
            CompletableFuture[] futures = (CompletableFuture[])cursor.stream().map(entry -> {
                if (LOG.isInfoEnabled()) {
                    LOG.info("Non handled {} assignments for key '{}' discovered, performing recovery", new Object[]{assignmentsType, new String(entry.key(), StandardCharsets.UTF_8)});
                }
                return (CompletableFuture)assignmentsEventHandler.apply((Entry)entry, revision);
            }).toArray(CompletableFuture[]::new);
            CompletionStage completionStage = CompletableFuture.allOf(futures).whenComplete((res, e) -> {
                if (e != null && !ExceptionUtils.hasCause((Throwable)e, (Class[])new Class[]{NodeStoppingException.class})) {
                    this.failureProcessor.process(new FailureContext(e, "Error when performing assignments recovery"));
                }
            });
            return completionStage;
        }
    }

    private void handleResourcesForDroppedZonesOnRecovery() {
    }

    private CompletableFuture<Boolean> onCreateZone(CreateZoneEventParameters createZoneEventParameters) {
        return (CompletableFuture)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> this.calculateZoneAssignmentsAndCreateReplicationNodes(createZoneEventParameters.causalityToken(), createZoneEventParameters.catalogVersion(), createZoneEventParameters.zoneDescriptor(), false).thenApply(unused -> false));
    }

    private CompletableFuture<Void> calculateZoneAssignmentsAndCreateReplicationNodes(long causalityToken, int catalogVersion, CatalogZoneDescriptor zoneDescriptor, boolean onNodeRecovery) {
        return IgniteUtils.inBusyLockAsync((IgniteBusyLock)this.busyLock, () -> {
            int zoneId = zoneDescriptor.id();
            return this.getOrCreateAssignments(zoneDescriptor, causalityToken, catalogVersion).thenCompose(stableAssignments -> this.createZoneReplicationNodes(zoneId, (List<Assignments>)stableAssignments, ZoneRebalanceUtil.zonePendingAssignmentsGetLocally((MetaStorageManager)this.metaStorageMgr, (int)zoneId, (int)zoneDescriptor.partitions(), (long)causalityToken), ZoneRebalanceUtil.zoneAssignmentsChainGetLocally((MetaStorageManager)this.metaStorageMgr, (int)zoneId, (int)zoneDescriptor.partitions(), (long)causalityToken), causalityToken, catalogVersion, zoneDescriptor.partitions(), onNodeRecovery));
        });
    }

    private CompletableFuture<Void> createZoneReplicationNodes(int zoneId, List<Assignments> stableAssignmentsForZone, List<@Nullable Assignments> pendingAssignmentsForZone, List<AssignmentsChain> assignmentsChains, long causalityToken, int catalogVersion, int partitionCount, boolean onNodeRecovery) {
        assert (stableAssignmentsForZone != null) : IgniteStringFormatter.format((String)"Zone has empty assignments [id={}].", (Object[])new Object[]{zoneId});
        Supplier<CompletableFuture> createZoneReplicationNodes = () -> IgniteUtils.inBusyLockAsync((IgniteBusyLock)this.busyLock, () -> {
            CompletableFuture[] partitionsStartFutures = new CompletableFuture[stableAssignmentsForZone.size()];
            for (int partId = 0; partId < stableAssignmentsForZone.size(); ++partId) {
                boolean shouldStartPartition;
                Assignments stableAssignments = (Assignments)stableAssignmentsForZone.get(partId);
                Assignments pendingAssignments = (Assignments)pendingAssignmentsForZone.get(partId);
                Assignment localAssignmentInStable = this.localAssignment(stableAssignments);
                AssignmentsChain assignmentsChain = (AssignmentsChain)assignmentsChains.get(partId);
                if (onNodeRecovery) {
                    if (PartitionReplicaLifecycleManager.lastRebalanceWasGraceful(assignmentsChain)) {
                        shouldStartPartition = localAssignmentInStable != null && (pendingAssignments == null || !pendingAssignments.force());
                    } else {
                        LOG.warn("Recovery after a forced rebalance for zone is not supported yet [zoneId={}, partitionId={}].", new Object[]{zoneId, partId});
                        shouldStartPartition = localAssignmentInStable != null && (pendingAssignments == null || !pendingAssignments.force());
                    }
                } else {
                    boolean bl = shouldStartPartition = localAssignmentInStable != null;
                }
                if (shouldStartPartition) {
                    ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, partId);
                    partitionsStartFutures[partId] = this.createZonePartitionReplicationNode(zonePartitionId, stableAssignments, causalityToken, partitionCount, this.isVolatileZoneForCatalogVersion(zoneId, catalogVersion), onNodeRecovery, !onNodeRecovery);
                    continue;
                }
                partitionsStartFutures[partId] = CompletableFutures.nullCompletedFuture();
            }
            return CompletableFuture.allOf(partitionsStartFutures);
        });
        if (onNodeRecovery) {
            return createZoneReplicationNodes.get();
        }
        return IgniteUtils.inBusyLockAsync((IgniteBusyLock)this.busyLock, () -> this.executeUnderZoneWriteLock(zoneId, createZoneReplicationNodes));
    }

    private boolean isVolatileZoneForCatalogVersion(int zoneId, int catalogVersion) {
        CatalogZoneDescriptor zoneDescriptor = this.catalogService.catalog(catalogVersion).zone(zoneId);
        assert (zoneDescriptor != null);
        return this.isVolatileZone(zoneDescriptor);
    }

    private boolean isVolatileZone(CatalogZoneDescriptor zoneDescriptor) {
        return zoneDescriptor.storageProfiles().profiles().stream().map(CatalogStorageProfileDescriptor::storageProfile).allMatch(storageProfile -> {
            StorageEngine storageEngine = this.dataStorageManager.engineByStorageProfile(storageProfile);
            return storageEngine != null && storageEngine.isVolatile();
        });
    }

    private CompletableFuture<?> createZonePartitionReplicationNode(ZonePartitionId zonePartitionId, Assignments stableAssignments, long revision, int partitionCount, boolean isVolatileZone, boolean onRecovery, boolean holdingZoneWriteLock) {
        Assignments forcedAssignments = stableAssignments.force() ? stableAssignments : null;
        PeersAndLearners stablePeersAndLearners = PeersAndLearners.fromAssignments((Collection)stableAssignments.nodes());
        RaftGroupEventsListener raftGroupEventsListener = this.createRaftGroupEventsListener(zonePartitionId);
        Supplier<CompletableFuture> startReplicaSupplier = () -> {
            PendingComparableValuesTracker storageIndexTracker = new PendingComparableValuesTracker((Comparable)Long.valueOf(0L));
            ZoneResourcesManager.ZonePartitionResources zoneResources = this.zoneResourcesManager.allocateZonePartitionResources(zonePartitionId, partitionCount, (PendingComparableValuesTracker<Long, Void>)storageIndexTracker);
            LocalBeforeReplicaStartEventParameters eventParams = new LocalBeforeReplicaStartEventParameters(zonePartitionId, revision, onRecovery, zoneResources, zoneResources.txStatePartitionStorageIsInRebalanceState());
            this.startedReplicationGroups.beforeStartingGroup(zonePartitionId);
            return ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)this.fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, (EventParameters)eventParams).thenCompose(v -> {
                if (eventParams.anyStorageIsInRebalanceState()) {
                    try {
                        this.replicaMgr.destroyReplicationProtocolStorages((ReplicationGroupId)zonePartitionId, isVolatileZone);
                    }
                    catch (NodeStoppingException e) {
                        return CompletableFuture.failedFuture(e);
                    }
                    CompletableFuture clearTxStateStorage = zoneResources.txStatePartitionStorage().clear();
                    CompletableFuture[] registeredCleanupFutures = (CompletableFuture[])eventParams.cleanupActions().stream().map(Supplier::get).toArray(CompletableFuture[]::new);
                    CompletableFuture<Void> clearMvStorages = CompletableFuture.allOf(registeredCleanupFutures);
                    return CompletableFuture.allOf(clearTxStateStorage, clearMvStorages);
                }
                return CompletableFutures.nullCompletedFuture();
            })).thenCompose(v -> {
                try {
                    return this.replicaMgr.startReplica((ReplicationGroupId)zonePartitionId, raftClient -> {
                        ZonePartitionReplicaListener replicaListener = new ZonePartitionReplicaListener(zoneResources.txStatePartitionStorage(), this.clockService, this.txManager, new CatalogValidationSchemasSource(this.catalogService, this.schemaManager), this.executorInclinedSchemaSyncService, this.catalogService, (LeasePlacementDriver)this.executorInclinedPlacementDriver, (ClusterNodeResolver)this.topologyService, (RaftCommandRunner)new ExecutorInclinedRaftCommandRunner((RaftCommandRunner)raftClient, this.partitionOperationsExecutor), this.failureProcessor, this.topologyService.localMember(), zonePartitionId, this.transactionStateResolver, this.txMessageSender);
                        zoneResources.replicaListenerFuture().complete(replicaListener);
                        return replicaListener;
                    }, (SnapshotStorageFactory)new PartitionSnapshotStorageFactory(zoneResources.snapshotStorage()), stablePeersAndLearners, (RaftGroupListener)zoneResources.raftListener(), raftGroupEventsListener, isVolatileZone, this.busyLock, storageIndexTracker);
                }
                catch (NodeStoppingException e) {
                    return CompletableFuture.failedFuture(e);
                }
            })).whenComplete((replica, throwable) -> {
                if (throwable != null) {
                    this.startedReplicationGroups.startingFailed(zonePartitionId);
                }
            })).thenCompose(v -> {
                Supplier addReplicationGroupIdFuture = () -> {
                    this.startedReplicationGroups.startingCompleted(zonePartitionId);
                    return CompletableFutures.nullCompletedFuture();
                };
                if (holdingZoneWriteLock) {
                    return addReplicationGroupIdFuture.get();
                }
                return this.executeUnderZoneWriteLock(zonePartitionId.zoneId(), addReplicationGroupIdFuture);
            })).thenApply(unused -> true);
        };
        return this.replicaMgr.weakStartReplica((ReplicationGroupId)zonePartitionId, startReplicaSupplier, forcedAssignments, revision).whenComplete((res, ex) -> {
            if (ex != null && !ExceptionUtils.hasCause((Throwable)ex, (Class[])new Class[]{NodeStoppingException.class})) {
                String errorMessage = String.format("Unable to update raft groups on the node [zonePartitionId=%s]", zonePartitionId);
                this.failureProcessor.process(new FailureContext(ex, errorMessage));
            }
        });
    }

    private CompletableFuture<Set<Assignment>> calculateZoneAssignments(ZonePartitionId zonePartitionId, Long assignmentsTimestamp) {
        Objects.requireNonNull(assignmentsTimestamp, "assignmentsTimestamp must be not null");
        HybridTimestamp assignmentsHybridTimestamp = HybridTimestamp.hybridTimestamp((long)assignmentsTimestamp);
        CompletionStage assignmentsFuture = this.reliableCatalogVersions.safeReliableCatalogFor(assignmentsHybridTimestamp).thenCompose(catalog -> this.calculateZoneAssignments(zonePartitionId, (Catalog)catalog, assignmentsHybridTimestamp));
        return CompletableFuture.anyOf(new CompletableFuture[]{this.stopReplicaLifecycleFuture, assignmentsFuture}).thenApply(a -> (Set)a);
    }

    private CompletableFuture<Set<Assignment>> calculateZoneAssignments(ZonePartitionId zonePartitionId, Catalog catalog, HybridTimestamp assignmentsTimestamp) {
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zonePartitionId.zoneId());
        int zoneId = zonePartitionId.zoneId();
        List currentAssignments = AssignmentUtil.currentDistributionFromLocalMetaStorage((MetaStorageManager)this.metaStorageMgr, (int)zoneDescriptor.partitions(), p -> ZoneRebalanceUtil.stablePartAssignmentsKey((ZonePartitionId)new ZonePartitionId(zoneId, p.intValue())), p -> ZoneRebalanceUtil.pendingPartAssignmentsQueueKey((ZonePartitionId)new ZonePartitionId(zoneId, p.intValue())));
        return this.distributionZoneMgr.dataNodes(assignmentsTimestamp, catalog.version(), zoneDescriptor.id()).thenApply(dataNodes -> PartitionDistributionUtils.calculateAssignmentForPartition((Collection)dataNodes, (List)currentAssignments, (int)zonePartitionId.partitionId(), (int)zoneDescriptor.partitions(), (int)zoneDescriptor.replicas(), (int)zoneDescriptor.consensusGroupSize()));
    }

    private ChangePeersAndLearnersWithRetry createChangePeersAndLearnersWithRetry(ZonePartitionId replicaGrpId) {
        return new ChangePeersAndLearnersWithRetry((IgniteBusyLock)this.busyLock, this.rebalanceScheduler, () -> this.partitionRaftClient(replicaGrpId));
    }

    private CompletableFuture<RaftGroupService> partitionRaftClient(ZonePartitionId replicaGrpId) {
        CompletableFuture replicaFut = this.replicaMgr.replica((ReplicationGroupId)replicaGrpId);
        if (replicaFut == null) {
            return CompletableFuture.failedFuture((Throwable)new IgniteInternalException("No such replica for partition " + replicaGrpId.partitionId() + " in zone " + replicaGrpId.zoneId()));
        }
        return replicaFut.thenApply(Replica::raftClient);
    }

    private RaftGroupEventsListener createRaftGroupEventsListener(ZonePartitionId zonePartitionId) {
        return new ZoneRebalanceRaftGroupEventsListener(this.metaStorageMgr, this.failureProcessor, zonePartitionId, this.busyLock, this.createChangePeersAndLearnersWithRetry(zonePartitionId), this.rebalanceScheduler, this::calculateZoneAssignments, this.rebalanceRetryDelayConfiguration);
    }

    private InternalClusterNode localNode() {
        return this.topologyService.localMember();
    }

    public void beforeNodeStop() {
        this.stopReplicaLifecycleFuture.completeExceptionally(new NodeStoppingException());
        this.busyLock.block();
        this.executorInclinedPlacementDriver.removeListener((Event)PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.onPrimaryReplicaExpiredListener);
        this.lowWatermark.removeListener((Event)LowWatermarkEvent.LOW_WATERMARK_CHANGED, this.onLowWatermarkChangedListener);
        this.catalogService.removeListener((Event)CatalogEvent.ZONE_CREATE, this.onCreateZoneListener);
        this.catalogService.removeListener((Event)CatalogEvent.ZONE_DROP, this.onZoneDropListener);
        this.metaStorageMgr.unregisterWatch(this.pendingAssignmentsRebalanceListener);
        this.metaStorageMgr.unregisterWatch(this.stableAssignmentsRebalanceListener);
        this.metaStorageMgr.unregisterWatch(this.assignmentsSwitchRebalanceListener);
        this.startedReplicationGroups.waitForStartingReplicas();
        this.cleanUpPartitionsResources(this.startedReplicationGroups.streamStartedReplicationGroups());
    }

    private CompletableFuture<List<Assignments>> writeZoneAssignmentsToMetastore(int zoneId, ConsistencyMode consistencyMode, List<Assignments> newAssignments) {
        assert (!newAssignments.isEmpty());
        boolean haMode = consistencyMode == ConsistencyMode.HIGH_AVAILABILITY;
        ArrayList<Operation> partitionAssignments = new ArrayList<Operation>(newAssignments.size());
        for (int i = 0; i < newAssignments.size(); ++i) {
            ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, i);
            ByteArray stableAssignmentsKey = ZoneRebalanceUtil.stablePartAssignmentsKey((ZonePartitionId)zonePartitionId);
            byte[] anAssignment = newAssignments.get(i).toBytes();
            Operation op = Operations.put((ByteArray)stableAssignmentsKey, (byte[])anAssignment);
            partitionAssignments.add(op);
            if (!haMode) continue;
            ByteArray assignmentsChainKey = ZoneRebalanceUtil.assignmentsChainKey((ZonePartitionId)zonePartitionId);
            byte[] assignmentChain = AssignmentsChain.of((long)-1L, (long)-1L, (Assignments[])new Assignments[]{newAssignments.get(i)}).toBytes();
            Operation chainOp = Operations.put((ByteArray)assignmentsChainKey, (byte[])assignmentChain);
            partitionAssignments.add(chainOp);
        }
        SimpleCondition condition = Conditions.notExists((ByteArray)new ByteArray(ByteUtils.toByteArray((ByteBuffer)((Operation)partitionAssignments.get(0)).key())));
        return ((CompletableFuture)this.metaStorageMgr.invoke((Condition)condition, partitionAssignments, Collections.emptyList()).whenComplete((invokeResult, e) -> {
            if (e != null && !ExceptionUtils.hasCause((Throwable)e, (Class[])new Class[]{NodeStoppingException.class})) {
                String errorMessage = String.format("Couldn't write assignments [assignmentsList=%s] to metastore during invoke.", Assignments.assignmentListToString((List)newAssignments));
                this.failureProcessor.process(new FailureContext(e, errorMessage));
            }
        })).thenCompose(invokeResult -> {
            if (invokeResult.booleanValue()) {
                LOG.info("Assignments calculated from data nodes are successfully written to meta storage [zoneId={}, assignments={}].", new Object[]{zoneId, Assignments.assignmentListToString((List)newAssignments)});
                return CompletableFuture.completedFuture(newAssignments);
            }
            Set partKeys = IntStream.range(0, newAssignments.size()).mapToObj(p -> ZoneRebalanceUtil.stablePartAssignmentsKey((ZonePartitionId)new ZonePartitionId(zoneId, p))).collect(Collectors.toSet());
            return ((CompletableFuture)this.metaStorageMgr.getAll(partKeys).thenApply(metaStorageAssignments -> {
                ArrayList<Assignments> realAssignments = new ArrayList<Assignments>();
                for (int p = 0; p < newAssignments.size(); ++p) {
                    ZonePartitionId partId = new ZonePartitionId(zoneId, p);
                    Entry assignmentsEntry = (Entry)metaStorageAssignments.get(ZoneRebalanceUtil.stablePartAssignmentsKey((ZonePartitionId)partId));
                    assert (assignmentsEntry != null && !assignmentsEntry.empty() && !assignmentsEntry.tombstone()) : "Unexpected assignments for partition [" + partId + ", entry=" + assignmentsEntry + "].";
                    Assignments real = Assignments.fromBytes((byte[])assignmentsEntry.value());
                    realAssignments.add(real);
                }
                LOG.info("Assignments picked up from meta storage [zoneId={}, assignments={}].", new Object[]{zoneId, Assignments.assignmentListToString(realAssignments)});
                return realAssignments;
            })).whenComplete((realAssignments, e) -> {
                if (e != null && !ExceptionUtils.hasCause((Throwable)e, (Class[])new Class[]{NodeStoppingException.class})) {
                    String errorMessage = String.format("Couldn't get assignments from metastore for zone [zoneId=%s].", zoneId);
                    this.failureProcessor.process(new FailureContext(e, errorMessage));
                }
            });
        });
    }

    private CompletableFuture<List<Assignments>> getOrCreateAssignments(CatalogZoneDescriptor zoneDescriptor, long causalityToken, int catalogVersion) {
        if (ZoneRebalanceUtil.zonePartitionAssignmentsGetLocally((MetaStorageManager)this.metaStorageMgr, (int)zoneDescriptor.id(), (int)0, (long)causalityToken) != null) {
            return CompletableFuture.completedFuture(ZoneRebalanceUtil.zoneAssignmentsGetLocally((MetaStorageManager)this.metaStorageMgr, (int)zoneDescriptor.id(), (int)zoneDescriptor.partitions(), (long)causalityToken));
        }
        Catalog catalog = this.catalogService.catalog(catalogVersion);
        long assignmentsTimestamp = catalog.time();
        return ((CompletableFuture)((CompletableFuture)this.distributionZoneMgr.dataNodes(zoneDescriptor.updateTimestamp(), catalogVersion, zoneDescriptor.id()).thenApply(dataNodes -> PartitionDistributionUtils.calculateAssignments((Collection)dataNodes, Collections.emptyList(), (int)zoneDescriptor.partitions(), (int)zoneDescriptor.replicas(), (int)zoneDescriptor.consensusGroupSize()).stream().map(assignments -> Assignments.of((Set)assignments, (long)assignmentsTimestamp)).collect(Collectors.toList()))).whenComplete((assignments, e) -> {
            if (e == null && LOG.isInfoEnabled()) {
                LOG.info("Assignments calculated from data nodes [zone={}, zoneId={}, assignments={}, revision={}]", new Object[]{zoneDescriptor.name(), zoneDescriptor.id(), Assignments.assignmentListToString((List)assignments), causalityToken});
            }
        })).thenCompose(assignments -> this.writeZoneAssignmentsToMetastore(zoneDescriptor.id(), zoneDescriptor.consistencyMode(), (List<Assignments>)assignments));
    }

    public boolean hasLocalPartition(ZonePartitionId zonePartitionId) {
        assert (Optional.ofNullable(this.zonePartitionsLocks.get(zonePartitionId.zoneId())).map(NaiveAsyncReadWriteLock::isReadLocked).orElse(false).booleanValue()) : zonePartitionId;
        return this.startedReplicationGroups.hasReplicationGroupStarted(zonePartitionId);
    }

    private WatchListener createPendingAssignmentsRebalanceListener() {
        return evt -> {
            if (!this.busyLock.enterBusy()) {
                return CompletableFuture.failedFuture(new NodeStoppingException());
            }
            try {
                Entry newEntry = evt.entryEvent().newEntry();
                CompletableFuture<Void> completableFuture = this.handleChangePendingAssignmentEvent(newEntry, evt.revision(), false);
                return completableFuture;
            }
            finally {
                this.busyLock.leaveBusy();
            }
        };
    }

    private WatchListener createStableAssignmentsRebalanceListener() {
        return evt -> {
            if (!this.busyLock.enterBusy()) {
                return CompletableFuture.failedFuture(new NodeStoppingException());
            }
            try {
                CompletableFuture<Void> completableFuture = this.handleChangeStableAssignmentEvent(evt);
                return completableFuture;
            }
            finally {
                this.busyLock.leaveBusy();
            }
        };
    }

    private WatchListener createAssignmentsSwitchRebalanceListener() {
        return evt -> IgniteUtils.inBusyLockAsync((IgniteBusyLock)this.busyLock, () -> {
            byte[] key = evt.entryEvent().newEntry().key();
            ZonePartitionId replicaGrpId = ZoneRebalanceUtil.extractZonePartitionId((byte[])key, (byte[])ZoneRebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES);
            Assignments assignments = Assignments.fromBytes((byte[])evt.entryEvent().newEntry().value());
            long assignmentsTimestamp = assignments.timestamp();
            return this.reliableCatalogVersions.safeReliableCatalogFor(HybridTimestamp.hybridTimestamp((long)assignmentsTimestamp)).thenCompose(catalog -> IgniteUtils.inBusyLockAsync((IgniteBusyLock)this.busyLock, () -> this.handleReduceChanged(evt, (Catalog)catalog, replicaGrpId, assignmentsTimestamp)));
        });
    }

    private CompletableFuture<Void> handleReduceChanged(WatchEvent evt, Catalog catalog, ZonePartitionId replicaGrpId, long assignmentsTimestamp) {
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(replicaGrpId.zoneId());
        return this.distributionZoneMgr.dataNodes(HybridTimestamp.hybridTimestamp((long)assignmentsTimestamp), catalog.version(), replicaGrpId.zoneId()).thenCompose(dataNodes -> ZoneRebalanceRaftGroupEventsListener.handleReduceChanged((MetaStorageManager)this.metaStorageMgr, (Collection)dataNodes, (int)zoneDescriptor.partitions(), (int)zoneDescriptor.replicas(), (int)zoneDescriptor.consensusGroupSize(), (ZonePartitionId)replicaGrpId, (WatchEvent)evt, (long)assignmentsTimestamp));
    }

    private CompletableFuture<Void> handleChangeStableAssignmentEvent(WatchEvent evt) {
        if (evt.entryEvents().stream().allMatch(e -> e.oldEntry().value() == null)) {
            return CompletableFutures.nullCompletedFuture();
        }
        if (!evt.single()) {
            assert (evt.entryEvents().stream().allMatch(entryEvent -> entryEvent.newEntry().tombstone())) : evt;
            return CompletableFutures.nullCompletedFuture();
        }
        assert (evt.single()) : evt;
        if (evt.entryEvent().oldEntry() == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        Entry stableAssignmentsWatchEvent = evt.entryEvent().newEntry();
        long revision = evt.revision();
        assert (stableAssignmentsWatchEvent.revision() == revision) : stableAssignmentsWatchEvent;
        if (stableAssignmentsWatchEvent.value() == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        return this.handleChangeStableAssignmentEvent(stableAssignmentsWatchEvent, evt.revision(), false);
    }

    private CompletableFuture<Void> handleChangeStableAssignmentEvent(Entry stableAssignmentsWatchEvent, long revision, boolean onNodeRecovery) {
        ZonePartitionId zonePartitionId = ZoneRebalanceUtil.extractZonePartitionId((byte[])stableAssignmentsWatchEvent.key(), (byte[])ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES);
        Set stableAssignments = stableAssignmentsWatchEvent.value() == null ? Collections.emptySet() : Assignments.fromBytes((byte[])stableAssignmentsWatchEvent.value()).nodes();
        return CompletableFuture.supplyAsync(() -> {
            Entry pendingAssignmentsEntry = this.metaStorageMgr.getLocally(ZoneRebalanceUtil.pendingPartAssignmentsQueueKey((ZonePartitionId)zonePartitionId), revision);
            byte[] pendingAssignmentsFromMetaStorage = pendingAssignmentsEntry.value();
            Assignments pendingAssignments = pendingAssignmentsFromMetaStorage == null ? Assignments.EMPTY : AssignmentsQueue.fromBytes((byte[])pendingAssignmentsFromMetaStorage).poll();
            return this.stopAndMaybeDestroyPartitionAndUpdateClients(zonePartitionId, stableAssignments, pendingAssignments, onNodeRecovery, revision);
        }, this.ioExecutor).thenCompose(Function.identity());
    }

    private CompletableFuture<Void> updatePartitionClients(ZonePartitionId zonePartitionId, Set<Assignment> stableAssignments, Set<Assignment> pendingAssignments) {
        return this.isLocalNodeIsPrimary((ReplicationGroupId)zonePartitionId).thenCompose(isLeaseholder -> (CompletionStage)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            CompletableFuture replicaFuture = this.replicaMgr.replica((ReplicationGroupId)zonePartitionId);
            boolean isLocalInStable = this.isLocalNodeInAssignments(stableAssignments);
            if (!isLocalInStable && !isLeaseholder.booleanValue()) {
                return CompletableFutures.nullCompletedFuture();
            }
            if (replicaFuture == null) {
                return CompletableFuture.failedFuture((Throwable)new ReplicaUnavailableException((ReplicationGroupId)zonePartitionId, this.localNode()));
            }
            return replicaFuture.thenAccept(replica -> replica.updatePeersAndLearners(PeersAndLearners.fromAssignments((Collection)RebalanceUtil.union((Set)stableAssignments, (Set)pendingAssignments))));
        }));
    }

    private CompletableFuture<Void> stopAndMaybeDestroyPartitionAndUpdateClients(ZonePartitionId zonePartitionId, Set<Assignment> stableAssignments, Assignments pendingAssignments, boolean onNodeRecovery, long revision) {
        CompletableFuture<Void> clientUpdateFuture = onNodeRecovery ? CompletableFutures.nullCompletedFuture() : this.updatePartitionClients(zonePartitionId, stableAssignments, pendingAssignments.nodes());
        boolean shouldStopLocalServices = (pendingAssignments.force() ? pendingAssignments.nodes().stream() : Stream.concat(stableAssignments.stream(), pendingAssignments.nodes().stream())).noneMatch(assignment -> assignment.consistentId().equals(this.localNode().name()));
        if (!shouldStopLocalServices) {
            return clientUpdateFuture;
        }
        return clientUpdateFuture.thenCompose(v -> this.replicaMgr.weakStopReplica((ReplicationGroupId)zonePartitionId, ReplicaManager.WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS, () -> this.stopAndDestroyPartition(zonePartitionId, revision)));
    }

    private CompletableFuture<Void> stopPartitionForRestart(ZonePartitionId zonePartitionId, long revision) {
        return this.replicaMgr.weakStopReplica((ReplicationGroupId)zonePartitionId, ReplicaManager.WeakReplicaStopReason.RESTART, () -> this.stopPartitionInternal(zonePartitionId, LocalPartitionReplicaEvent.BEFORE_REPLICA_STOPPED, LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, revision, replica -> {}));
    }

    private CompletableFuture<Void> stopPartitionAndDestroyForRestart(ZonePartitionId zonePartitionId, long revision) {
        return this.replicaMgr.weakStopReplica((ReplicationGroupId)zonePartitionId, ReplicaManager.WeakReplicaStopReason.RESTART, () -> this.stopAndDestroyPartition(zonePartitionId, revision));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry pendingAssignmentsEntry, long revision, boolean isRecovery) {
        if (pendingAssignmentsEntry.value() == null || pendingAssignmentsEntry.empty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        ZonePartitionId zonePartitionId = ZoneRebalanceUtil.extractZonePartitionId((byte[])pendingAssignmentsEntry.key(), (byte[])ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES);
        Assignments stableAssignments = this.stableAssignments(zonePartitionId, revision);
        AssignmentsQueue pendingAssignmentsQueue = AssignmentsQueue.fromBytes((byte[])pendingAssignmentsEntry.value());
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            if (LOG.isInfoEnabled()) {
                String stringKey = new String(pendingAssignmentsEntry.key(), StandardCharsets.UTF_8);
                LOG.info("Received update on pending assignments. Check if new replication node should be started [key={}, partition={}, zoneId={}, localMemberAddress={}, pendingAssignments={}, revision={}]", new Object[]{stringKey, zonePartitionId.partitionId(), zonePartitionId.zoneId(), this.localNode().address(), pendingAssignmentsQueue, revision});
            }
            Assignments pendingAssignments = pendingAssignmentsQueue == null ? Assignments.EMPTY : pendingAssignmentsQueue.poll();
            CompletionStage completionStage = ((CompletableFuture)this.handleChangePendingAssignmentEvent(zonePartitionId, stableAssignments, pendingAssignments, revision, isRecovery).thenRun(() -> {
                boolean isLocalNodeInStableOrPending = this.isNodeInReducedStableOrPendingAssignments(zonePartitionId, stableAssignments, pendingAssignments, revision);
                if (!isLocalNodeInStableOrPending) {
                    return;
                }
                if (isRecovery && !this.replicaMgr.isReplicaStarted((ReplicationGroupId)zonePartitionId)) {
                    return;
                }
                this.changePeersOnRebalance(zonePartitionId, pendingAssignments.nodes(), revision);
            })).whenComplete((v, ex) -> this.maybeRunFailHandler((Throwable)ex, zonePartitionId));
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private CompletableFuture<Void> handleChangePendingAssignmentEvent(ZonePartitionId replicaGrpId, @Nullable Assignments stableAssignments, Assignments pendingAssignments, long revision, boolean isRecovery) {
        CompletableFuture<Object> localServicesStartFuture;
        boolean shouldStartLocalGroupNode;
        boolean pendingAssignmentsAreForced = pendingAssignments.force();
        Set pendingAssignmentsNodes = pendingAssignments.nodes();
        Assignment localAssignmentInPending = this.localAssignment(pendingAssignments);
        Assignment localAssignmentInStable = this.localAssignment(stableAssignments);
        AssignmentsChain assignmentsChain = ZoneRebalanceUtil.assignmentsChainGetLocally((MetaStorageManager)this.metaStorageMgr, (ZonePartitionId)replicaGrpId, (long)revision);
        if (isRecovery) {
            if (PartitionReplicaLifecycleManager.lastRebalanceWasGraceful(assignmentsChain)) {
                shouldStartLocalGroupNode = localAssignmentInPending != null;
            } else {
                LOG.warn("Recovery after a forced rebalance for zone is not supported yet [zonePartitionId={}].", new Object[]{replicaGrpId});
                shouldStartLocalGroupNode = localAssignmentInPending != null;
            }
        } else {
            shouldStartLocalGroupNode = localAssignmentInPending != null && localAssignmentInStable == null;
        }
        Assignments computedStableAssignments = PartitionReplicaLifecycleManager.getComputedStableAssignments(stableAssignments, pendingAssignments);
        if (shouldStartLocalGroupNode) {
            CatalogZoneDescriptor zoneDescriptor = this.zoneDescriptorAt(replicaGrpId.zoneId(), pendingAssignments.timestamp());
            localServicesStartFuture = this.createZonePartitionReplicationNode(replicaGrpId, computedStableAssignments, revision, zoneDescriptor.partitions(), this.isVolatileZone(zoneDescriptor), isRecovery, false);
        } else {
            localServicesStartFuture = pendingAssignmentsAreForced && localAssignmentInPending != null ? this.replicaMgr.resetWithRetry((ReplicationGroupId)replicaGrpId, PeersAndLearners.fromAssignments((Collection)computedStableAssignments.nodes()), revision) : CompletableFutures.nullCompletedFuture();
        }
        return ((CompletableFuture)localServicesStartFuture.thenComposeAsync(v -> (CompletionStage)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> this.isLocalNodeIsPrimary((ReplicationGroupId)replicaGrpId)), (Executor)this.ioExecutor)).thenAcceptAsync(isLeaseholder -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            boolean isLocalNodeInStableOrPending = this.isNodeInReducedStableOrPendingAssignments(replicaGrpId, stableAssignments, pendingAssignments, revision);
            if (!isLocalNodeInStableOrPending && !isLeaseholder.booleanValue()) {
                return;
            }
            assert (isLocalNodeInStableOrPending || isLeaseholder.booleanValue()) : "The local node is outside of the replication group [inStableOrPending=" + isLocalNodeInStableOrPending + ", isLeaseholder=" + isLeaseholder + "].";
            if (isRecovery && !this.replicaMgr.isReplicaStarted((ReplicationGroupId)replicaGrpId)) {
                return;
            }
            assert (this.replicaMgr.isReplicaStarted((ReplicationGroupId)replicaGrpId)) : "The local node is outside of the replication group [, groupId=" + replicaGrpId + ", stable=" + stableAssignments + ", pending=" + pendingAssignments + ", localName=" + this.localNode().name() + "].";
            Set newAssignments = pendingAssignmentsAreForced || stableAssignments == null ? pendingAssignmentsNodes : RebalanceUtil.union((Set)pendingAssignmentsNodes, (Set)stableAssignments.nodes());
            this.replicaMgr.replica((ReplicationGroupId)replicaGrpId).thenAccept(replica -> replica.updatePeersAndLearners(PeersAndLearners.fromAssignments((Collection)newAssignments)));
        }), (Executor)this.ioExecutor);
    }

    private static Assignments getComputedStableAssignments(@Nullable Assignments stableAssignments, Assignments pendingAssignments) {
        if (stableAssignments == null || stableAssignments.nodes().isEmpty()) {
            return Assignments.forced((Set)pendingAssignments.nodes(), (long)pendingAssignments.timestamp());
        }
        if (pendingAssignments.force()) {
            return pendingAssignments;
        }
        return stableAssignments;
    }

    private CatalogZoneDescriptor zoneDescriptorAt(int zoneId, long timestamp) {
        Catalog catalog = this.catalogService.activeCatalog(timestamp);
        assert (catalog != null) : "Catalog is not available at " + HybridTimestamp.nullableHybridTimestamp((long)timestamp);
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
        assert (zoneDescriptor != null) : "Zone descriptor is not available at " + HybridTimestamp.nullableHybridTimestamp((long)timestamp) + " for zone " + zoneId;
        return zoneDescriptor;
    }

    private void changePeersOnRebalance(ZonePartitionId replicaGrpId, Set<Assignment> pendingAssignments, long revision) {
        PeersAndLearners newConfiguration = PeersAndLearners.fromAssignments(pendingAssignments);
        ChangePeersAndLearnersWithRetry mover = this.createChangePeersAndLearnersWithRetry(replicaGrpId);
        mover.execute(newConfiguration, revision, raftClient -> this.ensureLeader(replicaGrpId, (RaftGroupService)raftClient).whenComplete((raftWithTerm, ex) -> {
            if (raftWithTerm != null) {
                LOG.info("Current node={} is the leader of partition raft group={}. Initiate rebalance process for partition={}, zoneId={}", new Object[]{this.localNode().name(), replicaGrpId, replicaGrpId.partitionId(), replicaGrpId.zoneId()});
            }
        })).whenComplete((res, ex) -> this.maybeRunFailHandler((Throwable)ex, replicaGrpId));
    }

    private void maybeRunFailHandler(Throwable ex, ZonePartitionId replicaGrpId) {
        if (ex != null && !ExceptionUtils.hasCause((Throwable)ex, (Class[])new Class[]{NodeStoppingException.class, ComponentStoppingException.class, RaftStaleUpdateException.class})) {
            String errorMessage = String.format("Failure while moving partition [partId=%s]", replicaGrpId);
            this.failureProcessor.process(new FailureContext(ex, errorMessage));
        }
    }

    private CompletableFuture<@Nullable IgniteBiTuple<RaftGroupService, Long>> ensureLeader(ZonePartitionId replicaGrpId, RaftGroupService raftClient) {
        return ((CompletableFuture)raftClient.refreshAndGetLeaderWithTerm().exceptionally(throwable -> {
            if (ExceptionUtils.hasCause((Throwable)throwable, (Class[])new Class[]{TimeoutException.class})) {
                LOG.info("Node couldn't get the leader within timeout so the changing peers is skipped [grp={}].", new Object[]{replicaGrpId});
            } else if (ExceptionUtils.hasCause((Throwable)throwable, (Class[])new Class[]{ComponentStoppingException.class})) {
                LOG.info("Replica is being stopped so the changing peers is skipped [grp={}].", new Object[]{replicaGrpId});
            } else {
                LOG.info("Failed to get a leader for the RAFT replication group [grp={}].", throwable, new Object[]{replicaGrpId});
            }
            return LeaderWithTerm.NO_LEADER;
        })).thenApply(leaderWithTerm -> {
            if (leaderWithTerm.isEmpty() || !this.isLocalPeer(leaderWithTerm.leader())) {
                return null;
            }
            return new IgniteBiTuple((Object)raftClient, (Object)leaderWithTerm.term());
        });
    }

    private boolean isLocalPeer(Peer peer) {
        return peer.consistentId().equals(this.localNode().name());
    }

    private boolean isLocalNodeInAssignments(Collection<Assignment> assignments) {
        return assignments.stream().anyMatch(this.isLocalNodeAssignment);
    }

    private CompletableFuture<Void> waitForMetadataCompleteness(long ts) {
        return this.executorInclinedSchemaSyncService.waitForMetadataCompleteness(HybridTimestamp.hybridTimestamp((long)ts));
    }

    private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId replicationGroupId) {
        HybridTimestamp currentSafeTime = this.metaStorageMgr.clusterTime().currentSafeTime();
        if (HybridTimestamp.MIN_VALUE.equals((Object)currentSafeTime)) {
            return CompletableFutures.falseCompletedFuture();
        }
        long skewMs = this.clockService.maxClockSkewMillis();
        try {
            HybridTimestamp previousMetastoreSafeTime = currentSafeTime.subtractPhysicalTime(skewMs);
            return this.executorInclinedPlacementDriver.getPrimaryReplica(replicationGroupId, previousMetastoreSafeTime).thenApply(replicaMeta -> replicaMeta != null && replicaMeta.getLeaseholderId() != null && replicaMeta.getLeaseholderId().equals(this.localNode().id()));
        }
        catch (IllegalArgumentException e) {
            long currentSafeTimeMs = currentSafeTime.longValue();
            throw new AssertionError("Got a negative time [currentSafeTime=" + currentSafeTime + ", currentSafeTimeMs=" + currentSafeTimeMs + ", skewMs=" + skewMs + ", internal=" + (currentSafeTimeMs + (-skewMs << 16)) + "]", e);
        }
    }

    private boolean isNodeInReducedStableOrPendingAssignments(ZonePartitionId replicaGrpId, @Nullable Assignments stableAssignments, Assignments pendingAssignments, long revision) {
        Entry reduceEntry = this.metaStorageMgr.getLocally(ZoneRebalanceUtil.switchReduceKey((ZonePartitionId)replicaGrpId), revision);
        Assignments reduceAssignments = reduceEntry != null ? Assignments.fromBytes((byte[])reduceEntry.value()) : null;
        Set reducedStableAssignments = reduceAssignments != null ? RebalanceUtil.subtract((Set)stableAssignments.nodes(), (Set)reduceAssignments.nodes()) : stableAssignments.nodes();
        return this.isLocalNodeInAssignments(RebalanceUtil.union((Set)reducedStableAssignments, (Set)pendingAssignments.nodes()));
    }

    @Nullable
    private Assignment localAssignment(@Nullable Assignments assignments) {
        if (assignments != null) {
            for (Assignment assignment : assignments.nodes()) {
                if (!this.isLocalNodeAssignment.test(assignment)) continue;
                return assignment;
            }
        }
        return null;
    }

    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        try {
            this.metricManager.unregisterSource((MetricSource)this.zoneResourcesManager.snapshotsMetricsSource());
            IgniteUtils.closeAllManually((ManuallyCloseable[])new ManuallyCloseable[]{this.zoneResourcesManager});
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    @Nullable
    private Assignments stableAssignments(ZonePartitionId zonePartitionId, long revision) {
        Entry entry = this.metaStorageMgr.getLocally(ZoneRebalanceUtil.stablePartAssignmentsKey((ZonePartitionId)zonePartitionId), revision);
        return Assignments.fromBytes((byte[])entry.value());
    }

    @VisibleForTesting
    public CompletableFuture<Void> stopPartitionInternal(ZonePartitionId zonePartitionId, LocalPartitionReplicaEvent beforeReplicaStoppedEvent, LocalPartitionReplicaEvent afterReplicaStoppedEvent, long eventRevision, Consumer<Boolean> afterReplicaStopAction) {
        return this.executeUnderZoneWriteLock(zonePartitionId.zoneId(), () -> {
            LocalPartitionReplicaEventParameters eventParameters = new LocalPartitionReplicaEventParameters(zonePartitionId, eventRevision, false);
            return this.fireEvent(beforeReplicaStoppedEvent, (EventParameters)eventParameters).thenCompose(v -> {
                try {
                    return this.replicaMgr.stopReplica((ReplicationGroupId)zonePartitionId).thenComposeAsync(replicaWasStopped -> {
                        this.closeTrackers(zonePartitionId);
                        afterReplicaStopAction.accept((Boolean)replicaWasStopped);
                        if (!replicaWasStopped.booleanValue()) {
                            return CompletableFutures.nullCompletedFuture();
                        }
                        this.startedReplicationGroups.afterStoppingGroup(zonePartitionId);
                        return this.fireEvent(afterReplicaStoppedEvent, (EventParameters)eventParameters);
                    }, (Executor)this.ioExecutor);
                }
                catch (NodeStoppingException e) {
                    return CompletableFutures.nullCompletedFuture();
                }
            });
        });
    }

    private void closeTrackers(ZonePartitionId zonePartitionId) {
        ZoneResourcesManager.ZonePartitionResources replicaResources = this.zonePartitionResourcesOrNull(zonePartitionId);
        if (replicaResources != null) {
            replicaResources.closeTrackers();
        }
    }

    private void cleanUpPartitionsResources(Stream<ZonePartitionId> partitionIds) {
        block2: {
            CompletableFuture[] stopPartitionsFuture = (CompletableFuture[])partitionIds.map(zonePartitionId -> this.stopPartitionInternal((ZonePartitionId)zonePartitionId, LocalPartitionReplicaEvent.BEFORE_REPLICA_STOPPED, LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, -1L, replicaWasStopped -> {})).toArray(CompletableFuture[]::new);
            try {
                CompletableFuture<Void> fut = CompletableFuture.allOf(stopPartitionsFuture);
                CompletableFuture.delayedExecutor(30L, TimeUnit.SECONDS, Runnable::run).execute(() -> {
                    if (!fut.isDone()) {
                        this.printPartitionState(partitionIds);
                    }
                });
                fut.get();
            }
            catch (Throwable e) {
                if (PartitionReplicaLifecycleManager.isExpectedThrowableDuringResourcesStop(e)) break block2;
                this.failureProcessor.process(new FailureContext(e, "Unable to clean up zones resources"));
            }
        }
    }

    private static boolean isExpectedThrowableDuringResourcesStop(Throwable throwable) {
        return ExceptionUtils.hasCause((Throwable)throwable, (Class[])new Class[]{NodeStoppingException.class, ComponentStoppingException.class, TrackerClosedException.class, CancellationException.class, RecipientLeftException.class});
    }

    private void printPartitionState(Stream<ZonePartitionId> partitionIds) {
        List nonStoppedPartitions = partitionIds.filter(partId -> this.replicaMgr.replica((ReplicationGroupId)partId) != null).collect(Collectors.toList());
        int exceedLimit = nonStoppedPartitions.size() - 100;
        String partitionsStr = "There are still some partitions that are being stopped: " + S.toString(nonStoppedPartitions, (sb, e, i) -> sb.app(e).app(i < nonStoppedPartitions.size() - 1 ? ", " : "")) + (exceedLimit > 0 ? IgniteStringFormatter.format((String)" and {} more; ", (Object[])new Object[]{exceedLimit}) : "; ");
        ThreadUtils.dumpThreads((IgniteLogger)LOG, (String)partitionsStr, (boolean)false);
    }

    public CompletableFuture<Long> lockZoneForRead(int zoneId) {
        NaiveAsyncReadWriteLock lock = this.zonePartitionsLocks.computeIfAbsent(zoneId, id -> PartitionReplicaLifecycleManager.newZoneLock());
        return lock.readLock();
    }

    private static NaiveAsyncReadWriteLock newZoneLock() {
        return new NaiveAsyncReadWriteLock();
    }

    public void unlockZoneForRead(int zoneId, long stamp) {
        this.zonePartitionsLocks.get(zoneId).unlockRead(stamp);
    }

    public void loadTableListenerToZoneReplica(ZonePartitionId zonePartitionId, int tableId, TablePartitionReplicaProcessorFactory tablePartitionReplicaProcessorFactory, RaftTableProcessor raftTableProcessor, PartitionMvStorageAccess partitionMvStorageAccess, boolean onNodeRecovery) {
        ZoneResourcesManager.ZonePartitionResources resources = this.zonePartitionResources(zonePartitionId);
        resources.replicaListenerFuture().thenAccept(zoneReplicaListener -> zoneReplicaListener.addTableReplicaProcessor(tableId, tablePartitionReplicaProcessorFactory));
        if (onNodeRecovery) {
            resources.raftListener().addTableProcessorOnRecovery(tableId, raftTableProcessor);
        } else {
            resources.raftListener().addTableProcessor(tableId, raftTableProcessor);
        }
        resources.snapshotStorage().addMvPartition(tableId, partitionMvStorageAccess);
    }

    public CompletableFuture<Void> unloadTableResourcesFromZoneReplica(ZonePartitionId zonePartitionId, int tableId) {
        return this.zoneResourcesManager.removeTableResources(zonePartitionId, tableId);
    }

    public CompletableFuture<?> restartPartitionWithCleanUp(ZonePartitionId zonePartitionId, long revision, long assignmentsTimestamp) {
        return IgniteUtils.inBusyLockAsync((IgniteBusyLock)this.busyLock, () -> this.stopPartitionAndDestroyForRestart(zonePartitionId, revision).thenComposeAsync(unused -> {
            Assignments stableAssignments = ZoneRebalanceUtil.zoneStableAssignmentsGetLocally((MetaStorageManager)this.metaStorageMgr, (ZonePartitionId)zonePartitionId, (long)revision);
            assert (stableAssignments != null) : "zonePartitionId=" + zonePartitionId + ", revision=" + revision;
            return this.waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(unused2 -> IgniteUtils.inBusyLockAsync((IgniteBusyLock)this.busyLock, () -> {
                CatalogZoneDescriptor zoneDescriptor = this.zoneDescriptorAt(zonePartitionId.zoneId(), assignmentsTimestamp);
                return this.createZonePartitionReplicationNode(zonePartitionId, stableAssignments, revision, zoneDescriptor.partitions(), this.isVolatileZone(zoneDescriptor), false, false);
            }));
        }, (Executor)this.ioExecutor));
    }

    public CompletableFuture<?> restartPartition(ZonePartitionId zonePartitionId, long revision, long assignmentsTimestamp) {
        return IgniteUtils.inBusyLockAsync((IgniteBusyLock)this.busyLock, () -> this.stopPartitionForRestart(zonePartitionId, revision).thenComposeAsync(unused -> {
            Assignments stableAssignments = ZoneRebalanceUtil.zoneStableAssignmentsGetLocally((MetaStorageManager)this.metaStorageMgr, (ZonePartitionId)zonePartitionId, (long)revision);
            assert (stableAssignments != null) : "zonePartitionId=" + zonePartitionId + ", revision=" + revision;
            return this.waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(unused2 -> IgniteUtils.inBusyLockAsync((IgniteBusyLock)this.busyLock, () -> {
                Assignment localAssignment = this.localAssignment(stableAssignments);
                if (localAssignment == null) {
                    return CompletableFutures.nullCompletedFuture();
                }
                CatalogZoneDescriptor zoneDescriptor = this.zoneDescriptorAt(zonePartitionId.zoneId(), assignmentsTimestamp);
                return this.createZonePartitionReplicationNode(zonePartitionId, stableAssignments, revision, zoneDescriptor.partitions(), this.isVolatileZone(zoneDescriptor), false, false);
            }));
        }, (Executor)this.ioExecutor));
    }

    private <T> CompletableFuture<T> executeUnderZoneWriteLock(int zoneId, Supplier<CompletableFuture<T>> action) {
        NaiveAsyncReadWriteLock lock = this.zonePartitionsLocks.computeIfAbsent(zoneId, id -> PartitionReplicaLifecycleManager.newZoneLock());
        return lock.writeLock().thenCompose(stamp -> {
            try {
                return ((CompletableFuture)action.get()).whenComplete((v, e) -> lock.unlockWrite((long)stamp));
            }
            catch (Throwable e2) {
                lock.unlockWrite((long)stamp);
                return CompletableFuture.failedFuture(e2);
            }
        });
    }

    private CompletableFuture<Boolean> onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
        if (this.topologyService.localMember().id().equals(parameters.leaseholderId()) && parameters.groupId() instanceof ZonePartitionId) {
            ZonePartitionId groupId = (ZonePartitionId)parameters.groupId();
            this.replicaMgr.weakStopReplica((ReplicationGroupId)groupId, ReplicaManager.WeakReplicaStopReason.PRIMARY_EXPIRED, () -> this.stopAndDestroyPartition(groupId, parameters.causalityToken()));
        }
        return CompletableFutures.falseCompletedFuture();
    }

    private CompletableFuture<Void> stopAndDestroyPartition(ZonePartitionId zonePartitionId, long revision) {
        return this.stopPartitionInternal(zonePartitionId, LocalPartitionReplicaEvent.BEFORE_REPLICA_DESTROYED, LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED, revision, replicaWasStopped -> {
            if (replicaWasStopped.booleanValue()) {
                this.zoneResourcesManager.destroyZonePartitionResources(zonePartitionId);
                try {
                    this.replicaMgr.destroyReplicationProtocolStoragesDurably((ReplicationGroupId)zonePartitionId, false);
                }
                catch (NodeStoppingException e) {
                    throw new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)e);
                }
            }
        });
    }

    @TestOnly
    @Nullable
    public TxStatePartitionStorage txStatePartitionStorage(int zoneId, int partitionId) {
        return this.zoneResourcesManager.txStatePartitionStorage(zoneId, partitionId);
    }

    @TestOnly
    public HybridTimestamp currentSafeTimeForZonePartition(int zoneId, int partId) {
        return this.zonePartitionResources(new ZonePartitionId(zoneId, partId)).raftListener().currentSafeTime();
    }

    public ZoneResourcesManager.ZonePartitionResources zonePartitionResources(ZonePartitionId zonePartitionId) {
        ZoneResourcesManager.ZonePartitionResources resources = this.zonePartitionResourcesOrNull(zonePartitionId);
        assert (resources != null) : String.format("Missing resources for zone partition [zonePartitionId=%s]", zonePartitionId);
        return resources;
    }

    @Nullable
    public ZoneResourcesManager.ZonePartitionResources zonePartitionResourcesOrNull(ZonePartitionId zonePartitionId) {
        return this.zoneResourcesManager.getZonePartitionResources(zonePartitionId);
    }

    private void onZoneDrop(DropZoneEventParameters parameters) {
        IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            int eventCatalogVersion = parameters.catalogVersion();
            int catalogVersionWithZonePresent = eventCatalogVersion - 1;
            int zoneId = parameters.zoneId();
            CatalogZoneDescriptor zoneDescriptor = this.catalogService.catalog(catalogVersionWithZonePresent).zone(zoneId);
            assert (zoneDescriptor != null) : "Unexpected null zone descriptor for zoneId=" + zoneId + ", catalogVersion " + catalogVersionWithZonePresent;
            this.destructionEventsQueue.enqueue((Object)new DestroyZoneEvent(eventCatalogVersion, zoneId, zoneDescriptor.partitions()));
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Boolean> onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.falseCompletedFuture();
        }
        try {
            int newEarliestCatalogVersion = this.catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
            this.destructionEventsQueue.drainUpTo((long)newEarliestCatalogVersion).forEach(this::removeZonePartitionsIfPossible);
            CompletableFuture completableFuture = CompletableFutures.falseCompletedFuture();
            return completableFuture;
        }
        catch (Throwable t) {
            CompletableFuture<Boolean> completableFuture = CompletableFuture.failedFuture(t);
            return completableFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private void removeZonePartitionsIfPossible(DestroyZoneEvent event) {
        int zoneId = event.zoneId();
        int partitionsCount = event.partitions();
        ArrayList<CompletionStage> partitionsEligibilityForRemovalFutures = new ArrayList<CompletionStage>();
        for (int partitionIndex = 0; partitionIndex < partitionsCount; ++partitionIndex) {
            ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, partitionIndex);
            CompletionStage partitionRemovalFuture = ((CompletableFuture)this.startedReplicationGroups.hasReplicationGroupStartedOrAwaitIfStarting(zonePartitionId).thenComposeAsync(started -> {
                if (!started.booleanValue()) {
                    return CompletableFutures.falseCompletedFuture();
                }
                return IgniteUtils.inBusyLockAsync((IgniteBusyLock)this.busyLock, () -> this.isEligibleForDrop(zonePartitionId).thenCompose(eligible -> {
                    if (!eligible.booleanValue()) {
                        return CompletableFutures.falseCompletedFuture();
                    }
                    return ((CompletableFuture)this.stopAndDestroyPartition(zonePartitionId, -1L).thenCompose(v -> this.dropAssignments(zonePartitionId))).thenApply(v -> true);
                }));
            }, this.partitionOperationsExecutor)).exceptionally(e -> {
                if (!ExceptionUtils.hasCause((Throwable)e, (Class[])new Class[]{NodeStoppingException.class})) {
                    LOG.error("Unable to destroy zone partition [zonePartitionId={}]", e, new Object[]{zonePartitionId});
                }
                return false;
            });
            partitionsEligibilityForRemovalFutures.add(partitionRemovalFuture);
        }
        ((CompletableFuture)CompletableFuture.allOf(partitionsEligibilityForRemovalFutures.toArray(new CompletableFuture[0])).thenApply(fs -> partitionsEligibilityForRemovalFutures.stream().anyMatch(f -> (Boolean)f.join() == false))).thenAccept(anyFalse -> {
            if (anyFalse.booleanValue()) {
                this.destructionEventsQueue.enqueue((Object)event);
            } else {
                this.distributionZoneMgr.onDropZoneDestroy(zoneId, event.catalogVersion).whenComplete((r, e) -> {
                    if (e != null) {
                        LOG.error("Unable to destroy zone resources [zoneId={}]", e, new Object[]{zoneId});
                    }
                });
            }
        });
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private CompletableFuture<Boolean> isEligibleForDrop(ZonePartitionId zonePartitionId) {
        ZoneResourcesManager.ZonePartitionResources zonePartitionResources = this.zoneResourcesManager.getZonePartitionResources(zonePartitionId);
        if (zonePartitionResources == null) {
            return CompletableFutures.trueCompletedFuture();
        }
        try (Cursor cursor = zonePartitionResources.txStatePartitionStorage().scan();){
            if (!cursor.hasNext()) return this.zoneResourcesManager.areTableResourcesEmpty(zonePartitionId);
            CompletableFuture completableFuture = CompletableFutures.falseCompletedFuture();
            return completableFuture;
        }
        catch (TxStateStorageRebalanceException e) {
            return CompletableFutures.falseCompletedFuture();
        }
    }

    private CompletableFuture<Void> dropAssignments(ZonePartitionId zonePartitionId) {
        Set<ByteArray> assignmentKeys = Set.of(ZoneRebalanceUtil.stablePartAssignmentsKey((ZonePartitionId)zonePartitionId), ZoneRebalanceUtil.pendingPartAssignmentsQueueKey((ZonePartitionId)zonePartitionId), ZoneRebalanceUtil.pendingChangeTriggerKey((ZonePartitionId)zonePartitionId), ZoneRebalanceUtil.plannedPartAssignmentsKey((ZonePartitionId)zonePartitionId), ZoneRebalanceUtil.switchAppendKey((ZonePartitionId)zonePartitionId), ZoneRebalanceUtil.switchReduceKey((ZonePartitionId)zonePartitionId), ZoneRebalanceUtil.assignmentsChainKey((ZonePartitionId)zonePartitionId));
        return this.metaStorageMgr.removeAll(assignmentKeys).whenComplete((v, e) -> {
            if (e != null) {
                LOG.error("Failed to remove assignments from metastorage [zonePartitionId={}]", e, new Object[]{zonePartitionId});
            }
        });
    }

    private static boolean lastRebalanceWasGraceful(@Nullable AssignmentsChain assignmentsChain) {
        return assignmentsChain == null || assignmentsChain.size() == 1;
    }

    @FunctionalInterface
    public static interface TablePartitionReplicaProcessorFactory {
        public ReplicaTableProcessor createProcessor(RaftCommandRunner var1, TransactionStateResolver var2);
    }

    private static class DestroyZoneEvent {
        final int catalogVersion;
        final int zoneId;
        final int partitions;

        DestroyZoneEvent(int catalogVersion, int zoneId, int partitions) {
            this.catalogVersion = catalogVersion;
            this.zoneId = zoneId;
            this.partitions = partitions;
        }

        int catalogVersion() {
            return this.catalogVersion;
        }

        int zoneId() {
            return this.zoneId;
        }

        int partitions() {
            return this.partitions;
        }
    }
}

