/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table.distributed;

import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite3.cache.Cache;
import org.apache.ignite3.internal.catalog.Catalog;
import org.apache.ignite3.internal.catalog.CatalogService;
import org.apache.ignite3.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.CatalogTableProperties;
import org.apache.ignite3.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite3.internal.catalog.events.AddSecondaryZoneEventParameters;
import org.apache.ignite3.internal.catalog.events.AlterTablePropertiesEventParameters;
import org.apache.ignite3.internal.catalog.events.CatalogEvent;
import org.apache.ignite3.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite3.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite3.internal.catalog.events.DropSecondaryZoneEventParameters;
import org.apache.ignite3.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite3.internal.catalog.events.RenameTableEventParameters;
import org.apache.ignite3.internal.catalog.events.SecondaryStorageAvailableEventParameters;
import org.apache.ignite3.internal.causality.CompletionListener;
import org.apache.ignite3.internal.causality.IncrementalVersionedValue;
import org.apache.ignite3.internal.causality.OutdatedTokenException;
import org.apache.ignite3.internal.causality.RevisionListenerRegistry;
import org.apache.ignite3.internal.close.ManuallyCloseable;
import org.apache.ignite3.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite3.internal.components.LogSyncer;
import org.apache.ignite3.internal.components.NodeProperties;
import org.apache.ignite3.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite3.internal.configuration.utils.SystemDistributedConfigurationPropertyHolder;
import org.apache.ignite3.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite3.internal.event.EventListener;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.hlc.HybridTimestampTracker;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.lowwatermark.LowWatermark;
import org.apache.ignite3.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
import org.apache.ignite3.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.manager.IgniteComponent;
import org.apache.ignite3.internal.marshaller.ReflectionMarshallersProvider;
import org.apache.ignite3.internal.metastorage.MetaStorageManager;
import org.apache.ignite3.internal.metastorage.Revisions;
import org.apache.ignite3.internal.metrics.LongGauge;
import org.apache.ignite3.internal.metrics.MetricManager;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.MessagingService;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite3.internal.partition.replicator.LocalBeforeReplicaStartEventParameters;
import org.apache.ignite3.internal.partition.replicator.LocalPartitionReplicaEvent;
import org.apache.ignite3.internal.partition.replicator.LocalPartitionReplicaEventParameters;
import org.apache.ignite3.internal.partition.replicator.NaiveAsyncReadWriteLock;
import org.apache.ignite3.internal.partition.replicator.PartitionReplicaLifecycleManager;
import org.apache.ignite3.internal.partition.replicator.ReplicaTableProcessor;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionKey;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite3.internal.partition.replicator.schema.CatalogValidationSchemasSource;
import org.apache.ignite3.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService;
import org.apache.ignite3.internal.placementdriver.PlacementDriver;
import org.apache.ignite3.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite3.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite3.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
import org.apache.ignite3.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite3.internal.raft.service.RaftCommandRunner;
import org.apache.ignite3.internal.replicator.ReplicaManager;
import org.apache.ignite3.internal.replicator.ReplicaService;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite3.internal.schema.SchemaManager;
import org.apache.ignite3.internal.schema.SchemaRegistry;
import org.apache.ignite3.internal.schema.SchemaSyncService;
import org.apache.ignite3.internal.schema.catalog.CatalogToSchemaDescriptorConverter;
import org.apache.ignite3.internal.schema.configuration.GcConfiguration;
import org.apache.ignite3.internal.secondarystoragebridge.SecondaryStorageBridge;
import org.apache.ignite3.internal.secondarystoragebridge.ThreadAssertingSecondaryStorageBridge;
import org.apache.ignite3.internal.secondarystoragebridge.rocksdb.RocksDbSecondaryStorageBridge;
import org.apache.ignite3.internal.storage.DataStorageManager;
import org.apache.ignite3.internal.storage.MvPartitionStorage;
import org.apache.ignite3.internal.storage.StorageClosedException;
import org.apache.ignite3.internal.storage.StorageDestroyedException;
import org.apache.ignite3.internal.storage.engine.MvTableStorage;
import org.apache.ignite3.internal.storage.engine.StorageEngine;
import org.apache.ignite3.internal.storage.engine.StorageTableDescriptor;
import org.apache.ignite3.internal.storage.metrics.StorageEngineTablesMetricSource;
import org.apache.ignite3.internal.storage.secondary.SecondaryStorageEngine;
import org.apache.ignite3.internal.storage.secondary.SecondaryStorageTableDescriptor;
import org.apache.ignite3.internal.storage.secondary.SecondaryTableStorage;
import org.apache.ignite3.internal.table.IgniteTablesInternal;
import org.apache.ignite3.internal.table.InternalTable;
import org.apache.ignite3.internal.table.LongPriorityQueue;
import org.apache.ignite3.internal.table.StreamerReceiverRunner;
import org.apache.ignite3.internal.table.TableImpl;
import org.apache.ignite3.internal.table.TableViewInternal;
import org.apache.ignite3.internal.table.distributed.BitSetPartitionSet;
import org.apache.ignite3.internal.table.distributed.CatalogStorageIndexDescriptorSupplier;
import org.apache.ignite3.internal.table.distributed.PartitionModificationCounter;
import org.apache.ignite3.internal.table.distributed.PartitionModificationCounterFactory;
import org.apache.ignite3.internal.table.distributed.PartitionModificationCounterMetricSource;
import org.apache.ignite3.internal.table.distributed.PartitionReplicatorNodeRecovery;
import org.apache.ignite3.internal.table.distributed.PartitionSet;
import org.apache.ignite3.internal.table.distributed.PartitionUpdateHandlers;
import org.apache.ignite3.internal.table.distributed.SecondaryStorageRebalanceTrigger;
import org.apache.ignite3.internal.table.distributed.SecondaryZoneManager;
import org.apache.ignite3.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite3.internal.table.distributed.TableIndexStoragesSupplier;
import org.apache.ignite3.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite3.internal.table.distributed.TableStatsStalenessConfiguration;
import org.apache.ignite3.internal.table.distributed.TableUtils;
import org.apache.ignite3.internal.table.distributed.expiration.ExpiredRowsCleaner;
import org.apache.ignite3.internal.table.distributed.expiration.ExpiredRowsCleanerImpl;
import org.apache.ignite3.internal.table.distributed.expiration.configuration.ExpirationConfiguration;
import org.apache.ignite3.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite3.internal.table.distributed.gc.MvGc;
import org.apache.ignite3.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite3.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite3.internal.table.distributed.index.IndexUtils;
import org.apache.ignite3.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
import org.apache.ignite3.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite3.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
import org.apache.ignite3.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl;
import org.apache.ignite3.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage;
import org.apache.ignite3.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite3.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite3.internal.table.distributed.replicator.secondary.SecondaryReplicationManager;
import org.apache.ignite3.internal.table.distributed.replicator.secondary.ZoneSecondaryReplicationManager;
import org.apache.ignite3.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite3.internal.table.distributed.schema.SchemaVersionsImpl;
import org.apache.ignite3.internal.table.distributed.storage.ContinuousQueryResponseHandler;
import org.apache.ignite3.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite3.internal.table.distributed.storage.NullStorageEngine;
import org.apache.ignite3.internal.table.metrics.TableMetricSource;
import org.apache.ignite3.internal.table.policy.RlsPolicyManager;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.tx.LockManager;
import org.apache.ignite3.internal.tx.TxManager;
import org.apache.ignite3.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite3.internal.tx.configuration.TransactionView;
import org.apache.ignite3.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite3.internal.tx.impl.TransactionInflights;
import org.apache.ignite3.internal.tx.impl.TxMessageSender;
import org.apache.ignite3.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteBusyLock;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.util.Lazy;
import org.apache.ignite3.internal.util.PendingComparableValuesTracker;
import org.apache.ignite3.internal.util.SafeTimeValuesTracker;
import org.apache.ignite3.internal.utils.InternalTableProvider;
import org.apache.ignite3.internal.worker.ThreadAssertions;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.lang.IgniteException;
import org.apache.ignite3.sql.IgniteSql;
import org.apache.ignite3.table.QualifiedName;
import org.apache.ignite3.table.QualifiedNameHelper;
import org.apache.ignite3.table.Table;
import org.gridgain.internal.encryption.EncryptionManager;
import org.gridgain.internal.license.LicenseFeatureChecker;
import org.gridgain.internal.license.MissingRequiredFeaturesException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class TableManager
implements IgniteTablesInternal,
IgniteComponent {
    private static final IgniteLogger LOG = Loggers.forClass(TableManager.class);
    private static final String SECONDARY_STORE_BRIDGE_DIR = "secondary-store-bridge";
    private final TopologyService topologyService;
    private final ReplicaManager replicaMgr;
    private final LockManager lockMgr;
    private final ReplicaService replicaSvc;
    private final TxManager txManager;
    private final MetaStorageManager metaStorageMgr;
    private final DataStorageManager dataStorageMgr;
    private final TransactionStateResolver transactionStateResolver;
    private final ExpiredRowsCleaner expiredRowsCleaner;
    private final EncryptionManager encryptionManager;
    private final IncrementalVersionedValue<Void> tablesVv;
    private final IncrementalVersionedValue<Void> localPartitionsVv;
    private final IncrementalVersionedValue<Void> assignmentsUpdatedVv;
    private final Map<Integer, TableViewInternal> tables = new ConcurrentHashMap<Integer, TableViewInternal>();
    private final Map<Integer, TableViewInternal> startedTables = new ConcurrentHashMap<Integer, TableViewInternal>();
    private final LongPriorityQueue<DestroyTableEvent> destructionEventsQueue = new LongPriorityQueue<DestroyTableEvent>(DestroyTableEvent::catalogVersion);
    private final Map<Integer, PartitionSet> localPartsByTableId = new ConcurrentHashMap<Integer, PartitionSet>();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean beforeStopGuard = new AtomicBoolean();
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final SchemaManager schemaManager;
    private final TxStateRocksDbSharedStorage sharedTxStateStorage;
    private final ExecutorService scanRequestExecutor;
    private final ExecutorService ioExecutor;
    private final ClockService clockService;
    private final OutgoingSnapshotsManager outgoingSnapshotsManager;
    private final SchemaSyncService executorInclinedSchemaSyncService;
    private final CatalogService catalogService;
    private final FailureProcessor failureProcessor;
    private final ThreadPoolExecutor incomingSnapshotsExecutor;
    private final MvGc mvGc;
    private final LowWatermark lowWatermark;
    private final HybridTimestampTracker observableTimestampTracker;
    private final PlacementDriver executorInclinedPlacementDriver;
    private final Supplier<IgniteSql> sql;
    private final SchemaVersions schemaVersions;
    private final PartitionReplicatorNodeRecovery partitionReplicatorNodeRecovery;
    private final CompletableFuture<Void> stopManagerFuture = new CompletableFuture();
    private final SecondaryStorageRebalanceTrigger secondaryStorageRebalanceTrigger;
    private final ReplicationConfiguration replicationConfiguration;
    private final Executor partitionOperationsExecutor;
    private final ReflectionMarshallersProvider marshallers = new ReflectionMarshallersProvider();
    private final FullStateTransferIndexChooser fullStateTransferIndexChooser;
    private final RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry;
    private final SecondaryStorageBridge secondaryStorageBridge;
    private final TransactionInflights transactionInflights;
    private final TransactionConfiguration txCfg;
    private final String nodeName;
    private final PartitionReplicaLifecycleManager partitionReplicaLifecycleManager;
    private final ContinuousQueryResponseHandler continuousQueryResponseHandler = new ContinuousQueryResponseHandler();
    private final MessagingService messagingService;
    private final NodeProperties nodeProperties;
    @Nullable
    private ScheduledExecutorService streamerFlushExecutor;
    private final IndexMetaStorage indexMetaStorage;
    private final MinimumRequiredTimeCollectorService minTimeCollectorService;
    @Nullable
    private StreamerReceiverRunner streamerReceiverRunner;
    private final CompletableFuture<Void> readyToProcessReplicaStarts = new CompletableFuture();
    private final Map<Integer, Set<TableViewInternal>> tablesPerZone = new HashMap<Integer, Set<TableViewInternal>>();
    private final Map<Integer, NaiveAsyncReadWriteLock> tablesPerZoneLocks = new ConcurrentHashMap<Integer, NaiveAsyncReadWriteLock>();
    private final LicenseFeatureChecker licenseFeatureChecker;
    private final SecondaryReplicationManager secondaryReplicationManager;
    private final SystemDistributedConfigurationPropertyHolder<Integer> rebalanceRetryDelayConfiguration;
    private final EventListener<LocalBeforeReplicaStartEventParameters> onBeforeZoneReplicaStartedListener = this::beforeZoneReplicaStarted;
    private final EventListener<LocalPartitionReplicaEventParameters> onZoneReplicaStoppedListener = this::onZoneReplicaStopped;
    private final EventListener<LocalPartitionReplicaEventParameters> onZoneReplicaDestroyedListener = this::onZoneReplicaDestroyed;
    private final EventListener<LocalPartitionReplicaEventParameters> onBeforeZoneReplicaStopOrDestroyListener = this::onBeforeZoneReplicaStopOrDestroy;
    private final EventListener<CreateTableEventParameters> onTableCreateListener = this::loadTableToZoneOnTableCreate;
    private final EventListener<DropTableEventParameters> onTableDropListener = EventListener.fromConsumer(this::onTableDrop);
    private final EventListener<CatalogEventParameters> onTableAlterListener = this::onTableAlter;
    private final EventListener<ChangeLowWatermarkEventParameters> onLowWatermarkChangedListener = this::onLwmChanged;
    private final EventListener<PrimaryReplicaEventParameters> onPrimaryReplicaExpiredListener = this::onPrimaryReplicaExpired;
    private final EventListener<PrimaryReplicaEventParameters> onPrimaryReplicaElectedListener = this::onPrimaryReplicaElected;
    private final EventListener<SecondaryStorageAvailableEventParameters> onSecondaryStorageAvailableListener = this::onSecondaryStorageAvailable;
    private final MetricManager metricManager;
    private final PartitionModificationCounterFactory partitionModificationCounterFactory;
    private final Map<TablePartitionId, PartitionModificationCounterMetricSource> partModCounterMetricSources = new ConcurrentHashMap<TablePartitionId, PartitionModificationCounterMetricSource>();
    private final SecondaryZoneManager secondaryZoneManager;
    private final RlsPolicyManager policyManager;
    private final ClusterManagementGroupManager cmgManager;

    public TableManager(String nodeName, RevisionListenerRegistry registry, GcConfiguration gcConfig, TransactionConfiguration txCfg, ReplicationConfiguration replicationConfiguration, MessagingService messagingService, TopologyService topologyService, MessageSerializationRegistry messageSerializationRegistry, ReplicaManager replicaMgr, LockManager lockMgr, ReplicaService replicaSvc, TxManager txManager, DataStorageManager dataStorageMgr, Path storagePath, TxStateRocksDbSharedStorage txStateRocksDbSharedStorage, MetaStorageManager metaStorageMgr, SchemaManager schemaManager, ExecutorService ioExecutor, Executor partitionOperationsExecutor, ScheduledExecutorService commonScheduler, ClockService clockService, OutgoingSnapshotsManager outgoingSnapshotsManager, SchemaSyncService schemaSyncService, CatalogService catalogService, FailureProcessor failureProcessor, HybridTimestampTracker observableTimestampTracker, PlacementDriver placementDriver, Supplier<IgniteSql> sql, RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry, ExpirationConfiguration expirationConfiguration, LowWatermark lowWatermark, TransactionInflights transactionInflights, IndexMetaStorage indexMetaStorage, EncryptionManager encryptionManager, LogSyncer logSyncer, PartitionReplicaLifecycleManager partitionReplicaLifecycleManager, NodeProperties nodeProperties, MinimumRequiredTimeCollectorService minTimeCollectorService, SystemDistributedConfiguration systemDistributedConfiguration, MetricManager metricManager, LicenseFeatureChecker licenseFeatureChecker, PartitionModificationCounterFactory partitionModificationCounterFactory, ClusterManagementGroupManager cmgManager) {
        this.topologyService = topologyService;
        this.replicaMgr = replicaMgr;
        this.lockMgr = lockMgr;
        this.replicaSvc = replicaSvc;
        this.txManager = txManager;
        this.dataStorageMgr = dataStorageMgr;
        this.metaStorageMgr = metaStorageMgr;
        this.schemaManager = schemaManager;
        this.ioExecutor = ioExecutor;
        this.partitionOperationsExecutor = partitionOperationsExecutor;
        this.clockService = clockService;
        this.outgoingSnapshotsManager = outgoingSnapshotsManager;
        this.catalogService = catalogService;
        this.failureProcessor = failureProcessor;
        this.observableTimestampTracker = observableTimestampTracker;
        this.sql = sql;
        this.replicationConfiguration = replicationConfiguration;
        this.remotelyTriggeredResourceRegistry = remotelyTriggeredResourceRegistry;
        this.lowWatermark = lowWatermark;
        this.transactionInflights = transactionInflights;
        this.txCfg = txCfg;
        this.nodeName = nodeName;
        this.indexMetaStorage = indexMetaStorage;
        this.encryptionManager = encryptionManager;
        this.messagingService = messagingService;
        this.partitionReplicaLifecycleManager = partitionReplicaLifecycleManager;
        this.nodeProperties = nodeProperties;
        this.minTimeCollectorService = minTimeCollectorService;
        this.metricManager = metricManager;
        this.licenseFeatureChecker = licenseFeatureChecker;
        this.partitionModificationCounterFactory = partitionModificationCounterFactory;
        this.executorInclinedSchemaSyncService = new ExecutorInclinedSchemaSyncService(schemaSyncService, partitionOperationsExecutor);
        this.executorInclinedPlacementDriver = new ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
        this.cmgManager = cmgManager;
        TxMessageSender txMessageSender = new TxMessageSender(messagingService, replicaSvc, clockService);
        this.transactionStateResolver = new TransactionStateResolver(txManager, clockService, topologyService, messagingService, this.executorInclinedPlacementDriver, txMessageSender);
        InternalTableProvider internalTableProvider = tableId -> {
            TableViewInternal table = this.cachedTable(tableId);
            if (table == null) {
                return null;
            }
            return table.internalTable();
        };
        this.expiredRowsCleaner = new ExpiredRowsCleanerImpl(nodeName, topologyService, clockService, observableTimestampTracker, internalTableProvider, catalogService, schemaManager, txManager, expirationConfiguration, placementDriver, metricManager, replicaMgr, licenseFeatureChecker, nodeProperties);
        this.schemaVersions = new SchemaVersionsImpl(this.executorInclinedSchemaSyncService, catalogService, clockService);
        this.tablesVv = new IncrementalVersionedValue("TableManager#tables", registry, 100, null);
        this.localPartitionsVv = new IncrementalVersionedValue("TableManager#localPartitions", IncrementalVersionedValue.dependingOn(this.tablesVv));
        this.assignmentsUpdatedVv = new IncrementalVersionedValue("TableManager#assignmentsUpdated", IncrementalVersionedValue.dependingOn(this.localPartitionsVv));
        this.scanRequestExecutor = Executors.newSingleThreadExecutor(IgniteThreadFactory.create(nodeName, "scan-query-executor", LOG, ThreadOperation.STORAGE_READ));
        int cpus = Runtime.getRuntime().availableProcessors();
        this.incomingSnapshotsExecutor = new ThreadPoolExecutor(cpus, cpus, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), IgniteThreadFactory.create(nodeName, "incoming-raft-snapshot", LOG, ThreadOperation.STORAGE_READ, ThreadOperation.STORAGE_WRITE));
        this.incomingSnapshotsExecutor.allowCoreThreadTimeOut(true);
        this.mvGc = new MvGc(nodeName, gcConfig, lowWatermark, failureProcessor);
        this.partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery(messagingService, partitionOperationsExecutor, tableId -> this.tablesById().get(tableId));
        this.sharedTxStateStorage = txStateRocksDbSharedStorage;
        this.secondaryStorageRebalanceTrigger = new SecondaryStorageRebalanceTrigger(metaStorageMgr, catalogService);
        this.fullStateTransferIndexChooser = new FullStateTransferIndexChooser(catalogService, lowWatermark, indexMetaStorage);
        this.rebalanceRetryDelayConfiguration = new SystemDistributedConfigurationPropertyHolder<Integer>(systemDistributedConfiguration, (v, r) -> {}, "rebalanceRetryDelay", 200, Integer::parseInt);
        partitionReplicaLifecycleManager.listen(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, this.onBeforeZoneReplicaStartedListener);
        partitionReplicaLifecycleManager.listen(LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, this.onZoneReplicaStoppedListener);
        partitionReplicaLifecycleManager.listen(LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED, this.onZoneReplicaDestroyedListener);
        partitionReplicaLifecycleManager.listen(LocalPartitionReplicaEvent.BEFORE_REPLICA_STOPPED, this.onBeforeZoneReplicaStopOrDestroyListener);
        partitionReplicaLifecycleManager.listen(LocalPartitionReplicaEvent.BEFORE_REPLICA_DESTROYED, this.onBeforeZoneReplicaStopOrDestroyListener);
        this.secondaryStorageBridge = TableManager.createSecondaryStoreBridge(nodeName, storagePath.resolve(SECONDARY_STORE_BRIDGE_DIR), logSyncer, failureProcessor, ioExecutor, commonScheduler);
        this.secondaryReplicationManager = new ZoneSecondaryReplicationManager(schemaManager, txManager, replicaSvc, lowWatermark, clockService, topologyService, catalogService, placementDriver, failureProcessor, this::getTablesIfZoneIsSecondary, metaStorageMgr, ioExecutor);
        this.secondaryZoneManager = new SecondaryZoneManager(partitionReplicaLifecycleManager, catalogService, schemaManager, this.secondaryStorageBridge, this.executorInclinedSchemaSyncService, remotelyTriggeredResourceRegistry, lowWatermark, this.busyLock, partitionOperationsExecutor);
        this.policyManager = new RlsPolicyManager(catalogService, schemaManager, lowWatermark, commonScheduler);
    }

    @Override
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            this.mvGc.start();
            this.transactionStateResolver.start();
            this.fullStateTransferIndexChooser.start();
            this.rebalanceRetryDelayConfiguration.init();
            this.cleanUpResourcesForDroppedTablesOnRecoveryBusy();
            this.catalogService.listen(CatalogEvent.TABLE_CREATE, this.onTableCreateListener);
            this.catalogService.listen(CatalogEvent.TABLE_DROP, this.onTableDropListener);
            this.catalogService.listen(CatalogEvent.TABLE_ALTER, this.onTableAlterListener);
            this.catalogService.listen(CatalogEvent.SECONDARY_STORAGE_AVAILABLE, this.onSecondaryStorageAvailableListener);
            this.lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED, this.onLowWatermarkChangedListener);
            this.partitionReplicatorNodeRecovery.start();
            this.secondaryStorageRebalanceTrigger.start();
            this.expiredRowsCleaner.start();
            this.secondaryStorageBridge.start();
            this.policyManager.start();
            this.messagingService.addMessageHandler(ReplicaMessageGroup.class, this.continuousQueryResponseHandler);
            this.executorInclinedPlacementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.onPrimaryReplicaExpiredListener);
            this.executorInclinedPlacementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this.onPrimaryReplicaElectedListener);
            CompletableFuture<Revisions> recoveryFinishFuture = this.metaStorageMgr.recoveryFinishedFuture();
            assert (recoveryFinishFuture.isDone());
            long recoveryRevision = recoveryFinishFuture.join().revision();
            this.secondaryReplicationManager.start();
            return this.recoverTables(recoveryRevision, this.lowWatermark.getLowWatermark());
        });
    }

    private CompletableFuture<Boolean> beforeZoneReplicaStarted(LocalBeforeReplicaStartEventParameters parameters) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> ((CompletableFuture)this.readyToProcessReplicaStarts.thenCompose(v -> this.beforeZoneReplicaStartedImpl(parameters))).thenApply(unused -> false));
    }

    private CompletableFuture<Void> beforeZoneReplicaStartedImpl(LocalBeforeReplicaStartEventParameters parameters) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            ZonePartitionId zonePartitionId = parameters.zonePartitionId();
            NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new NaiveAsyncReadWriteLock());
            CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock();
            try {
                return ((CompletableFuture)readLockAcquisitionFuture.thenCompose(stamp -> {
                    Set<TableViewInternal> zoneTables = this.zoneTablesRawSet(zonePartitionId.zoneId());
                    if (zoneTables.isEmpty()) {
                        return CompletableFutures.nullCompletedFuture();
                    }
                    return CompletableFuture.allOf(this.createPartitionsAndLoadResourcesToZoneReplica(zonePartitionId, zoneTables, parameters), this.createPartitionsAndLoadResourcesToSecondaryZoneReplica(zonePartitionId, zoneTables, parameters.onRecovery()));
                })).whenComplete((unused, t) -> readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead));
            }
            catch (Throwable t2) {
                readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
                return CompletableFuture.failedFuture(t2);
            }
        });
    }

    private CompletableFuture<Void> createPartitionsAndLoadResourcesToZoneReplica(ZonePartitionId zonePartitionId, Set<TableViewInternal> zoneTables, LocalBeforeReplicaStartEventParameters event) {
        int partitionIndex = zonePartitionId.partitionId();
        PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex);
        List storageCreationFutures = zoneTables.stream().filter(tbl -> tbl.zoneId() == zonePartitionId.zoneId()).map(tbl -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.createPartitionStoragesIfAbsent((TableViewInternal)tbl, singlePartitionIdSet).exceptionally((Function)TableManager.ignoreTableClosedException()))).collect(Collectors.toList());
        return ((CompletableFuture)((CompletableFuture)CompletableFutures.allOf(storageCreationFutures).thenRunAsync(() -> TableManager.scheduleMvPartitionsCleanupIfNeeded(zonePartitionId, zoneTables, partitionIndex, event), this.ioExecutor)).exceptionally(TableManager.ignoreTableClosedException())).thenCompose(unused -> {
            CompletableFuture[] futures = (CompletableFuture[])zoneTables.stream().filter(tbl -> tbl.zoneId() == zonePartitionId.zoneId()).map(tbl -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> CompletableFuture.runAsync(() -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
                this.localPartsByTableId.compute(tbl.tableId(), (tableId, oldPartitionSet) -> TableManager.extendPartitionSet(oldPartitionSet, partitionIndex));
                this.lowWatermark.getLowWatermarkSafe(lwm -> IndexUtils.registerIndexesToTable(tbl, this.catalogService, singlePartitionIdSet, tbl.schemaView(), lwm));
                this.preparePartitionResourcesAndLoadToZoneReplicaBusy((TableViewInternal)tbl, zonePartitionId, event.onRecovery());
            }), this.ioExecutor).exceptionally((Function)TableManager.ignoreTableClosedException()))).toArray(CompletableFuture[]::new);
            return CompletableFuture.allOf(futures);
        });
    }

    private static void scheduleMvPartitionsCleanupIfNeeded(ZonePartitionId zonePartitionId, Set<TableViewInternal> zoneTables, int partitionIndex, LocalBeforeReplicaStartEventParameters event) {
        boolean anyMvPartitionStorageIsInRebalanceState = zoneTables.stream().filter(tbl -> tbl.zoneId() == zonePartitionId.zoneId()).map(table -> table.internalTable().storage().getMvPartition(partitionIndex)).filter(Objects::nonNull).anyMatch(partitionStorage -> partitionStorage.lastAppliedIndex() == -1L);
        if (anyMvPartitionStorageIsInRebalanceState) {
            event.registerStorageInRebalanceState();
        }
        event.addCleanupAction(() -> {
            CompletableFuture[] clearFutures = (CompletableFuture[])zoneTables.stream().map(table -> table.internalTable().storage().clearPartition(partitionIndex)).toArray(CompletableFuture[]::new);
            return CompletableFuture.allOf(clearFutures);
        });
    }

    private CompletableFuture<Void> createPartitionsAndLoadResourcesToSecondaryZoneReplica(ZonePartitionId zonePartitionId, Set<TableViewInternal> zoneTables, boolean onRecovery) {
        CompletableFuture<Void> loadSecondaryResourcesFuture = CompletableFuture.runAsync(() -> {
            for (TableViewInternal table : zoneTables) {
                Integer secondaryZoneId = table.internalTable().secondaryZoneId();
                if (secondaryZoneId == null || secondaryZoneId.intValue() != zonePartitionId.zoneId()) continue;
                this.secondaryZoneManager.prepareSecondaryPartitionResourcesAndLoadToZoneReplica(table, zonePartitionId, onRecovery);
            }
        }, this.ioExecutor);
        return loadSecondaryResourcesFuture.exceptionally((Function)TableManager.ignoreTableClosedException());
    }

    private static Function<Throwable, Void> ignoreTableClosedException() {
        return ex -> {
            if (ExceptionUtils.hasCause(ex, TableClosedException.class)) {
                return null;
            }
            throw (RuntimeException)ExceptionUtils.sneakyThrow(ex);
        };
    }

    private CompletableFuture<Boolean> onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) {
        ZonePartitionId zonePartitionId = parameters.zonePartitionId();
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock();
        try {
            return ((CompletableFuture)((CompletableFuture)readLockAcquisitionFuture.thenCompose(stamp -> {
                CompletableFuture[] futures = (CompletableFuture[])this.zoneTablesRawSet(zonePartitionId.zoneId()).stream().map(this::stopTablePartitions).toArray(CompletableFuture[]::new);
                return CompletableFuture.allOf(futures);
            })).whenComplete((v, t) -> readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead))).thenApply(v -> false);
        }
        catch (Throwable t2) {
            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
            return CompletableFuture.failedFuture(t2);
        }
    }

    private CompletableFuture<Boolean> onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) {
        ZonePartitionId zonePartitionId = parameters.zonePartitionId();
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock();
        try {
            return ((CompletableFuture)((CompletableFuture)readLockAcquisitionFuture.thenCompose(stamp -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
                CompletableFuture[] futures = (CompletableFuture[])this.zoneTablesRawSet(zonePartitionId.zoneId()).stream().map(table -> CompletableFuture.supplyAsync(() -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.stopAndDestroyTablePartition(new TablePartitionId(table.tableId(), zonePartitionId.partitionId()), parameters.causalityToken())), this.ioExecutor).thenCompose(Function.identity())).toArray(CompletableFuture[]::new);
                return CompletableFuture.allOf(futures);
            }))).whenComplete((v, t) -> readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead))).thenApply(unused -> false);
        }
        catch (Throwable t2) {
            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
            return CompletableFuture.failedFuture(t2);
        }
    }

    private CompletableFuture<Void> prepareTableResourcesOnRecovery(long causalityToken, CatalogZoneDescriptor zoneDescriptor, @Nullable CatalogZoneDescriptor secondaryZoneDescriptor, CatalogTableDescriptor tableDescriptor, CatalogSchemaDescriptor schemaDescriptor) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            TableImpl table = this.createTableImpl(causalityToken, tableDescriptor, zoneDescriptor, secondaryZoneDescriptor, schemaDescriptor);
            int tableId = tableDescriptor.id();
            this.tables.put(tableId, table);
            return this.schemaManager.schemaRegistry(causalityToken, tableId).thenAccept(schemaRegistry -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
                table.schemaView((SchemaRegistry)schemaRegistry);
                this.addTableToZone(zoneDescriptor.id(), table);
                if (secondaryZoneDescriptor != null) {
                    this.addTableToZone(secondaryZoneDescriptor.id(), table);
                }
                this.startedTables.put(tableId, table);
            }));
        });
    }

    private CompletableFuture<Boolean> loadTableToZoneOnTableCreate(CreateTableEventParameters parameters) {
        long causalityToken = parameters.causalityToken();
        int catalogVersion = parameters.catalogVersion();
        CatalogTableDescriptor tableDescriptor = parameters.tableDescriptor();
        CatalogZoneDescriptor zoneDescriptor = this.getZoneDescriptor(tableDescriptor, parameters.catalogVersion());
        CatalogZoneDescriptor secondaryZoneDescriptor = this.getSecondaryZoneDescriptor(tableDescriptor, parameters.catalogVersion());
        CatalogSchemaDescriptor schemaDescriptor = this.getSchemaDescriptor(tableDescriptor, parameters.catalogVersion());
        return this.loadTableToZoneOnTableCreate(causalityToken, catalogVersion, zoneDescriptor, secondaryZoneDescriptor, tableDescriptor, schemaDescriptor).thenApply(v -> false);
    }

    private CompletableFuture<Void> loadTableToZoneOnTableCreate(long causalityToken, int catalogVersion, CatalogZoneDescriptor zoneDescriptor, @Nullable CatalogZoneDescriptor secondaryZoneDescriptor, CatalogTableDescriptor tableDescriptor, CatalogSchemaDescriptor schemaDescriptor) {
        CompletionStage loadToZoneFuture;
        TableImpl table = this.createTableImpl(causalityToken, tableDescriptor, zoneDescriptor, secondaryZoneDescriptor, schemaDescriptor);
        int tableId = tableDescriptor.id();
        this.tablesVv.update(causalityToken, (ignore, e) -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            if (e != null) {
                return CompletableFuture.failedFuture(e);
            }
            return this.schemaManager.schemaRegistry(causalityToken, tableId).thenAccept(table::schemaView);
        }));
        CompletableFuture<Long> acquisitionFuture = this.partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id());
        try {
            loadToZoneFuture = this.loadTableToZoneOnTableCreateHavingZoneReadLock(acquisitionFuture, causalityToken, zoneDescriptor, table).whenComplete((res, ex) -> this.unlockZoneForRead(zoneDescriptor, acquisitionFuture));
        }
        catch (Throwable e2) {
            this.unlockZoneForRead(zoneDescriptor, acquisitionFuture);
            return CompletableFuture.failedFuture(e2);
        }
        if (secondaryZoneDescriptor == null) {
            return loadToZoneFuture;
        }
        CompletableFuture<Void> loadToSecondaryZoneFuture = this.loadTableToSecondaryZoneOnTableCreateIfNeeded(table, tableDescriptor, secondaryZoneDescriptor, causalityToken, false);
        CompletableFuture<Void> allResourcesLoadedFuture = CompletableFuture.allOf(new CompletableFuture[]{loadToZoneFuture, loadToSecondaryZoneFuture});
        this.secondaryReplicationManager.startReplicationForTable(table, catalogVersion);
        return allResourcesLoadedFuture;
    }

    private CompletableFuture<Void> loadTableToZoneOnTableCreateHavingZoneReadLock(CompletableFuture<Long> readLockAcquisitionFuture, long causalityToken, CatalogZoneDescriptor zoneDescriptor, TableImpl table) {
        int tableId = table.tableId();
        CompletableFuture<Void> localPartsUpdateFuture = this.localPartitionsVv.update(causalityToken, (ignore, throwable) -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> readLockAcquisitionFuture.thenComposeAsync(unused -> {
            BitSetPartitionSet parts = new BitSetPartitionSet();
            for (int i = 0; i < zoneDescriptor.partitions(); ++i) {
                if (!this.partitionReplicaLifecycleManager.hasLocalPartition(new ZonePartitionId(zoneDescriptor.id(), i))) continue;
                parts.set(i);
            }
            return this.createPartitionStoragesIfAbsent(table, parts).thenRun(() -> this.localPartsByTableId.put(tableId, parts));
        }, (Executor)this.ioExecutor)).exceptionally(TableManager.ignoreTableClosedException()));
        CompletableFuture<Void> tablesByIdFuture = this.tablesVv.get(causalityToken);
        CompletableFuture<Void> createPartsFut = this.assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
            if (e != null) {
                return CompletableFuture.failedFuture(e);
            }
            return CompletableFuture.allOf(localPartsUpdateFuture, tablesByIdFuture).thenRunAsync(() -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
                for (int i = 0; i < zoneDescriptor.partitions(); ++i) {
                    ZonePartitionId zonePartitionId = new ZonePartitionId(zoneDescriptor.id(), i);
                    if (!this.partitionReplicaLifecycleManager.hasLocalPartition(zonePartitionId)) continue;
                    this.preparePartitionResourcesAndLoadToZoneReplicaBusy(table, zonePartitionId, false);
                }
            }), this.ioExecutor);
        });
        this.tables.put(tableId, table);
        return createPartsFut.thenAccept(ignore -> {
            this.startedTables.put(tableId, table);
            this.addTableToZone(zoneDescriptor.id(), table);
        });
    }

    private void unlockZoneForRead(CatalogZoneDescriptor zoneDescriptor, CompletableFuture<Long> readLockAcquiryFuture) {
        readLockAcquiryFuture.thenAccept(stamp -> this.partitionReplicaLifecycleManager.unlockZoneForRead(zoneDescriptor.id(), (long)stamp));
    }

    private void unlockZoneForRead(int zoneId, CompletableFuture<Long> readLockAcquiryFuture) {
        readLockAcquiryFuture.thenAccept(stamp -> this.partitionReplicaLifecycleManager.unlockZoneForRead(zoneId, (long)stamp));
    }

    private void preparePartitionResourcesAndLoadToZoneReplicaBusy(TableViewInternal table, ZonePartitionId zonePartitionId, boolean onNodeRecovery) {
        MvPartitionStorage mvPartitionStorage;
        int partId = zonePartitionId.partitionId();
        int tableId = table.tableId();
        InternalTableImpl internalTbl = (InternalTableImpl)table.internalTable();
        TablePartitionId tablePartitionId = new TablePartitionId(tableId, partId);
        SafeTimeValuesTracker safeTimeTracker = new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
        try {
            mvPartitionStorage = TableManager.getMvPartitionStorage(table, partId);
        }
        catch (TableClosedException e) {
            return;
        }
        PartitionDataStorage partitionDataStorage = this.partitionDataStorage(new ZonePartitionKey(zonePartitionId.zoneId(), partId), tableId, mvPartitionStorage);
        PartitionUpdateHandlers partitionUpdateHandlers = this.createPartitionUpdateHandlers(partId, partitionDataStorage, table, safeTimeTracker, this.replicationConfiguration);
        internalTbl.updatePartitionTrackers(partId, safeTimeTracker);
        this.mvGc.addStorage(tablePartitionId, partitionUpdateHandlers.gcUpdateHandler);
        this.minTimeCollectorService.addPartition(new TablePartitionId(tableId, partId));
        Function<RaftCommandRunner, ReplicaTableProcessor> createListener = raftClient -> this.createReplicaListener(zonePartitionId, table, safeTimeTracker, mvPartitionStorage, partitionUpdateHandlers, (RaftCommandRunner)raftClient);
        PartitionListener tablePartitionRaftListener = new PartitionListener(this.txManager, partitionDataStorage, partitionUpdateHandlers.storageUpdateHandler, safeTimeTracker, this.catalogService, table.schemaView(), this.indexMetaStorage, this.topologyService.localMember().id(), this.minTimeCollectorService, this.partitionOperationsExecutor, this.executorInclinedPlacementDriver, this.clockService, zonePartitionId);
        PartitionMvStorageAccessImpl partitionStorageAccess = new PartitionMvStorageAccessImpl(partId, table.internalTable().storage(), this.mvGc, partitionUpdateHandlers.indexUpdateHandler, partitionUpdateHandlers.gcUpdateHandler, this.fullStateTransferIndexChooser, this.schemaManager.schemaRegistry(tableId), this.lowWatermark);
        this.partitionReplicaLifecycleManager.loadTableListenerToZoneReplica(zonePartitionId, tableId, createListener, tablePartitionRaftListener, partitionStorageAccess, onNodeRecovery);
    }

    private CompletableFuture<Boolean> onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
        if (!this.thisNodeHoldsLease(parameters.leaseholderId())) {
            return CompletableFutures.falseCompletedFuture();
        }
        this.secondaryReplicationManager.stopReplicationForPartition((ZonePartitionId)parameters.groupId());
        return CompletableFutures.falseCompletedFuture();
    }

    private CompletableFuture<Boolean> onBeforeZoneReplicaStopOrDestroy(LocalPartitionReplicaEventParameters parameters) {
        if (!this.nodeProperties.colocationEnabled()) {
            return CompletableFutures.falseCompletedFuture();
        }
        return this.secondaryReplicationManager.stopReplicationForPartition(parameters.zonePartitionId()).thenApply(v -> false);
    }

    private boolean thisNodeHoldsLease(@Nullable UUID leaseholderId) {
        return this.localNode().id().equals(leaseholderId);
    }

    private CompletableFuture<Boolean> onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) {
        if (!this.thisNodeHoldsLease(parameters.leaseholderId())) {
            return CompletableFutures.falseCompletedFuture();
        }
        this.secondaryReplicationManager.startReplicationForPartition((ZonePartitionId)parameters.groupId(), parameters.startTime().longValue());
        return CompletableFutures.falseCompletedFuture();
    }

    private CompletableFuture<Set<? extends TableViewInternal>> getTablesIfZoneIsSecondary(int zoneId) {
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock();
        try {
            return ((CompletableFuture)readLockAcquisitionFuture.thenApply(v -> this.zoneTablesRawSet(zoneId).stream().filter(table -> {
                Integer secondaryZoneId = table.internalTable().secondaryZoneId();
                return secondaryZoneId != null && zoneId == secondaryZoneId;
            }).collect(Collectors.toSet()))).whenComplete((v, e) -> readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead));
        }
        catch (Throwable e2) {
            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
            throw new CompletionException(e2);
        }
    }

    private void onTableDrop(DropTableEventParameters parameters) {
        IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            this.unregisterMetricsSource(this.startedTables.get(parameters.tableId()));
            this.destructionEventsQueue.enqueue(new DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));
        });
    }

    private CompletableFuture<Boolean> onTableAlter(CatalogEventParameters parameters) {
        if (parameters instanceof RenameTableEventParameters) {
            return this.onTableRename((RenameTableEventParameters)parameters).thenApply(unused -> false);
        }
        if (parameters instanceof AddSecondaryZoneEventParameters) {
            return this.onTableSecondaryZoneAdd((AddSecondaryZoneEventParameters)parameters).thenApply(unused -> false);
        }
        if (parameters instanceof DropSecondaryZoneEventParameters) {
            return this.onTableSecondaryZoneDrop((DropSecondaryZoneEventParameters)parameters).thenApply(unused -> false);
        }
        if (parameters instanceof AlterTablePropertiesEventParameters) {
            return this.onTablePropertiesChanged((AlterTablePropertiesEventParameters)parameters).thenApply(unused -> false);
        }
        return CompletableFutures.falseCompletedFuture();
    }

    private CompletableFuture<Boolean> onSecondaryStorageAvailable(SecondaryStorageAvailableEventParameters parameters) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            int tableId = parameters.tableId();
            LOG.info("Secondary storage is available for table [tableId={}]", tableId);
            TableViewInternal table = this.tables.get(parameters.tableId());
            this.secondaryReplicationManager.startReplicationForTable(table, parameters.catalogVersion());
            return CompletableFutures.falseCompletedFuture();
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Boolean> onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.falseCompletedFuture();
        }
        try {
            int newEarliestCatalogVersion = this.catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
            this.destructionEventsQueue.drainUpTo(newEarliestCatalogVersion).forEach(event -> this.destroyTableLocally(event.tableId()));
            CompletableFuture<Boolean> completableFuture = CompletableFutures.falseCompletedFuture();
            return completableFuture;
        }
        catch (Throwable t) {
            CompletableFuture<Boolean> completableFuture = CompletableFuture.failedFuture(t);
            return completableFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private CompletableFuture<?> onTablePropertiesChanged(AlterTablePropertiesEventParameters parameters) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesVv.update(parameters.causalityToken(), (ignore, e) -> {
            if (e != null) {
                return CompletableFuture.failedFuture(e);
            }
            TableViewInternal table = this.tables.get(parameters.tableId());
            table.updateStalenessConfiguration(parameters.staleRowsFraction(), parameters.minStaleRowsCount());
            return CompletableFutures.nullCompletedFuture();
        }));
    }

    private CompletableFuture<?> onTableRename(RenameTableEventParameters parameters) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesVv.update(parameters.causalityToken(), (ignore, e) -> {
            if (e != null) {
                return CompletableFuture.failedFuture(e);
            }
            TableViewInternal table = this.tables.get(parameters.tableId());
            ((TableImpl)table).name(parameters.newTableName());
            return CompletableFutures.nullCompletedFuture();
        }));
    }

    private CompletableFuture<?> onTableSecondaryZoneAdd(AddSecondaryZoneEventParameters parameters) {
        long causalityToken = parameters.causalityToken();
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesVv.update(causalityToken, (ignore, ex) -> {
            int catalogVersion = parameters.catalogVersion();
            int tableId = parameters.tableId();
            TableViewInternal table = this.tables.get(tableId);
            Catalog catalog = this.catalogService.catalog(catalogVersion);
            CatalogTableDescriptor tableDescriptor = TableManager.getTableDescriptor(tableId, catalog);
            CatalogZoneDescriptor zoneDescriptor = this.getSecondaryZoneDescriptor(tableDescriptor, catalogVersion);
            assert (zoneDescriptor != null);
            LOG.info("Adding the secondary zone to the table [tableId={}, tableName={}, zoneId={}, zoneName={}]", tableId, table.name(), zoneDescriptor.id(), zoneDescriptor.name());
            SecondaryTableStorage secondaryTableStorage = this.createSecondaryTableStorage(tableDescriptor, zoneDescriptor);
            if (secondaryTableStorage == null) {
                table.internalTable().setSecondaryZoneId(zoneDescriptor.id());
            } else {
                table.internalTable().setSecondaryStorage(secondaryTableStorage);
            }
            ((CompletableFuture)this.loadTableToSecondaryZoneIfNeeded(table, tableDescriptor, zoneDescriptor, causalityToken).thenComposeAsync(v -> this.secondaryReplicationManager.startFullStateTransferForTable(table, catalog), (Executor)this.ioExecutor)).whenComplete((result, error) -> {
                if (error != null && !ExceptionUtils.hasCause(error, NodeStoppingException.class)) {
                    String errorMessage = String.format("Failed to start full state transfer for table [tableId=%s, tableName=%s, zoneId=%s, zoneName=%s]", tableId, table.name(), zoneDescriptor.id(), zoneDescriptor.name());
                    this.failureProcessor.process(new FailureContext((Throwable)error, errorMessage));
                }
            });
            return CompletableFutures.nullCompletedFuture();
        }));
    }

    private CompletableFuture<?> onTableSecondaryZoneDrop(DropSecondaryZoneEventParameters parameters) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesVv.update(parameters.causalityToken(), (ignore, ex) -> {
            IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> this.dropSecondaryZoneFromTable(parameters));
            return CompletableFutures.nullCompletedFuture();
        }));
    }

    private void dropSecondaryZoneFromTable(DropSecondaryZoneEventParameters parameters) {
        int tableId = parameters.tableId();
        TableViewInternal table = this.tables.get(tableId);
        assert (table != null) : "Table is missing [tableId=" + tableId + "]";
        LOG.info("Dropping secondary storage profile for table [tableId={}, tableName={}].", tableId, table.name());
        this.secondaryReplicationManager.abortFullStateTransferForTable(table);
        this.secondaryReplicationManager.stopReplicationForTable(table).thenRun(() -> this.stopAndDestroySecondaryStorageResources(table));
    }

    private CompletableFuture<Void> stopAndDestroySecondaryStorageResources(TableViewInternal table) {
        int tableId = table.tableId();
        Integer secondaryZoneId = table.internalTable().secondaryZoneId();
        assert (secondaryZoneId != null) : "Secondary zone is null [tableId=" + tableId + "]";
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(secondaryZoneId, id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> zoneLockAcquiryFuture = zoneLock.writeLock();
        try {
            return ((CompletableFuture)((CompletableFuture)((CompletableFuture)zoneLockAcquiryFuture.thenCompose(stamp -> this.unloadTableResourcesFromSecondaryZoneReplica(table, secondaryZoneId))).thenRun(() -> this.tablesPerZone.getOrDefault(secondaryZoneId, Collections.emptySet()).remove(table))).thenRunAsync(() -> TableManager.destroyTableSecondaryStorage(table.internalTable()), this.ioExecutor)).whenComplete((v, ex) -> zoneLockAcquiryFuture.thenAccept(zoneLock::unlockWrite));
        }
        catch (Throwable t) {
            zoneLockAcquiryFuture.thenAccept(zoneLock::unlockWrite);
            throw t;
        }
    }

    private CompletableFuture<Void> unloadTableResourcesFromSecondaryZoneReplica(TableViewInternal table, int secondaryZoneId) {
        CompletableFuture<Long> readZoneLockAcquiryFuture = this.partitionReplicaLifecycleManager.lockZoneForRead(secondaryZoneId);
        try {
            return ((CompletableFuture)readZoneLockAcquiryFuture.thenCompose(stamp -> this.secondaryZoneManager.unloadTableResourcesFromSecondaryZoneReplicaHavingZoneReadLock(table))).whenComplete((v, ex) -> this.unlockZoneForRead(secondaryZoneId, readZoneLockAcquiryFuture));
        }
        catch (Throwable t) {
            this.unlockZoneForRead(secondaryZoneId, readZoneLockAcquiryFuture);
            throw t;
        }
    }

    private PartitionReplicaListener createReplicaListener(ZonePartitionId replicationGroupId, TableViewInternal table, PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker, MvPartitionStorage mvPartitionStorage, PartitionUpdateHandlers partitionUpdateHandlers, RaftCommandRunner raftClient) {
        int partitionIndex = replicationGroupId.partitionId();
        return new PartitionReplicaListener(mvPartitionStorage, new ExecutorInclinedRaftCommandRunner(raftClient, this.partitionOperationsExecutor), this.txManager, this.lockMgr, this.scanRequestExecutor, this.partitionOperationsExecutor, replicationGroupId, table.tableId(), table.indexesLockers(partitionIndex), new Lazy<TableSchemaAwareIndexStorage>(() -> table.indexStorageAdapters(partitionIndex).get().get(table.pkId())), () -> table.indexStorageAdapters(partitionIndex).get(), this.clockService, safeTimeTracker, this.transactionStateResolver, partitionUpdateHandlers.storageUpdateHandler, new CatalogValidationSchemasSource(this.catalogService, this.schemaManager), this.localNode(), this.executorInclinedSchemaSyncService, this.catalogService, this.executorInclinedPlacementDriver, this.topologyService, this.remotelyTriggeredResourceRegistry, this.schemaManager.schemaRegistry(table.tableId()), this.indexMetaStorage, this.lowWatermark, this.licenseFeatureChecker, this.failureProcessor, table.metrics(), this.policyManager);
    }

    private PartitionDataStorage partitionDataStorage(PartitionKey partitionKey, int tableId, MvPartitionStorage partitionStorage) {
        return new SnapshotAwarePartitionDataStorage(tableId, partitionStorage, this.outgoingSnapshotsManager, partitionKey);
    }

    @Override
    public void beforeNodeStop() {
        if (!this.beforeStopGuard.compareAndSet(false, true)) {
            return;
        }
        this.stopManagerFuture.completeExceptionally(new NodeStoppingException());
        this.busyLock.block();
        this.expiredRowsCleaner.stop();
        this.secondaryStorageRebalanceTrigger.stop();
        this.executorInclinedPlacementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.onPrimaryReplicaExpiredListener);
        this.executorInclinedPlacementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this.onPrimaryReplicaElectedListener);
        this.lowWatermark.removeListener(LowWatermarkEvent.LOW_WATERMARK_CHANGED, this.onLowWatermarkChangedListener);
        this.catalogService.removeListener(CatalogEvent.TABLE_CREATE, this.onTableCreateListener);
        this.catalogService.removeListener(CatalogEvent.TABLE_DROP, this.onTableDropListener);
        this.catalogService.removeListener(CatalogEvent.TABLE_ALTER, this.onTableAlterListener);
        this.catalogService.removeListener(CatalogEvent.SECONDARY_STORAGE_AVAILABLE, this.onSecondaryStorageAvailableListener);
        this.secondaryReplicationManager.stop();
    }

    @Override
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        assert (this.beforeStopGuard.get()) : "'stop' called before 'beforeNodeStop'";
        if (!this.stopGuard.compareAndSet(false, true)) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.partitionReplicaLifecycleManager.removeListener(LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED, this.onZoneReplicaDestroyedListener);
        this.partitionReplicaLifecycleManager.removeListener(LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, this.onZoneReplicaStoppedListener);
        this.partitionReplicaLifecycleManager.removeListener(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, this.onBeforeZoneReplicaStartedListener);
        this.partitionReplicaLifecycleManager.removeListener(LocalPartitionReplicaEvent.BEFORE_REPLICA_STOPPED, this.onBeforeZoneReplicaStopOrDestroyListener);
        this.partitionReplicaLifecycleManager.removeListener(LocalPartitionReplicaEvent.BEFORE_REPLICA_DESTROYED, this.onBeforeZoneReplicaStopOrDestroyListener);
        int shutdownTimeoutSeconds = 10;
        try {
            ManuallyCloseable[] manuallyCloseableArray = new ManuallyCloseable[8];
            manuallyCloseableArray[0] = () -> IgniteUtils.closeAllManually(this.tables.values().stream().map(table -> () -> TableManager.closeTable(table)));
            manuallyCloseableArray[1] = this.mvGc;
            manuallyCloseableArray[2] = this.fullStateTransferIndexChooser;
            manuallyCloseableArray[3] = () -> IgniteUtils.shutdownAndAwaitTermination(this.scanRequestExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS);
            manuallyCloseableArray[4] = () -> IgniteUtils.shutdownAndAwaitTermination(this.incomingSnapshotsExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS);
            manuallyCloseableArray[5] = () -> {
                ScheduledExecutorService streamerFlushExecutor;
                TableManager tableManager = this;
                synchronized (tableManager) {
                    streamerFlushExecutor = this.streamerFlushExecutor;
                }
                IgniteUtils.shutdownAndAwaitTermination(streamerFlushExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS);
            };
            manuallyCloseableArray[6] = this.secondaryStorageBridge;
            manuallyCloseableArray[7] = this.policyManager::stop;
            IgniteUtils.closeAllManually(manuallyCloseableArray);
        }
        catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    private CompletableFuture<Void> stopTablePartitions(TableViewInternal table) {
        return CompletableFuture.supplyAsync(() -> {
            InternalTable internalTable = table.internalTable();
            CompletableFuture[] stopReplicaFutures = new CompletableFuture[internalTable.partitions()];
            for (int p = 0; p < internalTable.partitions(); ++p) {
                TablePartitionId replicationGroupId = new TablePartitionId(table.tableId(), p);
                stopReplicaFutures[p] = this.stopTablePartition(replicationGroupId, table);
            }
            return CompletableFuture.allOf(stopReplicaFutures);
        }, this.ioExecutor).thenCompose(Function.identity());
    }

    private static void closeTable(TableViewInternal table) throws Exception {
        InternalTable internalTable = table.internalTable();
        IgniteUtils.closeAllManually(internalTable.storage(), internalTable.secondaryStorage(), internalTable);
    }

    private TableImpl createTableImpl(long causalityToken, CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor, @Nullable CatalogZoneDescriptor secondaryZoneDescriptor, CatalogSchemaDescriptor schemaDescriptor) {
        QualifiedName tableName = QualifiedNameHelper.fromNormalized(schemaDescriptor.name(), tableDescriptor.name());
        LOG.trace("Creating local table: name={}, id={}, token={}", tableName.toCanonicalForm(), tableDescriptor.id(), causalityToken);
        MvTableStorage tableStorage = this.createTableStorage(tableDescriptor, zoneDescriptor);
        SecondaryTableStorage secondaryStorage = this.createSecondaryTableStorage(tableDescriptor, secondaryZoneDescriptor);
        int partitions = zoneDescriptor.partitions();
        InternalTableImpl internalTable = new InternalTableImpl(tableName, zoneDescriptor.id(), tableDescriptor.id(), tableDescriptor.secondaryZoneId(), partitions, this.topologyService, this.txManager, tableStorage, this.replicaSvc, this.clockService, this.observableTimestampTracker, this.executorInclinedPlacementDriver, this.transactionInflights, this::streamerFlushExecutor, Objects.requireNonNull(this.streamerReceiverRunner), secondaryStorage, this.schemaManager.schemaRegistry(tableDescriptor.id()), this.continuousQueryResponseHandler, tableDescriptor.cache(), this.licenseFeatureChecker, () -> ((TransactionView)this.txCfg.value()).readWriteTimeoutMillis(), () -> ((TransactionView)this.txCfg.value()).readOnlyTimeoutMillis(), this.createAndRegisterMetricsSource(tableStorage.getTableDescriptor(), tableName), this.cmgManager);
        CatalogTableProperties descProps = tableDescriptor.properties();
        return new TableImpl(internalTable, this.lockMgr, this.schemaVersions, this.marshallers, this.sql.get(), this.failureProcessor, tableDescriptor.primaryKeyIndexId(), new TableStatsStalenessConfiguration(descProps.staleRowsFraction(), descProps.minStaleRowsCount()));
    }

    @Nullable
    private SecondaryTableStorage createSecondaryTableStorage(CatalogTableDescriptor tableDescriptor, @Nullable CatalogZoneDescriptor secondaryZoneDescriptor) {
        if (secondaryZoneDescriptor == null) {
            return null;
        }
        SecondaryTableStorage secondaryStorage = null;
        try {
            SecondaryStorageEngine secondaryStorageEngine = this.dataStorageMgr.secondaryEngineByStorageProfile(tableDescriptor.secondaryStorageProfile());
            if (secondaryStorageEngine != null) {
                SecondaryStorageTableDescriptor secondaryTableDescriptor = TableManager.toSecondaryStorageTableDescriptor(secondaryZoneDescriptor, tableDescriptor);
                secondaryStorage = secondaryStorageEngine.createTable(secondaryTableDescriptor);
                secondaryStorage.start();
            } else {
                LOG.info("Secondary storage engine is not available on the node [tableId={}, secondaryStorageProfile={}]", tableDescriptor.id(), tableDescriptor.secondaryStorageProfile());
            }
        }
        catch (MissingRequiredFeaturesException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(e.getMessage(), (Throwable)e);
            }
            LOG.warn(e.getMessage(), new Object[0]);
        }
        return secondaryStorage;
    }

    private CompletableFuture<Void> loadTableToSecondaryZoneIfNeeded(TableViewInternal table, CatalogTableDescriptor tableDescriptor, @Nullable CatalogZoneDescriptor secondaryZoneDescriptor, long causalityToken) {
        return this.loadTableToSecondaryZoneOnTableCreateIfNeeded(table, tableDescriptor, secondaryZoneDescriptor, causalityToken, false);
    }

    private CompletableFuture<Void> loadTableToSecondaryZoneOnTableCreateIfNeeded(TableViewInternal table, CatalogTableDescriptor tableDescriptor, @Nullable CatalogZoneDescriptor secondaryZoneDescriptor, long causalityToken, boolean onNodeRecovery) {
        if (secondaryZoneDescriptor == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        CompletableFuture<Long> lockAcquiryFuture = this.partitionReplicaLifecycleManager.lockZoneForRead(secondaryZoneDescriptor.id());
        return ((CompletableFuture)lockAcquiryFuture.thenRunAsync(() -> {
            this.addTableToZone(secondaryZoneDescriptor.id(), table);
            if (onNodeRecovery) {
                return;
            }
            this.secondaryZoneManager.loadTableToSecondaryZoneOnTableCreateHavingZoneReadLock(causalityToken, table, secondaryZoneDescriptor, tableDescriptor);
        }, this.ioExecutor)).whenComplete((v, ex) -> this.unlockZoneForRead(secondaryZoneDescriptor, lockAcquiryFuture));
    }

    protected MvTableStorage createTableStorage(CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
        StorageEngine engine = this.dataStorageMgr.engineByStorageProfile(tableDescriptor.storageProfile());
        if (engine == null) {
            engine = new NullStorageEngine();
        }
        return engine.createMvTable(new StorageTableDescriptor(tableDescriptor.id(), zoneDescriptor.partitions(), tableDescriptor.storageProfile(), this.encryptionManager.encryptionEnabled()), new CatalogStorageIndexDescriptorSupplier(this.catalogService, this.lowWatermark));
    }

    private static SecondaryStorageBridge createSecondaryStoreBridge(String nodeName, Path path, LogSyncer logSyncer, FailureProcessor failureProcessor, ExecutorService ioExecutor, ScheduledExecutorService scheduledExecutor) {
        SecondaryStorageBridge secondaryStorageBridge = new RocksDbSecondaryStorageBridge(nodeName, path, logSyncer, failureProcessor, ioExecutor, scheduledExecutor);
        if (ThreadAssertions.enabled()) {
            secondaryStorageBridge = new ThreadAssertingSecondaryStorageBridge(secondaryStorageBridge);
        }
        return secondaryStorageBridge;
    }

    private CompletableFuture<Void> destroyTableLocally(int tableId) {
        CompletionStage<Void> stopPartitionsFuture;
        TableViewInternal table = this.startedTables.remove(tableId);
        this.localPartsByTableId.remove(tableId);
        assert (table != null) : tableId;
        InternalTable internalTable = table.internalTable();
        if (!this.nodeProperties.colocationEnabled()) {
            Set<ByteArray> assignmentKeys = IntStream.range(0, internalTable.partitions()).mapToObj(p -> RebalanceUtil.stablePartAssignmentsKey(new TablePartitionId(tableId, p))).collect(Collectors.toSet());
            this.metaStorageMgr.removeAll(assignmentKeys).whenComplete((v, e) -> {
                if (e != null) {
                    LOG.error("Failed to remove assignments from metastorage [tableId={}]", (Throwable)e, (Object)tableId);
                }
            });
        }
        if (internalTable.hasSecondaryStorage()) {
            this.secondaryReplicationManager.abortFullStateTransferForTable(table);
            stopPartitionsFuture = this.secondaryReplicationManager.stopReplicationForTable(table).thenCompose(v -> CompletableFuture.allOf(this.stopAndDestroySecondaryStorageResources(table), this.stopAndDestroyTablePartitions(table)));
        } else {
            stopPartitionsFuture = this.stopAndDestroyTablePartitions(table);
        }
        return ((CompletableFuture)((CompletableFuture)stopPartitionsFuture.thenComposeAsync(unused -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> internalTable.storage().destroy()), (Executor)this.ioExecutor)).thenAccept(unused -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            this.tables.remove(tableId);
            this.schemaManager.dropRegistry(tableId);
        }))).whenComplete((v, e) -> {
            if (e != null && !ExceptionUtils.hasCause(e, NodeStoppingException.class)) {
                LOG.error("Unable to destroy table [name={}, tableId={}]", (Throwable)e, (Object)table.name(), (Object)tableId);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void destroyTableSecondaryStorage(InternalTable internalTable) {
        InternalTable internalTable2 = internalTable;
        synchronized (internalTable2) {
            SecondaryTableStorage secondaryStorage = internalTable.secondaryStorage();
            internalTable.removeSecondaryStorage();
            if (secondaryStorage != null) {
                secondaryStorage.destroy();
            }
        }
    }

    @Override
    public List<Table> tables() {
        return TableManager.sync(this.tablesAsync());
    }

    @Override
    public CompletableFuture<List<Table>> tablesAsync() {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesAsyncInternalBusy(false).thenApply(list0 -> list0));
    }

    @Override
    public List<Cache> caches() {
        return TableManager.sync(this.cachesAsync());
    }

    @Override
    public CompletableFuture<List<Cache>> cachesAsync() {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesAsyncInternalBusy(true).thenApply(list0 -> list0));
    }

    @Override
    @Nullable
    public Cache cache(QualifiedName name) {
        return TableManager.sync(this.cacheAsync(name));
    }

    @Override
    public CompletableFuture<Cache> cacheAsync(QualifiedName name) {
        return this.tableAsyncInternal(name).thenApply(desc -> desc == null ? null : (desc.cache() ? desc : null));
    }

    private CompletableFuture<List<TableViewInternal>> tablesAsyncInternalBusy(boolean cache) {
        HybridTimestamp now = this.clockService.now();
        return this.orStopManagerFuture(this.executorInclinedSchemaSyncService.waitForMetadataCompleteness(now)).thenCompose(unused -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            Catalog catalog = this.catalogService.activeCatalog(now.longValue());
            Collection<CatalogTableDescriptor> tableDescriptors = catalog.tables();
            if (tableDescriptors.isEmpty()) {
                return CompletableFutures.emptyListCompletedFuture();
            }
            CompletableFuture[] tableImplFutures = (CompletableFuture[])tableDescriptors.stream().filter(tableDescriptor -> tableDescriptor.cache() == cache).map(tableDescriptor -> this.tableAsyncInternalBusy(tableDescriptor.id())).toArray(CompletableFuture[]::new);
            return CompletableFutures.allOfToList(tableImplFutures);
        }));
    }

    private CompletableFuture<Map<Integer, TableViewInternal>> tablesById(long causalityToken) {
        return this.assignmentsUpdatedVv.get(causalityToken).thenApply(v -> Collections.unmodifiableMap(this.startedTables));
    }

    private Map<Integer, TableViewInternal> tablesById() {
        return Collections.unmodifiableMap(this.tables);
    }

    @TestOnly
    public Map<Integer, TableViewInternal> startedTables() {
        return Collections.unmodifiableMap(this.startedTables);
    }

    @Override
    public Table table(QualifiedName name) {
        return TableManager.sync(this.tableAsync(name));
    }

    @Override
    public TableViewInternal table(int id) throws NodeStoppingException {
        return TableManager.sync(this.tableAsync(id));
    }

    @Override
    public CompletableFuture<Table> tableAsync(QualifiedName name) {
        return this.tableAsyncInternal(name).thenApply(table -> table == null || table.cache() ? null : table);
    }

    public CompletableFuture<TableViewInternal> tableAsync(long causalityToken, int id) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.tablesById(causalityToken).thenApply(tablesById -> (TableViewInternal)tablesById.get(id)));
    }

    @Override
    public CompletableFuture<TableViewInternal> tableAsync(int tableId) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            HybridTimestamp now = this.clockService.now();
            return this.orStopManagerFuture(this.executorInclinedSchemaSyncService.waitForMetadataCompleteness(now)).thenCompose(unused -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
                Catalog catalog = this.catalogService.activeCatalog(now.longValue());
                if (catalog.table(tableId) == null) {
                    return CompletableFutures.nullCompletedFuture();
                }
                return this.tableAsyncInternalBusy(tableId);
            }));
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<PartitionSet> localPartitionSetAsync(long causalityToken, int tableId) {
        if (!this.busyLock.enterBusy()) {
            throw new IgniteException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)new NodeStoppingException());
        }
        try {
            CompletionStage completionStage = this.localPartitionsVv.get(causalityToken).thenApply(unused -> this.localPartsByTableId.getOrDefault(tableId, PartitionSet.EMPTY_SET));
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    @Override
    public TableViewInternal tableView(QualifiedName name) {
        return TableManager.sync(this.tableViewAsync(name));
    }

    @Override
    public CompletableFuture<TableViewInternal> tableViewAsync(QualifiedName name) {
        return this.tableAsyncInternal(name);
    }

    private CompletableFuture<TableViewInternal> tableAsyncInternal(QualifiedName name) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            HybridTimestamp now = this.clockService.now();
            return this.orStopManagerFuture(this.executorInclinedSchemaSyncService.waitForMetadataCompleteness(now)).thenCompose(unused -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
                Catalog catalog = this.catalogService.activeCatalog(now.longValue());
                CatalogTableDescriptor tableDescriptor = catalog.table(name.schemaName(), name.objectName());
                if (tableDescriptor == null) {
                    return CompletableFutures.nullCompletedFuture();
                }
                return this.tableAsyncInternalBusy(tableDescriptor.id());
            }));
        });
    }

    private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int tableId) {
        TableViewInternal tableImpl = this.startedTables.get(tableId);
        if (tableImpl != null) {
            return CompletableFuture.completedFuture(tableImpl);
        }
        CompletableFuture getLatestTableFuture = new CompletableFuture();
        CompletionListener<Void> tablesListener = (token, v, th) -> {
            if (th == null) {
                CompletableFuture<Void> tablesFuture = this.tablesVv.get(token);
                tablesFuture.whenComplete((tables, e) -> {
                    if (e != null) {
                        getLatestTableFuture.completeExceptionally((Throwable)e);
                    } else {
                        getLatestTableFuture.complete(this.startedTables.get(tableId));
                    }
                });
            } else {
                getLatestTableFuture.completeExceptionally(th);
            }
            return CompletableFutures.nullCompletedFuture();
        };
        this.assignmentsUpdatedVv.whenComplete(tablesListener);
        tableImpl = this.startedTables.get(tableId);
        if (tableImpl != null) {
            this.assignmentsUpdatedVv.removeWhenComplete(tablesListener);
            return CompletableFuture.completedFuture(tableImpl);
        }
        return this.orStopManagerFuture(getLatestTableFuture).whenComplete((unused, throwable) -> this.assignmentsUpdatedVv.removeWhenComplete(tablesListener));
    }

    private static <T> T sync(CompletableFuture<T> future) {
        return IgniteUtils.getInterruptibly(future);
    }

    private static PartitionSet extendPartitionSet(@Nullable PartitionSet oldPartitionSet, int partitionId) {
        PartitionSet newPartitionSet = Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new);
        newPartitionSet.set(partitionId);
        return newPartitionSet;
    }

    private static MvPartitionStorage getMvPartitionStorage(TableViewInternal table, int partitionId) {
        MvPartitionStorage mvPartition;
        InternalTable internalTable = table.internalTable();
        try {
            mvPartition = internalTable.storage().getMvPartition(partitionId);
        }
        catch (StorageClosedException e) {
            throw new TableClosedException(table.tableId(), (Throwable)e);
        }
        assert (mvPartition != null) : "tableId=" + table.tableId() + ", partitionId=" + partitionId;
        return mvPartition;
    }

    private CompletableFuture<Void> createPartitionStoragesIfAbsent(TableViewInternal table, PartitionSet partitions) {
        InternalTable internalTable = table.internalTable();
        List storageFuts = partitions.stream().mapToObj(partitionId -> {
            MvPartitionStorage mvPartition;
            try {
                mvPartition = internalTable.storage().getMvPartition(partitionId);
            }
            catch (StorageClosedException e) {
                return CompletableFuture.failedFuture(new TableClosedException(table.tableId(), (Throwable)e));
            }
            return mvPartition != null ? CompletableFuture.completedFuture(mvPartition) : internalTable.storage().createMvPartition(partitionId);
        }).collect(Collectors.toList());
        return CompletableFutures.allOf(storageFuts);
    }

    private CompletableFuture<Void> stopAndDestroyTablePartitions(TableViewInternal table) {
        InternalTable internalTable = table.internalTable();
        int partitions = internalTable.partitions();
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(internalTable.zoneId(), id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> writeLockAcquisitionFuture = zoneLock.writeLock();
        try {
            return ((CompletableFuture)writeLockAcquisitionFuture.thenCompose(stamp -> {
                CompletableFuture[] stopReplicaAndDestroyFutures = new CompletableFuture[partitions];
                for (int partitionId = 0; partitionId < partitions; ++partitionId) {
                    CompletableFuture<Object> resourcesUnloadFuture = this.nodeProperties.colocationEnabled() ? this.partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(new ZonePartitionId(internalTable.zoneId(), partitionId), internalTable.tableId()) : CompletableFutures.nullCompletedFuture();
                    TablePartitionId tablePartitionId = new TablePartitionId(internalTable.tableId(), partitionId);
                    stopReplicaAndDestroyFutures[partitionId] = resourcesUnloadFuture.thenCompose(v -> this.stopAndDestroyTablePartition(tablePartitionId, table));
                }
                return CompletableFuture.allOf(stopReplicaAndDestroyFutures).whenComplete((res, th) -> this.tablesPerZone.getOrDefault(internalTable.zoneId(), Collections.emptySet()).remove(table));
            })).whenComplete((unused, t) -> writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite));
        }
        catch (Throwable t2) {
            writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite);
            throw t2;
        }
    }

    private CompletableFuture<Void> stopAndDestroyTablePartition(TablePartitionId tablePartitionId, long causalityToken) {
        CompletableFuture<Object> tokenFuture;
        try {
            tokenFuture = this.tablesVv.get(causalityToken);
        }
        catch (OutdatedTokenException e) {
            tokenFuture = CompletableFutures.nullCompletedFuture();
        }
        return tokenFuture.thenCompose(ignore -> {
            TableViewInternal table = this.tables.get(tablePartitionId.tableId());
            assert (table != null) : tablePartitionId;
            return this.stopAndDestroyTablePartition(tablePartitionId, table);
        });
    }

    private CompletableFuture<Void> stopAndDestroyTablePartition(TablePartitionId tablePartitionId, TableViewInternal table) {
        return this.stopTablePartition(tablePartitionId, table).thenComposeAsync(v -> this.destroyPartitionStorages(tablePartitionId, table), (Executor)this.ioExecutor);
    }

    private CompletableFuture<Void> stopTablePartition(TablePartitionId tablePartitionId, TableViewInternal table) {
        CompletableFuture<Boolean> stopReplicaFuture;
        try {
            stopReplicaFuture = this.nodeProperties.colocationEnabled() ? CompletableFutures.trueCompletedFuture() : this.replicaMgr.stopReplica(tablePartitionId);
        }
        catch (NodeStoppingException e) {
            stopReplicaFuture = CompletableFutures.falseCompletedFuture();
        }
        return stopReplicaFuture.thenCompose(v -> {
            TableManager.closePartitionTrackers(table.internalTable(), tablePartitionId.partitionId());
            this.minTimeCollectorService.removePartition(tablePartitionId);
            PartitionModificationCounterMetricSource metricSource = this.partModCounterMetricSources.remove(tablePartitionId);
            if (metricSource != null) {
                try {
                    this.metricManager.unregisterSource(metricSource);
                }
                catch (Exception e) {
                    String message = "Failed to register metrics source for table [name={}, partitionId={}].";
                    LOG.warn(message, e, table.name(), tablePartitionId.partitionId());
                }
            }
            return this.mvGc.removeStorage(tablePartitionId);
        });
    }

    private CompletableFuture<Void> destroyPartitionStorages(TablePartitionId tablePartitionId, TableViewInternal table) {
        InternalTable internalTable = table.internalTable();
        int partitionId = tablePartitionId.partitionId();
        ArrayList<CompletableFuture<Void>> destroyFutures = new ArrayList<CompletableFuture<Void>>();
        try {
            if (internalTable.storage().getMvPartition(partitionId) != null) {
                destroyFutures.add(internalTable.storage().destroyPartition(partitionId));
            }
        }
        catch (StorageDestroyedException storageDestroyedException) {
        }
        catch (StorageClosedException storageClosedException) {
            // empty catch block
        }
        return CompletableFuture.allOf(destroyFutures.toArray(new CompletableFuture[0]));
    }

    @TestOnly
    public ReplicaManager getReplicaManager() {
        return this.replicaMgr;
    }

    private static void closePartitionTrackers(InternalTable internalTable, int partitionId) {
        TableManager.closeTracker(internalTable.getPartitionSafeTimeTracker(partitionId));
        TableManager.closeTracker(internalTable.getPartitionStorageIndexTracker(partitionId));
    }

    private static void closeTracker(@Nullable PendingComparableValuesTracker<?, Void> tracker) {
        if (tracker != null) {
            tracker.close();
        }
    }

    private InternalClusterNode localNode() {
        return this.topologyService.localMember();
    }

    private PartitionUpdateHandlers createPartitionUpdateHandlers(int partitionId, PartitionDataStorage partitionDataStorage, TableViewInternal table, PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker, ReplicationConfiguration replicationConfiguration) {
        TableIndexStoragesSupplier indexes = table.indexStorageAdapters(partitionId);
        IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(indexes);
        GcUpdateHandler gcUpdateHandler = new GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler);
        PartitionModificationCounterFactory.SizeSupplier partSizeSupplier = () -> partitionDataStorage.getStorage().estimatedSize();
        PartitionModificationCounter modificationCounter = this.partitionModificationCounterFactory.create(partSizeSupplier, table::stalenessConfiguration, table.tableId(), partitionId);
        this.registerPartitionModificationCounterMetrics(table, partitionId, modificationCounter);
        StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(partitionId, partitionDataStorage, indexUpdateHandler, replicationConfiguration, modificationCounter);
        storageUpdateHandler.start();
        return new PartitionUpdateHandlers(storageUpdateHandler, indexUpdateHandler, gcUpdateHandler);
    }

    private void registerPartitionModificationCounterMetrics(TableViewInternal table, int partitionId, PartitionModificationCounter counter) {
        PartitionModificationCounterMetricSource metricSource = new PartitionModificationCounterMetricSource(table.tableId(), partitionId);
        metricSource.addMetric(new LongGauge("modificationCount", "The value of the volatile counter of partition modifications. This value is used to determine staleness of the related SQL statistics.", counter::value));
        metricSource.addMetric(new LongGauge("nextMilestone", "The value of the next milestone for the number of partition modifications. This value is used to determine staleness of the related SQL statistics.", counter::nextMilestone));
        metricSource.addMetric(new LongGauge("lastMilestoneTimestamp", "The timestamp value representing the commit time of the last modification operation that reached the milestone. This value is used to determine staleness of the related SQL statistics.", () -> counter.lastMilestoneTimestamp().longValue()));
        try {
            this.metricManager.registerSource(metricSource);
            this.metricManager.enable(metricSource);
            this.partModCounterMetricSources.put(new TablePartitionId(table.tableId(), partitionId), metricSource);
        }
        catch (Exception e) {
            LOG.warn("Failed to register metrics source for table [name={}, partitionId={}].", e, table.name(), partitionId);
        }
    }

    @Override
    @Nullable
    public TableViewInternal cachedTable(int tableId) {
        return this.tables.get(tableId);
    }

    @TestOnly
    @Nullable
    public TableViewInternal cachedTable(String name) {
        return TableManager.findTableImplByName(this.tables.values(), name);
    }

    public List<TableViewInternal> cachedTables() {
        return this.tables.values().stream().collect(Collectors.toUnmodifiableList());
    }

    private static CatalogTableDescriptor getTableDescriptor(int tableId, Catalog catalog) {
        CatalogTableDescriptor tableDescriptor = catalog.table(tableId);
        assert (tableDescriptor != null) : "tableId=" + tableId + ", catalogVersion=" + catalog.version();
        return tableDescriptor;
    }

    private CatalogZoneDescriptor getZoneDescriptor(CatalogTableDescriptor tableDescriptor, int catalogVersion) {
        return TableManager.getZoneDescriptor(tableDescriptor, this.catalogService.catalog(catalogVersion));
    }

    private static CatalogZoneDescriptor getZoneDescriptor(CatalogTableDescriptor tableDescriptor, Catalog catalog) {
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(tableDescriptor.zoneId());
        assert (zoneDescriptor != null) : "tableId=" + tableDescriptor.id() + ", zoneId=" + tableDescriptor.zoneId() + ", catalogVersion=" + catalog.version();
        return zoneDescriptor;
    }

    @Nullable
    private CatalogZoneDescriptor getSecondaryZoneDescriptor(CatalogTableDescriptor tableDescriptor, int catalogVersion) {
        Integer secondaryZoneId = tableDescriptor.secondaryZoneId();
        if (secondaryZoneId == null) {
            return null;
        }
        return this.getSecondaryZoneDescriptor(secondaryZoneId, tableDescriptor.id(), catalogVersion);
    }

    private CatalogZoneDescriptor getSecondaryZoneDescriptor(int secondaryZoneId, int tableId, int catalogVersion) {
        CatalogZoneDescriptor zoneDescriptor = this.catalogService.catalog(catalogVersion).zone(secondaryZoneId);
        assert (zoneDescriptor != null) : "tableId=" + tableId + ", zoneId=" + secondaryZoneId + ", catalogVersion=" + catalogVersion;
        return zoneDescriptor;
    }

    private CatalogSchemaDescriptor getSchemaDescriptor(CatalogTableDescriptor tableDescriptor, int catalogVersion) {
        CatalogSchemaDescriptor schemaDescriptor = this.catalogService.catalog(catalogVersion).schema(tableDescriptor.schemaId());
        assert (schemaDescriptor != null) : "tableId=" + tableDescriptor.id() + ", schemaId=" + tableDescriptor.schemaId() + ", catalogVersion=" + catalogVersion;
        return schemaDescriptor;
    }

    @Nullable
    private static TableViewInternal findTableImplByName(Collection<TableViewInternal> tables, String name) {
        return tables.stream().filter(table -> table.qualifiedName().equals(QualifiedName.fromSimple(name))).findAny().orElse(null);
    }

    private CompletableFuture<Void> recoverTables(long recoveryRevision, @Nullable HybridTimestamp lwm) {
        int earliestCatalogVersion = lwm == null ? this.catalogService.earliestCatalogVersion() : this.catalogService.activeCatalogVersion(lwm.longValue());
        int latestCatalogVersion = this.catalogService.latestCatalogVersion();
        IntOpenHashSet startedTables = new IntOpenHashSet();
        ArrayList<CompletableFuture<Void>> startTableFutures = new ArrayList<CompletableFuture<Void>>();
        Catalog nextCatalog = null;
        for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion; --ver) {
            Catalog catalog = this.catalogService.catalog(ver);
            for (CatalogTableDescriptor tableDescriptor : catalog.tables()) {
                boolean destroyTableEventFired;
                int tableId = tableDescriptor.id();
                boolean bl = destroyTableEventFired = nextCatalog != null && nextCatalog.table(tableId) == null;
                if (destroyTableEventFired) {
                    this.destructionEventsQueue.enqueue(new DestroyTableEvent(nextCatalog.version(), tableId));
                }
                if (!startedTables.add(tableId)) continue;
                CatalogZoneDescriptor zoneDescriptor = this.getZoneDescriptor(tableDescriptor, ver);
                CatalogZoneDescriptor secondaryZoneDescriptor = this.getSecondaryZoneDescriptor(tableDescriptor, ver);
                CatalogSchemaDescriptor schemaDescriptor = this.getSchemaDescriptor(tableDescriptor, ver);
                CompletableFuture<Void> startTableFuture = this.prepareTableResourcesOnRecovery(recoveryRevision, zoneDescriptor, secondaryZoneDescriptor, tableDescriptor, schemaDescriptor);
                if (destroyTableEventFired) {
                    this.unregisterMetricsSource(this.tables.get(tableId));
                }
                startTableFutures.add(startTableFuture);
            }
            nextCatalog = catalog;
        }
        return ((CompletableFuture)CompletableFuture.allOf((CompletableFuture[])startTableFutures.toArray(CompletableFuture[]::new)).whenComplete((BiConsumer)CompletableFutures.copyStateTo(this.readyToProcessReplicaStarts))).whenComplete((unused, throwable) -> {
            if (throwable != null) {
                this.failureProcessor.process(new FailureContext((Throwable)throwable, "Error starting tables"));
            } else {
                LOG.info("Tables started successfully [count={}]", startTableFutures.size());
            }
        });
    }

    private <T> CompletableFuture<T> orStopManagerFuture(CompletableFuture<T> future) {
        if (future.isDone()) {
            return future;
        }
        return CompletableFuture.anyOf(future, this.stopManagerFuture).thenApply(o -> o);
    }

    private static SecondaryStorageTableDescriptor toSecondaryStorageTableDescriptor(CatalogZoneDescriptor secondaryZoneDescriptor, CatalogTableDescriptor tableDescriptor) {
        return new SecondaryStorageTableDescriptor(tableDescriptor.id(), tableDescriptor.name(), secondaryZoneDescriptor.id(), secondaryZoneDescriptor.partitions(), CatalogToSchemaDescriptorConverter.convert(tableDescriptor, tableDescriptor.latestSchemaVersion()));
    }

    private void cleanUpResourcesForDroppedTablesOnRecoveryBusy() {
        Set<Integer> aliveTableIds = TableUtils.aliveTables(this.catalogService, this.lowWatermark.getLowWatermark());
        this.destroyMvStoragesForTablesNotIn(aliveTableIds);
        if (!this.nodeProperties.colocationEnabled()) {
            this.destroyTxStateStoragesForTablesNotIn(aliveTableIds);
            this.destroyReplicationProtocolStoragesForTablesNotIn(aliveTableIds);
        }
    }

    private void destroyMvStoragesForTablesNotIn(Set<Integer> aliveTableIds) {
        for (StorageEngine storageEngine : this.dataStorageMgr.allStorageEngines()) {
            Set<Integer> tableIdsOnDisk = storageEngine.tableIdsOnDisk();
            for (int tableId : CollectionUtils.difference(tableIdsOnDisk, aliveTableIds)) {
                storageEngine.destroyMvTable(tableId);
                LOG.info("Destroyed table MV storage for table {} in storage engine '{}'", tableId, storageEngine.name());
            }
        }
    }

    private void destroyTxStateStoragesForTablesNotIn(Set<Integer> aliveTableIds) {
        Set<Integer> tableIdsOnDisk = this.sharedTxStateStorage.zoneIdsOnDisk();
        for (int tableId : CollectionUtils.difference(tableIdsOnDisk, aliveTableIds)) {
            this.sharedTxStateStorage.destroyStorage(tableId);
            LOG.info("Destroyed table TX state storage for table {}", tableId);
        }
    }

    private void destroyReplicationProtocolStoragesForTablesNotIn(Set<Integer> aliveTableIds) {
        Set<TablePartitionId> partitionIdsOnDisk;
        try {
            partitionIdsOnDisk = this.replicaMgr.replicationProtocolTablePartitionIdsOnDisk();
        }
        catch (NodeStoppingException e) {
            return;
        }
        Map<Integer, List<TablePartitionId>> partitionIdsByTableId = partitionIdsOnDisk.stream().collect(Collectors.groupingBy(TablePartitionId::tableId));
        for (Map.Entry<Integer, List<TablePartitionId>> entry : partitionIdsByTableId.entrySet()) {
            int tableId = entry.getKey();
            List<TablePartitionId> partitionIds = entry.getValue();
            if (aliveTableIds.contains(tableId)) continue;
            this.destroyReplicationProtocolStoragesOnRecovery(tableId, partitionIds);
        }
    }

    private void destroyReplicationProtocolStoragesOnRecovery(int tableId, List<TablePartitionId> partitionIds) {
        for (TablePartitionId partitionId : partitionIds) {
            try {
                this.replicaMgr.destroyReplicationProtocolStoragesOnStartup(partitionId);
            }
            catch (NodeStoppingException e) {
                break;
            }
        }
        List partitionIndexes = partitionIds.stream().map(TablePartitionId::partitionId).collect(Collectors.toList());
        LOG.info("Destroyed replication protocol storages for table {} and partitions {}", tableId, partitionIndexes);
    }

    private synchronized ScheduledExecutorService streamerFlushExecutor() {
        if (!this.busyLock.enterBusy()) {
            throw new IgniteException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)new NodeStoppingException());
        }
        try {
            if (this.streamerFlushExecutor == null) {
                this.streamerFlushExecutor = Executors.newSingleThreadScheduledExecutor(IgniteThreadFactory.create(this.nodeName, "streamer-flush-executor", LOG, ThreadOperation.STORAGE_WRITE));
            }
            ScheduledExecutorService scheduledExecutorService = this.streamerFlushExecutor;
            return scheduledExecutorService;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    @Override
    public void setStreamerReceiverRunner(StreamerReceiverRunner runner) {
        this.streamerReceiverRunner = runner;
    }

    public Set<TableViewInternal> zoneTables(int zoneId) throws IgniteInternalException {
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock();
        try {
            return (Set)((CompletableFuture)readLockAcquisitionFuture.thenApply(stamp -> {
                Set<TableViewInternal> res = Set.copyOf(this.zoneTablesRawSet(zoneId));
                zoneLock.unlockRead((long)stamp);
                return res;
            })).get();
        }
        catch (Throwable t) {
            readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
            if (t instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, "Failed to acquire a read lock for zone [zoneId=" + zoneId + "]", t);
        }
    }

    private Set<TableViewInternal> zoneTablesRawSet(int zoneId) {
        return this.tablesPerZone.getOrDefault(zoneId, Set.of());
    }

    private void addTableToZone(int zoneId, TableViewInternal table) throws IgniteInternalException {
        NaiveAsyncReadWriteLock zoneLock = this.tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
        CompletableFuture<Long> writeLockAcquisitionFuture = zoneLock.writeLock();
        try {
            ((CompletableFuture)writeLockAcquisitionFuture.thenAccept(stamp -> {
                this.tablesPerZone.compute(zoneId, (id, tables) -> {
                    if (tables == null) {
                        tables = new HashSet<TableViewInternal>();
                    }
                    tables.add(table);
                    return tables;
                });
                zoneLock.unlockWrite((long)stamp);
            })).get();
        }
        catch (Throwable t) {
            writeLockAcquisitionFuture.thenAccept(zoneLock::unlockWrite);
            if (t instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, "Failed to acquire a write lock for zone [zoneId=" + zoneId + "]", t);
        }
    }

    @TestOnly
    public SecondaryReplicationManager secondaryReplicationManager() {
        return this.secondaryReplicationManager;
    }

    private TableMetricSource createAndRegisterMetricsSource(StorageTableDescriptor tableDescriptor, QualifiedName tableName) {
        StorageEngine engine = this.dataStorageMgr.engineByStorageProfile(tableDescriptor.getStorageProfile());
        if (engine != null) {
            StorageEngineTablesMetricSource engineMetricSource = new StorageEngineTablesMetricSource(engine.name(), tableName);
            engine.addTableMetrics(tableDescriptor, engineMetricSource);
            try {
                this.metricManager.registerSource(engineMetricSource);
                this.metricManager.enable(engineMetricSource);
            }
            catch (Exception e) {
                String message = "Failed to register storage engine metrics source for table [id={}, name={}].";
                LOG.warn(message, e, tableDescriptor.getId(), tableName);
            }
        }
        TableMetricSource source = new TableMetricSource(tableName);
        try {
            this.metricManager.registerSource(source);
            this.metricManager.enable(source);
        }
        catch (Exception e) {
            LOG.warn("Failed to register metrics source for table [id={}, name={}].", e, tableDescriptor.getId(), tableName);
        }
        return source;
    }

    private void unregisterMetricsSource(TableViewInternal table) {
        if (table == null) {
            return;
        }
        QualifiedName tableName = table.qualifiedName();
        try {
            this.metricManager.unregisterSource(TableMetricSource.sourceName(tableName));
        }
        catch (Exception e) {
            LOG.warn("Failed to unregister metrics source for table [id={}, name={}].", e, table.tableId(), tableName);
        }
        String storageProfile = table.internalTable().storage().getTableDescriptor().getStorageProfile();
        StorageEngine engine = this.dataStorageMgr.engineByStorageProfile(storageProfile);
        if (engine != null) {
            try {
                this.metricManager.unregisterSource(StorageEngineTablesMetricSource.sourceName(engine.name(), tableName));
            }
            catch (Exception e) {
                LOG.warn("Failed to unregister storage engine metrics source for table [id={}, name={}].", e, table.tableId(), tableName);
            }
        }
    }

    private static class TableClosedException
    extends IgniteInternalException {
        private static final long serialVersionUID = 1L;

        private TableClosedException(int tableId, @Nullable Throwable cause) {
            super(ErrorGroups.Common.INTERNAL_ERR, "Table is closed [tableId=" + tableId + "]", cause);
        }
    }

    private static class DestroyTableEvent {
        final int catalogVersion;
        final int tableId;

        DestroyTableEvent(int catalogVersion, int tableId) {
            this.catalogVersion = catalogVersion;
            this.tableId = tableId;
        }

        public int catalogVersion() {
            return this.catalogVersion;
        }

        public int tableId() {
            return this.tableId;
        }
    }
}

