/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table.distributed;

import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite3.cache.Cache;
import org.apache.ignite3.internal.catalog.Catalog;
import org.apache.ignite3.internal.catalog.CatalogService;
import org.apache.ignite3.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite3.internal.catalog.events.AddSecondaryZoneEventParameters;
import org.apache.ignite3.internal.catalog.events.AlterTablePropertiesEventParameters;
import org.apache.ignite3.internal.catalog.events.CatalogEvent;
import org.apache.ignite3.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite3.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite3.internal.catalog.events.DropSecondaryZoneEventParameters;
import org.apache.ignite3.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite3.internal.catalog.events.RenameTableEventParameters;
import org.apache.ignite3.internal.catalog.events.SecondaryStorageAvailableEventParameters;
import org.apache.ignite3.internal.causality.CompletionListener;
import org.apache.ignite3.internal.causality.IncrementalVersionedValue;
import org.apache.ignite3.internal.causality.OutdatedTokenException;
import org.apache.ignite3.internal.causality.RevisionListenerRegistry;
import org.apache.ignite3.internal.close.ManuallyCloseable;
import org.apache.ignite3.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite3.internal.components.LogSyncer;
import org.apache.ignite3.internal.components.NodeProperties;
import org.apache.ignite3.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite3.internal.configuration.utils.SystemDistributedConfigurationPropertyHolder;
import org.apache.ignite3.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite3.internal.distributionzones.rebalance.AssignmentUtil;
import org.apache.ignite3.internal.distributionzones.rebalance.PartitionMover;
import org.apache.ignite3.internal.distributionzones.rebalance.RebalanceRaftGroupEventsListener;
import org.apache.ignite3.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite3.internal.event.EventListener;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.hlc.HybridTimestampTracker;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.lowwatermark.LowWatermark;
import org.apache.ignite3.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
import org.apache.ignite3.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.manager.IgniteComponent;
import org.apache.ignite3.internal.marshaller.ReflectionMarshallersProvider;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.MetaStorageManager;
import org.apache.ignite3.internal.metastorage.Revisions;
import org.apache.ignite3.internal.metastorage.WatchEvent;
import org.apache.ignite3.internal.metastorage.WatchListener;
import org.apache.ignite3.internal.metrics.LongGauge;
import org.apache.ignite3.internal.metrics.MetricManager;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.MessagingService;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite3.internal.partition.replicator.LocalPartitionReplicaEvent;
import org.apache.ignite3.internal.partition.replicator.LocalPartitionReplicaEventParameters;
import org.apache.ignite3.internal.partition.replicator.NaiveAsyncReadWriteLock;
import org.apache.ignite3.internal.partition.replicator.PartitionReplicaLifecycleManager;
import org.apache.ignite3.internal.partition.replicator.ReliableCatalogVersions;
import org.apache.ignite3.internal.partition.replicator.ReplicaTableProcessor;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite3.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionKey;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite3.internal.partition.replicator.schema.CatalogValidationSchemasSource;
import org.apache.ignite3.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService;
import org.apache.ignite3.internal.partitiondistribution.Assignment;
import org.apache.ignite3.internal.partitiondistribution.Assignments;
import org.apache.ignite3.internal.partitiondistribution.AssignmentsChain;
import org.apache.ignite3.internal.partitiondistribution.AssignmentsQueue;
import org.apache.ignite3.internal.partitiondistribution.PartitionDistributionUtils;
import org.apache.ignite3.internal.placementdriver.PlacementDriver;
import org.apache.ignite3.internal.placementdriver.ReplicaMeta;
import org.apache.ignite3.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite3.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite3.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
import org.apache.ignite3.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite3.internal.raft.PeersAndLearners;
import org.apache.ignite3.internal.raft.RaftGroupEventsListener;
import org.apache.ignite3.internal.raft.service.RaftCommandRunner;
import org.apache.ignite3.internal.raft.service.RaftGroupService;
import org.apache.ignite3.internal.raft.storage.SnapshotStorageFactory;
import org.apache.ignite3.internal.replicator.PartitionGroupId;
import org.apache.ignite3.internal.replicator.Replica;
import org.apache.ignite3.internal.replicator.ReplicaManager;
import org.apache.ignite3.internal.replicator.ReplicaService;
import org.apache.ignite3.internal.replicator.ReplicationGroupId;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.replicator.TransientReplicaStartException;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite3.internal.replicator.listener.ReplicaListener;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite3.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite3.internal.replicator.message.ReplicaRequest;
import org.apache.ignite3.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite3.internal.schema.SchemaManager;
import org.apache.ignite3.internal.schema.SchemaRegistry;
import org.apache.ignite3.internal.schema.SchemaSyncService;
import org.apache.ignite3.internal.schema.catalog.CatalogToSchemaDescriptorConverter;
import org.apache.ignite3.internal.schema.configuration.GcConfiguration;
import org.apache.ignite3.internal.secondarystoragebridge.SecondaryStorageBridge;
import org.apache.ignite3.internal.secondarystoragebridge.ThreadAssertingSecondaryStorageBridge;
import org.apache.ignite3.internal.secondarystoragebridge.rocksdb.RocksDbSecondaryStorageBridge;
import org.apache.ignite3.internal.storage.DataStorageManager;
import org.apache.ignite3.internal.storage.MvPartitionStorage;
import org.apache.ignite3.internal.storage.StorageClosedException;
import org.apache.ignite3.internal.storage.StorageDestroyedException;
import org.apache.ignite3.internal.storage.engine.MvTableStorage;
import org.apache.ignite3.internal.storage.engine.StorageEngine;
import org.apache.ignite3.internal.storage.engine.StorageTableDescriptor;
import org.apache.ignite3.internal.storage.metrics.StorageEngineTablesMetricSource;
import org.apache.ignite3.internal.storage.secondary.SecondaryStorageEngine;
import org.apache.ignite3.internal.storage.secondary.SecondaryStorageTableDescriptor;
import org.apache.ignite3.internal.storage.secondary.SecondaryTableStorage;
import org.apache.ignite3.internal.table.IgniteTablesInternal;
import org.apache.ignite3.internal.table.InternalTable;
import org.apache.ignite3.internal.table.LongPriorityQueue;
import org.apache.ignite3.internal.table.StreamerReceiverRunner;
import org.apache.ignite3.internal.table.TableImpl;
import org.apache.ignite3.internal.table.TableViewInternal;
import org.apache.ignite3.internal.table.distributed.BitSetPartitionSet;
import org.apache.ignite3.internal.table.distributed.CatalogStorageIndexDescriptorSupplier;
import org.apache.ignite3.internal.table.distributed.PartitionModificationCounter;
import org.apache.ignite3.internal.table.distributed.PartitionModificationCounterFactory;
import org.apache.ignite3.internal.table.distributed.PartitionModificationCounterMetricSource;
import org.apache.ignite3.internal.table.distributed.PartitionReplicatorNodeRecovery;
import org.apache.ignite3.internal.table.distributed.PartitionSet;
import org.apache.ignite3.internal.table.distributed.PartitionUpdateHandlers;
import org.apache.ignite3.internal.table.distributed.SecondaryStorageRebalanceTrigger;
import org.apache.ignite3.internal.table.distributed.SecondaryZoneManager;
import org.apache.ignite3.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite3.internal.table.distributed.TableAssignmentsService;
import org.apache.ignite3.internal.table.distributed.TableIndexStoragesSupplier;
import org.apache.ignite3.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite3.internal.table.distributed.TableStatsStalenessConfiguration;
import org.apache.ignite3.internal.table.distributed.TableUtils;
import org.apache.ignite3.internal.table.distributed.expiration.ExpiredRowsCleaner;
import org.apache.ignite3.internal.table.distributed.expiration.ExpiredRowsCleanerImpl;
import org.apache.ignite3.internal.table.distributed.expiration.configuration.ExpirationConfiguration;
import org.apache.ignite3.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite3.internal.table.distributed.gc.MvGc;
import org.apache.ignite3.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite3.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite3.internal.table.distributed.index.IndexUtils;
import org.apache.ignite3.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
import org.apache.ignite3.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite3.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
import org.apache.ignite3.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl;
import org.apache.ignite3.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
import org.apache.ignite3.internal.table.distributed.raft.snapshot.TablePartitionKey;
import org.apache.ignite3.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite3.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite3.internal.table.distributed.replicator.secondary.SecondaryReplicationManager;
import org.apache.ignite3.internal.table.distributed.replicator.secondary.ZoneSecondaryReplicationManager;
import org.apache.ignite3.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite3.internal.table.distributed.schema.SchemaVersionsImpl;
import org.apache.ignite3.internal.table.distributed.storage.BrokenTxStateStorage;
import org.apache.ignite3.internal.table.distributed.storage.ContinuousQueryResponseHandler;
import org.apache.ignite3.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite3.internal.table.distributed.storage.NullStorageEngine;
import org.apache.ignite3.internal.table.distributed.storage.PartitionStorages;
import org.apache.ignite3.internal.table.metrics.TableMetricSource;
import org.apache.ignite3.internal.table.policy.RlsPolicyManager;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.tx.LockManager;
import org.apache.ignite3.internal.tx.TxManager;
import org.apache.ignite3.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite3.internal.tx.configuration.TransactionView;
import org.apache.ignite3.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite3.internal.tx.impl.TransactionInflights;
import org.apache.ignite3.internal.tx.impl.TxMessageSender;
import org.apache.ignite3.internal.tx.storage.state.ThreadAssertingTxStateStorage;
import org.apache.ignite3.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite3.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite3.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite3.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.Cursor;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.util.Lazy;
import org.apache.ignite3.internal.util.PendingComparableValuesTracker;
import org.apache.ignite3.internal.util.SafeTimeValuesTracker;
import org.apache.ignite3.internal.utils.InternalTableProvider;
import org.apache.ignite3.internal.utils.RebalanceUtilEx;
import org.apache.ignite3.internal.worker.ThreadAssertions;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.lang.IgniteException;
import org.apache.ignite3.sql.IgniteSql;
import org.apache.ignite3.table.QualifiedName;
import org.apache.ignite3.table.QualifiedNameHelper;
import org.apache.ignite3.table.Table;
import org.gridgain.internal.encryption.EncryptionManager;
import org.gridgain.internal.license.LicenseFeatureChecker;
import org.gridgain.internal.license.MissingRequiredFeaturesException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class TableManager
implements IgniteTablesInternal,
IgniteComponent {
    private static final IgniteLogger LOG = Loggers.forClass(TableManager.class);
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private static final PartitionReplicationMessagesFactory TABLE_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    private static final String SECONDARY_STORE_BRIDGE_DIR = "secondary-store-bridge";
    private final TopologyService topologyService;
    private final ReplicaManager replicaMgr;
    private final LockManager lockMgr;
    private final ReplicaService replicaSvc;
    private final TxManager txManager;
    private final MetaStorageManager metaStorageMgr;
    private final DataStorageManager dataStorageMgr;
    private final TransactionStateResolver transactionStateResolver;
    private final ExpiredRowsCleaner expiredRowsCleaner;
    private final EncryptionManager encryptionManager;
    private final IncrementalVersionedValue<Void> tablesVv;
    private final IncrementalVersionedValue<Void> localPartitionsVv;
    private final IncrementalVersionedValue<Void> assignmentsUpdatedVv;
    private final Map<Integer, TableViewInternal> tables = new ConcurrentHashMap<Integer, TableViewInternal>();
    private final Map<Integer, TableViewInternal> startedTables = new ConcurrentHashMap<Integer, TableViewInternal>();
    private final LongPriorityQueue<DestroyTableEvent> destructionEventsQueue = new LongPriorityQueue<DestroyTableEvent>(DestroyTableEvent::catalogVersion);
    private final Map<Integer, PartitionSet> localPartsByTableId = new ConcurrentHashMap<Integer, PartitionSet>();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean beforeStopGuard = new AtomicBoolean();
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final SchemaManager schemaManager;
    private final TxStateRocksDbSharedStorage sharedTxStateStorage;
    private final ExecutorService scanRequestExecutor;
    private final ExecutorService ioExecutor;
    private final ClockService clockService;
    private final OutgoingSnapshotsManager outgoingSnapshotsManager;
    private final DistributionZoneManager distributionZoneManager;
    private final SchemaSyncService executorInclinedSchemaSyncService;
    private final CatalogService catalogService;
    private final FailureProcessor failureProcessor;
    private final ThreadPoolExecutor incomingSnapshotsExecutor;
    private final WatchListener pendingAssignmentsRebalanceListener;
    private final WatchListener stableAssignmentsRebalanceListener;
    private final WatchListener assignmentsSwitchRebalanceListener;
    private final MvGc mvGc;
    private final LowWatermark lowWatermark;
    private final HybridTimestampTracker observableTimestampTracker;
    private final PlacementDriver executorInclinedPlacementDriver;
    private final Supplier<IgniteSql> sql;
    private final SchemaVersions schemaVersions;
    private final PartitionReplicatorNodeRecovery partitionReplicatorNodeRecovery;
    private final CompletableFuture<Void> stopManagerFuture = new CompletableFuture();
    private final SecondaryStorageRebalanceTrigger secondaryStorageRebalanceTrigger;
    private final ReplicationConfiguration replicationConfiguration;
    private final Executor partitionOperationsExecutor;
    private final ScheduledExecutorService rebalanceScheduler;
    private final ReflectionMarshallersProvider marshallers = new ReflectionMarshallersProvider();
    private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
    private final RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry;
    private final SecondaryStorageBridge secondaryStorageBridge;
    private final TransactionInflights transactionInflights;
    private final TransactionConfiguration txCfg;
    private final String nodeName;
    private final PartitionReplicaLifecycleManager partitionReplicaLifecycleManager;
    private final ContinuousQueryResponseHandler continuousQueryResponseHandler = new ContinuousQueryResponseHandler();
    private final MessagingService messagingService;
    private final NodeProperties nodeProperties;
    @Nullable
    private ScheduledExecutorService streamerFlushExecutor;
    private final IndexMetaStorage indexMetaStorage;
    private final Predicate<Assignment> isLocalNodeAssignment = assignment -> assignment.consistentId().equals(this.localNode().name());
    private final MinimumRequiredTimeCollectorService minTimeCollectorService;
    @Nullable
    private StreamerReceiverRunner streamerReceiverRunner;
    private final CompletableFuture<Void> readyToProcessReplicaStarts = new CompletableFuture();
    private final Map<Integer, Set<TableViewInternal>> tablesPerZone = new HashMap<Integer, Set<TableViewInternal>>();
    private final Map<Integer, NaiveAsyncReadWriteLock> tablesPerZoneLocks = new ConcurrentHashMap<Integer, NaiveAsyncReadWriteLock>();
    private final LicenseFeatureChecker licenseFeatureChecker;
    private final SecondaryReplicationManager secondaryReplicationManager;
    private final SystemDistributedConfigurationPropertyHolder<Integer> rebalanceRetryDelayConfiguration;
    private final EventListener<LocalPartitionReplicaEventParameters> onBeforeZoneReplicaStartedListener = this::beforeZoneReplicaStarted;
    private final EventListener<LocalPartitionReplicaEventParameters> onZoneReplicaStoppedListener = this::onZoneReplicaStopped;
    private final EventListener<LocalPartitionReplicaEventParameters> onZoneReplicaDestroyedListener = this::onZoneReplicaDestroyed;
    private final EventListener<LocalPartitionReplicaEventParameters> onBeforeZoneReplicaStopOrDestroyListener = this::onBeforeZoneReplicaStopOrDestroy;
    private final EventListener<CreateTableEventParameters> onTableCreateWithColocationListener = this::loadTableToZoneOnTableCreate;
    private final EventListener<CreateTableEventParameters> onTableCreateWithoutColocationListener = this::onTableCreate;
    private final EventListener<DropTableEventParameters> onTableDropListener = EventListener.fromConsumer(this::onTableDrop);
    private final EventListener<CatalogEventParameters> onTableAlterListener = this::onTableAlter;
    private final EventListener<ChangeLowWatermarkEventParameters> onLowWatermarkChangedListener = this::onLwmChanged;
    private final EventListener<PrimaryReplicaEventParameters> onPrimaryReplicaExpiredListener = this::onPrimaryReplicaExpired;
    private final EventListener<PrimaryReplicaEventParameters> onPrimaryReplicaElectedListener = this::onPrimaryReplicaElected;
    private final EventListener<SecondaryStorageAvailableEventParameters> onSecondaryStorageAvailableListener = this::onSecondaryStorageAvailable;
    private final TableAssignmentsService assignmentsService;
    private final ReliableCatalogVersions reliableCatalogVersions;
    private final MetricManager metricManager;
    private final PartitionModificationCounterFactory partitionModificationCounterFactory;
    private final Map<TablePartitionId, PartitionModificationCounterMetricSource> partModCounterMetricSources = new ConcurrentHashMap<TablePartitionId, PartitionModificationCounterMetricSource>();
    private final SecondaryZoneManager secondaryZoneManager;
    private final RlsPolicyManager policyManager;
    private final ClusterManagementGroupManager cmgManager;

    public TableManager(String nodeName, RevisionListenerRegistry registry, GcConfiguration gcConfig, TransactionConfiguration txCfg, ReplicationConfiguration replicationConfiguration, MessagingService messagingService, TopologyService topologyService, MessageSerializationRegistry messageSerializationRegistry, ReplicaManager replicaMgr, LockManager lockMgr, ReplicaService replicaSvc, TxManager txManager, DataStorageManager dataStorageMgr, Path storagePath, TxStateRocksDbSharedStorage txStateRocksDbSharedStorage, MetaStorageManager metaStorageMgr, SchemaManager schemaManager, ExecutorService ioExecutor, Executor partitionOperationsExecutor, ScheduledExecutorService rebalanceScheduler, ScheduledExecutorService commonScheduler, ClockService clockService, OutgoingSnapshotsManager outgoingSnapshotsManager, DistributionZoneManager distributionZoneManager, SchemaSyncService schemaSyncService, CatalogService catalogService, FailureProcessor failureProcessor, HybridTimestampTracker observableTimestampTracker, PlacementDriver placementDriver, Supplier<IgniteSql> sql, RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry, ExpirationConfiguration expirationConfiguration, LowWatermark lowWatermark, TransactionInflights transactionInflights, IndexMetaStorage indexMetaStorage, EncryptionManager encryptionManager, LogSyncer logSyncer, PartitionReplicaLifecycleManager partitionReplicaLifecycleManager, NodeProperties nodeProperties, MinimumRequiredTimeCollectorService minTimeCollectorService, SystemDistributedConfiguration systemDistributedConfiguration, MetricManager metricManager, LicenseFeatureChecker licenseFeatureChecker, PartitionModificationCounterFactory partitionModificationCounterFactory, ClusterManagementGroupManager cmgManager) {
        this.topologyService = topologyService;
        this.replicaMgr = replicaMgr;
        this.lockMgr = lockMgr;
        this.replicaSvc = replicaSvc;
        this.txManager = txManager;
        this.dataStorageMgr = dataStorageMgr;
        this.metaStorageMgr = metaStorageMgr;
        this.schemaManager = schemaManager;
        this.ioExecutor = ioExecutor;
        this.partitionOperationsExecutor = partitionOperationsExecutor;
        this.rebalanceScheduler = rebalanceScheduler;
        this.clockService = clockService;
        this.outgoingSnapshotsManager = outgoingSnapshotsManager;
        this.distributionZoneManager = distributionZoneManager;
        this.catalogService = catalogService;
        this.failureProcessor = failureProcessor;
        this.observableTimestampTracker = observableTimestampTracker;
        this.sql = sql;
        this.replicationConfiguration = replicationConfiguration;
        this.remotelyTriggeredResourceRegistry = remotelyTriggeredResourceRegistry;
        this.lowWatermark = lowWatermark;
        this.transactionInflights = transactionInflights;
        this.txCfg = txCfg;
        this.nodeName = nodeName;
        this.indexMetaStorage = indexMetaStorage;
        this.encryptionManager = encryptionManager;
        this.messagingService = messagingService;
        this.partitionReplicaLifecycleManager = partitionReplicaLifecycleManager;
        this.nodeProperties = nodeProperties;
        this.minTimeCollectorService = minTimeCollectorService;
        this.metricManager = metricManager;
        this.licenseFeatureChecker = licenseFeatureChecker;
        this.partitionModificationCounterFactory = partitionModificationCounterFactory;
        this.executorInclinedSchemaSyncService = new ExecutorInclinedSchemaSyncService(schemaSyncService, partitionOperationsExecutor);
        this.executorInclinedPlacementDriver = new ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
        this.reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService);
        this.cmgManager = cmgManager;
        TxMessageSender txMessageSender = new TxMessageSender(messagingService, replicaSvc, clockService);
        this.transactionStateResolver = new TransactionStateResolver(txManager, clockService, topologyService, messagingService, this.executorInclinedPlacementDriver, txMessageSender);
        InternalTableProvider internalTableProvider = tableId -> {
            TableViewInternal table = this.cachedTable(tableId);
            if (table == null) {
                return null;
            }
            return table.internalTable();
        };
        this.expiredRowsCleaner = new ExpiredRowsCleanerImpl(nodeName, topologyService, clockService, observableTimestampTracker, internalTableProvider, catalogService, schemaManager, txManager, expirationConfiguration, placementDriver, metricManager, replicaMgr, licenseFeatureChecker, nodeProperties);
        this.schemaVersions = new SchemaVersionsImpl(this.executorInclinedSchemaSyncService, catalogService, clockService);
        this.tablesVv = new IncrementalVersionedValue("TableManager#tables", registry, 100, null);
        this.localPartitionsVv = new IncrementalVersionedValue("TableManager#localPartitions", IncrementalVersionedValue.dependingOn(this.tablesVv));
        this.assignmentsUpdatedVv = new IncrementalVersionedValue("TableManager#assignmentsUpdated", IncrementalVersionedValue.dependingOn(this.localPartitionsVv));
        this.scanRequestExecutor = Executors.newSingleThreadExecutor(IgniteThreadFactory.create(nodeName, "scan-query-executor", LOG, ThreadOperation.STORAGE_READ));
        int cpus = Runtime.getRuntime().availableProcessors();
        this.incomingSnapshotsExecutor = new ThreadPoolExecutor(cpus, cpus, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), IgniteThreadFactory.create(nodeName, "incoming-raft-snapshot", LOG, ThreadOperation.STORAGE_READ, ThreadOperation.STORAGE_WRITE));
        this.incomingSnapshotsExecutor.allowCoreThreadTimeOut(true);
        this.pendingAssignmentsRebalanceListener = this.createPendingAssignmentsRebalanceListener();
        this.stableAssignmentsRebalanceListener = this.createStableAssignmentsRebalanceListener();
        this.assignmentsSwitchRebalanceListener = this.createAssignmentsSwitchRebalanceListener();
        this.mvGc = new MvGc(nodeName, gcConfig, lowWatermark, failureProcessor);
        this.partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery(metaStorageMgr, messagingService, topologyService, partitionOperationsExecutor, tableId -> this.tablesById().get(tableId));
        this.sharedTxStateStorage = txStateRocksDbSharedStorage;
        this.secondaryStorageRebalanceTrigger = new SecondaryStorageRebalanceTrigger(metaStorageMgr, distributionZoneManager, catalogService);
        this.fullStateTransferIndexChooser = new FullStateTransferIndexChooser(catalogService, lowWatermark, indexMetaStorage);
        this.rebalanceRetryDelayConfiguration = new SystemDistributedConfigurationPropertyHolder<Integer>(systemDistributedConfiguration, (v, r) -> {}, "rebalanceRetryDelay", 200, Integer::parseInt);
        this.assignmentsService = new TableAssignmentsService(metaStorageMgr, catalogService, distributionZoneManager, failureProcessor);
        partitionReplicaLifecycleManager.listen(LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, this.onZoneReplicaStoppedListener);
        partitionReplicaLifecycleManager.listen(LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED, this.onZoneReplicaDestroyedListener);
        partitionReplicaLifecycleManager.listen(LocalPartitionReplicaEvent.BEFORE_REPLICA_STOPPED, this.onBeforeZoneReplicaStopOrDestroyListener);
        partitionReplicaLifecycleManager.listen(LocalPartitionReplicaEvent.BEFORE_REPLICA_DESTROYED, this.onBeforeZoneReplicaStopOrDestroyListener);
        partitionReplicaLifecycleManager.listen(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, this.onBeforeZoneReplicaStartedListener);
        this.secondaryStorageBridge = TableManager.createSecondaryStoreBridge(nodeName, storagePath.resolve(SECONDARY_STORE_BRIDGE_DIR), logSyncer, failureProcessor, ioExecutor, commonScheduler);
        this.secondaryReplicationManager = new ZoneSecondaryReplicationManager(schemaManager, txManager, replicaSvc, lowWatermark, clockService, topologyService, catalogService, placementDriver, failureProcessor, this::getTablesIfZoneIsSecondary, metaStorageMgr, ioExecutor);
        this.secondaryZoneManager = new SecondaryZoneManager(partitionReplicaLifecycleManager, catalogService, schemaManager, this.secondaryStorageBridge, this.executorInclinedSchemaSyncService, remotelyTriggeredResourceRegistry, lowWatermark, this.busyLock, partitionOperationsExecutor);
        this.policyManager = new RlsPolicyManager(catalogService, schemaManager, lowWatermark, commonScheduler);
    }

    @Override
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            this.mvGc.start();
            this.transactionStateResolver.start();
            this.fullStateTransferIndexChooser.start();
            this.rebalanceRetryDelayConfiguration.init();
            this.cleanUpResourcesForDroppedTablesOnRecoveryBusy();
            if (!this.nodeProperties.colocationEnabled()) {
                this.metaStorageMgr.registerPrefixWatch(new ByteArray(RebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES), this.pendingAssignmentsRebalanceListener);
                this.metaStorageMgr.registerPrefixWatch(new ByteArray(RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES), this.stableAssignmentsRebalanceListener);
                this.metaStorageMgr.registerPrefixWatch(new ByteArray(RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES), this.assignmentsSwitchRebalanceListener);
            }
            this.catalogService.listen(CatalogEvent.TABLE_CREATE, this.nodeProperties.colocationEnabled() ? this.onTableCreateWithColocationListener : this.onTableCreateWithoutColocationListener);
            this.catalogService.listen(CatalogEvent.TABLE_DROP, this.onTableDropListener);
            this.catalogService.listen(CatalogEvent.TABLE_ALTER, this.onTableAlterListener);
            this.catalogService.listen(CatalogEvent.SECONDARY_STORAGE_AVAILABLE, this.onSecondaryStorageAvailableListener);
            this.lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED, this.onLowWatermarkChangedListener);
            this.partitionReplicatorNodeRecovery.start();
            this.secondaryStorageRebalanceTrigger.start();
            this.expiredRowsCleaner.start();
            this.secondaryStorageBridge.start();
            this.policyManager.start();
            this.messagingService.addMessageHandler(ReplicaMessageGroup.class, this.continuousQueryResponseHandler);
            this.executorInclinedPlacementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.onPrimaryReplicaExpiredListener);
            this.executorInclinedPlacementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this.onPrimaryReplicaElectedListener);
            CompletableFuture<Revisions> recoveryFinishFuture = this.metaStorageMgr.recoveryFinishedFuture();
            assert (recoveryFinishFuture.isDone());
            long recoveryRevision = recoveryFinishFuture.join().revision();
            this.secondaryReplicationManager.start();
            return this.recoverTables(recoveryRevision, this.lowWatermark.getLowWatermark()).thenCompose(v -> this.processAssignmentsOnRecovery(recoveryRevision));
        });
    }

    private CompletableFuture<Void> waitForMetadataCompleteness(long ts) {
        return this.executorInclinedSchemaSyncService.waitForMetadataCompleteness(HybridTimestamp.hybridTimestamp(ts));
    }

    private CompletableFuture<Boolean> beforeZoneReplicaStarted(LocalPartitionReplicaEventParameters parameters) {
        if (!this.nodeProperties.colocationEnabled() && !parameters.onRecovery()) {
            return CompletableFutures.falseCompletedFuture();
        }
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> ((CompletableFuture)this.readyToProcessReplicaStarts.thenCompose(v -> this.beforeZoneReplicaStartedImpl(parameters))).thenApply(unused -> false));
    }

    private CompletableFuture<Void> beforeZoneReplicaStartedImpl(LocalPartitionReplicaEventParameters parameters) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            ZonePartitionId zonePartitionId = parameters.zonePartitionId();
            boolean onRecovery = parameters.onRecovery();
            NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new NaiveAsyncReadWriteLock());
            CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock();
            try {
                return ((CompletableFuture)readLockAcquisitionFuture.thenCompose(stamp -> {
                    Set<TableViewInternal> zoneTables = this.zoneTablesRawSet(zonePartitionId.zoneId());
                    if (zoneTables.isEmpty()) {
                        return CompletableFutures.nullCompletedFuture();
                    }
                    if (this.nodeProperties.colocationEnabled()) {
                        return CompletableFuture.allOf(this.createPartitionsAndLoadResourcesToZoneReplica(zonePartitionId, zoneTables, onRecovery), this.createPartitionsAndLoadResourcesToSecondaryZoneReplica(zonePartitionId, zoneTables, onRecovery));
                    }
                    assert (onRecovery);
                    return this.createPartitionsAndLoadResourcesToSecondaryZoneReplica(zonePartitionId, zoneTables, onRecovery);
                })).whenComplete((unused, t) -> readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead));
            }
            catch (Throwable t2) {
                readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
                return CompletableFuture.failedFuture(t2);
            }
        });
    }

    private CompletableFuture<Void> createPartitionsAndLoadResourcesToZoneReplica(ZonePartitionId zonePartitionId, Set<TableViewInternal> zoneTables, boolean onRecovery) {
        int partitionIndex = zonePartitionId.partitionId();
        PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex);
        CompletableFuture[] futures = (CompletableFuture[])zoneTables.stream().filter(tbl -> tbl.zoneId() == zonePartitionId.zoneId()).map(tbl -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> ((CompletableFuture)this.getOrCreatePartitionStorages((TableViewInternal)tbl, singlePartitionIdSet).thenRunAsync(() -> IgniteUtils.inBusyLock(this.busyLock, () -> {
            this.localPartsByTableId.compute(tbl.tableId(), (tableId, oldPartitionSet) -> TableManager.extendPartitionSet(oldPartitionSet, partitionIndex));
            this.lowWatermark.getLowWatermarkSafe(lwm -> IndexUtils.registerIndexesToTable(tbl, this.catalogService, singlePartitionIdSet, tbl.schemaView(), lwm));
            this.preparePartitionResourcesAndLoadToZoneReplicaBusy((TableViewInternal)tbl, zonePartitionId, onRecovery);
        }), this.ioExecutor)).exceptionally(TableManager.ignoreTableClosedException()))).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(futures);
    }

    private CompletableFuture<Void> createPartitionsAndLoadResourcesToSecondaryZoneReplica(ZonePartitionId zonePartitionId, Set<TableViewInternal> zoneTables, boolean onRecovery) {
        CompletableFuture<Void> loadSecondaryResourcesFuture = CompletableFuture.runAsync(() -> {
            for (TableViewInternal table : zoneTables) {
                Integer secondaryZoneId = table.internalTable().secondaryZoneId();
                if (secondaryZoneId == null || secondaryZoneId.intValue() != zonePartitionId.zoneId()) continue;
                this.secondaryZoneManager.prepareSecondaryPartitionResourcesAndLoadToZoneReplica(table, zonePartitionId, onRecovery);
            }
        }, this.ioExecutor);
        return loadSecondaryResourcesFuture.exceptionally((Function)TableManager.ignoreTableClosedException());
    }

    private static Function<Throwable, Void> ignoreTableClosedException() {
        return ex -> {
            if (ExceptionUtils.hasCause(ex, TableClosedException.class)) {
                return null;
            }
            throw (RuntimeException)ExceptionUtils.sneakyThrow(ex);
        };
    }

    private CompletableFuture<Boolean> onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) {
        if (!this.nodeProperties.colocationEnabled()) {
            return CompletableFutures.falseCompletedFuture();
        }
        ZonePartitionId zonePartitionId = parameters.zonePartitionId();
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock();
        try {
            return ((CompletableFuture)((CompletableFuture)readLockAcquisitionFuture.thenCompose(stamp -> {
                CompletableFuture[] futures = (CompletableFuture[])this.zoneTablesRawSet(zonePartitionId.zoneId()).stream().map(this::stopTablePartitions).toArray(CompletableFuture[]::new);
                return CompletableFuture.allOf(futures);
            })).whenComplete((v, t) -> readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead))).thenApply(v -> false);
        }
        catch (Throwable t2) {
            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
            return CompletableFuture.failedFuture(t2);
        }
    }

    private CompletableFuture<Boolean> onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) {
        if (!this.nodeProperties.colocationEnabled()) {
            return CompletableFutures.falseCompletedFuture();
        }
        ZonePartitionId zonePartitionId = parameters.zonePartitionId();
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock();
        try {
            return ((CompletableFuture)((CompletableFuture)readLockAcquisitionFuture.thenCompose(stamp -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
                CompletableFuture[] futures = (CompletableFuture[])this.zoneTablesRawSet(zonePartitionId.zoneId()).stream().map(table -> CompletableFuture.supplyAsync(() -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.stopAndDestroyTablePartition(new TablePartitionId(table.tableId(), zonePartitionId.partitionId()), parameters.causalityToken())), this.ioExecutor)).toArray(CompletableFuture[]::new);
                return CompletableFuture.allOf(futures);
            }))).whenComplete((v, t) -> readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead))).thenApply(unused -> false);
        }
        catch (Throwable t2) {
            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
            return CompletableFuture.failedFuture(t2);
        }
    }

    private CompletableFuture<Void> prepareTableResourcesOnRecovery(long causalityToken, CatalogZoneDescriptor zoneDescriptor, @Nullable CatalogZoneDescriptor secondaryZoneDescriptor, CatalogTableDescriptor tableDescriptor, CatalogSchemaDescriptor schemaDescriptor) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            TableImpl table = this.createTableImpl(causalityToken, tableDescriptor, zoneDescriptor, secondaryZoneDescriptor, schemaDescriptor);
            int tableId = tableDescriptor.id();
            this.tables.put(tableId, table);
            return this.schemaManager.schemaRegistry(causalityToken, tableId).thenAccept(schemaRegistry -> IgniteUtils.inBusyLock(this.busyLock, () -> {
                table.schemaView((SchemaRegistry)schemaRegistry);
                this.addTableToZone(zoneDescriptor.id(), table);
                if (secondaryZoneDescriptor != null) {
                    this.addTableToZone(secondaryZoneDescriptor.id(), table);
                }
                this.startedTables.put(tableId, table);
            }));
        });
    }

    private CompletableFuture<Boolean> loadTableToZoneOnTableCreate(CreateTableEventParameters parameters) {
        long causalityToken = parameters.causalityToken();
        int catalogVersion = parameters.catalogVersion();
        CatalogTableDescriptor tableDescriptor = parameters.tableDescriptor();
        CatalogZoneDescriptor zoneDescriptor = this.getZoneDescriptor(tableDescriptor, parameters.catalogVersion());
        CatalogZoneDescriptor secondaryZoneDescriptor = this.getSecondaryZoneDescriptor(tableDescriptor, parameters.catalogVersion());
        CatalogSchemaDescriptor schemaDescriptor = this.getSchemaDescriptor(tableDescriptor, parameters.catalogVersion());
        return this.loadTableToZoneOnTableCreate(causalityToken, catalogVersion, zoneDescriptor, secondaryZoneDescriptor, tableDescriptor, schemaDescriptor).thenApply(v -> false);
    }

    private CompletableFuture<Void> loadTableToZoneOnTableCreate(long causalityToken, int catalogVersion, CatalogZoneDescriptor zoneDescriptor, @Nullable CatalogZoneDescriptor secondaryZoneDescriptor, CatalogTableDescriptor tableDescriptor, CatalogSchemaDescriptor schemaDescriptor) {
        CompletionStage loadToZoneFuture;
        TableImpl table = this.createTableImpl(causalityToken, tableDescriptor, zoneDescriptor, secondaryZoneDescriptor, schemaDescriptor);
        int tableId = tableDescriptor.id();
        this.tablesVv.update(causalityToken, (ignore, e) -> IgniteUtils.inBusyLock(this.busyLock, () -> {
            if (e != null) {
                return CompletableFuture.failedFuture(e);
            }
            return this.schemaManager.schemaRegistry(causalityToken, tableId).thenAccept(table::schemaView);
        }));
        CompletableFuture<Long> acquisitionFuture = this.partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id());
        try {
            loadToZoneFuture = this.loadTableToZoneOnTableCreateHavingZoneReadLock(acquisitionFuture, causalityToken, zoneDescriptor, table).whenComplete((res, ex) -> this.unlockZoneForRead(zoneDescriptor, acquisitionFuture));
        }
        catch (Throwable e2) {
            this.unlockZoneForRead(zoneDescriptor, acquisitionFuture);
            return CompletableFuture.failedFuture(e2);
        }
        if (secondaryZoneDescriptor == null) {
            return loadToZoneFuture;
        }
        CompletableFuture<Void> loadToSecondaryZoneFuture = this.loadTableToSecondaryZoneOnTableCreateIfNeeded(table, tableDescriptor, secondaryZoneDescriptor, causalityToken, false);
        CompletableFuture<Void> allResourcesLoadedFuture = CompletableFuture.allOf(new CompletableFuture[]{loadToZoneFuture, loadToSecondaryZoneFuture});
        this.secondaryReplicationManager.startReplicationForTable(table, catalogVersion);
        return allResourcesLoadedFuture;
    }

    private CompletableFuture<Void> loadTableToZoneOnTableCreateHavingZoneReadLock(CompletableFuture<Long> readLockAcquisitionFuture, long causalityToken, CatalogZoneDescriptor zoneDescriptor, TableImpl table) {
        int tableId = table.tableId();
        CompletableFuture<Void> localPartsUpdateFuture = this.localPartitionsVv.update(causalityToken, (ignore, throwable) -> IgniteUtils.inBusyLock(this.busyLock, () -> readLockAcquisitionFuture.thenComposeAsync(unused -> {
            BitSetPartitionSet parts = new BitSetPartitionSet();
            for (int i = 0; i < zoneDescriptor.partitions(); ++i) {
                if (!this.partitionReplicaLifecycleManager.hasLocalPartition(new ZonePartitionId(zoneDescriptor.id(), i))) continue;
                parts.set(i);
            }
            return this.getOrCreatePartitionStorages(table, parts).thenRun(() -> this.localPartsByTableId.put(tableId, parts));
        }, (Executor)this.ioExecutor)).exceptionally(TableManager.ignoreTableClosedException()));
        CompletableFuture<Void> tablesByIdFuture = this.tablesVv.get(causalityToken);
        CompletableFuture<Void> createPartsFut = this.assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
            if (e != null) {
                return CompletableFuture.failedFuture(e);
            }
            return CompletableFuture.allOf(localPartsUpdateFuture, tablesByIdFuture).thenRunAsync(() -> IgniteUtils.inBusyLock(this.busyLock, () -> {
                for (int i = 0; i < zoneDescriptor.partitions(); ++i) {
                    ZonePartitionId zonePartitionId = new ZonePartitionId(zoneDescriptor.id(), i);
                    if (!this.partitionReplicaLifecycleManager.hasLocalPartition(zonePartitionId)) continue;
                    this.preparePartitionResourcesAndLoadToZoneReplicaBusy(table, zonePartitionId, false);
                }
            }), this.ioExecutor);
        });
        this.tables.put(tableId, table);
        return createPartsFut.thenAccept(ignore -> {
            this.startedTables.put(tableId, table);
            this.addTableToZone(zoneDescriptor.id(), table);
        });
    }

    private void unlockZoneForRead(CatalogZoneDescriptor zoneDescriptor, CompletableFuture<Long> readLockAcquiryFuture) {
        readLockAcquiryFuture.thenAccept(stamp -> this.partitionReplicaLifecycleManager.unlockZoneForRead(zoneDescriptor.id(), (long)stamp));
    }

    private void unlockZoneForRead(int zoneId, CompletableFuture<Long> readLockAcquiryFuture) {
        readLockAcquiryFuture.thenAccept(stamp -> this.partitionReplicaLifecycleManager.unlockZoneForRead(zoneId, (long)stamp));
    }

    private void preparePartitionResourcesAndLoadToZoneReplicaBusy(TableViewInternal table, ZonePartitionId zonePartitionId, boolean onNodeRecovery) {
        PartitionStorages partitionStorages;
        int partId = zonePartitionId.partitionId();
        int tableId = table.tableId();
        InternalTableImpl internalTbl = (InternalTableImpl)table.internalTable();
        TablePartitionId tablePartitionId = new TablePartitionId(tableId, partId);
        SafeTimeValuesTracker safeTimeTracker = new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
        PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<Long, Void>(Long.valueOf(0L)){

            @Override
            public void update(Long newValue, @Nullable Void futureResult) {
                throw new UnsupportedOperationException("It's not expected that in case of enabled colocation table storageIndexTracker will be updated.");
            }

            @Override
            public CompletableFuture<Void> waitFor(Long valueToWait) {
                throw new UnsupportedOperationException("It's not expected that in case of enabled colocation table storageIndexTracker will be updated.");
            }
        };
        try {
            partitionStorages = TableManager.getPartitionStorages(table, partId);
        }
        catch (TableClosedException e) {
            return;
        }
        PartitionDataStorage partitionDataStorage = this.partitionDataStorage(new ZonePartitionKey(zonePartitionId.zoneId(), partId), tableId, partitionStorages.getMvPartitionStorage());
        PartitionUpdateHandlers partitionUpdateHandlers = this.createPartitionUpdateHandlers(partId, partitionDataStorage, table, safeTimeTracker, this.replicationConfiguration);
        internalTbl.updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker);
        this.mvGc.addStorage(tablePartitionId, partitionUpdateHandlers.gcUpdateHandler);
        this.minTimeCollectorService.addPartition(new TablePartitionId(tableId, partId));
        Function<RaftCommandRunner, ReplicaTableProcessor> createListener = raftClient -> this.createReplicaListener(zonePartitionId, table, safeTimeTracker, partitionStorages.getMvPartitionStorage(), partitionStorages.getTxStateStorage(), partitionUpdateHandlers, (RaftCommandRunner)raftClient);
        PartitionListener tablePartitionRaftListener = new PartitionListener(this.txManager, partitionDataStorage, partitionUpdateHandlers.storageUpdateHandler, partitionStorages.getTxStateStorage(), safeTimeTracker, storageIndexTracker, this.catalogService, table.schemaView(), this.indexMetaStorage, this.topologyService.localMember().id(), this.minTimeCollectorService, this.partitionOperationsExecutor, this.executorInclinedPlacementDriver, this.clockService, this.nodeProperties, zonePartitionId);
        PartitionMvStorageAccessImpl partitionStorageAccess = new PartitionMvStorageAccessImpl(partId, table.internalTable().storage(), this.mvGc, partitionUpdateHandlers.indexUpdateHandler, partitionUpdateHandlers.gcUpdateHandler, this.fullStateTransferIndexChooser, this.schemaManager.schemaRegistry(tableId), this.lowWatermark);
        this.partitionReplicaLifecycleManager.loadTableListenerToZoneReplica(zonePartitionId, tableId, createListener, tablePartitionRaftListener, partitionStorageAccess, onNodeRecovery);
    }

    private CompletableFuture<Boolean> onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
        if (!this.thisNodeHoldsLease(parameters.leaseholderId())) {
            return CompletableFutures.falseCompletedFuture();
        }
        ReplicationGroupId groupId = parameters.groupId();
        if (groupId instanceof TablePartitionId) {
            TablePartitionId tablePartitionId = (TablePartitionId)groupId;
            this.replicaMgr.weakStopReplica(tablePartitionId, ReplicaManager.WeakReplicaStopReason.PRIMARY_EXPIRED, () -> this.stopAndDestroyTablePartition(tablePartitionId, this.tablesVv.latestCausalityToken()));
        } else {
            this.secondaryReplicationManager.stopReplicationForPartition((ZonePartitionId)groupId);
        }
        return CompletableFutures.falseCompletedFuture();
    }

    private CompletableFuture<Boolean> onBeforeZoneReplicaStopOrDestroy(LocalPartitionReplicaEventParameters parameters) {
        if (!this.nodeProperties.colocationEnabled()) {
            return CompletableFutures.falseCompletedFuture();
        }
        return this.secondaryReplicationManager.stopReplicationForPartition(parameters.zonePartitionId()).thenApply(v -> false);
    }

    private boolean thisNodeHoldsLease(@Nullable UUID leaseholderId) {
        return this.localNode().id().equals(leaseholderId);
    }

    private CompletableFuture<Boolean> onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) {
        if (!this.thisNodeHoldsLease(parameters.leaseholderId())) {
            return CompletableFutures.falseCompletedFuture();
        }
        ReplicationGroupId groupId = parameters.groupId();
        if (groupId instanceof ZonePartitionId) {
            this.secondaryReplicationManager.startReplicationForPartition((ZonePartitionId)groupId, parameters.startTime().longValue());
        }
        return CompletableFutures.falseCompletedFuture();
    }

    private CompletableFuture<Set<? extends TableViewInternal>> getTablesIfZoneIsSecondary(int zoneId) {
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock();
        try {
            return ((CompletableFuture)readLockAcquisitionFuture.thenApply(v -> this.zoneTablesRawSet(zoneId).stream().filter(table -> {
                Integer secondaryZoneId = table.internalTable().secondaryZoneId();
                return secondaryZoneId != null && zoneId == secondaryZoneId;
            }).collect(Collectors.toSet()))).whenComplete((v, e) -> readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead));
        }
        catch (Throwable e2) {
            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
            throw new CompletionException(e2);
        }
    }

    private CompletableFuture<Void> processAssignmentsOnRecovery(long recoveryRevision) {
        return this.recoverStableAssignments(recoveryRevision).thenCompose(v -> this.recoverPendingAssignments(recoveryRevision));
    }

    private CompletableFuture<Void> recoverStableAssignments(long recoveryRevision) {
        return this.handleAssignmentsOnRecovery(new ByteArray(RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES), recoveryRevision, (entry, rev) -> this.handleChangeStableAssignmentEvent((Entry)entry, (long)rev, true), "stable");
    }

    private CompletableFuture<Void> recoverPendingAssignments(long recoveryRevision) {
        return this.handleAssignmentsOnRecovery(new ByteArray(RebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES), recoveryRevision, (entry, rev) -> this.handleChangePendingAssignmentEvent((Entry)entry, (long)rev, true), "pending");
    }

    private CompletableFuture<Void> handleAssignmentsOnRecovery(ByteArray prefix, long revision, BiFunction<Entry, Long, CompletableFuture<Void>> assignmentsEventHandler, String assignmentsType) {
        try (Cursor<Entry> cursor = this.metaStorageMgr.prefixLocally(prefix, revision);){
            CompletableFuture[] futures = (CompletableFuture[])cursor.stream().map(entry -> {
                if (LOG.isInfoEnabled()) {
                    LOG.info("Non handled {} assignments for key '{}' discovered, performing recovery", assignmentsType, new String(entry.key(), StandardCharsets.UTF_8));
                }
                return (CompletableFuture)assignmentsEventHandler.apply((Entry)entry, revision);
            }).toArray(CompletableFuture[]::new);
            CompletionStage completionStage = CompletableFuture.allOf(futures).exceptionally(e -> {
                LOG.error("Error when performing assignments recovery", (Throwable)e);
                return null;
            });
            return completionStage;
        }
    }

    private CompletableFuture<Boolean> onTableCreate(CreateTableEventParameters parameters) {
        return this.createTableLocally(parameters.causalityToken(), parameters.catalogVersion(), parameters.tableDescriptor(), false).thenApply(unused -> false);
    }

    private void onTableDrop(DropTableEventParameters parameters) {
        IgniteUtils.inBusyLock(this.busyLock, () -> {
            this.unregisterMetricsSource(this.startedTables.get(parameters.tableId()));
            this.destructionEventsQueue.enqueue(new DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));
        });
    }

    private CompletableFuture<Boolean> onTableAlter(CatalogEventParameters parameters) {
        if (parameters instanceof RenameTableEventParameters) {
            return this.onTableRename((RenameTableEventParameters)parameters).thenApply(unused -> false);
        }
        if (parameters instanceof AddSecondaryZoneEventParameters) {
            return this.onTableSecondaryZoneAdd((AddSecondaryZoneEventParameters)parameters).thenApply(unused -> false);
        }
        if (parameters instanceof DropSecondaryZoneEventParameters) {
            return this.onTableSecondaryZoneDrop((DropSecondaryZoneEventParameters)parameters).thenApply(unused -> false);
        }
        if (parameters instanceof AlterTablePropertiesEventParameters) {
            return this.onTablePropertiesChanged((AlterTablePropertiesEventParameters)parameters).thenApply(unused -> false);
        }
        return CompletableFutures.falseCompletedFuture();
    }

    private CompletableFuture<Boolean> onSecondaryStorageAvailable(SecondaryStorageAvailableEventParameters parameters) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            int tableId = parameters.tableId();
            LOG.info("Secondary storage is available for table [tableId={}]", tableId);
            TableViewInternal table = this.tables.get(parameters.tableId());
            this.secondaryReplicationManager.startReplicationForTable(table, parameters.catalogVersion());
            return CompletableFutures.falseCompletedFuture();
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Boolean> onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.falseCompletedFuture();
        }
        try {
            int newEarliestCatalogVersion = this.catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
            this.destructionEventsQueue.drainUpTo(newEarliestCatalogVersion).forEach(event -> this.destroyTableLocally(event.tableId()));
            CompletableFuture<Boolean> completableFuture = CompletableFutures.falseCompletedFuture();
            return completableFuture;
        }
        catch (Throwable t) {
            CompletableFuture<Boolean> completableFuture = CompletableFuture.failedFuture(t);
            return completableFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private CompletableFuture<?> onTablePropertiesChanged(AlterTablePropertiesEventParameters parameters) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesVv.update(parameters.causalityToken(), (ignore, e) -> {
            if (e != null) {
                return CompletableFuture.failedFuture(e);
            }
            TableViewInternal table = this.tables.get(parameters.tableId());
            table.updateStalenessConfiguration(parameters.staleRowsFraction(), parameters.minStaleRowsCount());
            return CompletableFutures.nullCompletedFuture();
        }));
    }

    private CompletableFuture<?> onTableRename(RenameTableEventParameters parameters) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesVv.update(parameters.causalityToken(), (ignore, e) -> {
            if (e != null) {
                return CompletableFuture.failedFuture(e);
            }
            TableViewInternal table = this.tables.get(parameters.tableId());
            ((TableImpl)table).name(parameters.newTableName());
            return CompletableFutures.nullCompletedFuture();
        }));
    }

    private CompletableFuture<?> onTableSecondaryZoneAdd(AddSecondaryZoneEventParameters parameters) {
        long causalityToken = parameters.causalityToken();
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesVv.update(causalityToken, (ignore, ex) -> {
            int catalogVersion = parameters.catalogVersion();
            int tableId = parameters.tableId();
            TableViewInternal table = this.tables.get(tableId);
            Catalog catalog = this.catalogService.catalog(catalogVersion);
            CatalogTableDescriptor tableDescriptor = TableManager.getTableDescriptor(tableId, catalog);
            CatalogZoneDescriptor zoneDescriptor = this.getSecondaryZoneDescriptor(tableDescriptor, catalogVersion);
            assert (zoneDescriptor != null);
            LOG.info("Adding the secondary zone to the table [tableId={}, tableName={}, zoneId={}, zoneName={}]", tableId, table.name(), zoneDescriptor.id(), zoneDescriptor.name());
            SecondaryTableStorage secondaryTableStorage = this.createSecondaryTableStorage(tableDescriptor, zoneDescriptor);
            if (secondaryTableStorage == null) {
                table.internalTable().setSecondaryZoneId(zoneDescriptor.id());
            } else {
                table.internalTable().setSecondaryStorage(secondaryTableStorage);
            }
            ((CompletableFuture)this.loadTableToSecondaryZoneIfNeeded(table, tableDescriptor, zoneDescriptor, causalityToken).thenComposeAsync(v -> this.secondaryReplicationManager.startFullStateTransferForTable(table, catalog), (Executor)this.ioExecutor)).whenComplete((result, error) -> {
                if (error != null && !ExceptionUtils.hasCause(error, NodeStoppingException.class)) {
                    String errorMessage = String.format("Failed to start full state transfer for table [tableId=%s, tableName=%s, zoneId=%s, zoneName=%s]", tableId, table.name(), zoneDescriptor.id(), zoneDescriptor.name());
                    this.failureProcessor.process(new FailureContext((Throwable)error, errorMessage));
                }
            });
            return CompletableFutures.nullCompletedFuture();
        }));
    }

    private CompletableFuture<?> onTableSecondaryZoneDrop(DropSecondaryZoneEventParameters parameters) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesVv.update(parameters.causalityToken(), (ignore, ex) -> {
            IgniteUtils.inBusyLock(this.busyLock, () -> this.dropSecondaryZoneFromTable(parameters));
            return CompletableFutures.nullCompletedFuture();
        }));
    }

    private void dropSecondaryZoneFromTable(DropSecondaryZoneEventParameters parameters) {
        int tableId = parameters.tableId();
        TableViewInternal table = this.tables.get(tableId);
        assert (table != null) : "Table is missing [tableId=" + tableId + "]";
        LOG.info("Dropping secondary storage profile for table [tableId={}, tableName={}].", tableId, table.name());
        this.secondaryReplicationManager.abortFullStateTransferForTable(table);
        this.secondaryReplicationManager.stopReplicationForTable(table).thenRun(() -> this.stopAndDestroySecondaryStorageResources(table));
    }

    private CompletableFuture<Void> stopAndDestroySecondaryStorageResources(TableViewInternal table) {
        int tableId = table.tableId();
        Integer secondaryZoneId = table.internalTable().secondaryZoneId();
        assert (secondaryZoneId != null) : "Secondary zone is null [tableId=" + tableId + "]";
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(secondaryZoneId, id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> zoneLockAcquiryFuture = zoneLock.writeLock();
        try {
            return ((CompletableFuture)((CompletableFuture)((CompletableFuture)zoneLockAcquiryFuture.thenCompose(stamp -> this.unloadTableResourcesFromSecondaryZoneReplica(table, secondaryZoneId))).thenRun(() -> this.tablesPerZone.getOrDefault(secondaryZoneId, Collections.emptySet()).remove(table))).thenRunAsync(() -> TableManager.destroyTableSecondaryStorage(table.internalTable()), this.ioExecutor)).whenComplete((v, ex) -> zoneLockAcquiryFuture.thenAccept(zoneLock::unlockWrite));
        }
        catch (Throwable t) {
            zoneLockAcquiryFuture.thenAccept(zoneLock::unlockWrite);
            throw t;
        }
    }

    private CompletableFuture<Void> unloadTableResourcesFromSecondaryZoneReplica(TableViewInternal table, int secondaryZoneId) {
        CompletableFuture<Long> readZoneLockAcquiryFuture = this.partitionReplicaLifecycleManager.lockZoneForRead(secondaryZoneId);
        try {
            return ((CompletableFuture)readZoneLockAcquiryFuture.thenCompose(stamp -> this.secondaryZoneManager.unloadTableResourcesFromSecondaryZoneReplicaHavingZoneReadLock(table))).whenComplete((v, ex) -> this.unlockZoneForRead(secondaryZoneId, readZoneLockAcquiryFuture));
        }
        catch (Throwable t) {
            this.unlockZoneForRead(secondaryZoneId, readZoneLockAcquiryFuture);
            throw t;
        }
    }

    private CompletableFuture<Void> startLocalPartitionsAndClients(CompletableFuture<List<Assignments>> stableAssignmentsFuture, List<@Nullable Assignments> pendingAssignmentsForPartitions, List<@Nullable AssignmentsChain> assignmentsChains, TableImpl table, boolean isRecovery, long assignmentsTimestamp) {
        int tableId = table.tableId();
        return stableAssignmentsFuture.thenCompose(stableAssignmentsForPartitions -> {
            assert (stableAssignmentsForPartitions != null) : IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId);
            int partitions = stableAssignmentsForPartitions.size();
            CompletableFuture[] futures = new CompletableFuture[partitions];
            for (int i = 0; i < partitions; ++i) {
                boolean shouldStartPartition;
                int partId = i;
                Assignments stableAssignments = (Assignments)stableAssignmentsForPartitions.get(i);
                Assignments pendingAssignments = (Assignments)pendingAssignmentsForPartitions.get(i);
                Assignment localAssignmentInStable = this.localAssignment(stableAssignments);
                if (isRecovery) {
                    AssignmentsChain assignmentsChain = (AssignmentsChain)assignmentsChains.get(i);
                    if (TableManager.lastRebalanceWasGraceful(assignmentsChain)) {
                        shouldStartPartition = localAssignmentInStable != null && (pendingAssignments == null || !pendingAssignments.force());
                    } else {
                        LOG.warn("Recovery after a forced rebalance for table is not supported yet [tableId={}, partitionId={}].", tableId, partId);
                        shouldStartPartition = localAssignmentInStable != null && (pendingAssignments == null || !pendingAssignments.force());
                    }
                } else {
                    shouldStartPartition = localAssignmentInStable != null;
                }
                futures[i] = shouldStartPartition ? this.startPartitionAndStartClient(table, partId, localAssignmentInStable, stableAssignments, isRecovery, assignmentsTimestamp).whenComplete((res, ex) -> {
                    if (ex != null) {
                        String errorMessage = String.format("Unable to update raft groups on the node [tableId=%s, partitionId=%s]", tableId, partId);
                        this.failureProcessor.process(new FailureContext((Throwable)ex, errorMessage));
                    }
                }) : CompletableFutures.nullCompletedFuture();
            }
            return CompletableFuture.allOf(futures);
        });
    }

    private CompletableFuture<Void> startPartitionAndStartClient(TableViewInternal table, int partId, Assignment localAssignment, Assignments stableAssignments, boolean isRecovery, long assignmentsTimestamp) {
        if (this.nodeProperties.colocationEnabled()) {
            return CompletableFutures.nullCompletedFuture();
        }
        int tableId = table.tableId();
        InternalTableImpl internalTbl = (InternalTableImpl)table.internalTable();
        PeersAndLearners stablePeersAndLearners = PeersAndLearners.fromAssignments(stableAssignments.nodes());
        TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId);
        CompletableFuture<Boolean> shouldStartGroupFut = isRecovery ? this.partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded(replicaGrpId, internalTbl, stablePeersAndLearners, localAssignment, assignmentsTimestamp) : CompletableFutures.trueCompletedFuture();
        Assignments forcedAssignments = stableAssignments.force() ? stableAssignments : null;
        Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> shouldStartGroupFut.thenComposeAsync(startGroup -> IgniteUtils.inBusyLock(this.busyLock, () -> {
            PartitionStorages partitionStorages;
            if (!startGroup.booleanValue()) {
                return CompletableFutures.falseCompletedFuture();
            }
            SafeTimeValuesTracker safeTimeTracker = new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
            PendingComparableValuesTracker<Long, Void> storageIndexTracker = new PendingComparableValuesTracker<Long, Void>(0L);
            try {
                partitionStorages = TableManager.getPartitionStorages(table, partId);
            }
            catch (TableClosedException e) {
                return CompletableFutures.falseCompletedFuture();
            }
            TablePartitionKey partitionKey = new TablePartitionKey(tableId, partId);
            PartitionDataStorage partitionDataStorage = this.partitionDataStorage(partitionKey, internalTbl.tableId(), partitionStorages.getMvPartitionStorage());
            storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null);
            PartitionUpdateHandlers partitionUpdateHandlers = this.createPartitionUpdateHandlers(partId, partitionDataStorage, table, safeTimeTracker, this.replicationConfiguration);
            internalTbl.updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker);
            this.mvGc.addStorage(replicaGrpId, partitionUpdateHandlers.gcUpdateHandler);
            PartitionListener raftGroupListener = new PartitionListener(this.txManager, partitionDataStorage, partitionUpdateHandlers.storageUpdateHandler, partitionStorages.getTxStateStorage(), safeTimeTracker, storageIndexTracker, this.catalogService, table.schemaView(), this.indexMetaStorage, this.topologyService.localMember().id(), this.minTimeCollectorService, this.partitionOperationsExecutor, this.executorInclinedPlacementDriver, this.clockService, this.nodeProperties, replicaGrpId);
            this.minTimeCollectorService.addPartition(new TablePartitionId(tableId, partId));
            SnapshotStorageFactory snapshotStorageFactory = this.createSnapshotStorageFactory(replicaGrpId, partitionUpdateHandlers, internalTbl);
            Function<RaftGroupService, ReplicaListener> createListener = raftClient -> this.createReplicaListener(replicaGrpId, table, safeTimeTracker, partitionStorages.getMvPartitionStorage(), partitionStorages.getTxStateStorage(), partitionUpdateHandlers, (RaftCommandRunner)raftClient);
            RaftGroupEventsListener raftGroupEventsListener = this.createRaftGroupEventsListener(replicaGrpId);
            MvTableStorage mvTableStorage = internalTbl.storage();
            try {
                return this.replicaMgr.startReplica(raftGroupEventsListener, raftGroupListener, mvTableStorage.isVolatile(), snapshotStorageFactory, createListener, storageIndexTracker, replicaGrpId, stablePeersAndLearners).thenApply(ignored -> true);
            }
            catch (NodeStoppingException e) {
                throw new AssertionError("Loza was stopped before Table manager", e);
            }
        }), (Executor)this.ioExecutor);
        return this.replicaMgr.weakStartReplica(replicaGrpId, startReplicaSupplier, forcedAssignments).handle((res, ex) -> {
            if (ex != null && !ExceptionUtils.hasCause(ex, NodeStoppingException.class, TransientReplicaStartException.class)) {
                String errorMessage = String.format("Unable to update raft groups on the node [tableId=%s, partitionId=%s]", tableId, partId);
                this.failureProcessor.process(new FailureContext((Throwable)ex, errorMessage));
            }
            return null;
        });
    }

    @Nullable
    private Assignment localAssignment(@Nullable Assignments assignments) {
        if (assignments != null) {
            for (Assignment assignment : assignments.nodes()) {
                if (!this.isLocalNodeAssignment.test(assignment)) continue;
                return assignment;
            }
        }
        return null;
    }

    private PartitionMover createPartitionMover(TablePartitionId replicaGrpId) {
        return new PartitionMover(this.busyLock, this.rebalanceScheduler, () -> {
            CompletableFuture<Replica> replicaFut = this.replicaMgr.replica(replicaGrpId);
            if (replicaFut == null) {
                return CompletableFuture.failedFuture(new IgniteInternalException("No such replica for partition " + replicaGrpId.partitionId() + " in table " + replicaGrpId.tableId()));
            }
            return replicaFut.thenApply(Replica::raftClient);
        });
    }

    private RaftGroupEventsListener createRaftGroupEventsListener(TablePartitionId replicaGrpId) {
        PartitionMover partitionMover = this.createPartitionMover(replicaGrpId);
        return new RebalanceRaftGroupEventsListener(this.metaStorageMgr, this.failureProcessor, replicaGrpId, this.busyLock, partitionMover, this::calculateAssignments, this.rebalanceScheduler, this.rebalanceRetryDelayConfiguration);
    }

    private PartitionReplicaListener createReplicaListener(PartitionGroupId replicationGroupId, TableViewInternal table, PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker, MvPartitionStorage mvPartitionStorage, TxStatePartitionStorage txStatePartitionStorage, PartitionUpdateHandlers partitionUpdateHandlers, RaftCommandRunner raftClient) {
        int partitionIndex = replicationGroupId.partitionId();
        return new PartitionReplicaListener(mvPartitionStorage, new ExecutorInclinedRaftCommandRunner(raftClient, this.partitionOperationsExecutor), this.txManager, this.lockMgr, this.scanRequestExecutor, this.partitionOperationsExecutor, replicationGroupId, table.tableId(), table.indexesLockers(partitionIndex), new Lazy<TableSchemaAwareIndexStorage>(() -> table.indexStorageAdapters(partitionIndex).get().get(table.pkId())), () -> table.indexStorageAdapters(partitionIndex).get(), this.clockService, safeTimeTracker, txStatePartitionStorage, this.transactionStateResolver, partitionUpdateHandlers.storageUpdateHandler, new CatalogValidationSchemasSource(this.catalogService, this.schemaManager), this.localNode(), this.executorInclinedSchemaSyncService, this.catalogService, this.executorInclinedPlacementDriver, this.topologyService, this.remotelyTriggeredResourceRegistry, this.schemaManager.schemaRegistry(table.tableId()), this.indexMetaStorage, this.lowWatermark, this.licenseFeatureChecker, this.failureProcessor, this.nodeProperties, table.metrics(), this.policyManager);
    }

    private CompletableFuture<Set<Assignment>> calculateAssignments(TablePartitionId tablePartitionId, Long assignmentsTimestamp) {
        CompletionStage assignmentsFuture = this.reliableCatalogVersions.safeReliableCatalogFor(HybridTimestamp.hybridTimestamp(assignmentsTimestamp)).thenCompose(catalog -> this.calculateAssignments(tablePartitionId, (Catalog)catalog));
        return this.orStopManagerFuture((CompletableFuture)assignmentsFuture);
    }

    private CompletableFuture<Set<Assignment>> calculateAssignments(TablePartitionId tablePartitionId, Catalog catalog) {
        CatalogTableDescriptor tableDescriptor = TableManager.getTableDescriptor(tablePartitionId.tableId(), catalog);
        CatalogZoneDescriptor zoneDescriptor = TableManager.getZoneDescriptor(tableDescriptor, catalog);
        List<Set<Assignment>> currentAssignments = AssignmentUtil.currentDistributionFromLocalMetaStorage(this.metaStorageMgr, zoneDescriptor.partitions(), p -> RebalanceUtil.stablePartAssignmentsKey(new TablePartitionId(tablePartitionId.tableId(), (int)p)), p -> RebalanceUtil.pendingPartAssignmentsQueueKey(new TablePartitionId(tablePartitionId.tableId(), (int)p)));
        return this.distributionZoneManager.dataNodes(zoneDescriptor.updateTimestamp(), catalog.version(), tableDescriptor.zoneId()).thenApply(dataNodes -> PartitionDistributionUtils.calculateAssignmentForPartition(dataNodes, currentAssignments, tablePartitionId.partitionId(), zoneDescriptor.partitions(), zoneDescriptor.replicas(), zoneDescriptor.consensusGroupSize()));
    }

    private boolean isLocalNodeInAssignments(Collection<Assignment> assignments) {
        return assignments.stream().anyMatch(this.isLocalNodeAssignment);
    }

    private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId replicationGroupId) {
        return this.isLocalNodeIsPrimary(this.getPrimaryReplica(replicationGroupId));
    }

    private CompletableFuture<Boolean> isLocalNodeIsPrimary(CompletableFuture<ReplicaMeta> primaryReplicaFuture) {
        return primaryReplicaFuture.thenApply(primaryReplicaMeta -> primaryReplicaMeta != null && primaryReplicaMeta.getLeaseholder() != null && primaryReplicaMeta.getLeaseholder().equals(this.localNode().name()));
    }

    private CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId replicationGroupId) {
        HybridTimestamp currentSafeTime = this.metaStorageMgr.clusterTime().currentSafeTime();
        if (HybridTimestamp.MIN_VALUE.equals(currentSafeTime)) {
            return CompletableFutures.nullCompletedFuture();
        }
        long skewMs = this.clockService.maxClockSkewMillis();
        try {
            HybridTimestamp previousMetastoreSafeTime = currentSafeTime.subtractPhysicalTime(skewMs);
            return this.executorInclinedPlacementDriver.getPrimaryReplica(replicationGroupId, previousMetastoreSafeTime);
        }
        catch (IllegalArgumentException e) {
            long currentSafeTimeMs = currentSafeTime.longValue();
            throw new AssertionError("Got a negative time [currentSafeTime=" + currentSafeTime + ", currentSafeTimeMs=" + currentSafeTimeMs + ", skewMs=" + skewMs + ", internal=" + (currentSafeTimeMs + (-skewMs << 16)) + "]", e);
        }
    }

    private PartitionDataStorage partitionDataStorage(PartitionKey partitionKey, int tableId, MvPartitionStorage partitionStorage) {
        return new SnapshotAwarePartitionDataStorage(tableId, partitionStorage, this.outgoingSnapshotsManager, partitionKey);
    }

    @Override
    public void beforeNodeStop() {
        if (!this.beforeStopGuard.compareAndSet(false, true)) {
            return;
        }
        this.stopManagerFuture.completeExceptionally(new NodeStoppingException());
        this.busyLock.block();
        this.expiredRowsCleaner.stop();
        this.secondaryStorageRebalanceTrigger.stop();
        this.executorInclinedPlacementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.onPrimaryReplicaExpiredListener);
        this.executorInclinedPlacementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this.onPrimaryReplicaElectedListener);
        this.lowWatermark.removeListener(LowWatermarkEvent.LOW_WATERMARK_CHANGED, this.onLowWatermarkChangedListener);
        this.catalogService.removeListener(CatalogEvent.TABLE_CREATE, this.onTableCreateWithColocationListener);
        this.catalogService.removeListener(CatalogEvent.TABLE_CREATE, this.onTableCreateWithoutColocationListener);
        this.catalogService.removeListener(CatalogEvent.TABLE_DROP, this.onTableDropListener);
        this.catalogService.removeListener(CatalogEvent.TABLE_ALTER, this.onTableAlterListener);
        this.catalogService.removeListener(CatalogEvent.SECONDARY_STORAGE_AVAILABLE, this.onSecondaryStorageAvailableListener);
        this.secondaryReplicationManager.stop();
        if (!this.nodeProperties.colocationEnabled()) {
            this.metaStorageMgr.unregisterWatch(this.pendingAssignmentsRebalanceListener);
            this.metaStorageMgr.unregisterWatch(this.stableAssignmentsRebalanceListener);
            this.metaStorageMgr.unregisterWatch(this.assignmentsSwitchRebalanceListener);
            this.stopReplicasAndCloseTables(this.tables);
        }
    }

    @Override
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        assert (this.beforeStopGuard.get()) : "'stop' called before 'beforeNodeStop'";
        if (!this.stopGuard.compareAndSet(false, true)) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.partitionReplicaLifecycleManager.removeListener(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, this.onBeforeZoneReplicaStartedListener);
        this.partitionReplicaLifecycleManager.removeListener(LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED, this.onZoneReplicaDestroyedListener);
        this.partitionReplicaLifecycleManager.removeListener(LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, this.onZoneReplicaStoppedListener);
        this.partitionReplicaLifecycleManager.removeListener(LocalPartitionReplicaEvent.BEFORE_REPLICA_STOPPED, this.onBeforeZoneReplicaStopOrDestroyListener);
        this.partitionReplicaLifecycleManager.removeListener(LocalPartitionReplicaEvent.BEFORE_REPLICA_DESTROYED, this.onBeforeZoneReplicaStopOrDestroyListener);
        int shutdownTimeoutSeconds = 10;
        try {
            ManuallyCloseable[] manuallyCloseableArray = new ManuallyCloseable[8];
            manuallyCloseableArray[0] = () -> {
                if (this.nodeProperties.colocationEnabled()) {
                    IgniteUtils.closeAllManually(this.tables.values().stream().map(table -> () -> TableManager.closeTable(table)));
                }
            };
            manuallyCloseableArray[1] = this.mvGc;
            manuallyCloseableArray[2] = this.fullStateTransferIndexChooser;
            manuallyCloseableArray[3] = () -> IgniteUtils.shutdownAndAwaitTermination(this.scanRequestExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS);
            manuallyCloseableArray[4] = () -> IgniteUtils.shutdownAndAwaitTermination(this.incomingSnapshotsExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS);
            manuallyCloseableArray[5] = () -> {
                ScheduledExecutorService streamerFlushExecutor;
                TableManager tableManager = this;
                synchronized (tableManager) {
                    streamerFlushExecutor = this.streamerFlushExecutor;
                }
                IgniteUtils.shutdownAndAwaitTermination(streamerFlushExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS);
            };
            manuallyCloseableArray[6] = this.secondaryStorageBridge;
            manuallyCloseableArray[7] = this.policyManager::stop;
            IgniteUtils.closeAllManually(manuallyCloseableArray);
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    private void stopReplicasAndCloseTables(Map<Integer, TableViewInternal> tables) {
        ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>(tables.size());
        for (TableViewInternal table : tables.values()) {
            CompletionStage stopFuture = ((CompletableFuture)this.stopTablePartitions(table).thenRun(() -> {
                try {
                    TableManager.closeTable(table);
                }
                catch (Exception e) {
                    throw new CompletionException(e);
                }
            })).whenComplete((res, ex) -> {
                if (ex != null) {
                    String errorMessage = String.format("Unable to stop table [name=%s, tableId=%s]", table.name(), table.tableId());
                    this.failureProcessor.process(new FailureContext((Throwable)ex, errorMessage));
                }
            });
            futures.add(stopFuture);
        }
        try {
            CompletableFuture.allOf((CompletableFuture[])futures.toArray(CompletableFuture[]::new)).get(30L, TimeUnit.SECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.error("Unable to clean table resources", (Throwable)e);
        }
    }

    private CompletableFuture<Void> stopTablePartitions(TableViewInternal table) {
        return CompletableFuture.supplyAsync(() -> {
            InternalTable internalTable = table.internalTable();
            CompletableFuture[] stopReplicaFutures = new CompletableFuture[internalTable.partitions()];
            for (int p = 0; p < internalTable.partitions(); ++p) {
                TablePartitionId replicationGroupId = new TablePartitionId(table.tableId(), p);
                stopReplicaFutures[p] = this.stopTablePartition(replicationGroupId, table);
            }
            return CompletableFuture.allOf(stopReplicaFutures);
        }, this.ioExecutor).thenCompose(Function.identity());
    }

    private static void closeTable(TableViewInternal table) throws Exception {
        InternalTable internalTable = table.internalTable();
        IgniteUtils.closeAllManually(internalTable.storage(), internalTable.txStateStorage(), internalTable.secondaryStorage(), internalTable);
    }

    private TableImpl createTableImpl(long causalityToken, CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor, @Nullable CatalogZoneDescriptor secondaryZoneDescriptor, CatalogSchemaDescriptor schemaDescriptor) {
        QualifiedName tableName = QualifiedNameHelper.fromNormalized(schemaDescriptor.name(), tableDescriptor.name());
        LOG.trace("Creating local table: name={}, id={}, token={}", tableName.toCanonicalForm(), tableDescriptor.id(), causalityToken);
        MvTableStorage tableStorage = this.createTableStorage(tableDescriptor, zoneDescriptor);
        SecondaryTableStorage secondaryStorage = this.createSecondaryTableStorage(tableDescriptor, secondaryZoneDescriptor);
        TxStateStorage txStateStorage = this.createTxStateTableStorage(tableDescriptor, zoneDescriptor);
        int partitions = zoneDescriptor.partitions();
        InternalTableImpl internalTable = new InternalTableImpl(tableName, zoneDescriptor.id(), tableDescriptor.id(), tableDescriptor.secondaryZoneId(), partitions, this.topologyService, this.txManager, tableStorage, txStateStorage, this.replicaSvc, this.clockService, this.observableTimestampTracker, this.executorInclinedPlacementDriver, this.transactionInflights, this::streamerFlushExecutor, Objects.requireNonNull(this.streamerReceiverRunner), secondaryStorage, this.schemaManager.schemaRegistry(tableDescriptor.id()), this.continuousQueryResponseHandler, tableDescriptor.cache(), this.licenseFeatureChecker, () -> ((TransactionView)this.txCfg.value()).readWriteTimeoutMillis(), () -> ((TransactionView)this.txCfg.value()).readOnlyTimeoutMillis(), this.nodeProperties.colocationEnabled(), this.createAndRegisterMetricsSource(tableStorage.getTableDescriptor(), tableName), this.cmgManager);
        return new TableImpl(internalTable, this.lockMgr, this.schemaVersions, this.marshallers, this.sql.get(), this.failureProcessor, tableDescriptor.primaryKeyIndexId(), new TableStatsStalenessConfiguration(tableDescriptor.properties().staleRowsFraction(), tableDescriptor.properties().minStaleRowsCount()));
    }

    @Nullable
    private SecondaryTableStorage createSecondaryTableStorage(CatalogTableDescriptor tableDescriptor, @Nullable CatalogZoneDescriptor secondaryZoneDescriptor) {
        if (secondaryZoneDescriptor == null) {
            return null;
        }
        SecondaryTableStorage secondaryStorage = null;
        try {
            SecondaryStorageEngine secondaryStorageEngine = this.dataStorageMgr.secondaryEngineByStorageProfile(tableDescriptor.secondaryStorageProfile());
            if (secondaryStorageEngine != null) {
                SecondaryStorageTableDescriptor secondaryTableDescriptor = TableManager.toSecondaryStorageTableDescriptor(secondaryZoneDescriptor, tableDescriptor);
                secondaryStorage = secondaryStorageEngine.createTable(secondaryTableDescriptor);
                secondaryStorage.start();
            } else {
                LOG.info("Secondary storage engine is not available on the node [tableId={}, secondaryStorageProfile={}]", tableDescriptor.id(), tableDescriptor.secondaryStorageProfile());
            }
        }
        catch (MissingRequiredFeaturesException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(e.getMessage(), (Throwable)e);
            }
            LOG.warn(e.getMessage(), new Object[0]);
        }
        return secondaryStorage;
    }

    private CompletableFuture<?> createTableLocally(long causalityToken, int catalogVersion, CatalogTableDescriptor tableDescriptor, boolean onNodeRecovery) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            int tableId = tableDescriptor.id();
            CatalogZoneDescriptor zoneDescriptor = this.getZoneDescriptor(tableDescriptor, catalogVersion);
            CatalogZoneDescriptor secondaryZoneDescriptor = this.getSecondaryZoneDescriptor(tableDescriptor, catalogVersion);
            CatalogSchemaDescriptor schemaDescriptor = this.getSchemaDescriptor(tableDescriptor, catalogVersion);
            CompletableFuture<List<Assignments>> stableAssignmentsFutureAfterInvoke = this.assignmentsService.createAndWriteTableAssignmentsToMetastorage(tableId, zoneDescriptor, tableDescriptor, causalityToken, catalogVersion);
            return this.createTableLocally(causalityToken, tableDescriptor, zoneDescriptor, schemaDescriptor, stableAssignmentsFutureAfterInvoke, RebalanceUtil.tablePendingAssignmentsGetLocally(this.metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken), RebalanceUtil.tableAssignmentsChainGetLocally(this.metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken), onNodeRecovery, this.catalogService.catalog(catalogVersion).time(), secondaryZoneDescriptor, catalogVersion);
        });
    }

    private CompletableFuture<Void> createTableLocally(long causalityToken, CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor, CatalogSchemaDescriptor schemaDescriptor, CompletableFuture<List<Assignments>> stableAssignmentsFuture, List<Assignments> pendingAssignments, List<AssignmentsChain> assignmentsChains, boolean onNodeRecovery, long assignmentsTimestamp, @Nullable CatalogZoneDescriptor secondaryZoneDescriptor, int catalogVersion) {
        TableImpl table = this.createTableImpl(causalityToken, tableDescriptor, zoneDescriptor, secondaryZoneDescriptor, schemaDescriptor);
        int tableId = tableDescriptor.id();
        this.tablesVv.update(causalityToken, (ignore, e) -> IgniteUtils.inBusyLock(this.busyLock, () -> {
            if (e != null) {
                return CompletableFuture.failedFuture(e);
            }
            return this.schemaManager.schemaRegistry(causalityToken, tableId).thenAccept(table::schemaView);
        }));
        CompletableFuture<Void> localPartsUpdateFuture = this.localPartitionsVv.update(causalityToken, (ignore, throwable) -> IgniteUtils.inBusyLock(this.busyLock, () -> stableAssignmentsFuture.thenComposeAsync(newAssignments -> {
            BitSetPartitionSet parts = new BitSetPartitionSet();
            for (int i = 0; i < newAssignments.size(); ++i) {
                Assignments partitionAssignments = (Assignments)newAssignments.get(i);
                if (this.localAssignment(partitionAssignments) == null) continue;
                parts.set(i);
            }
            return ((CompletableFuture)this.getOrCreatePartitionStorages(table, parts).thenRun(() -> this.localPartsByTableId.put(tableId, parts))).exceptionally(TableManager.ignoreTableClosedException());
        }, (Executor)this.ioExecutor)));
        CompletableFuture<Void> tablesByIdFuture = this.tablesVv.get(causalityToken);
        CompletableFuture<Void> createPartsFut = this.assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
            if (e != null) {
                return CompletableFuture.failedFuture(e);
            }
            return CompletableFuture.allOf(localPartsUpdateFuture, tablesByIdFuture).thenComposeAsync(ignore -> IgniteUtils.inBusyLock(this.busyLock, () -> {
                if (onNodeRecovery) {
                    SchemaRegistry schemaRegistry = table.schemaView();
                    PartitionSet partitionSet = this.localPartsByTableId.get(tableId);
                    HybridTimestamp lwm = this.lowWatermark.getLowWatermark();
                    IndexUtils.registerIndexesToTable(table, this.catalogService, partitionSet, schemaRegistry, lwm);
                }
                return this.startLocalPartitionsAndClients(stableAssignmentsFuture, pendingAssignments, assignmentsChains, table, onNodeRecovery, assignmentsTimestamp);
            }), (Executor)this.ioExecutor);
        });
        this.tables.put(tableId, table);
        CompletionStage loadToSecondaryZoneFuture = this.loadTableToSecondaryZoneOnTableCreateIfNeeded(table, tableDescriptor, secondaryZoneDescriptor, causalityToken, onNodeRecovery).thenRun(() -> {
            if (table.internalTable().hasSecondaryStorage()) {
                this.secondaryReplicationManager.startReplicationForTable(table, catalogVersion);
            }
        });
        return ((CompletableFuture)createPartsFut.thenCompose(arg_0 -> TableManager.lambda$createTableLocally$133((CompletableFuture)loadToSecondaryZoneFuture, arg_0))).thenRun(() -> this.startedTables.put(tableId, table));
    }

    private CompletableFuture<Void> loadTableToSecondaryZoneIfNeeded(TableViewInternal table, CatalogTableDescriptor tableDescriptor, @Nullable CatalogZoneDescriptor secondaryZoneDescriptor, long causalityToken) {
        return this.loadTableToSecondaryZoneOnTableCreateIfNeeded(table, tableDescriptor, secondaryZoneDescriptor, causalityToken, false);
    }

    private CompletableFuture<Void> loadTableToSecondaryZoneOnTableCreateIfNeeded(TableViewInternal table, CatalogTableDescriptor tableDescriptor, @Nullable CatalogZoneDescriptor secondaryZoneDescriptor, long causalityToken, boolean onNodeRecovery) {
        if (secondaryZoneDescriptor == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        CompletableFuture<Long> lockAcquiryFuture = this.partitionReplicaLifecycleManager.lockZoneForRead(secondaryZoneDescriptor.id());
        return ((CompletableFuture)lockAcquiryFuture.thenRunAsync(() -> {
            this.addTableToZone(secondaryZoneDescriptor.id(), table);
            if (onNodeRecovery) {
                return;
            }
            this.secondaryZoneManager.loadTableToSecondaryZoneOnTableCreateHavingZoneReadLock(causalityToken, table, secondaryZoneDescriptor, tableDescriptor);
        }, this.ioExecutor)).whenComplete((v, ex) -> this.unlockZoneForRead(secondaryZoneDescriptor, lockAcquiryFuture));
    }

    protected MvTableStorage createTableStorage(CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
        StorageEngine engine = this.dataStorageMgr.engineByStorageProfile(tableDescriptor.storageProfile());
        if (engine == null) {
            engine = new NullStorageEngine();
        }
        return engine.createMvTable(new StorageTableDescriptor(tableDescriptor.id(), zoneDescriptor.partitions(), tableDescriptor.storageProfile(), this.encryptionManager.encryptionEnabled()), new CatalogStorageIndexDescriptorSupplier(this.catalogService, this.lowWatermark));
    }

    private static SecondaryStorageBridge createSecondaryStoreBridge(String nodeName, Path path, LogSyncer logSyncer, FailureProcessor failureProcessor, ExecutorService ioExecutor, ScheduledExecutorService scheduledExecutor) {
        SecondaryStorageBridge secondaryStorageBridge = new RocksDbSecondaryStorageBridge(nodeName, path, logSyncer, failureProcessor, ioExecutor, scheduledExecutor);
        if (ThreadAssertions.enabled()) {
            secondaryStorageBridge = new ThreadAssertingSecondaryStorageBridge(secondaryStorageBridge);
        }
        return secondaryStorageBridge;
    }

    protected TxStateStorage createTxStateTableStorage(CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
        if (this.nodeProperties.colocationEnabled()) {
            return new BrokenTxStateStorage();
        }
        int tableId = tableDescriptor.id();
        TxStateStorage txStateStorage = new TxStateRocksDbStorage(tableId, zoneDescriptor.partitions(), this.sharedTxStateStorage);
        if (ThreadAssertions.enabled()) {
            txStateStorage = new ThreadAssertingTxStateStorage(txStateStorage);
        }
        txStateStorage.start();
        return txStateStorage;
    }

    private CompletableFuture<Void> destroyTableLocally(int tableId) {
        CompletionStage<Void> stopPartitionsFuture;
        TableViewInternal table = this.startedTables.remove(tableId);
        this.localPartsByTableId.remove(tableId);
        assert (table != null) : tableId;
        InternalTable internalTable = table.internalTable();
        if (!this.nodeProperties.colocationEnabled()) {
            Set<ByteArray> assignmentKeys = IntStream.range(0, internalTable.partitions()).mapToObj(p -> RebalanceUtil.stablePartAssignmentsKey(new TablePartitionId(tableId, p))).collect(Collectors.toSet());
            this.metaStorageMgr.removeAll(assignmentKeys).whenComplete((v, e) -> {
                if (e != null) {
                    LOG.error("Failed to remove assignments from metastorage [tableId={}]", (Throwable)e, (Object)tableId);
                }
            });
        }
        if (internalTable.hasSecondaryStorage()) {
            this.secondaryReplicationManager.abortFullStateTransferForTable(table);
            stopPartitionsFuture = this.secondaryReplicationManager.stopReplicationForTable(table).thenCompose(v -> CompletableFuture.allOf(this.stopAndDestroySecondaryStorageResources(table), this.stopAndDestroyTablePartitions(table)));
        } else {
            stopPartitionsFuture = this.stopAndDestroyTablePartitions(table);
        }
        return ((CompletableFuture)((CompletableFuture)stopPartitionsFuture.thenComposeAsync(unused -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            CompletableFuture<Void> tableStorageDestroyFuture = internalTable.storage().destroy();
            internalTable.txStateStorage().destroy();
            return tableStorageDestroyFuture;
        }), (Executor)this.ioExecutor)).thenAccept(unused -> IgniteUtils.inBusyLock(this.busyLock, () -> {
            this.tables.remove(tableId);
            this.schemaManager.dropRegistry(tableId);
        }))).whenComplete((v, e) -> {
            if (e != null && !ExceptionUtils.hasCause(e, NodeStoppingException.class)) {
                LOG.error("Unable to destroy table [name={}, tableId={}]", (Throwable)e, (Object)table.name(), (Object)tableId);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void destroyTableSecondaryStorage(InternalTable internalTable) {
        InternalTable internalTable2 = internalTable;
        synchronized (internalTable2) {
            SecondaryTableStorage secondaryStorage = internalTable.secondaryStorage();
            internalTable.removeSecondaryStorage();
            if (secondaryStorage != null) {
                secondaryStorage.destroy();
            }
        }
    }

    @Override
    public List<Table> tables() {
        return TableManager.sync(this.tablesAsync());
    }

    @Override
    public CompletableFuture<List<Table>> tablesAsync() {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesAsyncInternalBusy(false).thenApply(list0 -> list0));
    }

    @Override
    public List<Cache> caches() {
        return TableManager.sync(this.cachesAsync());
    }

    @Override
    public CompletableFuture<List<Cache>> cachesAsync() {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesAsyncInternalBusy(true).thenApply(list0 -> list0));
    }

    @Override
    @Nullable
    public Cache cache(QualifiedName name) {
        return TableManager.sync(this.cacheAsync(name));
    }

    @Override
    public CompletableFuture<Cache> cacheAsync(QualifiedName name) {
        return this.tableAsyncInternal(name).thenApply(desc -> desc == null ? null : (desc.cache() ? desc : null));
    }

    private CompletableFuture<List<TableViewInternal>> tablesAsyncInternalBusy(boolean cache) {
        HybridTimestamp now = this.clockService.now();
        return this.orStopManagerFuture(this.executorInclinedSchemaSyncService.waitForMetadataCompleteness(now)).thenCompose(unused -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            Catalog catalog = this.catalogService.activeCatalog(now.longValue());
            Collection<CatalogTableDescriptor> tableDescriptors = catalog.tables();
            if (tableDescriptors.isEmpty()) {
                return CompletableFutures.emptyListCompletedFuture();
            }
            CompletableFuture[] tableImplFutures = (CompletableFuture[])tableDescriptors.stream().filter(tableDescriptor -> tableDescriptor.cache() == cache).map(tableDescriptor -> this.tableAsyncInternalBusy(tableDescriptor.id())).toArray(CompletableFuture[]::new);
            return CompletableFutures.allOfToList(tableImplFutures);
        }));
    }

    private CompletableFuture<Map<Integer, TableViewInternal>> tablesById(long causalityToken) {
        return this.assignmentsUpdatedVv.get(causalityToken).thenApply(v -> Collections.unmodifiableMap(this.startedTables));
    }

    private Map<Integer, TableViewInternal> tablesById() {
        return Collections.unmodifiableMap(this.tables);
    }

    @TestOnly
    public Map<Integer, TableViewInternal> startedTables() {
        return Collections.unmodifiableMap(this.startedTables);
    }

    @Override
    public Table table(QualifiedName name) {
        return TableManager.sync(this.tableAsync(name));
    }

    @Override
    public TableViewInternal table(int id) throws NodeStoppingException {
        return TableManager.sync(this.tableAsync(id));
    }

    @Override
    public CompletableFuture<Table> tableAsync(QualifiedName name) {
        return this.tableAsyncInternal(name).thenApply(table -> table == null || table.cache() ? null : table);
    }

    public CompletableFuture<TableViewInternal> tableAsync(long causalityToken, int id) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesById(causalityToken).thenApply(tablesById -> (TableViewInternal)tablesById.get(id)));
    }

    @Override
    public CompletableFuture<TableViewInternal> tableAsync(int tableId) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            HybridTimestamp now = this.clockService.now();
            return this.orStopManagerFuture(this.executorInclinedSchemaSyncService.waitForMetadataCompleteness(now)).thenCompose(unused -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
                Catalog catalog = this.catalogService.activeCatalog(now.longValue());
                if (catalog.table(tableId) == null) {
                    return CompletableFutures.nullCompletedFuture();
                }
                return this.tableAsyncInternalBusy(tableId);
            }));
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<PartitionSet> localPartitionSetAsync(long causalityToken, int tableId) {
        if (!this.busyLock.enterBusy()) {
            throw new IgniteException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)new NodeStoppingException());
        }
        try {
            CompletionStage completionStage = this.localPartitionsVv.get(causalityToken).thenApply(unused -> this.localPartsByTableId.getOrDefault(tableId, PartitionSet.EMPTY_SET));
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    @Override
    public TableViewInternal tableView(QualifiedName name) {
        return TableManager.sync(this.tableViewAsync(name));
    }

    @Override
    public CompletableFuture<TableViewInternal> tableViewAsync(QualifiedName name) {
        return this.tableAsyncInternal(name);
    }

    private CompletableFuture<TableViewInternal> tableAsyncInternal(QualifiedName name) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            HybridTimestamp now = this.clockService.now();
            return this.orStopManagerFuture(this.executorInclinedSchemaSyncService.waitForMetadataCompleteness(now)).thenCompose(unused -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
                Catalog catalog = this.catalogService.activeCatalog(now.longValue());
                CatalogTableDescriptor tableDescriptor = catalog.table(name.schemaName(), name.objectName());
                if (tableDescriptor == null) {
                    return CompletableFutures.nullCompletedFuture();
                }
                return this.tableAsyncInternalBusy(tableDescriptor.id());
            }));
        });
    }

    private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int tableId) {
        TableViewInternal tableImpl = this.startedTables.get(tableId);
        if (tableImpl != null) {
            return CompletableFuture.completedFuture(tableImpl);
        }
        CompletableFuture getLatestTableFuture = new CompletableFuture();
        CompletionListener<Void> tablesListener = (token, v, th) -> {
            if (th == null) {
                CompletableFuture<Void> tablesFuture = this.tablesVv.get(token);
                tablesFuture.whenComplete((tables, e) -> {
                    if (e != null) {
                        getLatestTableFuture.completeExceptionally((Throwable)e);
                    } else {
                        getLatestTableFuture.complete(this.startedTables.get(tableId));
                    }
                });
            } else {
                getLatestTableFuture.completeExceptionally(th);
            }
        };
        this.assignmentsUpdatedVv.whenComplete(tablesListener);
        tableImpl = this.startedTables.get(tableId);
        if (tableImpl != null) {
            this.assignmentsUpdatedVv.removeWhenComplete(tablesListener);
            return CompletableFuture.completedFuture(tableImpl);
        }
        return this.orStopManagerFuture(getLatestTableFuture).whenComplete((unused, throwable) -> this.assignmentsUpdatedVv.removeWhenComplete(tablesListener));
    }

    private static <T> T sync(CompletableFuture<T> future) {
        return IgniteUtils.getInterruptibly(future);
    }

    private static RuntimeException convertThrowable(Throwable th) {
        if (th instanceof RuntimeException) {
            return (RuntimeException)th;
        }
        return new IgniteException(ErrorGroups.Common.INTERNAL_ERR, th);
    }

    private WatchListener createPendingAssignmentsRebalanceListener() {
        return evt -> {
            if (!this.busyLock.enterBusy()) {
                return CompletableFuture.failedFuture(new NodeStoppingException());
            }
            try {
                Entry newEntry = evt.entryEvent().newEntry();
                CompletableFuture<Void> completableFuture = this.handleChangePendingAssignmentEvent(newEntry, evt.revision(), false);
                return completableFuture;
            }
            finally {
                this.busyLock.leaveBusy();
            }
        };
    }

    private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry pendingAssignmentsEntry, long revision, boolean isRecovery) {
        if (pendingAssignmentsEntry.value() == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        TablePartitionId replicaGrpId = RebalanceUtil.extractTablePartitionId(pendingAssignmentsEntry.key(), RebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES);
        Assignments stableAssignments = RebalanceUtil.stableAssignmentsGetLocally(this.metaStorageMgr, replicaGrpId, revision);
        AssignmentsChain assignmentsChain = RebalanceUtil.assignmentsChainGetLocally(this.metaStorageMgr, replicaGrpId, revision);
        return ((CompletableFuture)this.tablesVv.get(revision).thenApply(ignore -> {
            if (!this.busyLock.enterBusy()) {
                return CompletableFuture.failedFuture(new NodeStoppingException());
            }
            try {
                TableViewInternal table = this.tables.get(replicaGrpId.tableId());
                if (table == null) {
                    if (LOG.isInfoEnabled()) {
                        LOG.info("Skipping Pending Assignments update, because table {} does not exist", replicaGrpId.tableId());
                    }
                    CompletableFuture completableFuture = CompletableFutures.nullCompletedFuture();
                    return completableFuture;
                }
                AssignmentsQueue pendingAssignmentsQueue = AssignmentsQueue.fromBytes(pendingAssignmentsEntry.value());
                if (LOG.isInfoEnabled()) {
                    String stringKey = new String(pendingAssignmentsEntry.key(), StandardCharsets.UTF_8);
                    LOG.info("Received update on pending assignments. Check if new raft group should be started [key={}, partition={}, table={}, localMemberAddress={}, pendingAssignmentsQueue={}, revision={}]", stringKey, replicaGrpId.partitionId(), table.name(), this.localNode().address(), pendingAssignmentsQueue, revision);
                }
                Assignments pendingAssignments = pendingAssignmentsQueue == null ? Assignments.EMPTY : pendingAssignmentsQueue.poll();
                CompletionStage completionStage = this.handleChangePendingAssignmentEvent(replicaGrpId, table, stableAssignments, pendingAssignments, assignmentsChain, revision, isRecovery).thenAccept(v -> this.executeIfLocalNodeIsPrimaryForGroup(replicaGrpId, replicaMeta -> this.sendChangePeersAndLearnersRequest((ReplicaMeta)replicaMeta, replicaGrpId, pendingAssignments, revision)));
                return completionStage;
            }
            finally {
                this.busyLock.leaveBusy();
            }
        })).thenCompose(Function.identity());
    }

    private CompletableFuture<Void> handleChangePendingAssignmentEvent(TablePartitionId replicaGrpId, TableViewInternal tbl, @Nullable Assignments stableAssignments, Assignments pendingAssignments, @Nullable AssignmentsChain assignmentsChain, long revision, boolean isRecovery) {
        boolean shouldStartLocalGroupNode;
        boolean pendingAssignmentsAreForced = pendingAssignments.force();
        Set<Assignment> pendingAssignmentsNodes = pendingAssignments.nodes();
        long assignmentsTimestamp = pendingAssignments.timestamp();
        Assignment localAssignmentInPending = this.localAssignment(pendingAssignments);
        Assignment localAssignmentInStable = this.localAssignment(stableAssignments);
        if (isRecovery) {
            if (TableManager.lastRebalanceWasGraceful(assignmentsChain)) {
                shouldStartLocalGroupNode = localAssignmentInPending != null;
            } else {
                LOG.warn("Recovery after a forced rebalance for table is not supported yet [tablePartitionId={}].", replicaGrpId);
                shouldStartLocalGroupNode = localAssignmentInPending != null;
            }
        } else {
            boolean bl = shouldStartLocalGroupNode = localAssignmentInPending != null && localAssignmentInStable == null;
        }
        Assignments computedStableAssignments = stableAssignments == null || stableAssignments.nodes().isEmpty() ? Assignments.forced(pendingAssignmentsNodes, assignmentsTimestamp) : (pendingAssignmentsAreForced ? pendingAssignments : stableAssignments);
        CompletionStage localServicesStartFuture = shouldStartLocalGroupNode ? this.localPartitionsVv.get(revision).thenComposeAsync(unused -> this.createPartitionAndStartClient(replicaGrpId, tbl, isRecovery, assignmentsTimestamp, localAssignmentInPending, computedStableAssignments), (Executor)this.ioExecutor) : this.waitForMetadataCompleteness(assignmentsTimestamp).thenRunAsync(() -> {
            PeersAndLearners stablePeersAndLearners = PeersAndLearners.fromAssignments(computedStableAssignments.nodes());
            if (pendingAssignmentsAreForced && localAssignmentInPending != null) {
                assert (this.replicaMgr.isReplicaStarted(replicaGrpId)) : "The local node is outside of the replication group: " + replicaGrpId;
                this.replicaMgr.resetPeers(replicaGrpId, stablePeersAndLearners);
            }
        }, this.ioExecutor);
        return ((CompletableFuture)((CompletableFuture)localServicesStartFuture).thenComposeAsync(v -> IgniteUtils.inBusyLock(this.busyLock, () -> this.isLocalNodeIsPrimary(replicaGrpId)), (Executor)this.ioExecutor)).thenAcceptAsync(isLeaseholder -> IgniteUtils.inBusyLock(this.busyLock, () -> {
            boolean isLocalNodeInStableOrPending = this.isNodeInReducedStableOrPendingAssignments(replicaGrpId, stableAssignments, pendingAssignments, revision);
            if (!isLocalNodeInStableOrPending && !isLeaseholder.booleanValue()) {
                return;
            }
            assert (isLocalNodeInStableOrPending || isLeaseholder.booleanValue()) : "The local node is outside of the replication group [inStableOrPending=" + isLocalNodeInStableOrPending + ", isLeaseholder=" + isLeaseholder + "].";
            if (isRecovery && !this.replicaMgr.isReplicaStarted(replicaGrpId)) {
                return;
            }
            assert (this.replicaMgr.isReplicaStarted(replicaGrpId)) : "The local node is outside of the replication group [stable=" + stableAssignments + ", pending=" + pendingAssignments + ", localName=" + this.localNode().name() + "].";
            Set<Assignment> newAssignments = pendingAssignmentsAreForced || stableAssignments == null ? pendingAssignmentsNodes : RebalanceUtil.union(pendingAssignmentsNodes, stableAssignments.nodes());
            this.replicaMgr.replica(replicaGrpId).thenAccept(replica -> replica.updatePeersAndLearners(PeersAndLearners.fromAssignments(newAssignments)));
        }), (Executor)this.ioExecutor);
    }

    private CompletableFuture<Void> createPartitionAndStartClient(TablePartitionId replicaGrpId, TableViewInternal tbl, boolean isRecovery, long assignmentsTimestamp, Assignment localAssignmentInPending, Assignments computedStableAssignments) {
        int partitionId = replicaGrpId.partitionId();
        PartitionSet singlePartitionIdSet = PartitionSet.of(partitionId);
        return IgniteUtils.inBusyLock(this.busyLock, () -> ((CompletableFuture)this.getOrCreatePartitionStorages(tbl, singlePartitionIdSet).thenRun(() -> this.localPartsByTableId.compute(replicaGrpId.tableId(), (tableId, oldPartitionSet) -> TableManager.extendPartitionSet(oldPartitionSet, partitionId)))).exceptionally(TableManager.ignoreTableClosedException())).thenComposeAsync(unused -> IgniteUtils.inBusyLock(this.busyLock, () -> {
            this.lowWatermark.getLowWatermarkSafe(lwm -> IndexUtils.registerIndexesToTable(tbl, this.catalogService, singlePartitionIdSet, tbl.schemaView(), lwm));
            return this.waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(ignored -> IgniteUtils.inBusyLock(this.busyLock, () -> {
                assert (localAssignmentInPending != null) : "Local member assignment";
                return this.startPartitionAndStartClient(tbl, replicaGrpId.partitionId(), localAssignmentInPending, computedStableAssignments, isRecovery, assignmentsTimestamp);
            }));
        }), (Executor)this.ioExecutor);
    }

    private static boolean lastRebalanceWasGraceful(@Nullable AssignmentsChain assignmentsChain) {
        return assignmentsChain == null || assignmentsChain.size() == 1;
    }

    private static PartitionSet extendPartitionSet(@Nullable PartitionSet oldPartitionSet, int partitionId) {
        PartitionSet newPartitionSet = Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new);
        newPartitionSet.set(partitionId);
        return newPartitionSet;
    }

    private void executeIfLocalNodeIsPrimaryForGroup(ReplicationGroupId groupId, Consumer<ReplicaMeta> toExecute) {
        CompletableFuture<ReplicaMeta> primaryReplicaFuture = this.getPrimaryReplica(groupId);
        this.isLocalNodeIsPrimary(primaryReplicaFuture).thenAccept(isPrimary -> {
            if (isPrimary.booleanValue()) {
                primaryReplicaFuture.thenAccept((Consumer)toExecute);
            }
        });
    }

    private void sendChangePeersAndLearnersRequest(ReplicaMeta replicaMeta, TablePartitionId replicationGroupId, Assignments pendingAssignments, long currentRevision) {
        this.metaStorageMgr.get(RebalanceUtil.pendingPartAssignmentsQueueKey(replicationGroupId)).thenAccept(latestPendingAssignmentsEntry -> {
            if (currentRevision < latestPendingAssignmentsEntry.revision()) {
                return;
            }
            TablePartitionIdMessage partitionIdMessage = ReplicaMessageUtils.toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, replicationGroupId);
            ChangePeersAndLearnersAsyncReplicaRequest request = TABLE_MESSAGES_FACTORY.changePeersAndLearnersAsyncReplicaRequest().groupId(partitionIdMessage).pendingAssignments(pendingAssignments.toBytes()).enlistmentConsistencyToken(replicaMeta.getStartTime().longValue()).build();
            this.replicaSvc.invoke(this.localNode(), (ReplicaRequest)request);
        });
    }

    private boolean isNodeInReducedStableOrPendingAssignments(TablePartitionId replicaGrpId, @Nullable Assignments stableAssignments, Assignments pendingAssignments, long revision) {
        Entry reduceEntry = this.metaStorageMgr.getLocally(RebalanceUtil.switchReduceKey(replicaGrpId), revision);
        Assignments reduceAssignments = reduceEntry != null ? Assignments.fromBytes(reduceEntry.value()) : null;
        Set<Assignment> reducedStableAssignments = reduceAssignments != null ? RebalanceUtil.subtract(stableAssignments.nodes(), reduceAssignments.nodes()) : stableAssignments.nodes();
        return this.isLocalNodeInAssignments(RebalanceUtil.union(reducedStableAssignments, pendingAssignments.nodes()));
    }

    private SnapshotStorageFactory createSnapshotStorageFactory(TablePartitionId replicaGrpId, PartitionUpdateHandlers partitionUpdateHandlers, InternalTable internalTable) {
        int partitionId = replicaGrpId.partitionId();
        PartitionMvStorageAccessImpl partitionAccess = new PartitionMvStorageAccessImpl(partitionId, internalTable.storage(), this.mvGc, partitionUpdateHandlers.indexUpdateHandler, partitionUpdateHandlers.gcUpdateHandler, this.fullStateTransferIndexChooser, this.schemaManager.schemaRegistry(replicaGrpId.tableId()), this.lowWatermark);
        PartitionTxStateAccessImpl txStateAccess = new PartitionTxStateAccessImpl(internalTable.txStateStorage().getPartitionStorage(partitionId));
        PartitionSnapshotStorage snapshotStorage = new PartitionSnapshotStorage(new TablePartitionKey(replicaGrpId.tableId(), replicaGrpId.partitionId()), this.topologyService, this.outgoingSnapshotsManager, txStateAccess, this.catalogService, this.failureProcessor, this.incomingSnapshotsExecutor);
        snapshotStorage.addMvPartition(internalTable.tableId(), partitionAccess);
        return new PartitionSnapshotStorageFactory(snapshotStorage);
    }

    private WatchListener createStableAssignmentsRebalanceListener() {
        return evt -> {
            if (!this.busyLock.enterBusy()) {
                return CompletableFuture.failedFuture(new NodeStoppingException());
            }
            try {
                CompletableFuture<Void> completableFuture = this.handleChangeStableAssignmentEvent(evt);
                return completableFuture;
            }
            finally {
                this.busyLock.leaveBusy();
            }
        };
    }

    private WatchListener createAssignmentsSwitchRebalanceListener() {
        return evt -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            byte[] key = evt.entryEvent().newEntry().key();
            TablePartitionId replicaGrpId = RebalanceUtil.extractTablePartitionId(key, RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES);
            return this.tablesById(evt.revision()).thenCompose(tables -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
                Assignments assignments = Assignments.fromBytes(evt.entryEvent().newEntry().value());
                long assignmentsTimestamp = assignments.timestamp();
                return this.reliableCatalogVersions.safeReliableCatalogFor(HybridTimestamp.hybridTimestamp(assignmentsTimestamp)).thenCompose(catalog -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
                    CatalogTableDescriptor tableDescriptor = TableManager.getTableDescriptor(replicaGrpId.tableId(), catalog);
                    CatalogZoneDescriptor zoneDescriptor = TableManager.getZoneDescriptor(tableDescriptor, catalog);
                    return this.handleReduceChanged(evt, catalog.version(), zoneDescriptor, replicaGrpId, assignmentsTimestamp);
                }));
            }));
        });
    }

    private CompletableFuture<Void> handleReduceChanged(WatchEvent evt, int catalogVersion, CatalogZoneDescriptor zoneDescriptor, TablePartitionId replicaGrpId, long assignmentsTimestamp) {
        return this.distributionZoneManager.dataNodes(zoneDescriptor.updateTimestamp(), catalogVersion, zoneDescriptor.id()).thenCompose(dataNodes -> RebalanceUtilEx.handleReduceChanged(this.metaStorageMgr, dataNodes, zoneDescriptor.partitions(), zoneDescriptor.replicas(), zoneDescriptor.consensusGroupSize(), replicaGrpId, evt, assignmentsTimestamp));
    }

    private static PartitionStorages getPartitionStorages(TableViewInternal table, int partitionId) {
        MvPartitionStorage mvPartition;
        InternalTable internalTable = table.internalTable();
        try {
            mvPartition = internalTable.storage().getMvPartition(partitionId);
        }
        catch (StorageClosedException e) {
            throw new TableClosedException(table.tableId(), (Throwable)e);
        }
        assert (mvPartition != null) : "tableId=" + table.tableId() + ", partitionId=" + partitionId;
        TxStatePartitionStorage txStatePartitionStorage = internalTable.txStateStorage().getPartitionStorage(partitionId);
        assert (txStatePartitionStorage != null) : "tableId=" + table.tableId() + ", partitionId=" + partitionId;
        return new PartitionStorages(mvPartition, txStatePartitionStorage);
    }

    private CompletableFuture<Void> getOrCreatePartitionStorages(TableViewInternal table, PartitionSet partitions) {
        InternalTable internalTable = table.internalTable();
        CompletableFuture[] storageFuts = (CompletableFuture[])partitions.stream().mapToObj(partitionId -> {
            MvPartitionStorage mvPartition;
            try {
                mvPartition = internalTable.storage().getMvPartition(partitionId);
            }
            catch (StorageClosedException e) {
                return CompletableFuture.failedFuture(new TableClosedException(table.tableId(), (Throwable)e));
            }
            return (mvPartition != null ? CompletableFuture.completedFuture(mvPartition) : internalTable.storage().createMvPartition(partitionId)).thenComposeAsync(mvPartitionStorage -> {
                TxStatePartitionStorage txStatePartitionStorage = internalTable.txStateStorage().getOrCreatePartitionStorage(partitionId);
                if (mvPartitionStorage.lastAppliedIndex() == -1L || !this.nodeProperties.colocationEnabled() && txStatePartitionStorage.lastAppliedIndex() == -1L) {
                    if (this.nodeProperties.colocationEnabled()) {
                        return internalTable.storage().clearPartition(partitionId);
                    }
                    return CompletableFuture.allOf(internalTable.storage().clearPartition(partitionId), txStatePartitionStorage.clear());
                }
                return CompletableFutures.nullCompletedFuture();
            }, (Executor)this.ioExecutor);
        }).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(storageFuts);
    }

    protected CompletableFuture<Void> handleChangeStableAssignmentEvent(WatchEvent evt) {
        if (evt.entryEvents().stream().allMatch(e -> e.oldEntry().value() == null)) {
            return CompletableFutures.nullCompletedFuture();
        }
        if (!evt.single()) {
            assert (evt.entryEvents().stream().allMatch(entryEvent -> entryEvent.newEntry().tombstone())) : evt;
            return CompletableFutures.nullCompletedFuture();
        }
        assert (evt.single()) : evt;
        if (evt.entryEvent().oldEntry() == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        Entry stableAssignmentsWatchEvent = evt.entryEvent().newEntry();
        long revision = evt.revision();
        assert (stableAssignmentsWatchEvent.revision() == revision) : stableAssignmentsWatchEvent;
        if (stableAssignmentsWatchEvent.value() == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        return this.handleChangeStableAssignmentEvent(stableAssignmentsWatchEvent, evt.revision(), false);
    }

    protected CompletableFuture<Void> handleChangeStableAssignmentEvent(Entry stableAssignmentsWatchEvent, long revision, boolean isRecovery) {
        TablePartitionId tablePartitionId = RebalanceUtil.extractTablePartitionId(stableAssignmentsWatchEvent.key(), RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES);
        Set stableAssignments = stableAssignmentsWatchEvent.value() == null ? Collections.emptySet() : Assignments.fromBytes(stableAssignmentsWatchEvent.value()).nodes();
        return CompletableFuture.supplyAsync(() -> {
            Entry pendingAssignmentsEntry = this.metaStorageMgr.getLocally(RebalanceUtil.pendingPartAssignmentsQueueKey(tablePartitionId), revision);
            AssignmentsQueue pendingAssignmentsQueue = AssignmentsQueue.fromBytes(pendingAssignmentsEntry.value());
            if (LOG.isInfoEnabled()) {
                String stringKey = new String(stableAssignmentsWatchEvent.key(), StandardCharsets.UTF_8);
                LOG.info("Received update on stable assignments [key={}, partition={}, localMemberAddress={}, stableAssignments={}, pendingAssignmentsQueue={}, revision={}]", stringKey, tablePartitionId, this.localNode().address(), stableAssignments, pendingAssignmentsQueue, revision);
            }
            Assignments pendingAssignments = pendingAssignmentsQueue == null ? Assignments.EMPTY : pendingAssignmentsQueue.poll();
            return this.stopAndDestroyTablePartitionAndUpdateClients(tablePartitionId, stableAssignments, pendingAssignments, isRecovery, revision);
        }, this.ioExecutor).thenCompose(Function.identity());
    }

    private CompletableFuture<Void> updatePartitionClients(TablePartitionId tablePartitionId, Set<Assignment> stableAssignments, Set<Assignment> pendingAssignments) {
        return this.isLocalNodeIsPrimary(tablePartitionId).thenCompose(isLeaseholder -> IgniteUtils.inBusyLock(this.busyLock, () -> {
            boolean isLocalInStable = this.isLocalNodeInAssignments(stableAssignments);
            if (!isLocalInStable && !isLeaseholder.booleanValue()) {
                return CompletableFutures.nullCompletedFuture();
            }
            assert (this.replicaMgr.isReplicaStarted(tablePartitionId)) : "The local node is outside of the replication group [stable=" + stableAssignments + ", isLeaseholder=" + isLeaseholder + "].";
            return this.replicaMgr.replica(tablePartitionId).thenAccept(replica -> replica.updatePeersAndLearners(PeersAndLearners.fromAssignments(RebalanceUtil.union(stableAssignments, pendingAssignments))));
        }));
    }

    private CompletableFuture<Void> stopAndDestroyTablePartitionAndUpdateClients(TablePartitionId tablePartitionId, Set<Assignment> stableAssignments, Assignments pendingAssignments, boolean isRecovery, long revision) {
        CompletableFuture<Void> clientUpdateFuture = isRecovery ? CompletableFutures.nullCompletedFuture() : this.updatePartitionClients(tablePartitionId, stableAssignments, pendingAssignments.nodes());
        boolean shouldStopLocalServices = (pendingAssignments.force() ? pendingAssignments.nodes().stream() : Stream.concat(stableAssignments.stream(), pendingAssignments.nodes().stream())).noneMatch(this.isLocalNodeAssignment);
        if (shouldStopLocalServices) {
            return CompletableFuture.allOf(clientUpdateFuture, this.weakStopAndDestroyTablePartition(tablePartitionId, revision));
        }
        return clientUpdateFuture;
    }

    private CompletableFuture<Void> weakStopAndDestroyTablePartition(TablePartitionId tablePartitionId, long causalityToken) {
        return this.replicaMgr.weakStopReplica(tablePartitionId, ReplicaManager.WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS, () -> this.stopAndDestroyTablePartition(tablePartitionId, causalityToken));
    }

    private CompletableFuture<Void> stopAndDestroyTablePartitions(TableViewInternal table) {
        InternalTable internalTable = table.internalTable();
        int partitions = internalTable.partitions();
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(internalTable.zoneId(), id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> writeLockAcquisitionFuture = zoneLock.writeLock();
        try {
            return ((CompletableFuture)writeLockAcquisitionFuture.thenCompose(stamp -> {
                CompletableFuture[] stopReplicaAndDestroyFutures = new CompletableFuture[partitions];
                for (int partitionId = 0; partitionId < partitions; ++partitionId) {
                    CompletableFuture<Object> resourcesUnloadFuture = this.nodeProperties.colocationEnabled() ? this.partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(new ZonePartitionId(internalTable.zoneId(), partitionId), internalTable.tableId()) : CompletableFutures.nullCompletedFuture();
                    TablePartitionId tablePartitionId = new TablePartitionId(internalTable.tableId(), partitionId);
                    stopReplicaAndDestroyFutures[partitionId] = resourcesUnloadFuture.thenCompose(v -> this.stopAndDestroyTablePartition(tablePartitionId, table, true));
                }
                return CompletableFuture.allOf(stopReplicaAndDestroyFutures).whenComplete((res, th) -> this.tablesPerZone.getOrDefault(internalTable.zoneId(), Collections.emptySet()).remove(table));
            })).whenComplete((unused, t) -> writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite));
        }
        catch (Throwable t2) {
            writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite);
            throw t2;
        }
    }

    private CompletableFuture<Void> stopAndDestroyTablePartition(TablePartitionId tablePartitionId, long causalityToken) {
        CompletableFuture<Object> tokenFuture;
        try {
            tokenFuture = this.tablesVv.get(causalityToken);
        }
        catch (OutdatedTokenException e) {
            tokenFuture = CompletableFutures.nullCompletedFuture();
        }
        return tokenFuture.thenCompose(ignore -> {
            TableViewInternal table = this.tables.get(tablePartitionId.tableId());
            assert (table != null) : tablePartitionId;
            return this.stopAndDestroyTablePartition(tablePartitionId, table, false);
        });
    }

    private CompletableFuture<Void> stopAndDestroyTablePartition(TablePartitionId tablePartitionId, TableViewInternal table, boolean destroyingWholeTable) {
        return this.stopTablePartition(tablePartitionId, table).thenComposeAsync(v -> this.destroyPartitionStorages(tablePartitionId, table, destroyingWholeTable), (Executor)this.ioExecutor);
    }

    private CompletableFuture<Void> stopPartitionForRestart(TablePartitionId tablePartitionId, TableViewInternal table) {
        return this.replicaMgr.weakStopReplica(tablePartitionId, ReplicaManager.WeakReplicaStopReason.RESTART, () -> this.stopTablePartition(tablePartitionId, table));
    }

    private CompletableFuture<Void> stopPartitionAndDestroyForRestart(TablePartitionId tablePartitionId, TableViewInternal table) {
        return this.replicaMgr.weakStopReplica(tablePartitionId, ReplicaManager.WeakReplicaStopReason.RESTART, () -> this.stopAndDestroyTablePartition(tablePartitionId, table, false));
    }

    private CompletableFuture<Void> stopTablePartition(TablePartitionId tablePartitionId, TableViewInternal table) {
        CompletableFuture<Boolean> stopReplicaFuture;
        try {
            stopReplicaFuture = this.nodeProperties.colocationEnabled() ? CompletableFutures.trueCompletedFuture() : this.replicaMgr.stopReplica(tablePartitionId);
        }
        catch (NodeStoppingException e) {
            stopReplicaFuture = CompletableFutures.falseCompletedFuture();
        }
        return stopReplicaFuture.thenCompose(v -> {
            TableManager.closePartitionTrackers(table.internalTable(), tablePartitionId.partitionId());
            this.minTimeCollectorService.removePartition(tablePartitionId);
            PartitionModificationCounterMetricSource metricSource = this.partModCounterMetricSources.remove(tablePartitionId);
            if (metricSource != null) {
                try {
                    this.metricManager.unregisterSource(metricSource);
                }
                catch (Exception e) {
                    String message = "Failed to register metrics source for table [name={}, partitionId={}].";
                    LOG.warn(message, e, table.name(), tablePartitionId.partitionId());
                }
            }
            return this.mvGc.removeStorage(tablePartitionId);
        });
    }

    private CompletableFuture<Void> destroyPartitionStorages(TablePartitionId tablePartitionId, TableViewInternal table, boolean destroyingWholeTable) {
        InternalTable internalTable = table.internalTable();
        int partitionId = tablePartitionId.partitionId();
        ArrayList<CompletableFuture<Void>> destroyFutures = new ArrayList<CompletableFuture<Void>>();
        try {
            if (internalTable.storage().getMvPartition(partitionId) != null) {
                destroyFutures.add(internalTable.storage().destroyPartition(partitionId));
            }
        }
        catch (StorageDestroyedException storageDestroyedException) {
        }
        catch (StorageClosedException storageClosedException) {
            // empty catch block
        }
        if (!this.nodeProperties.colocationEnabled()) {
            if (internalTable.txStateStorage().getPartitionStorage(partitionId) != null) {
                destroyFutures.add(CompletableFuture.runAsync(() -> internalTable.txStateStorage().destroyPartitionStorage(partitionId), this.ioExecutor));
            }
            destroyFutures.add(CompletableFuture.runAsync(() -> this.destroyReplicationProtocolStorages(tablePartitionId, table, !destroyingWholeTable), this.ioExecutor));
        }
        return CompletableFuture.allOf(destroyFutures.toArray(new CompletableFuture[0]));
    }

    @TestOnly
    public ReplicaManager getReplicaManager() {
        return this.replicaMgr;
    }

    private void destroyReplicationProtocolStorages(TablePartitionId tablePartitionId, TableViewInternal table, boolean destroyDurably) {
        InternalTableImpl internalTbl = (InternalTableImpl)table.internalTable();
        this.destroyReplicationProtocolStorages(tablePartitionId, internalTbl.storage().isVolatile(), destroyDurably);
    }

    private void destroyReplicationProtocolStorages(TablePartitionId tablePartitionId, boolean isVolatileStorage, boolean destroyDurably) {
        try {
            if (destroyDurably) {
                this.replicaMgr.destroyReplicationProtocolStoragesDurably(tablePartitionId, isVolatileStorage);
            } else {
                this.replicaMgr.destroyReplicationProtocolStorages(tablePartitionId, isVolatileStorage);
            }
        }
        catch (NodeStoppingException e) {
            throw new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)e);
        }
    }

    private static void closePartitionTrackers(InternalTable internalTable, int partitionId) {
        TableManager.closeTracker(internalTable.getPartitionSafeTimeTracker(partitionId));
        TableManager.closeTracker(internalTable.getPartitionStorageIndexTracker(partitionId));
    }

    private static void closeTracker(@Nullable PendingComparableValuesTracker<?, Void> tracker) {
        if (tracker != null) {
            tracker.close();
        }
    }

    private InternalClusterNode localNode() {
        return this.topologyService.localMember();
    }

    private PartitionUpdateHandlers createPartitionUpdateHandlers(int partitionId, PartitionDataStorage partitionDataStorage, TableViewInternal table, PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker, ReplicationConfiguration replicationConfiguration) {
        TableIndexStoragesSupplier indexes = table.indexStorageAdapters(partitionId);
        IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(indexes);
        GcUpdateHandler gcUpdateHandler = new GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler);
        PartitionModificationCounterFactory.SizeSupplier partSizeSupplier = () -> partitionDataStorage.getStorage().estimatedSize();
        PartitionModificationCounter modificationCounter = this.partitionModificationCounterFactory.create(partSizeSupplier, table::stalenessConfiguration);
        this.registerPartitionModificationCounterMetrics(table, partitionId, modificationCounter);
        StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(partitionId, partitionDataStorage, indexUpdateHandler, replicationConfiguration, modificationCounter);
        return new PartitionUpdateHandlers(storageUpdateHandler, indexUpdateHandler, gcUpdateHandler);
    }

    private void registerPartitionModificationCounterMetrics(TableViewInternal table, int partitionId, PartitionModificationCounter counter) {
        PartitionModificationCounterMetricSource metricSource = new PartitionModificationCounterMetricSource(table.tableId(), partitionId);
        metricSource.addMetric(new LongGauge("modificationCount", "The value of the volatile counter of partition modifications. This value is used to determine staleness of the related SQL statistics.", counter::value));
        metricSource.addMetric(new LongGauge("nextMilestone", "The value of the next milestone for the number of partition modifications. This value is used to determine staleness of the related SQL statistics.", counter::nextMilestone));
        metricSource.addMetric(new LongGauge("lastMilestoneTimestamp", "The timestamp value representing the commit time of the last modification operation that reached the milestone. This value is used to determine staleness of the related SQL statistics.", () -> counter.lastMilestoneTimestamp().longValue()));
        try {
            this.metricManager.registerSource(metricSource);
            this.metricManager.enable(metricSource);
            this.partModCounterMetricSources.put(new TablePartitionId(table.tableId(), partitionId), metricSource);
        }
        catch (Exception e) {
            LOG.warn("Failed to register metrics source for table [name={}, partitionId={}].", e, table.name(), partitionId);
        }
    }

    @Override
    @Nullable
    public TableViewInternal cachedTable(int tableId) {
        return this.tables.get(tableId);
    }

    @TestOnly
    @Nullable
    public TableViewInternal cachedTable(String name) {
        return TableManager.findTableImplByName(this.tables.values(), name);
    }

    public List<TableViewInternal> cachedTables() {
        return this.tables.values().stream().collect(Collectors.toUnmodifiableList());
    }

    private static CatalogTableDescriptor getTableDescriptor(int tableId, Catalog catalog) {
        CatalogTableDescriptor tableDescriptor = catalog.table(tableId);
        assert (tableDescriptor != null) : "tableId=" + tableId + ", catalogVersion=" + catalog.version();
        return tableDescriptor;
    }

    private CatalogZoneDescriptor getZoneDescriptor(CatalogTableDescriptor tableDescriptor, int catalogVersion) {
        return TableManager.getZoneDescriptor(tableDescriptor, this.catalogService.catalog(catalogVersion));
    }

    private static CatalogZoneDescriptor getZoneDescriptor(CatalogTableDescriptor tableDescriptor, Catalog catalog) {
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(tableDescriptor.zoneId());
        assert (zoneDescriptor != null) : "tableId=" + tableDescriptor.id() + ", zoneId=" + tableDescriptor.zoneId() + ", catalogVersion=" + catalog.version();
        return zoneDescriptor;
    }

    @Nullable
    private CatalogZoneDescriptor getSecondaryZoneDescriptor(CatalogTableDescriptor tableDescriptor, int catalogVersion) {
        Integer secondaryZoneId = tableDescriptor.secondaryZoneId();
        if (secondaryZoneId == null) {
            return null;
        }
        return this.getSecondaryZoneDescriptor(secondaryZoneId, tableDescriptor.id(), catalogVersion);
    }

    private CatalogZoneDescriptor getSecondaryZoneDescriptor(int secondaryZoneId, int tableId, int catalogVersion) {
        CatalogZoneDescriptor zoneDescriptor = this.catalogService.catalog(catalogVersion).zone(secondaryZoneId);
        assert (zoneDescriptor != null) : "tableId=" + tableId + ", zoneId=" + secondaryZoneId + ", catalogVersion=" + catalogVersion;
        return zoneDescriptor;
    }

    private CatalogSchemaDescriptor getSchemaDescriptor(CatalogTableDescriptor tableDescriptor, int catalogVersion) {
        CatalogSchemaDescriptor schemaDescriptor = this.catalogService.catalog(catalogVersion).schema(tableDescriptor.schemaId());
        assert (schemaDescriptor != null) : "tableId=" + tableDescriptor.id() + ", schemaId=" + tableDescriptor.schemaId() + ", catalogVersion=" + catalogVersion;
        return schemaDescriptor;
    }

    @Nullable
    private static TableViewInternal findTableImplByName(Collection<TableViewInternal> tables, String name) {
        return tables.stream().filter(table -> table.qualifiedName().equals(QualifiedName.fromSimple(name))).findAny().orElse(null);
    }

    private CompletableFuture<Void> recoverTables(long recoveryRevision, @Nullable HybridTimestamp lwm) {
        int earliestCatalogVersion = lwm == null ? this.catalogService.earliestCatalogVersion() : this.catalogService.activeCatalogVersion(lwm.longValue());
        int latestCatalogVersion = this.catalogService.latestCatalogVersion();
        IntOpenHashSet startedTables = new IntOpenHashSet();
        ArrayList startTableFutures = new ArrayList();
        Catalog nextCatalog = null;
        for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion; --ver) {
            Catalog catalog = this.catalogService.catalog(ver);
            for (CatalogTableDescriptor tableDescriptor : catalog.tables()) {
                CompletableFuture<Object> startTableFuture;
                boolean destroyTableEventFired;
                int tableId = tableDescriptor.id();
                boolean bl = destroyTableEventFired = nextCatalog != null && nextCatalog.table(tableId) == null;
                if (destroyTableEventFired) {
                    this.destructionEventsQueue.enqueue(new DestroyTableEvent(nextCatalog.version(), tableId));
                }
                if (!startedTables.add(tableId)) continue;
                if (this.nodeProperties.colocationEnabled()) {
                    CatalogZoneDescriptor zoneDescriptor = this.getZoneDescriptor(tableDescriptor, ver);
                    CatalogZoneDescriptor secondaryZoneDescriptor = this.getSecondaryZoneDescriptor(tableDescriptor, ver);
                    CatalogSchemaDescriptor schemaDescriptor = this.getSchemaDescriptor(tableDescriptor, ver);
                    startTableFuture = this.prepareTableResourcesOnRecovery(recoveryRevision, zoneDescriptor, secondaryZoneDescriptor, tableDescriptor, schemaDescriptor);
                    if (destroyTableEventFired) {
                        this.unregisterMetricsSource(this.tables.get(tableId));
                    }
                } else {
                    startTableFuture = this.createTableLocally(recoveryRevision, ver, tableDescriptor, true);
                }
                startTableFutures.add(startTableFuture);
            }
            nextCatalog = catalog;
        }
        return ((CompletableFuture)CompletableFuture.allOf((CompletableFuture[])startTableFutures.toArray(CompletableFuture[]::new)).whenComplete((BiConsumer)CompletableFutures.copyStateTo(this.readyToProcessReplicaStarts))).whenComplete((unused, throwable) -> {
            if (throwable != null) {
                this.failureProcessor.process(new FailureContext((Throwable)throwable, "Error starting tables"));
            } else {
                LOG.info("Tables started successfully [count={}]", startTableFutures.size());
            }
        });
    }

    private <T> CompletableFuture<T> orStopManagerFuture(CompletableFuture<T> future) {
        if (future.isDone()) {
            return future;
        }
        return CompletableFuture.anyOf(future, this.stopManagerFuture).thenApply(o -> o);
    }

    private static SecondaryStorageTableDescriptor toSecondaryStorageTableDescriptor(CatalogZoneDescriptor secondaryZoneDescriptor, CatalogTableDescriptor tableDescriptor) {
        return new SecondaryStorageTableDescriptor(tableDescriptor.id(), tableDescriptor.name(), secondaryZoneDescriptor.id(), secondaryZoneDescriptor.partitions(), CatalogToSchemaDescriptorConverter.convert(tableDescriptor, tableDescriptor.latestSchemaVersion()));
    }

    private void cleanUpResourcesForDroppedTablesOnRecoveryBusy() {
        Set<Integer> aliveTableIds = TableUtils.aliveTables(this.catalogService, this.lowWatermark.getLowWatermark());
        this.destroyMvStoragesForTablesNotIn(aliveTableIds);
        if (!this.nodeProperties.colocationEnabled()) {
            this.destroyTxStateStoragesForTablesNotIn(aliveTableIds);
            this.destroyReplicationProtocolStoragesForTablesNotIn(aliveTableIds);
        }
    }

    private void destroyMvStoragesForTablesNotIn(Set<Integer> aliveTableIds) {
        for (StorageEngine storageEngine : this.dataStorageMgr.allStorageEngines()) {
            Set<Integer> tableIdsOnDisk = storageEngine.tableIdsOnDisk();
            for (int tableId : CollectionUtils.difference(tableIdsOnDisk, aliveTableIds)) {
                storageEngine.destroyMvTable(tableId);
                LOG.info("Destroyed table MV storage for table {} in storage engine '{}'", tableId, storageEngine.name());
            }
        }
    }

    private void destroyTxStateStoragesForTablesNotIn(Set<Integer> aliveTableIds) {
        Set<Integer> tableIdsOnDisk = this.sharedTxStateStorage.tableOrZoneIdsOnDisk();
        for (int tableId : CollectionUtils.difference(tableIdsOnDisk, aliveTableIds)) {
            this.sharedTxStateStorage.destroyStorage(tableId);
            LOG.info("Destroyed table TX state storage for table {}", tableId);
        }
    }

    private void destroyReplicationProtocolStoragesForTablesNotIn(Set<Integer> aliveTableIds) {
        Set<TablePartitionId> partitionIdsOnDisk;
        try {
            partitionIdsOnDisk = this.replicaMgr.replicationProtocolTablePartitionIdsOnDisk();
        }
        catch (NodeStoppingException e) {
            return;
        }
        Map<Integer, List<TablePartitionId>> partitionIdsByTableId = partitionIdsOnDisk.stream().collect(Collectors.groupingBy(TablePartitionId::tableId));
        for (Map.Entry<Integer, List<TablePartitionId>> entry : partitionIdsByTableId.entrySet()) {
            int tableId = entry.getKey();
            List<TablePartitionId> partitionIds = entry.getValue();
            if (aliveTableIds.contains(tableId)) continue;
            this.destroyReplicationProtocolStoragesOnRecovery(tableId, partitionIds);
        }
    }

    private void destroyReplicationProtocolStoragesOnRecovery(int tableId, List<TablePartitionId> partitionIds) {
        for (TablePartitionId partitionId : partitionIds) {
            try {
                this.replicaMgr.destroyReplicationProtocolStoragesOnStartup(partitionId);
            }
            catch (NodeStoppingException e) {
                break;
            }
        }
        List partitionIndexes = partitionIds.stream().map(TablePartitionId::partitionId).collect(Collectors.toList());
        LOG.info("Destroyed replication protocol storages for table {} and partitions {}", tableId, partitionIndexes);
    }

    private synchronized ScheduledExecutorService streamerFlushExecutor() {
        if (!this.busyLock.enterBusy()) {
            throw new IgniteException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)new NodeStoppingException());
        }
        try {
            if (this.streamerFlushExecutor == null) {
                this.streamerFlushExecutor = Executors.newSingleThreadScheduledExecutor(IgniteThreadFactory.create(this.nodeName, "streamer-flush-executor", LOG, ThreadOperation.STORAGE_WRITE));
            }
            ScheduledExecutorService scheduledExecutorService = this.streamerFlushExecutor;
            return scheduledExecutorService;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    public CompletableFuture<Void> restartPartition(TablePartitionId tablePartitionId, long revision, long assignmentsTimestamp) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesVv.get(revision).thenComposeAsync(unused -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            TableViewInternal table = this.tables.get(tablePartitionId.tableId());
            assert (table != null) : tablePartitionId;
            return this.stopPartitionForRestart(tablePartitionId, table).thenComposeAsync(unused1 -> {
                Assignments stableAssignments = RebalanceUtil.stableAssignmentsGetLocally(this.metaStorageMgr, tablePartitionId, revision);
                assert (stableAssignments != null) : "tablePartitionId=" + tablePartitionId + ", revision=" + revision;
                return this.waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(unused2 -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
                    Assignment localAssignment = this.localAssignment(stableAssignments);
                    if (localAssignment == null) {
                        return CompletableFutures.nullCompletedFuture();
                    }
                    return this.startPartitionAndStartClient(table, tablePartitionId.partitionId(), localAssignment, stableAssignments, false, assignmentsTimestamp);
                }));
            }, (Executor)this.ioExecutor);
        }), (Executor)this.ioExecutor));
    }

    public CompletableFuture<Void> restartPartitionWithCleanUp(TablePartitionId tablePartitionId, long revision, long assignmentsTimestamp) {
        return this.tableAsync(tablePartitionId.tableId()).thenComposeAsync(table -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            assert (table != null) : tablePartitionId;
            Assignments stableAssignments = RebalanceUtil.stableAssignmentsGetLocally(this.metaStorageMgr, tablePartitionId, revision);
            Assignment localAssignment = this.localAssignment(stableAssignments);
            return this.stopPartitionAndDestroyForRestart(tablePartitionId, (TableViewInternal)table).thenComposeAsync(unused1 -> this.createPartitionAndStartClient(tablePartitionId, (TableViewInternal)table, false, assignmentsTimestamp, localAssignment, stableAssignments), (Executor)this.ioExecutor);
        }), (Executor)this.ioExecutor);
    }

    @Override
    public void setStreamerReceiverRunner(StreamerReceiverRunner runner) {
        this.streamerReceiverRunner = runner;
    }

    public Set<TableViewInternal> zoneTables(int zoneId) throws IgniteInternalException {
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock();
        try {
            return (Set)((CompletableFuture)readLockAcquisitionFuture.thenApply(stamp -> {
                Set<TableViewInternal> res = Set.copyOf(this.zoneTablesRawSet(zoneId));
                zoneLock.unlockRead((long)stamp);
                return res;
            })).get();
        }
        catch (Throwable t) {
            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
            if (t instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, "Failed to acquire a read lock for zone [zoneId=" + zoneId + "]", t);
        }
    }

    private Set<TableViewInternal> zoneTablesRawSet(int zoneId) {
        return this.tablesPerZone.getOrDefault(zoneId, Set.of());
    }

    private void addTableToZone(int zoneId, TableViewInternal table) throws IgniteInternalException {
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> writeLockAcquisitionFuture = zoneLock.writeLock();
        try {
            ((CompletableFuture)writeLockAcquisitionFuture.thenAccept(stamp -> {
                this.tablesPerZone.compute(zoneId, (id, tables) -> {
                    if (tables == null) {
                        tables = new HashSet<TableViewInternal>();
                    }
                    tables.add(table);
                    return tables;
                });
                zoneLock.unlockWrite((long)stamp);
            })).get();
        }
        catch (Throwable t) {
            writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite);
            if (t instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, "Failed to acquire a write lock for zone [zoneId=" + zoneId + "]", t);
        }
    }

    @TestOnly
    public SecondaryReplicationManager secondaryReplicationManager() {
        return this.secondaryReplicationManager;
    }

    private TableMetricSource createAndRegisterMetricsSource(StorageTableDescriptor tableDescriptor, QualifiedName tableName) {
        StorageEngine engine = this.dataStorageMgr.engineByStorageProfile(tableDescriptor.getStorageProfile());
        if (engine != null) {
            StorageEngineTablesMetricSource engineMetricSource = new StorageEngineTablesMetricSource(engine.name(), tableName);
            engine.addTableMetrics(tableDescriptor, engineMetricSource);
            try {
                this.metricManager.registerSource(engineMetricSource);
                this.metricManager.enable(engineMetricSource);
            }
            catch (Exception e) {
                String message = "Failed to register storage engine metrics source for table [id={}, name={}].";
                LOG.warn(message, e, tableDescriptor.getId(), tableName);
            }
        }
        TableMetricSource source = new TableMetricSource(tableName);
        try {
            this.metricManager.registerSource(source);
            this.metricManager.enable(source);
        }
        catch (Exception e) {
            LOG.warn("Failed to register metrics source for table [id={}, name={}].", e, tableDescriptor.getId(), tableName);
        }
        return source;
    }

    private void unregisterMetricsSource(TableViewInternal table) {
        if (table == null) {
            return;
        }
        QualifiedName tableName = table.qualifiedName();
        try {
            this.metricManager.unregisterSource(TableMetricSource.sourceName(tableName));
        }
        catch (Exception e) {
            LOG.warn("Failed to unregister metrics source for table [id={}, name={}].", e, table.tableId(), tableName);
        }
        String storageProfile = table.internalTable().storage().getTableDescriptor().getStorageProfile();
        StorageEngine engine = this.dataStorageMgr.engineByStorageProfile(storageProfile);
        if (engine != null) {
            try {
                this.metricManager.unregisterSource(StorageEngineTablesMetricSource.sourceName(engine.name(), tableName));
            }
            catch (Exception e) {
                LOG.warn("Failed to unregister storage engine metrics source for table [id={}, name={}].", e, table.tableId(), tableName);
            }
        }
    }

    private static /* synthetic */ CompletionStage lambda$createTableLocally$133(CompletableFuture loadToSecondaryZoneFuture, Object ignore) {
        return loadToSecondaryZoneFuture;
    }

    private static class TableClosedException
    extends IgniteInternalException {
        private static final long serialVersionUID = 1L;

        private TableClosedException(int tableId, @Nullable Throwable cause) {
            super(ErrorGroups.Common.INTERNAL_ERR, "Table is closed [tableId=" + tableId + "]", cause);
        }
    }

    private static class DestroyTableEvent {
        final int catalogVersion;
        final int tableId;

        DestroyTableEvent(int catalogVersion, int tableId) {
            this.catalogVersion = catalogVersion;
            this.tableId = tableId;
        }

        public int catalogVersion() {
            return this.catalogVersion;
        }

        public int tableId() {
            return this.tableId;
        }
    }
}

