/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.distributionzones;

import java.lang.invoke.LambdaMetafactory;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.catalog.CatalogManager;
import org.apache.ignite3.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite3.internal.distributionzones.DataNodesHistory;
import org.apache.ignite3.internal.distributionzones.DataNodesHistoryEntry;
import org.apache.ignite3.internal.distributionzones.DataNodesHistoryMetaStorageOperation;
import org.apache.ignite3.internal.distributionzones.DataNodesMapSerializer;
import org.apache.ignite3.internal.distributionzones.DistributionZoneTimer;
import org.apache.ignite3.internal.distributionzones.DistributionZonesUtil;
import org.apache.ignite3.internal.distributionzones.NodeWithAttributes;
import org.apache.ignite3.internal.distributionzones.exception.DistributionZoneNotFoundException;
import org.apache.ignite3.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.EntryEvent;
import org.apache.ignite3.internal.metastorage.MetaStorageManager;
import org.apache.ignite3.internal.metastorage.WatchEvent;
import org.apache.ignite3.internal.metastorage.WatchListener;
import org.apache.ignite3.internal.metastorage.dsl.CompoundCondition;
import org.apache.ignite3.internal.metastorage.dsl.Condition;
import org.apache.ignite3.internal.metastorage.dsl.Conditions;
import org.apache.ignite3.internal.metastorage.dsl.Iif;
import org.apache.ignite3.internal.metastorage.dsl.Operation;
import org.apache.ignite3.internal.metastorage.dsl.OperationType;
import org.apache.ignite3.internal.metastorage.dsl.Operations;
import org.apache.ignite3.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite3.internal.metastorage.dsl.StatementResult;
import org.apache.ignite3.internal.metastorage.dsl.Statements;
import org.apache.ignite3.internal.metastorage.dsl.Update;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.StripedScheduledThreadPoolExecutor;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.util.Lazy;
import org.apache.ignite3.lang.ErrorGroups;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
import org.jetbrains.annotations.VisibleForTesting;

public class DataNodesManager {
    private static final IgniteLogger LOG = Loggers.forClass(DataNodesManager.class);
    private static final int MAX_ATTEMPTS_ON_RETRY = 100;
    private final MetaStorageManager metaStorageManager;
    private final CatalogManager catalogManager;
    private final ClockService clockService;
    private final FailureProcessor failureProcessor;
    private final IgniteSpinBusyLock busyLock;
    private final Map<Integer, ZoneTimers> zoneTimers = new ConcurrentHashMap<Integer, ZoneTimers>();
    private final StripedScheduledThreadPoolExecutor executor;
    private final Lazy<UUID> localNodeId;
    private final WatchListener scaleUpTimerPrefixListener;
    private final WatchListener scaleDownTimerPrefixListener;
    private final WatchListener dataNodesListener;
    private final Map<Integer, DataNodesHistory> dataNodesHistoryVolatile = new ConcurrentHashMap<Integer, DataNodesHistory>();
    private final BiConsumer<Long, Integer> partitionResetClosure;
    private final IntSupplier partitionDistributionResetTimeoutSupplier;
    private final Supplier<Set<NodeWithAttributes>> latestLogicalTopologyProvider;

    public DataNodesManager(String nodeName, Supplier<UUID> nodeIdSupplier, IgniteSpinBusyLock busyLock, MetaStorageManager metaStorageManager, CatalogManager catalogManager, ClockService clockService, FailureProcessor failureProcessor, BiConsumer<Long, Integer> partitionResetClosure, IntSupplier partitionDistributionResetTimeoutSupplier, Supplier<Set<NodeWithAttributes>> latestLogicalTopologyProvider) {
        this.metaStorageManager = metaStorageManager;
        this.catalogManager = catalogManager;
        this.clockService = clockService;
        this.failureProcessor = failureProcessor;
        this.localNodeId = new Lazy<UUID>(nodeIdSupplier);
        this.partitionResetClosure = partitionResetClosure;
        this.partitionDistributionResetTimeoutSupplier = partitionDistributionResetTimeoutSupplier;
        this.latestLogicalTopologyProvider = latestLogicalTopologyProvider;
        this.busyLock = busyLock;
        this.executor = DistributionZonesUtil.createZoneManagerExecutor(Math.min(Runtime.getRuntime().availableProcessors() * 3, 20), IgniteThreadFactory.create(nodeName, "dst-zones-scheduler", LOG, new ThreadOperation[0]));
        this.scaleUpTimerPrefixListener = this.createScaleUpTimerPrefixListener();
        this.scaleDownTimerPrefixListener = this.createScaleDownTimerPrefixListener();
        this.dataNodesListener = this.createDataNodesListener();
    }

