/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.distributionzones;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.catalog.CatalogManager;
import org.apache.ignite3.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite3.internal.catalog.events.AlterZoneEventParameters;
import org.apache.ignite3.internal.catalog.events.CatalogEvent;
import org.apache.ignite3.internal.catalog.events.DropZoneEventParameters;
import org.apache.ignite3.internal.causality.RevisionListenerRegistry;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite3.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite3.internal.configuration.utils.SystemDistributedConfigurationPropertyHolder;
import org.apache.ignite3.internal.distributionzones.DataNodesManager;
import org.apache.ignite3.internal.distributionzones.DistributionZonesUtil;
import org.apache.ignite3.internal.distributionzones.LogicalTopologySetSerializer;
import org.apache.ignite3.internal.distributionzones.NodeWithAttributes;
import org.apache.ignite3.internal.distributionzones.ZoneMetricSource;
import org.apache.ignite3.internal.distributionzones.events.HaZoneTopologyUpdateEvent;
import org.apache.ignite3.internal.distributionzones.events.HaZoneTopologyUpdateEventParams;
import org.apache.ignite3.internal.distributionzones.exception.DistributionZoneNotFoundException;
import org.apache.ignite3.internal.distributionzones.rebalance.DistributionZoneRebalanceEngine;
import org.apache.ignite3.internal.distributionzones.utils.CatalogAlterZoneEventListener;
import org.apache.ignite3.internal.event.AbstractEventProducer;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureManager;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.failure.handlers.NoOpFailureHandler;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.lowwatermark.LowWatermark;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.manager.IgniteComponent;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.EntryEvent;
import org.apache.ignite3.internal.metastorage.MetaStorageManager;
import org.apache.ignite3.internal.metastorage.Revisions;
import org.apache.ignite3.internal.metastorage.WatchListener;
import org.apache.ignite3.internal.metastorage.dsl.Condition;
import org.apache.ignite3.internal.metastorage.dsl.Conditions;
import org.apache.ignite3.internal.metastorage.dsl.Iif;
import org.apache.ignite3.internal.metastorage.dsl.Operation;
import org.apache.ignite3.internal.metastorage.dsl.Operations;
import org.apache.ignite3.internal.metastorage.dsl.StatementResult;
import org.apache.ignite3.internal.metastorage.dsl.Statements;
import org.apache.ignite3.internal.metastorage.dsl.Update;
import org.apache.ignite3.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite3.internal.metrics.MetricManager;
import org.apache.ignite3.internal.util.ByteUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteBusyLock;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.lang.ErrorGroups;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class DistributionZoneManager
extends AbstractEventProducer<HaZoneTopologyUpdateEvent, HaZoneTopologyUpdateEventParams>
implements IgniteComponent {
    private static final IgniteLogger LOG = Loggers.forClass(DistributionZoneManager.class);
    private final MetaStorageManager metaStorageManager;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final LogicalTopologyService logicalTopologyService;
    private final FailureProcessor failureProcessor;
    private final DataNodesManager dataNodesManager;
    private final LogicalTopologyEventListener topologyEventListener = new DistributionZoneManagerLogicalTopologyEventListener();
    private final ConcurrentSkipListMap<Long, Set<NodeWithAttributes>> logicalTopologyByRevision = new ConcurrentSkipListMap();
    private final WatchListener topologyWatchListener;
    private final DistributionZoneRebalanceEngine rebalanceEngine;
    private final CatalogManager catalogManager;
    private final SystemDistributedConfigurationPropertyHolder<Integer> partitionDistributionResetTimeoutConfiguration;
    private final MetricManager metricManager;
    private final ClockService clockService;
    private final Map<Integer, ZoneMetricSource> zoneMetricSources = new ConcurrentHashMap<Integer, ZoneMetricSource>();
    private final String localNodeName;
    private final PartitionResetClosure partitionResetClosure = (revision, zoneDescriptor) -> {
        if (zoneDescriptor.consistencyMode() != ConsistencyMode.HIGH_AVAILABILITY) {
            return;
        }
        this.fireEvent(HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED, new HaZoneTopologyUpdateEventParams(zoneDescriptor.id(), revision)).exceptionally(th -> {
            LOG.error("Error during the local " + HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED.name() + " event processing", (Throwable)th);
            return null;
        });
    };
    @TestOnly
    @Nullable
    private Predicate<NodeWithAttributes> additionalNodeFilter = null;

    @TestOnly
    public DistributionZoneManager(String nodeName, Supplier<UUID> nodeIdSupplier, RevisionListenerRegistry registry, MetaStorageManager metaStorageManager, LogicalTopologyService logicalTopologyService, CatalogManager catalogManager, SystemDistributedConfiguration systemDistributedConfiguration, ClockService clockService, MetricManager metricManager, LowWatermark lowWatermark) {
        this(nodeName, nodeIdSupplier, registry, metaStorageManager, logicalTopologyService, new FailureManager(new NoOpFailureHandler()), catalogManager, systemDistributedConfiguration, clockService, metricManager, lowWatermark);
    }

    public DistributionZoneManager(String nodeName, Supplier<UUID> nodeIdSupplier, RevisionListenerRegistry registry, MetaStorageManager metaStorageManager, LogicalTopologyService logicalTopologyService, FailureProcessor failureProcessor, CatalogManager catalogManager, SystemDistributedConfiguration systemDistributedConfiguration, ClockService clockService, MetricManager metricManager, LowWatermark lowWatermark) {
        this.metaStorageManager = metaStorageManager;
        this.logicalTopologyService = logicalTopologyService;
        this.failureProcessor = failureProcessor;
        this.catalogManager = catalogManager;
        this.localNodeName = nodeName;
        this.clockService = clockService;
        this.topologyWatchListener = this.createMetastorageTopologyListener();
        this.rebalanceEngine = new DistributionZoneRebalanceEngine(this.busyLock, metaStorageManager, this, catalogManager);
        this.partitionDistributionResetTimeoutConfiguration = new SystemDistributedConfigurationPropertyHolder<Integer>(systemDistributedConfiguration, this::onUpdatePartitionDistributionResetBusy, "partitionDistributionResetTimeout", 0, Integer::parseInt);
        this.dataNodesManager = new DataNodesManager(nodeName, nodeIdSupplier, this.busyLock, metaStorageManager, catalogManager, clockService, failureProcessor, this.partitionResetClosure, this.partitionDistributionResetTimeoutConfiguration::currentValue, this::logicalTopology, lowWatermark);
        this.metricManager = metricManager;
    }

    @Override
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            this.partitionDistributionResetTimeoutConfiguration.init();
            this.registerCatalogEventListenersOnStartManagerBusy();
            this.logicalTopologyService.addEventListener(this.topologyEventListener);
            this.metaStorageManager.registerPrefixWatch(DistributionZonesUtil.zonesLogicalTopologyPrefix(), this.topologyWatchListener);
            CompletableFuture<Revisions> recoveryFinishFuture = this.metaStorageManager.recoveryFinishedFuture();
            assert (recoveryFinishFuture.isDone());
            long recoveryRevision = recoveryFinishFuture.join().revision();
            this.restoreGlobalStateFromLocalMetaStorage(recoveryRevision);
            this.registerMetricSourcesOnStart();
            return CompletableFuture.allOf(this.restoreLogicalTopologyChangeEvent(recoveryRevision), this.dataNodesManager.startAsync(this.currentZones(), recoveryRevision)).thenComposeAsync(notUsed -> this.rebalanceEngine.startAsync(), (Executor)componentContext.executor());
        });
    }

    @Override
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.busyLock.block();
        this.dataNodesManager.stop();
        this.rebalanceEngine.stop();
        this.logicalTopologyService.removeEventListener(this.topologyEventListener);
        this.metaStorageManager.unregisterWatch(this.topologyWatchListener);
        return CompletableFutures.nullCompletedFuture();
    }

    public int estimatedDataNodesCount(String dataNodeFilter, List<String> storageProfiles) {
        return DistributionZonesUtil.filterDataNodes(this.dataNodesManager.topologyNodes(), dataNodeFilter, storageProfiles).size();
    }

    public CompletableFuture<Set<String>> currentDataNodes(int zoneId) {
        HybridTimestamp current = this.clockService.current();
        int catalogVersion = this.catalogManager.activeCatalogVersion(current.longValue());
        return this.dataNodes(current, catalogVersion, zoneId);
    }

    public CompletableFuture<Set<String>> currentDataNodes(String zoneName) {
        Objects.requireNonNull(zoneName, "Zone name is required.");
        HybridTimestamp current = this.clockService.current();
        int catalogVersion = this.catalogManager.activeCatalogVersion(current.longValue());
        CatalogZoneDescriptor zoneDesc = this.catalogManager.catalog(catalogVersion).zone(zoneName);
        if (zoneDesc == null) {
            throw new DistributionZoneNotFoundException(zoneName);
        }
        return this.dataNodes(current, catalogVersion, zoneDesc.id());
    }

    public CompletableFuture<Set<String>> dataNodes(HybridTimestamp timestamp, int catalogVersion, int zoneId) {
        if (catalogVersion < 0) {
            throw new IllegalArgumentException("catalogVersion must be greater or equal to zero [catalogVersion=" + catalogVersion + "\"");
        }
        if (zoneId < 0) {
            throw new IllegalArgumentException("zoneId cannot be a negative number [zoneId=" + zoneId + "\"");
        }
        if (timestamp.equals(CatalogManager.INITIAL_TIMESTAMP)) {
            timestamp = HybridTimestamp.hybridTimestamp(this.catalogManager.catalog(catalogVersion).time());
        }
        return this.dataNodesManager.dataNodes(zoneId, timestamp, catalogVersion);
    }

    public CompletableFuture<Void> recalculateDataNodes(Set<String> zoneNames) throws DistributionZoneNotFoundException {
        Collection<CatalogZoneDescriptor> zones = this.catalogManager.latestCatalog().zones();
        CompletableFuture[] recalculationFutures = (CompletableFuture[])DistributionZonesUtil.filterZonesForOperations(zoneNames, zones).stream().map(CatalogObjectDescriptor::name).map(this::recalculateDataNodes).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(recalculationFutures);
    }

    public CompletableFuture<Void> recalculateDataNodes(String zoneName) throws DistributionZoneNotFoundException {
        return this.dataNodesManager.recalculateDataNodes(zoneName).thenAccept(v -> {});
    }

    private CompletableFuture<Void> onUpdateScaleUpBusy(AlterZoneEventParameters parameters) {
        HybridTimestamp timestamp = this.metaStorageManager.timestampByRevisionLocally(parameters.causalityToken());
        return this.dataNodesManager.onAutoAdjustAlteration(parameters.zoneDescriptor(), timestamp);
    }

    private void onUpdatePartitionDistributionResetBusy(int partitionDistributionResetTimeoutSeconds, long causalityToken) {
        CompletableFuture<Revisions> recoveryFuture = this.metaStorageManager.recoveryFinishedFuture();
        assert (recoveryFuture.isDone());
        if (recoveryFuture.join().revision() >= causalityToken) {
            return;
        }
        for (CatalogZoneDescriptor zoneDescriptor : this.currentZones()) {
            int zoneId = zoneDescriptor.id();
            if (zoneDescriptor.consistencyMode() != ConsistencyMode.HIGH_AVAILABILITY) continue;
            this.dataNodesManager.onUpdatePartitionDistributionReset(zoneId, partitionDistributionResetTimeoutSeconds, () -> this.partitionResetClosure.run(causalityToken, zoneDescriptor));
        }
    }

    private CompletableFuture<Void> onUpdateScaleDownBusy(AlterZoneEventParameters parameters) {
        HybridTimestamp timestamp = this.metaStorageManager.timestampByRevisionLocally(parameters.causalityToken());
        return this.dataNodesManager.onAutoAdjustAlteration(parameters.zoneDescriptor(), timestamp);
    }

    private CompletableFuture<Void> onUpdateFilterBusy(AlterZoneEventParameters parameters) {
        HybridTimestamp timestamp = this.metaStorageManager.timestampByRevisionLocally(parameters.causalityToken());
        Entry topologyEntry = this.metaStorageManager.getLocally(DistributionZonesUtil.zonesLogicalTopologyKey(), parameters.causalityToken());
        if (topologyEntry != null && topologyEntry.value() != null) {
            Set<NodeWithAttributes> logicalTopology = DistributionZonesUtil.deserializeLogicalTopologySet(topologyEntry.value());
            return this.dataNodesManager.onZoneFilterChange(parameters.zoneDescriptor(), timestamp, logicalTopology);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    private CompletableFuture<Void> onCreateZone(CatalogZoneDescriptor zone, long causalityToken) {
        HybridTimestamp timestamp = this.metaStorageManager.timestampByRevisionLocally(causalityToken);
        Set<NodeWithAttributes> filteredDataNodes = DistributionZonesUtil.filterDataNodes(this.logicalTopology(causalityToken), zone).stream().filter(n -> this.additionalNodeFilter == null || this.additionalNodeFilter.test((NodeWithAttributes)n)).collect(Collectors.toSet());
        return this.dataNodesManager.onZoneCreate(zone.id(), timestamp, filteredDataNodes).thenRun(() -> this.registerMetricSource(zone));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateLogicalTopologyInMetaStorage(LogicalTopologySnapshot newTopology) {
        if (!this.busyLock.enterBusy()) {
            throw new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)new NodeStoppingException());
        }
        try {
            Update update;
            Condition condition;
            Set<LogicalNode> logicalTopology = newTopology.nodes();
            if (newTopology.version() == 1L) {
                condition = Conditions.notExists(DistributionZonesUtil.zonesLogicalTopologyVersionKey()).or(Conditions.value(DistributionZonesUtil.zonesLogicalTopologyClusterIdKey()).ne(ByteUtils.uuidToBytes(newTopology.clusterId())));
                update = DistributionZonesUtil.updateLogicalTopologyAndVersionAndClusterId(newTopology);
            } else {
                condition = Conditions.value(DistributionZonesUtil.zonesLogicalTopologyVersionKey()).lt(ByteUtils.longToBytesKeepingOrder(newTopology.version()));
                update = DistributionZonesUtil.updateLogicalTopologyAndVersion(newTopology);
            }
            Iif iff = Statements.iif(condition, update, Operations.ops(new Operation[0]).yield(false));
            this.metaStorageManager.invoke(iff).whenComplete((res, e) -> {
                if (e != null) {
                    if (!DistributionZoneManager.relatesToNodeStopping(e)) {
                        String errorMessage = String.format("Failed to update distribution zones' logical topology and version keys [topology = %s, version = %s]", Arrays.toString(logicalTopology.toArray()), newTopology.version());
                        this.failureProcessor.process(new FailureContext((Throwable)e, errorMessage));
                    }
                } else if (res.getAsBoolean()) {
                    LOG.info("Distribution zones' logical topology and version keys were updated [topology = {}, version = {}]", Arrays.toString(logicalTopology.toArray()), newTopology.version());
                } else {
                    LOG.debug("Failed to update distribution zones' logical topology and version keys due to concurrent update [topology = {}, version = {}]", Arrays.toString(logicalTopology.toArray()), newTopology.version());
                }
            });
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private static boolean relatesToNodeStopping(Throwable e) {
        return ExceptionUtils.hasCause(e, NodeStoppingException.class);
    }

    private void restoreGlobalStateFromLocalMetaStorage(long recoveryRevision) {
        Entry lastHandledTopologyEntry = this.metaStorageManager.getLocally(DistributionZonesUtil.zonesLastHandledTopology(), recoveryRevision);
        if (lastHandledTopologyEntry.value() != null) {
            this.logicalTopologyByRevision.put(recoveryRevision, DistributionZonesUtil.deserializeLogicalTopologySet(lastHandledTopologyEntry.value()));
        }
        assert (lastHandledTopologyEntry.value() == null || this.logicalTopology(recoveryRevision).equals(DistributionZonesUtil.deserializeLogicalTopologySet(lastHandledTopologyEntry.value()))) : "Initial value of logical topology was changed after initialization from the Meta Storage manager.";
    }

    private WatchListener createMetastorageTopologyListener() {
        return evt -> {
            if (!this.busyLock.enterBusy()) {
                return CompletableFuture.failedFuture(new NodeStoppingException());
            }
            try {
                assert (evt.entryEvents().size() == 2 || evt.entryEvents().size() == 3) : "Expected an event with logical topology, its version and maybe clusterId entries but was events with keys: " + evt.entryEvents().stream().map(DistributionZoneManager::entryKeyAsString).collect(Collectors.toList());
                Set<NodeWithAttributes> newLogicalTopology = null;
                Set<NodeWithAttributes> oldLogicalTopology = null;
                HybridTimestamp timestamp = evt.timestamp();
                for (EntryEvent event : evt.entryEvents()) {
                    Entry e = event.newEntry();
                    Entry old = event.oldEntry();
                    if (!Arrays.equals(e.key(), DistributionZonesUtil.zonesLogicalTopologyKey().bytes())) continue;
                    byte[] newLogicalTopologyBytes = e.value();
                    assert (newLogicalTopologyBytes != null) : "New topology is null.";
                    newLogicalTopology = DistributionZonesUtil.deserializeLogicalTopologySet(newLogicalTopologyBytes);
                    byte[] oldLogicalTopologyBytes = old.value();
                    if (oldLogicalTopologyBytes == null) continue;
                    oldLogicalTopology = DistributionZonesUtil.deserializeLogicalTopologySet(oldLogicalTopologyBytes);
                }
                assert (newLogicalTopology != null) : "The event doesn't contain logical topology";
                if (oldLogicalTopology == null) {
                    oldLogicalTopology = newLogicalTopology;
                }
                CompletableFuture<Void> completableFuture = this.onLogicalTopologyUpdate(newLogicalTopology, oldLogicalTopology, evt.revision(), timestamp);
                return completableFuture;
            }
            finally {
                this.busyLock.leaveBusy();
            }
        };
    }

    private static String entryKeyAsString(EntryEvent entry) {
        return entry.newEntry() == null ? "null" : new String(entry.newEntry().key(), StandardCharsets.UTF_8);
    }

    private CompletableFuture<Void> onLogicalTopologyUpdate(Set<NodeWithAttributes> newLogicalTopology, Set<NodeWithAttributes> oldLogicalTopology, long revision, HybridTimestamp timestamp) {
        this.logicalTopologyByRevision.put(revision, newLogicalTopology);
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
        for (CatalogZoneDescriptor zone : this.currentZones()) {
            CompletableFuture<Void> f = this.dataNodesManager.onTopologyChange(zone, revision, timestamp, newLogicalTopology, oldLogicalTopology);
            futures.add(f);
        }
        futures.add(this.saveRecoverableStateToMetastorage(revision, newLogicalTopology));
        return CompletableFuture.allOf((CompletableFuture[])futures.toArray(CompletableFuture[]::new));
    }

    private Collection<CatalogZoneDescriptor> currentZones() {
        int catalogVersion = this.catalogManager.latestCatalogVersion();
        return this.catalogManager.catalog(catalogVersion).zones();
    }

    private CompletableFuture<Void> saveRecoverableStateToMetastorage(long revision, Set<NodeWithAttributes> newLogicalTopology) {
        Operation[] puts = new Operation[]{Operations.put(DistributionZonesUtil.zonesRecoverableStateRevision(), ByteUtils.longToBytesKeepingOrder(revision)), Operations.put(DistributionZonesUtil.zonesLastHandledTopology(), LogicalTopologySetSerializer.serialize(newLogicalTopology))};
        Iif iif = Statements.iif((Condition)DistributionZonesUtil.conditionForRecoverableStateChanges(revision), Operations.ops(puts).yield(true), Operations.ops(new Operation[0]).yield(false));
        return ((CompletableFuture)((CompletableFuture)this.metaStorageManager.invoke(iif).thenApply(StatementResult::getAsBoolean)).whenComplete((invokeResult, e) -> {
            if (e != null) {
                if (!DistributionZoneManager.relatesToNodeStopping(e)) {
                    String errorMessage = String.format("Failed to update recoverable state for distribution zone manager [revision = %s]", revision);
                    this.failureProcessor.process(new FailureContext((Throwable)e, errorMessage));
                }
            } else if (invokeResult.booleanValue()) {
                LOG.info("Update recoverable state for distribution zone manager [revision = {}]", revision);
            } else {
                LOG.debug("Failed to update recoverable states for distribution zone manager [revision = {}]", revision);
            }
        })).thenCompose(ignored -> CompletableFutures.nullCompletedFuture());
    }

    private long timestampByRevision(long revision) {
        try {
            return this.metaStorageManager.timestampByRevisionLocally(revision).longValue();
        }
        catch (CompactedException e) {
            if (revision > 1L) {
                LOG.warn("Unable to retrieve timestamp by revision because of meta storage compaction, [revision={}].", revision);
            }
            return -1L;
        }
    }

    @TestOnly
    public DataNodesManager dataNodesManager() {
        return this.dataNodesManager;
    }

    @TestOnly
    public void setAdditionalNodeFilter(Predicate<NodeWithAttributes> filter) {
        this.additionalNodeFilter = filter;
    }

    public Set<NodeWithAttributes> logicalTopology() {
        return this.logicalTopology(Long.MAX_VALUE);
    }

    public Set<NodeWithAttributes> logicalTopology(long revision) {
        assert (revision >= 0L) : revision;
        Map.Entry<Long, Set<NodeWithAttributes>> entry = this.logicalTopologyByRevision.floorEntry(revision);
        return entry != null ? entry.getValue() : Collections.emptySet();
    }

    private void registerCatalogEventListenersOnStartManagerBusy() {
        this.catalogManager.listen(CatalogEvent.ZONE_CREATE, parameters -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> this.onCreateZone(parameters.zoneDescriptor(), parameters.causalityToken()).thenApply(ignored -> false)));
        this.catalogManager.listen(CatalogEvent.ZONE_DROP, parameters -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            this.onDropZoneBusy((DropZoneEventParameters)parameters);
            return CompletableFutures.falseCompletedFuture();
        }));
        this.catalogManager.listen(CatalogEvent.ZONE_ALTER, new ManagerCatalogAlterZoneEventListener());
    }

    private void registerMetricSource(CatalogZoneDescriptor zone) {
        this.registerMetricSource(zone, null);
    }

    private void registerMetricSource(CatalogZoneDescriptor zone, @Nullable ZoneMetricSource copyFrom) {
        try {
            ZoneMetricSource source = copyFrom == null ? new ZoneMetricSource(this.metaStorageManager, this.localNodeName, zone) : new ZoneMetricSource(this.metaStorageManager, this.localNodeName, zone, copyFrom);
            this.zoneMetricSources.put(zone.id(), source);
            this.metricManager.registerSource(source);
            this.metricManager.enable(source);
        }
        catch (Exception e) {
            LOG.error("Failed to register zone metric source [zoneName={}, zoneId={}]", e, zone.name(), zone.id());
        }
    }

    private void unregisterMetricSource(int zoneId) {
        ZoneMetricSource source = this.zoneMetricSources.remove(zoneId);
        if (source == null) {
            return;
        }
        try {
            this.metricManager.unregisterSource(source);
        }
        catch (Exception e) {
            LOG.error("Failed to unregister zone metric source [zoneName={}, zoneId={}]", e, source.zoneName(), zoneId);
        }
    }

    private void registerMetricSourcesOnStart() {
        this.currentZones().forEach(this::registerMetricSource);
    }

    private CompletableFuture<Void> restoreLogicalTopologyChangeEvent(long recoveryRevision) {
        Entry topologyEntry = this.metaStorageManager.getLocally(DistributionZonesUtil.zonesLogicalTopologyKey(), recoveryRevision);
        if (topologyEntry.value() != null) {
            Set<NodeWithAttributes> logicalTopology = DistributionZonesUtil.deserializeLogicalTopologySet(topologyEntry.value());
            long topologyRevision = topologyEntry.revision();
            Entry lastUpdateRevisionEntry = this.metaStorageManager.getLocally(DistributionZonesUtil.zonesRecoverableStateRevision(), recoveryRevision);
            if (lastUpdateRevisionEntry.value() == null || topologyRevision > ByteUtils.bytesToLongKeepingOrder(lastUpdateRevisionEntry.value())) {
                HybridTimestamp timestamp = this.metaStorageManager.timestampByRevisionLocally(recoveryRevision);
                return this.onLogicalTopologyUpdate(logicalTopology, logicalTopology, recoveryRevision, timestamp);
            }
        }
        return CompletableFutures.nullCompletedFuture();
    }

    private void onDropZoneBusy(DropZoneEventParameters parameters) {
        this.unregisterMetricSource(parameters.zoneId());
    }

    public CompletableFuture<?> onDropZoneDestroy(int zoneId, int dropZoneCatalogVersion) {
        return this.dataNodesManager.onZoneDestroy(zoneId, dropZoneCatalogVersion);
    }

    private class DistributionZoneManagerLogicalTopologyEventListener
    implements LogicalTopologyEventListener {
        private DistributionZoneManagerLogicalTopologyEventListener() {
        }

        @Override
        public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot newTopology) {
            DistributionZoneManager.this.updateLogicalTopologyInMetaStorage(newTopology);
        }

        @Override
        public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) {
            DistributionZoneManager.this.updateLogicalTopologyInMetaStorage(newTopology);
        }

        @Override
        public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
            DistributionZoneManager.this.updateLogicalTopologyInMetaStorage(newTopology);
        }
    }

    @FunctionalInterface
    public static interface PartitionResetClosure {
        public void run(long var1, CatalogZoneDescriptor var3);
    }

    private class ManagerCatalogAlterZoneEventListener
    extends CatalogAlterZoneEventListener {
        private ManagerCatalogAlterZoneEventListener() {
            super(DistributionZoneManager.this.catalogManager);
        }

        @Override
        protected CompletableFuture<Void> onAutoAdjustScaleUpUpdate(AlterZoneEventParameters parameters, int oldAutoAdjustScaleUp) {
            return IgniteUtils.inBusyLock((IgniteBusyLock)DistributionZoneManager.this.busyLock, () -> DistributionZoneManager.this.onUpdateScaleUpBusy(parameters));
        }

        @Override
        protected CompletableFuture<Void> onAutoAdjustScaleDownUpdate(AlterZoneEventParameters parameters, int oldAutoAdjustScaleDown) {
            return IgniteUtils.inBusyLock((IgniteBusyLock)DistributionZoneManager.this.busyLock, () -> DistributionZoneManager.this.onUpdateScaleDownBusy(parameters));
        }

        @Override
        protected CompletableFuture<Void> onFilterUpdate(AlterZoneEventParameters parameters, String oldFilter) {
            return IgniteUtils.inBusyLock((IgniteBusyLock)DistributionZoneManager.this.busyLock, () -> DistributionZoneManager.this.onUpdateFilterBusy(parameters));
        }

        @Override
        protected CompletableFuture<Void> onNameUpdate(AlterZoneEventParameters parameters, String oldName) {
            return IgniteUtils.inBusyLock((IgniteBusyLock)DistributionZoneManager.this.busyLock, () -> {
                ZoneMetricSource oldSource = DistributionZoneManager.this.zoneMetricSources.get(parameters.zoneDescriptor().id());
                DistributionZoneManager.this.unregisterMetricSource(parameters.zoneDescriptor().id());
                DistributionZoneManager.this.registerMetricSource(parameters.zoneDescriptor(), oldSource);
                return CompletableFutures.nullCompletedFuture();
            });
        }
    }
}