    CompletableFuture<Void> startAsync(Collection<CatalogZoneDescriptor> knownZones, long recoveryRevision) {
        this.metaStorageManager.registerPrefixWatch(DistributionZonesUtil.zoneScaleUpTimerPrefix(), this.scaleUpTimerPrefixListener);
        this.metaStorageManager.registerPrefixWatch(DistributionZonesUtil.zoneScaleDownTimerPrefix(), this.scaleDownTimerPrefixListener);
        this.metaStorageManager.registerPrefixWatch(DistributionZonesUtil.zoneDataNodesHistoryPrefix(), this.dataNodesListener);
        if (knownZones.isEmpty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        HashSet<ByteArray> allKeys = new HashSet<ByteArray>();
        HashMap<Integer, CatalogZoneDescriptor> descriptors = new HashMap<Integer, CatalogZoneDescriptor>();
        for (CatalogZoneDescriptor zone : knownZones) {
            allKeys.add(DistributionZonesUtil.zoneDataNodesHistoryKey(zone.id()));
            allKeys.add(DistributionZonesUtil.zoneScaleUpTimerKey(zone.id()));
            allKeys.add(DistributionZonesUtil.zoneScaleDownTimerKey(zone.id()));
            allKeys.add(DistributionZonesUtil.zonePartitionResetTimerKey(zone.id()));
            allKeys.add(DistributionZonesUtil.zoneDataNodesKey(zone.id()));
            descriptors.put(zone.id(), zone);
        }
        ArrayList legacyInitFutures = new ArrayList();
        return ((CompletableFuture)((CompletableFuture)this.metaStorageManager.getAll(allKeys).thenAccept(entriesMap -> {
            for (CatalogZoneDescriptor zone : descriptors.values()) {
                Entry historyEntry = (Entry)entriesMap.get(DistributionZonesUtil.zoneDataNodesHistoryKey(zone.id()));
                Entry scaleUpEntry = (Entry)entriesMap.get(DistributionZonesUtil.zoneScaleUpTimerKey(zone.id()));
                Entry scaleDownEntry = (Entry)entriesMap.get(DistributionZonesUtil.zoneScaleDownTimerKey(zone.id()));
                Entry partitionResetEntry = (Entry)entriesMap.get(DistributionZonesUtil.zonePartitionResetTimerKey(zone.id()));
                Entry legacyDataNodesEntry = (Entry)entriesMap.get(DistributionZonesUtil.zoneDataNodesKey(zone.id()));
                if (DataNodesManager.missingEntry(historyEntry)) {
                    if (DataNodesManager.missingEntry(legacyDataNodesEntry)) {
                        LOG.warn("Couldn't recover data nodes history for zone [id={}, historyEntry={}].", zone.id(), historyEntry);
                        continue;
                    }
                    legacyInitFutures.add(this.initZoneWithLegacyDataNodes(zone, legacyDataNodesEntry.value(), recoveryRevision));
                    continue;
                }
                if (DataNodesManager.missingEntry(scaleUpEntry) || DataNodesManager.missingEntry(scaleDownEntry) || DataNodesManager.missingEntry(partitionResetEntry)) {
                    throw new AssertionError((Object)IgniteStringFormatter.format("Couldn't recover timers for zone [id={}, name={}, scaleUpEntry={}, scaleDownEntry={}, partitionResetEntry={}", zone.id(), zone.name(), scaleUpEntry, scaleDownEntry, partitionResetEntry));
                }
                DataNodesHistory history = DataNodesHistory.DataNodesHistorySerializer.deserialize(historyEntry.value());
                this.dataNodesHistoryVolatile.put(zone.id(), history);
                DistributionZoneTimer scaleUpTimer = DistributionZoneTimer.DistributionZoneTimerSerializer.deserialize(scaleUpEntry.value());
                DistributionZoneTimer scaleDownTimer = DistributionZoneTimer.DistributionZoneTimerSerializer.deserialize(scaleDownEntry.value());
                this.onScaleUpTimerChange(zone, scaleUpTimer);
                this.onScaleDownTimerChange(zone, scaleDownTimer);
                this.restorePartitionResetTimer(zone.id(), scaleDownTimer, recoveryRevision);
            }
        })).thenCompose(unused -> CompletableFutures.allOf(legacyInitFutures).handle((v, e) -> {
            if (e != null) {
                LOG.warn("Could not recover legacy data nodes for zone.", (Throwable)e);
            }
            return CompletableFutures.nullCompletedFuture();
        }))).thenAccept(unused -> {});
    }

    private static boolean missingEntry(Entry e) {
        return e.empty() || e.tombstone();
    }

    void stop() {
        this.zoneTimers.forEach((k, zt) -> zt.stopAllTimers());
        IgniteUtils.shutdownAndAwaitTermination(this.executor, 10L, TimeUnit.SECONDS);
    }

    CompletableFuture<Void> onTopologyChange(CatalogZoneDescriptor zoneDescriptor, long revision, HybridTimestamp timestamp, Set<NodeWithAttributes> newLogicalTopology, Set<NodeWithAttributes> oldLogicalTopology) {
        int zoneId = zoneDescriptor.id();
        return this.doOperation(zoneDescriptor, List.of(DistributionZonesUtil.zoneDataNodesHistoryKey(zoneId), DistributionZonesUtil.zoneScaleUpTimerKey(zoneId), DistributionZonesUtil.zoneScaleDownTimerKey(zoneId)), dataNodesHistoryContext -> CompletableFuture.completedFuture(this.onTopologyChangeInternal(zoneDescriptor, revision, timestamp, newLogicalTopology, oldLogicalTopology, (DistributionZonesUtil.DataNodesHistoryContext)dataNodesHistoryContext)), false);
    }

    @Nullable
    private DataNodesHistoryMetaStorageOperation onTopologyChangeInternal(CatalogZoneDescriptor zoneDescriptor, long revision, HybridTimestamp timestamp, Set<NodeWithAttributes> newLogicalTopology, Set<NodeWithAttributes> oldLogicalTopology, @Nullable DistributionZonesUtil.DataNodesHistoryContext dataNodesHistoryContext) {
        if (dataNodesHistoryContext == null) {
            return null;
        }
        DataNodesHistory dataNodesHistory = dataNodesHistoryContext.dataNodesHistory();
        if (dataNodesHistory.entryIsPresentAtExactTimestamp(timestamp)) {
            return null;
        }
        int zoneId = zoneDescriptor.id();
        LOG.debug("Topology change detected [zoneId={}, timestamp={}, newTopology={}, oldTopology={}].", zoneId, timestamp, DistributionZonesUtil.nodeNames(newLogicalTopology), DistributionZonesUtil.nodeNames(oldLogicalTopology));
        DistributionZoneTimer scaleUpTimer = dataNodesHistoryContext.scaleUpTimer();
        DistributionZoneTimer scaleDownTimer = dataNodesHistoryContext.scaleDownTimer();
        DataNodesHistoryEntry latestDataNodes = dataNodesHistory.dataNodesForTimestamp(timestamp);
        Set<NodeWithAttributes> addedNodes = newLogicalTopology.stream().filter(node -> !latestDataNodes.dataNodes().contains(node)).collect(Collectors.toSet());
        Set addedNodesComparingToOldTopology = newLogicalTopology.stream().filter(node -> !oldLogicalTopology.contains(node)).collect(Collectors.toSet());
        Set<NodeWithAttributes> removedNodes = latestDataNodes.dataNodes().stream().filter(node -> !newLogicalTopology.contains(node) && !Objects.equals(node.nodeId(), this.localNodeId.get())).filter(node -> !scaleDownTimer.nodes().contains(node)).collect(Collectors.toSet());
        int partitionResetDelay = this.partitionDistributionResetTimeoutSupplier.getAsInt();
        if (!removedNodes.isEmpty() && zoneDescriptor.consistencyMode() == ConsistencyMode.HIGH_AVAILABILITY && partitionResetDelay != Integer.MAX_VALUE) {
            this.reschedulePartitionReset(partitionResetDelay, () -> this.partitionResetClosure.accept(revision, zoneId), zoneId);
        }
        DistributionZoneTimer mergedScaleUpTimer = DataNodesManager.mergeTimerOnTopologyChange(zoneDescriptor, timestamp, scaleUpTimer, addedNodes, newLogicalTopology, true);
        DistributionZoneTimer mergedScaleDownTimer = DataNodesManager.mergeTimerOnTopologyChange(zoneDescriptor, timestamp, scaleDownTimer, removedNodes, newLogicalTopology, false);
        DataNodesHistoryEntry currentDataNodes = DataNodesManager.currentDataNodes(timestamp, dataNodesHistory, mergedScaleUpTimer, mergedScaleDownTimer, zoneDescriptor);
        DistributionZoneTimer scaleUpTimerToSave = DataNodesManager.timerToSave(timestamp, mergedScaleUpTimer);
        DistributionZoneTimer scaleDownTimerToSave = DataNodesManager.timerToSave(timestamp, mergedScaleDownTimer);
        boolean addMandatoryEntry = !addedNodesComparingToOldTopology.isEmpty();
        CompoundCondition condition = Conditions.and(DataNodesManager.dataNodesHistoryEqualToOrNotExists(zoneId, dataNodesHistory), Conditions.and(DataNodesManager.timerEqualToOrNotExists(DistributionZonesUtil.zoneScaleUpTimerKey(zoneId), scaleUpTimer), DataNodesManager.timerEqualToOrNotExists(DistributionZonesUtil.zoneScaleDownTimerKey(zoneId), scaleDownTimer)));
        List<Operation> operations = DataNodesManager.operations(this.addNewEntryToDataNodesHistory(zoneId, dataNodesHistory, currentDataNodes.timestamp(), currentDataNodes.dataNodes(), addMandatoryEntry), DataNodesManager.renewTimer(DistributionZonesUtil.zoneScaleUpTimerKey(zoneId), scaleUpTimerToSave), DataNodesManager.renewTimer(DistributionZonesUtil.zoneScaleDownTimerKey(zoneId), scaleDownTimerToSave));
        return DataNodesHistoryMetaStorageOperation.builder().zoneId(zoneId).condition(condition).operations(operations).operationName("topology change").currentDataNodesHistory(dataNodesHistory).currentTimestamp(timestamp).historyEntryTimestamp(currentDataNodes.timestamp()).historyEntryNodes(currentDataNodes.dataNodes()).scaleUpTimer(scaleUpTimerToSave).scaleDownTimer(scaleDownTimerToSave).addMandatoryEntry(addMandatoryEntry).build();
    }

    private static DistributionZoneTimer mergeTimerOnTopologyChange(CatalogZoneDescriptor zoneDescriptor, HybridTimestamp timestamp, DistributionZoneTimer currentTimer, Set<NodeWithAttributes> nodes, Set<NodeWithAttributes> logicalTopology, boolean scaleUp) {
        Set<NodeWithAttributes> currentTimerFilteredNodes = currentTimer.nodes().stream().filter(n -> {
            if (currentTimer.createTimestamp().longValue() >= timestamp.longValue()) {
                return true;
            }
            return scaleUp == DistributionZonesUtil.nodeNames(logicalTopology).contains(n.nodeName());
        }).collect(Collectors.toSet());
        if (nodes.isEmpty()) {
            return new DistributionZoneTimer(currentTimer.createTimestamp(), currentTimer.timeToWaitInSeconds(), currentTimerFilteredNodes);
        }
        int autoAdjustWaitInSeconds = scaleUp ? zoneDescriptor.dataNodesAutoAdjustScaleUp() : zoneDescriptor.dataNodesAutoAdjustScaleDown();
        return new DistributionZoneTimer(timestamp, autoAdjustWaitInSeconds, CollectionUtils.union(nodes, currentTimerFilteredNodes));
    }

    private static DistributionZoneTimer timerToSave(HybridTimestamp currentTimestamp, DistributionZoneTimer timer) {
        return timer.timeToTrigger().longValue() <= currentTimestamp.longValue() ? DistributionZoneTimer.DEFAULT_TIMER : timer;
    }

    CompletableFuture<Void> onZoneFilterChange(CatalogZoneDescriptor zoneDescriptor, HybridTimestamp timestamp, Set<NodeWithAttributes> logicalTopology) {
        int zoneId = zoneDescriptor.id();
        return this.doOperation(zoneDescriptor, List.of(DistributionZonesUtil.zoneDataNodesHistoryKey(zoneId)), dataNodesHistoryContext -> CompletableFuture.completedFuture(this.onZoneFilterChangeInternal(zoneDescriptor, timestamp, logicalTopology, (DistributionZonesUtil.DataNodesHistoryContext)dataNodesHistoryContext)), true);
    }

    @Nullable
    private DataNodesHistoryMetaStorageOperation onZoneFilterChangeInternal(CatalogZoneDescriptor zoneDescriptor, HybridTimestamp timestamp, Set<NodeWithAttributes> logicalTopology, DistributionZonesUtil.DataNodesHistoryContext dataNodesHistoryContext) {
        LOG.debug("Distribution zone filter changed [zoneId={}, timestamp={}, logicalTopology={}, descriptor={}].", zoneDescriptor.id(), timestamp, DistributionZonesUtil.nodeNames(logicalTopology), zoneDescriptor);
        Set<NodeWithAttributes> filteredDataNodes = DistributionZonesUtil.filterDataNodes(logicalTopology, zoneDescriptor);
        return this.recalculateAndApplyDataNodesToMetastoreImmediately(zoneDescriptor, filteredDataNodes, timestamp, dataNodesHistoryContext);
    }

    CompletableFuture<Void> onAutoAdjustAlteration(CatalogZoneDescriptor zoneDescriptor, HybridTimestamp timestamp) {
        int zoneId = zoneDescriptor.id();
        return this.doOperation(zoneDescriptor, List.of(DistributionZonesUtil.zoneDataNodesHistoryKey(zoneId), DistributionZonesUtil.zoneScaleUpTimerKey(zoneId), DistributionZonesUtil.zoneScaleDownTimerKey(zoneId)), dataNodesHistoryContext -> CompletableFuture.completedFuture(this.onAutoAdjustAlterationInternal(zoneDescriptor, timestamp, (DistributionZonesUtil.DataNodesHistoryContext)dataNodesHistoryContext)), true);
    }

    @Nullable
    private DataNodesHistoryMetaStorageOperation onAutoAdjustAlterationInternal(CatalogZoneDescriptor zoneDescriptor, HybridTimestamp timestamp, DistributionZonesUtil.DataNodesHistoryContext dataNodesHistoryContext) {
        assert (dataNodesHistoryContext != null) : "Data nodes history and timers are missing, zone=" + zoneDescriptor;
        DataNodesHistory dataNodesHistory = dataNodesHistoryContext.dataNodesHistory();
        if (dataNodesHistory.entryIsPresentAtExactTimestamp(timestamp)) {
            return null;
        }
        int zoneId = zoneDescriptor.id();
        LOG.debug("Distribution zone auto adjust changed [zoneId={}, timestamp={}, descriptor={}].", zoneId, timestamp, zoneDescriptor);
        DistributionZoneTimer scaleUpTimer = dataNodesHistoryContext.scaleUpTimer();
        DistributionZoneTimer scaleDownTimer = dataNodesHistoryContext.scaleDownTimer();
        DistributionZoneTimer modifiedScaleUpTimer = scaleUpTimer.modifyTimeToWait(zoneDescriptor.dataNodesAutoAdjustScaleUp());
        DistributionZoneTimer modifiedScaleDownTimer = scaleDownTimer.modifyTimeToWait(zoneDescriptor.dataNodesAutoAdjustScaleDown());
        DataNodesHistoryEntry currentDataNodes = DataNodesManager.currentDataNodes(timestamp, dataNodesHistory, modifiedScaleUpTimer, modifiedScaleDownTimer, zoneDescriptor);
        DistributionZoneTimer scaleUpTimerToSave = DataNodesManager.timerToSave(timestamp, modifiedScaleUpTimer);
        DistributionZoneTimer scaleDownTimerToSave = DataNodesManager.timerToSave(timestamp, modifiedScaleDownTimer);
        CompoundCondition condition = Conditions.and(DataNodesManager.dataNodesHistoryEqualToOrNotExists(zoneId, dataNodesHistory), Conditions.and(DataNodesManager.timerEqualToOrNotExists(DistributionZonesUtil.zoneScaleUpTimerKey(zoneId), scaleUpTimer), DataNodesManager.timerEqualToOrNotExists(DistributionZonesUtil.zoneScaleDownTimerKey(zoneId), scaleDownTimer)));
        List<Operation> operations = DataNodesManager.operations(this.addNewEntryToDataNodesHistory(zoneId, dataNodesHistory, currentDataNodes.timestamp(), currentDataNodes.dataNodes()), DataNodesManager.renewTimer(DistributionZonesUtil.zoneScaleUpTimerKey(zoneId), scaleUpTimerToSave), DataNodesManager.renewTimer(DistributionZonesUtil.zoneScaleDownTimerKey(zoneId), scaleDownTimerToSave));
        return DataNodesHistoryMetaStorageOperation.builder().zoneId(zoneId).condition(condition).operations(operations).operationName("distribution zone auto adjust change").currentDataNodesHistory(dataNodesHistory).currentTimestamp(timestamp).historyEntryTimestamp(currentDataNodes.timestamp()).historyEntryNodes(currentDataNodes.dataNodes()).scaleUpTimer(scaleUpTimerToSave).scaleDownTimer(scaleDownTimerToSave).build();
    }

    void onUpdatePartitionDistributionReset(int zoneId, int partitionDistributionResetTimeoutSeconds, Runnable taskOnReset) {
        if (partitionDistributionResetTimeoutSeconds == Integer.MAX_VALUE) {
            this.zoneTimers.computeIfAbsent((Integer)Integer.valueOf((int)zoneId), (Function<Integer, ZoneTimers>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, createZoneTimers(int ), (Ljava/lang/Integer;)Lorg/apache/ignite3/internal/distributionzones/DataNodesManager$ZoneTimers;)((DataNodesManager)this)).partitionReset.stopScheduledTask();
        } else {
            this.zoneTimers.computeIfAbsent((Integer)Integer.valueOf((int)zoneId), (Function<Integer, ZoneTimers>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, createZoneTimers(int ), (Ljava/lang/Integer;)Lorg/apache/ignite3/internal/distributionzones/DataNodesManager$ZoneTimers;)((DataNodesManager)this)).partitionReset.reschedule(partitionDistributionResetTimeoutSeconds, taskOnReset);
        }
    }

    private Runnable applyTimerClosure(CatalogZoneDescriptor zoneDescriptor, ScheduledTimer scheduledTimer) {
        int zoneId = zoneDescriptor.id();
        return () -> this.doOperation(zoneDescriptor, List.of(DistributionZonesUtil.zoneDataNodesHistoryKey(zoneId), scheduledTimer.metaStorageKey()), dataNodesHistoryContext -> this.applyTimerClosure0(zoneDescriptor, scheduledTimer, (DistributionZonesUtil.DataNodesHistoryContext)dataNodesHistoryContext), true);
    }

    @Nullable
    private CompletableFuture<DataNodesHistoryMetaStorageOperation> applyTimerClosure0(CatalogZoneDescriptor zoneDescriptor, ScheduledTimer scheduledTimer, DistributionZonesUtil.DataNodesHistoryContext dataNodesHistoryContext) {
        assert (dataNodesHistoryContext != null) : "Data nodes history and timers are missing, zone=" + zoneDescriptor;
        DataNodesHistory dataNodesHistory = dataNodesHistoryContext.dataNodesHistory();
        DistributionZoneTimer timer = scheduledTimer.timerFromContext(dataNodesHistoryContext);
        if (timer.equals(DistributionZoneTimer.DEFAULT_TIMER)) {
            return CompletableFutures.nullCompletedFuture();
        }
        int zoneId = zoneDescriptor.id();
        LOG.debug("Triggered " + scheduledTimer.name() + " [zoneId={}, timer={}].", zoneId, timer);
        HybridTimestamp timeToTrigger = timer.timeToTrigger();
        if (dataNodesHistory.entryIsPresentAtExactTimestamp(timeToTrigger)) {
            return CompletableFutures.nullCompletedFuture();
        }
        long currentDelay = DataNodesManager.delayInSeconds(timeToTrigger);
        if (currentDelay < 0L) {
            return CompletableFutures.nullCompletedFuture();
        }
        if (currentDelay > 1L) {
            scheduledTimer.reschedule(currentDelay, this.applyTimerClosure(zoneDescriptor, scheduledTimer));
            return CompletableFutures.nullCompletedFuture();
        }
        DataNodesHistoryEntry currentDataNodes = scheduledTimer.recalculateDataNodes(dataNodesHistory, timer);
        return this.clockService.waitFor(timeToTrigger.addPhysicalTime(this.clockService.maxClockSkewMillis())).thenApply(ignored -> DataNodesHistoryMetaStorageOperation.builder().zoneId(zoneId).condition(Conditions.and(DataNodesManager.dataNodesHistoryEqualToOrNotExists(zoneId, dataNodesHistory), DataNodesManager.timerEqualToOrNotExists(scheduledTimer.metaStorageKey(), timer))).operations(DataNodesManager.operations(this.addNewEntryToDataNodesHistory(zoneId, dataNodesHistory, timeToTrigger, currentDataNodes.dataNodes()), DataNodesManager.clearTimer(scheduledTimer.metaStorageKey()))).operationName(scheduledTimer.name() + " trigger").currentDataNodesHistory(dataNodesHistory).currentTimestamp(timeToTrigger).historyEntryTimestamp(timeToTrigger).historyEntryNodes(currentDataNodes.dataNodes()).scaleUpTimer(scheduledTimer.scaleUpTimerAfterApply()).scaleDownTimer(scheduledTimer.scaleDownTimerAfterApply()).build());
    }

    private void onScaleUpTimerChange(CatalogZoneDescriptor zoneDescriptor, DistributionZoneTimer scaleUpTimer) {
        ScaleUpScheduledTimer timer = new ScaleUpScheduledTimer(zoneDescriptor);
        timer.init(scaleUpTimer);
    }

    private void onScaleDownTimerChange(CatalogZoneDescriptor zoneDescriptor, DistributionZoneTimer scaleDownTimer) {
        ScaleDownScheduledTimer timer = new ScaleDownScheduledTimer(zoneDescriptor);
        timer.init(scaleDownTimer);
    }

    private void restorePartitionResetTimer(int zoneId, DistributionZoneTimer scaleDownTimer, long revision) {
        if (!scaleDownTimer.equals(DistributionZoneTimer.DEFAULT_TIMER)) {
            this.reschedulePartitionReset(this.partitionDistributionResetTimeoutSupplier.getAsInt(), () -> this.partitionResetClosure.accept(revision, zoneId), zoneId);
        }
    }

    private static long delayInSeconds(HybridTimestamp timerTimestamp) {
        if (timerTimestamp.equals(HybridTimestamp.MIN_VALUE) || timerTimestamp.equals(HybridTimestamp.MAX_VALUE)) {
            return -1L;
        }
        long currentTime = System.currentTimeMillis();
        long delayMs = timerTimestamp.getPhysical() - currentTime;
        return Math.max(0L, delayMs / 1000L);
    }

    private void reschedulePartitionReset(long delayInSeconds, Runnable runnable, int zoneId) {
        this.zoneTimers.computeIfAbsent((Integer)Integer.valueOf((int)zoneId), (Function<Integer, ZoneTimers>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, createZoneTimers(int ), (Ljava/lang/Integer;)Lorg/apache/ignite3/internal/distributionzones/DataNodesManager$ZoneTimers;)((DataNodesManager)this)).partitionReset.reschedule(delayInSeconds, runnable);
    }

    private ZoneTimers createZoneTimers(int zoneId) {
        return new ZoneTimers(zoneId, this.executor);
    }

    private static DataNodesHistoryEntry currentDataNodes(HybridTimestamp timestamp, DataNodesHistory dataNodesHistory, DistributionZoneTimer scaleUpTimer, DistributionZoneTimer scaleDownTimer, CatalogZoneDescriptor zoneDescriptor) {
        DataNodesHistoryEntry currentDataNodesEntry = dataNodesHistory.dataNodesForTimestamp(timestamp);
        assert (currentDataNodesEntry.timestamp().longValue() != HybridTimestamp.MIN_VALUE.longValue()) : "Data nodes history is missing for timestamp [zoneId=" + zoneDescriptor.id() + ", timestamp=" + timestamp + "].";
        HashSet<NodeWithAttributes> dataNodes = new HashSet<NodeWithAttributes>(currentDataNodesEntry.dataNodes());
        long scaleUpTriggerTime = scaleUpTimer.timeToTrigger().longValue();
        long scaleDownTriggerTime = scaleDownTimer.timeToTrigger().longValue();
        long timestampLong = timestamp.longValue();
        HybridTimestamp newTimestamp = timestamp;
        if (scaleUpTriggerTime <= timestampLong) {
            dataNodes.addAll(DistributionZonesUtil.filterDataNodes(scaleUpTimer.nodes(), zoneDescriptor));
        }
        if (scaleDownTriggerTime <= timestampLong) {
            dataNodes.removeAll(scaleDownTimer.nodes());
        }
        return new DataNodesHistoryEntry(newTimestamp, dataNodes);
    }

    public CompletableFuture<Set<String>> dataNodes(int zoneId, HybridTimestamp timestamp) {
        return this.dataNodes(zoneId, timestamp, null);
    }

    public CompletableFuture<Set<String>> dataNodes(int zoneId, HybridTimestamp timestamp, @Nullable Integer catalogVersion) {
        CatalogZoneDescriptor zone;
        DataNodesHistory volatileHistory = this.dataNodesHistoryVolatile.get(zoneId);
        if (volatileHistory != null && volatileHistory.entryIsPresentAtExactTimestamp(timestamp)) {
            return CompletableFuture.completedFuture(DistributionZonesUtil.nodeNames(volatileHistory.dataNodesForTimestamp(timestamp).dataNodes()));
        }
        if (catalogVersion == null) {
            catalogVersion = this.catalogManager.activeCatalogVersion(timestamp.longValue());
        }
        if ((zone = this.catalogManager.catalog(catalogVersion).zone(zoneId)) == null) {
            return CompletableFuture.failedFuture(new DistributionZoneNotFoundException(zoneId));
        }
        return ((CompletableFuture)this.getValueFromMetaStorage(DistributionZonesUtil.zoneDataNodesHistoryKey(zoneId), DataNodesHistory.DataNodesHistorySerializer::deserialize).thenApply(history -> IgniteUtils.inBusyLock(this.busyLock, () -> {
            if (history == null) {
                return DistributionZonesUtil.filterDataNodes(this.topologyNodes(), zone);
            }
            DataNodesHistoryEntry entry = history.dataNodesForTimestamp(timestamp);
            return entry.dataNodes();
        }))).thenApply(DistributionZonesUtil::nodeNames);
    }

    public CompletableFuture<Set<String>> recalculateDataNodes(String zoneName) {
        int catalogVersion = this.catalogManager.latestCatalogVersion();
        CatalogZoneDescriptor zoneDescriptor = this.catalogManager.catalog(catalogVersion).zone(zoneName);
        if (zoneDescriptor == null) {
            return CompletableFuture.failedFuture(new DistributionZoneNotFoundException(zoneName));
        }
        return this.recalculateDataNodes(zoneDescriptor);
    }

    public CompletableFuture<Set<String>> recalculateDataNodes(int zoneId) {
        int catalogVersion = this.catalogManager.latestCatalogVersion();
        CatalogZoneDescriptor zoneDescriptor = this.catalogManager.catalog(catalogVersion).zone(zoneId);
        if (zoneDescriptor == null) {
            return CompletableFuture.failedFuture(new DistributionZoneNotFoundException(zoneId));
        }
        return this.recalculateDataNodes(zoneDescriptor);
    }

    private CompletableFuture<Set<String>> recalculateDataNodes(CatalogZoneDescriptor zoneDescriptor) {
        int zoneId = zoneDescriptor.id();
        Set<NodeWithAttributes> currentLogicalTopology = this.topologyNodes();
        Set<NodeWithAttributes> filteredDataNodes = DistributionZonesUtil.filterDataNodes(currentLogicalTopology, zoneDescriptor);
        return this.doOperation(zoneDescriptor, List.of(DistributionZonesUtil.zoneDataNodesHistoryKey(zoneId)), dataNodesHistoryContext -> CompletableFuture.completedFuture(this.recalculateAndApplyDataNodesToMetastoreImmediately(zoneDescriptor, filteredDataNodes, this.clockService.now(), (DistributionZonesUtil.DataNodesHistoryContext)dataNodesHistoryContext)), true).thenApply(v -> DistributionZonesUtil.nodeNames(filteredDataNodes));
    }

    @Nullable
    private DataNodesHistoryMetaStorageOperation recalculateAndApplyDataNodesToMetastoreImmediately(CatalogZoneDescriptor zoneDescriptor, Set<NodeWithAttributes> filteredDataNodes, HybridTimestamp timestamp, DistributionZonesUtil.DataNodesHistoryContext dataNodesHistoryContext) {
        assert (dataNodesHistoryContext != null) : "Data nodes history and timers are missing, zone=" + zoneDescriptor;
        DataNodesHistory dataNodesHistory = dataNodesHistoryContext.dataNodesHistory();
        if (dataNodesHistory.entryIsPresentAtExactTimestamp(timestamp)) {
            return null;
        }
        int zoneId = zoneDescriptor.id();
        this.stopAllTimers(zoneId);
        return DataNodesHistoryMetaStorageOperation.builder().zoneId(zoneId).condition(DataNodesManager.dataNodesHistoryEqualToOrNotExists(zoneId, dataNodesHistory)).operations(DataNodesManager.operations(this.addNewEntryToDataNodesHistory(zoneId, dataNodesHistory, timestamp, filteredDataNodes), DataNodesManager.clearTimer(DistributionZonesUtil.zoneScaleUpTimerKey(zoneId)), DataNodesManager.clearTimer(DistributionZonesUtil.zoneScaleDownTimerKey(zoneId)), DataNodesManager.clearTimer(DistributionZonesUtil.zonePartitionResetTimerKey(zoneId)))).operationName("distribution zone filter change").currentDataNodesHistory(dataNodesHistory).currentTimestamp(timestamp).historyEntryTimestamp(timestamp).historyEntryNodes(filteredDataNodes).scaleUpTimer(DistributionZoneTimer.DEFAULT_TIMER).scaleDownTimer(DistributionZoneTimer.DEFAULT_TIMER).build();
    }

    Set<NodeWithAttributes> topologyNodes() {
        Entry topologyEntry = this.metaStorageManager.getLocally(DistributionZonesUtil.zonesLogicalTopologyKey());
        if (topologyEntry.empty()) {
            return Collections.emptySet();
        }
        return DistributionZonesUtil.deserializeLogicalTopologySet(topologyEntry.value());
    }

    private static Condition dataNodesHistoryEqualToOrNotExists(int zoneId, DataNodesHistory history) {
        return Conditions.or(Conditions.notExists(DistributionZonesUtil.zoneDataNodesHistoryKey(zoneId)), Conditions.value(DistributionZonesUtil.zoneDataNodesHistoryKey(zoneId)).eq(DataNodesHistory.DataNodesHistorySerializer.serialize(history)));
    }

    private static Condition timerEqualToOrNotExists(ByteArray timerKey, DistributionZoneTimer timer) {
        return Conditions.or(Conditions.notExists(timerKey), Conditions.or(Conditions.value(timerKey).eq(DistributionZoneTimer.DistributionZoneTimerSerializer.serialize(timer)), Conditions.value(timerKey).eq(DistributionZoneTimer.DistributionZoneTimerSerializer.serialize(DistributionZoneTimer.DEFAULT_TIMER))));
    }

    private Operation addNewEntryToDataNodesHistory(int zoneId, DataNodesHistory history, HybridTimestamp timestamp, Set<NodeWithAttributes> nodes) {
        return this.addNewEntryToDataNodesHistory(zoneId, history, timestamp, nodes, false);
    }

    private Operation addNewEntryToDataNodesHistory(int zoneId, DataNodesHistory history, HybridTimestamp timestamp, Set<NodeWithAttributes> nodes, boolean addMandatoryEntry) {
        if (!addMandatoryEntry && !history.isEmpty() && nodes.equals(history.dataNodesForTimestamp(HybridTimestamp.MAX_VALUE).dataNodes())) {
            return Operations.noop();
        }
        DataNodesHistory newHistory = history.addHistoryEntry(timestamp, nodes);
        this.dataNodesHistoryVolatile.put(zoneId, newHistory);
        return Operations.put(DistributionZonesUtil.zoneDataNodesHistoryKey(zoneId), DataNodesHistory.DataNodesHistorySerializer.serialize(newHistory));
    }

    private static Operation renewTimer(ByteArray timerKey, DistributionZoneTimer timer) {
        return Operations.put(timerKey, DistributionZoneTimer.DistributionZoneTimerSerializer.serialize(timer));
    }

    private static Operation clearTimer(ByteArray timerKey) {
        return Operations.put(timerKey, DistributionZoneTimer.DistributionZoneTimerSerializer.serialize(DistributionZoneTimer.DEFAULT_TIMER));
    }

    private <T> CompletableFuture<T> getValueFromMetaStorage(ByteArray key, Function<byte[], T> deserializer) {
        return this.metaStorageManager.get(key).thenApply(e -> DataNodesManager.deserializeEntry(e, deserializer));
    }

    private CompletableFuture<DistributionZonesUtil.DataNodesHistoryContext> getDataNodeHistoryContextMs(List<ByteArray> keys) {
        return this.metaStorageManager.getAll(new HashSet<ByteArray>(keys)).thenApply(entries -> DistributionZonesUtil.dataNodeHistoryContextFromValues(entries.values()));
    }

    private CompletableFuture<DistributionZonesUtil.DataNodesHistoryContext> getDataNodeHistoryContextMsLocally(List<ByteArray> keys) {
        List<Entry> entries = this.metaStorageManager.getAllLocally(keys);
        return CompletableFuture.completedFuture(DistributionZonesUtil.dataNodeHistoryContextFromValues(entries));
    }

    private CompletableFuture<DistributionZonesUtil.DataNodesHistoryContext> ensureContextIsPresentAndInitZoneIfNeeded(@Nullable DistributionZonesUtil.DataNodesHistoryContext context, List<ByteArray> keys, int zoneId) {
        if (context == null) {
            return this.initZone(zoneId).thenCompose(ignored -> this.getDataNodeHistoryContextMs(keys));
        }
        return CompletableFuture.completedFuture(context);
    }

    @Nullable
    private static <T> T deserializeEntry(@Nullable Entry e, Function<byte[], T> deserializer) {
        if (e == null || e.value() == null || e.empty() || e.tombstone()) {
            return null;
        }
        return deserializer.apply(e.value());
    }

    private CompletableFuture<Void> doOperation(CatalogZoneDescriptor zone, List<ByteArray> keysToRead, Function<DistributionZonesUtil.DataNodesHistoryContext, CompletableFuture<DataNodesHistoryMetaStorageOperation>> operation, boolean ensureContextIsPresent) {
        return this.msInvokeWithRetry(msGetter -> msGetter.get(keysToRead).thenCompose(operation), zone, ensureContextIsPresent);
    }

    private CompletableFuture<Void> msInvokeWithRetry(Function<DataNodeHistoryContextMetaStorageGetter, CompletableFuture<DataNodesHistoryMetaStorageOperation>> metaStorageOperationSupplier, CatalogZoneDescriptor zone, boolean ensureContextIsPresent) {
        return this.msInvokeWithRetry(metaStorageOperationSupplier, 100, zone, ensureContextIsPresent);
    }

    private CompletableFuture<Void> msInvokeWithRetry(Function<DataNodeHistoryContextMetaStorageGetter, CompletableFuture<DataNodesHistoryMetaStorageOperation>> metaStorageOperationSupplier, int attemptsLeft, CatalogZoneDescriptor zone, boolean ensureContextIsPresent) {
        if (attemptsLeft <= 0) {
            throw new AssertionError((Object)("Failed to perform meta storage invoke, maximum number of attempts reached [zone=" + zone + "]."));
        }
        DataNodeHistoryContextMetaStorageGetter msGetter0 = attemptsLeft == 100 ? this::getDataNodeHistoryContextMsLocally : this::getDataNodeHistoryContextMs;
        DataNodeHistoryContextMetaStorageGetter msGetter = ensureContextIsPresent ? keys -> msGetter0.get(keys).thenCompose(context -> this.ensureContextIsPresentAndInitZoneIfNeeded((DistributionZonesUtil.DataNodesHistoryContext)context, keys, zone.id())) : msGetter0;
        CompletableFuture<DataNodesHistoryMetaStorageOperation> metaStorageOperationFuture = metaStorageOperationSupplier.apply(msGetter);
        return metaStorageOperationFuture.thenCompose(metaStorageOperation -> {
            if (metaStorageOperation == null) {
                return CompletableFutures.nullCompletedFuture();
            }
            return ((CompletableFuture)this.metaStorageManager.invoke(metaStorageOperation.operation()).thenCompose(result -> {
                if (result.getAsBoolean()) {
                    LOG.info(metaStorageOperation.successLogMessage(), new Object[0]);
                    return CompletableFutures.nullCompletedFuture();
                }
                return this.msInvokeWithRetry(metaStorageOperationSupplier, attemptsLeft - 1, zone, ensureContextIsPresent);
            })).whenComplete((v, e) -> {
                if (e != null && !DataNodesManager.relatesToNodeStopping(e)) {
                    this.failureProcessor.process(new FailureContext((Throwable)e, metaStorageOperation.failureLogMessage()));
                }
            });
        });
    }

    private static boolean relatesToNodeStopping(Throwable e) {
        return ExceptionUtils.hasCause(e, NodeStoppingException.class);
    }

    CompletableFuture<?> onZoneCreate(int zoneId, HybridTimestamp timestamp, Set<NodeWithAttributes> dataNodes) {
        return this.initZone(zoneId, timestamp, dataNodes, false);
    }

    private CompletableFuture<?> initZoneWithLegacyDataNodes(CatalogZoneDescriptor zone, byte[] legacyDataNodesBytes, long recoveryRevision) {
        Entry nodeAttributesEntry = this.metaStorageManager.getLocally(DistributionZonesUtil.zonesNodesAttributes(), recoveryRevision);
        Map<UUID, NodeWithAttributes> nodesAttributes = DistributionZonesUtil.deserializeNodesAttributes(nodeAttributesEntry.value());
        Set<NodeWithAttributes> unfilteredDataNodes = DataNodesMapSerializer.deserialize(legacyDataNodesBytes).keySet().stream().map(node -> {
            NodeWithAttributes nwa = (NodeWithAttributes)nodesAttributes.get(node.nodeId());
            if (nwa == null) {
                return new NodeWithAttributes(node.nodeName(), node.nodeId(), null);
            }
            Map<String, String> userAttributes = nwa.userAttributes();
            List<String> storageProfiles = nwa.storageProfiles();
            return new NodeWithAttributes(node.nodeName(), node.nodeId(), userAttributes, storageProfiles);
        }).collect(Collectors.toSet());
        Set<NodeWithAttributes> dataNodes = DistributionZonesUtil.filterDataNodes(unfilteredDataNodes, zone);
        LOG.info("Recovering data nodes of distribution zone from legacy data nodes [zoneId={}, unfilteredDataNodes={}, filter='{}', dataNodes={}]", zone.id(), DistributionZonesUtil.nodeNames(unfilteredDataNodes), zone.filter(), DistributionZonesUtil.nodeNames(dataNodes));
        return this.initZone(zone.id(), this.clockService.current(), dataNodes, true);
    }

    private CompletableFuture<?> initZone(int zoneId) {
        CatalogZoneDescriptor zone = this.zoneDescriptor(zoneId);
        Set<NodeWithAttributes> topologyNodes = this.latestLogicalTopologyProvider.get();
        Set<NodeWithAttributes> filteredNodes = DistributionZonesUtil.filterDataNodes(topologyNodes, zone);
        return this.initZone(zoneId, this.clockService.now(), filteredNodes, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<?> initZone(int zoneId, HybridTimestamp timestamp, Set<NodeWithAttributes> dataNodes, boolean removeLegacyDataNodes) {
        if (!this.busyLock.enterBusy()) {
            throw new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)new NodeStoppingException());
        }
        try {
            CompoundCondition condition = Conditions.and(Conditions.notExists(DistributionZonesUtil.zoneDataNodesHistoryKey(zoneId)), Conditions.notTombstone(DistributionZonesUtil.zoneDataNodesHistoryKey(zoneId)));
            Update update = new Operations(DataNodesManager.operations(this.addNewEntryToDataNodesHistory(zoneId, new DataNodesHistory(), timestamp, dataNodes), DataNodesManager.clearTimer(DistributionZonesUtil.zoneScaleUpTimerKey(zoneId)), DataNodesManager.clearTimer(DistributionZonesUtil.zoneScaleDownTimerKey(zoneId)), DataNodesManager.clearTimer(DistributionZonesUtil.zonePartitionResetTimerKey(zoneId)), removeLegacyDataNodes ? Operations.remove(DistributionZonesUtil.zoneDataNodesKey(zoneId)) : Operations.noop(), removeLegacyDataNodes ? Operations.remove(DistributionZonesUtil.zonesNodesAttributes()) : Operations.noop())).yield(true);
            Iif iif = Statements.iif((Condition)condition, update, Operations.ops(new Operation[0]).yield(false));
            CompletionStage completionStage = ((CompletableFuture)this.metaStorageManager.invoke(iif).thenApply(StatementResult::getAsBoolean)).whenComplete((invokeResult, e) -> {
                if (e != null) {
                    if (!DataNodesManager.relatesToNodeStopping(e)) {
                        String errorMessage = String.format("Failed to initialize zone's dataNodes history [zoneId = %s, timestamp = %s, dataNodes = %s]", zoneId, timestamp, DistributionZonesUtil.nodeNames(dataNodes));
                        this.failureProcessor.process(new FailureContext((Throwable)e, errorMessage));
                    }
                } else if (invokeResult.booleanValue()) {
                    LOG.info("Initialized zone's dataNodes history [zoneId = {}, timestamp = {}, dataNodes = {}]", zoneId, timestamp, DistributionZonesUtil.nodeNames(dataNodes));
                } else {
                    LOG.debug("Failed to initialize zone's dataNodes history [zoneId = {}, timestamp = {}, dataNodes = {}]", zoneId, timestamp, DistributionZonesUtil.nodeNames(dataNodes));
                }
            });
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    CompletableFuture<?> onZoneDrop(int zoneId, HybridTimestamp timestamp) {
        return this.removeDataNodesKeys(zoneId, timestamp).thenRun(() -> {
            ZoneTimers zt = this.zoneTimers.remove(zoneId);
            if (zt != null) {
                zt.stopAllTimers();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<?> removeDataNodesKeys(int zoneId, HybridTimestamp timestamp) {
        if (!this.busyLock.enterBusy()) {
            throw new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)new NodeStoppingException());
        }
        try {
            SimpleCondition condition = Conditions.exists(DistributionZonesUtil.zoneDataNodesHistoryKey(zoneId));
            Update removeKeysUpd = Operations.ops(Operations.remove(DistributionZonesUtil.zoneScaleUpTimerKey(zoneId)), Operations.remove(DistributionZonesUtil.zoneScaleDownTimerKey(zoneId)), Operations.remove(DistributionZonesUtil.zonePartitionResetTimerKey(zoneId))).yield(true);
            Iif iif = Statements.iif((Condition)condition, removeKeysUpd, Operations.ops(new Operation[0]).yield(false));
            CompletionStage completionStage = ((CompletableFuture)this.metaStorageManager.invoke(iif).thenApply(StatementResult::getAsBoolean)).whenComplete((invokeResult, e) -> {
                if (e != null) {
                    if (!DataNodesManager.relatesToNodeStopping(e)) {
                        String errorMessage = String.format("Failed to delete zone's dataNodes keys [zoneId = %s, timestamp = %s]", zoneId, timestamp);
                        this.failureProcessor.process(new FailureContext((Throwable)e, errorMessage));
                    }
                } else if (invokeResult.booleanValue()) {
                    LOG.info("Delete zone's dataNodes keys [zoneId = {}, timestamp = {}]", zoneId, timestamp);
                } else {
                    LOG.debug("Failed to delete zone's dataNodes keys [zoneId = {}, timestamp = {}]", zoneId, timestamp);
                }
            });
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private void stopAllTimers(int zoneId) {
        ZoneTimers zt = this.zoneTimers.get(zoneId);
        if (zt != null) {
            zt.stopAllTimers();
        }
    }

    private WatchListener createScaleUpTimerPrefixListener() {
        return event -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> DataNodesManager.processWatchEvent(event, DistributionZonesUtil.DISTRIBUTION_ZONE_SCALE_UP_TIMER_PREFIX_BYTES, (zoneId, e) -> this.processTimerWatchEvent((int)zoneId, (Entry)e, true)));
    }

    private WatchListener createScaleDownTimerPrefixListener() {
        return event -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> DataNodesManager.processWatchEvent(event, DistributionZonesUtil.DISTRIBUTION_ZONE_SCALE_DOWN_TIMER_PREFIX_BYTES, (zoneId, e) -> this.processTimerWatchEvent((int)zoneId, (Entry)e, false)));
    }

    private WatchListener createDataNodesListener() {
        return event -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> DataNodesManager.processWatchEvent(event, DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX_BYTES, this::processDataNodesHistoryWatchEvent));
    }

    private CompletableFuture<Void> processTimerWatchEvent(int zoneId, Entry e, boolean scaleUp) {
        if (e.tombstone()) {
            return CompletableFutures.nullCompletedFuture();
        }
        DistributionZoneTimer timer = DistributionZoneTimer.DistributionZoneTimerSerializer.deserialize(e.value());
        CatalogZoneDescriptor zoneDescriptor = this.catalogManager.activeCatalog(e.timestamp().longValue()).zone(zoneId);
        if (zoneDescriptor == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        if (scaleUp) {
            this.onScaleUpTimerChange(zoneDescriptor, timer);
        } else {
            this.onScaleDownTimerChange(zoneDescriptor, timer);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    private CompletableFuture<Void> processDataNodesHistoryWatchEvent(int zoneId, Entry e) {
        if (!e.tombstone() && e.value() != null) {
            DataNodesHistory history = DataNodesHistory.DataNodesHistorySerializer.deserialize(e.value());
            CatalogZoneDescriptor zoneDescriptor = this.catalogManager.activeCatalog(e.timestamp().longValue()).zone(zoneId);
            if (zoneDescriptor == null) {
                return CompletableFutures.nullCompletedFuture();
            }
            this.dataNodesHistoryVolatile.put(zoneId, history);
        } else if (e.tombstone()) {
            this.dataNodesHistoryVolatile.remove(zoneId);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    private static CompletableFuture<Void> processWatchEvent(WatchEvent event, byte[] keyPrefix, BiFunction<Integer, Entry, CompletableFuture<Void>> processor) {
        EntryEvent entryEvent = event.entryEvent();
        Entry e = entryEvent.newEntry();
        if (e != null && !e.empty()) {
            assert (IgniteUtils.startsWith(e.key(), keyPrefix)) : "Unexpected key: " + new String(e.key(), StandardCharsets.UTF_8);
            int zoneId = RebalanceUtil.extractZoneId(e.key(), keyPrefix);
            return processor.apply(zoneId, e);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    private CatalogZoneDescriptor zoneDescriptor(int zoneId) {
        CatalogZoneDescriptor zone = this.catalogManager.catalog(this.catalogManager.latestCatalogVersion()).zone(zoneId);
        if (zone == null) {
            throw new DistributionZoneNotFoundException(zoneId);
        }
        return zone;
    }

    private static List<Operation> operations(Operation ... operations) {
        ArrayList<Operation> res = new ArrayList<Operation>();
        for (Operation op : operations) {
            if (op.type() == OperationType.NO_OP) continue;
            res.add(op);
        }
        return res;
    }

    @TestOnly
    public ZoneTimers zoneTimers(int zoneId) {
        return this.zoneTimers.computeIfAbsent(zoneId, k -> new ZoneTimers(zoneId, this.executor));
    }

    @VisibleForTesting
    public static class ZoneTimers {
        public final ZoneTimerSchedule scaleUp;
        public final ZoneTimerSchedule scaleDown;
        public final ZoneTimerSchedule partitionReset;

        ZoneTimers(int zoneId, StripedScheduledThreadPoolExecutor executor) {
            this.scaleUp = new ZoneTimerSchedule(zoneId, executor);
            this.scaleDown = new ZoneTimerSchedule(zoneId, executor);
            this.partitionReset = new ZoneTimerSchedule(zoneId, executor);
        }

        void stopAllTimers() {
            this.scaleUp.stopTimer();
            this.scaleDown.stopTimer();
            this.partitionReset.stopTimer();
        }
    }

    @VisibleForTesting
    public static class ZoneTimerSchedule {
        final StripedScheduledThreadPoolExecutor executor;
        final int zoneId;
        @Nullable
        private ScheduledFuture<?> taskFuture;
        private long delay;

        ZoneTimerSchedule(int zoneId, StripedScheduledThreadPoolExecutor executor) {
            this.zoneId = zoneId;
            this.executor = executor;
        }

        synchronized void reschedule(long delayInSeconds, Runnable task) {
            this.stopScheduledTask();
            this.delay = delayInSeconds;
            if (delayInSeconds >= 0L) {
                this.taskFuture = this.executor.schedule(task, delayInSeconds, TimeUnit.SECONDS, this.zoneId);
            }
        }

        synchronized void stopScheduledTask() {
            if (this.taskFuture != null && this.delay > 0L) {
                this.taskFuture.cancel(false);
                this.taskFuture = null;
                this.delay = 0L;
            }
        }

        synchronized void stopTimer() {
            if (this.taskFuture != null) {
                this.taskFuture.cancel(false);
                this.taskFuture = null;
            }
        }

        @TestOnly
        public synchronized boolean taskIsScheduled() {
            return this.taskFuture != null;
        }

        @TestOnly
        public synchronized boolean taskIsCancelled() {
            return this.taskFuture == null;
        }

        @TestOnly
        public synchronized boolean taskIsDone() {
            return this.taskFuture != null && this.taskFuture.isDone();
        }
    }

    private static interface ScheduledTimer {
        public String name();

        public void init(DistributionZoneTimer var1);

        public ByteArray metaStorageKey();

        public DistributionZoneTimer timerFromContext(DistributionZonesUtil.DataNodesHistoryContext var1);

        public void reschedule(long var1, Runnable var3);

        public DataNodesHistoryEntry recalculateDataNodes(DataNodesHistory var1, DistributionZoneTimer var2);

        @Nullable
        public DistributionZoneTimer scaleUpTimerAfterApply();

        @Nullable
        public DistributionZoneTimer scaleDownTimerAfterApply();
    }

    private class ScaleUpScheduledTimer
    implements ScheduledTimer {
        final CatalogZoneDescriptor zone;

        private ScaleUpScheduledTimer(CatalogZoneDescriptor zone) {
            this.zone = zone;
        }

        @Override
        public void init(DistributionZoneTimer timer) {
            int zoneId = this.zone.id();
            if (timer.equals(DistributionZoneTimer.DEFAULT_TIMER)) {
                DataNodesManager.this.zoneTimers.computeIfAbsent((Integer)Integer.valueOf((int)zoneId), (Function<Integer, ZoneTimers>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$init$0(java.lang.Integer ), (Ljava/lang/Integer;)Lorg/apache/ignite3/internal/distributionzones/DataNodesManager$ZoneTimers;)((ScaleUpScheduledTimer)this)).scaleUp.stopScheduledTask();
                return;
            }
            this.reschedule(DataNodesManager.delayInSeconds(timer.timeToTrigger()), DataNodesManager.this.applyTimerClosure(this.zone, this));
        }

        @Override
        public String name() {
            return "scale up timer";
        }

        @Override
        public ByteArray metaStorageKey() {
            return DistributionZonesUtil.zoneScaleUpTimerKey(this.zone.id());
        }

        @Override
        public DistributionZoneTimer timerFromContext(DistributionZonesUtil.DataNodesHistoryContext context) {
            return context.scaleUpTimer();
        }

        @Override
        public void reschedule(long delayInSeconds, Runnable runnable) {
            DataNodesManager.this.zoneTimers.computeIfAbsent((Integer)Integer.valueOf((int)this.zone.id()), (Function<Integer, ZoneTimers>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$reschedule$1(java.lang.Integer ), (Ljava/lang/Integer;)Lorg/apache/ignite3/internal/distributionzones/DataNodesManager$ZoneTimers;)((ScaleUpScheduledTimer)this)).scaleUp.reschedule(delayInSeconds, runnable);
        }

        @Override
        public DataNodesHistoryEntry recalculateDataNodes(DataNodesHistory dataNodesHistory, DistributionZoneTimer timer) {
            return DataNodesManager.currentDataNodes(timer.timeToTrigger(), dataNodesHistory, timer, DistributionZoneTimer.DEFAULT_TIMER, this.zone);
        }

        @Override
        @Nullable
        public DistributionZoneTimer scaleUpTimerAfterApply() {
            return DistributionZoneTimer.DEFAULT_TIMER;
        }

        @Override
        @Nullable
        public DistributionZoneTimer scaleDownTimerAfterApply() {
            return null;
        }

        private /* synthetic */ ZoneTimers lambda$reschedule$1(Integer k) {
            return DataNodesManager.this.createZoneTimers(this.zone.id());
        }

        private /* synthetic */ ZoneTimers lambda$init$0(Integer k) {
            return DataNodesManager.this.createZoneTimers(this.zone.id());
        }
    }

    private class ScaleDownScheduledTimer
    implements ScheduledTimer {
        final CatalogZoneDescriptor zone;

        private ScaleDownScheduledTimer(CatalogZoneDescriptor zone) {
            this.zone = zone;
        }

        @Override
        public void init(DistributionZoneTimer timer) {
            int zoneId = this.zone.id();
            if (timer.equals(DistributionZoneTimer.DEFAULT_TIMER)) {
                DataNodesManager.this.zoneTimers.computeIfAbsent((Integer)Integer.valueOf((int)zoneId), (Function<Integer, ZoneTimers>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$init$0(java.lang.Integer ), (Ljava/lang/Integer;)Lorg/apache/ignite3/internal/distributionzones/DataNodesManager$ZoneTimers;)((ScaleDownScheduledTimer)this)).scaleDown.stopScheduledTask();
                return;
            }
            this.reschedule(DataNodesManager.delayInSeconds(timer.timeToTrigger()), DataNodesManager.this.applyTimerClosure(this.zone, this));
        }

        @Override
        public String name() {
            return "scale down timer";
        }

        @Override
        public ByteArray metaStorageKey() {
            return DistributionZonesUtil.zoneScaleDownTimerKey(this.zone.id());
        }

        @Override
        public DistributionZoneTimer timerFromContext(DistributionZonesUtil.DataNodesHistoryContext context) {
            return context.scaleDownTimer();
        }

        @Override
        public void reschedule(long delayInSeconds, Runnable runnable) {
            DataNodesManager.this.zoneTimers.computeIfAbsent((Integer)Integer.valueOf((int)this.zone.id()), (Function<Integer, ZoneTimers>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$reschedule$1(java.lang.Integer ), (Ljava/lang/Integer;)Lorg/apache/ignite3/internal/distributionzones/DataNodesManager$ZoneTimers;)((ScaleDownScheduledTimer)this)).scaleDown.reschedule(delayInSeconds, runnable);
        }

        @Override
        public DataNodesHistoryEntry recalculateDataNodes(DataNodesHistory dataNodesHistory, DistributionZoneTimer timer) {
            return DataNodesManager.currentDataNodes(timer.timeToTrigger(), dataNodesHistory, DistributionZoneTimer.DEFAULT_TIMER, timer, this.zone);
        }

        @Override
        @Nullable
        public DistributionZoneTimer scaleUpTimerAfterApply() {
            return null;
        }

        @Override
        @Nullable
        public DistributionZoneTimer scaleDownTimerAfterApply() {
            return DistributionZoneTimer.DEFAULT_TIMER;
        }

        private /* synthetic */ ZoneTimers lambda$reschedule$1(Integer k) {
            return DataNodesManager.this.createZoneTimers(this.zone.id());
        }

        private /* synthetic */ ZoneTimers lambda$init$0(Integer k) {
            return DataNodesManager.this.createZoneTimers(this.zone.id());
        }
    }

    @FunctionalInterface
    private static interface DataNodeHistoryContextMetaStorageGetter {
        public CompletableFuture<DistributionZonesUtil.DataNodesHistoryContext> get(List<ByteArray> var1);
    }
}

