/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.table.distributed.disaster;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.components.NodeProperties;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
import org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEvent;
import org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEventParams;
import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite.internal.event.Event;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.metrics.MetricSource;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum;
import org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
import org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesRequest;
import org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesResponse;
import org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateMessage;
import org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateRequest;
import org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateResponse;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.systemview.api.SystemView;
import org.apache.ignite.internal.systemview.api.SystemViewManager;
import org.apache.ignite.internal.systemview.api.SystemViewProvider;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequest;
import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestSerializer;
import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews;
import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum;
import org.apache.ignite.internal.table.distributed.disaster.GlobalTablePartitionState;
import org.apache.ignite.internal.table.distributed.disaster.GroupUpdateRequest;
import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState;
import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateByNode;
import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnumWithLogIndex;
import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateMessageByNode;
import org.apache.ignite.internal.table.distributed.disaster.LocalTablePartitionState;
import org.apache.ignite.internal.table.distributed.disaster.LocalTablePartitionStateByNode;
import org.apache.ignite.internal.table.distributed.disaster.ManualGroupRestartRequest;
import org.apache.ignite.internal.table.distributed.disaster.PartitionStatesMetricSource;
import org.apache.ignite.internal.table.distributed.disaster.SingleZoneResetDistributionRequest;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalPartitionIdException;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.NodesNotFoundException;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.ZonesNotFoundException;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.apache.ignite.internal.versioned.VersionedSerializer;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.table.QualifiedNameHelper;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class DisasterRecoveryManager
implements IgniteComponent,
SystemViewProvider {
    static final IgniteLogger LOG = Loggers.forClass(DisasterRecoveryManager.class);
    static final ByteArray RECOVERY_TRIGGER_KEY = new ByteArray("disaster.recovery.trigger");
    private static final String RECOVERY_TRIGGER_REVISION_KEY_PREFIX = "disaster.recovery.trigger.revision.";
    private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private static final IgniteProductVersion VERSION_WITH_ROW_COUNT = IgniteProductVersion.fromString((String)"9.1.5");
    private static final int TIMEOUT_SECONDS = 30;
    private static final int CATCH_UP_THRESHOLD = 100;
    private final ExecutorService threadPool;
    private final MessagingService messagingService;
    final MetaStorageManager metaStorageManager;
    final CatalogManager catalogManager;
    final DistributionZoneManager dzManager;
    final Loza raftManager;
    private final TopologyService topologyService;
    private final WatchListener watchListener;
    final TableManager tableManager;
    final PartitionReplicaLifecycleManager partitionReplicaLifecycleManager;
    private final MetricManager metricManager;
    private final FailureManager failureManager;
    private final NodeProperties nodeProperties;
    private final SystemViewManager systemViewManager;
    private final ClusterManagementGroupManager cmgManager;
    private final Map<UUID, CompletableFuture<Void>> ongoingOperationsById = new ConcurrentHashMap<UUID, CompletableFuture<Void>>();
    private final Map<Integer, PartitionStatesMetricSource> metricSourceByTableId = new ConcurrentHashMap<Integer, PartitionStatesMetricSource>();

    public DisasterRecoveryManager(ExecutorService threadPool, MessagingService messagingService, MetaStorageManager metaStorageManager, CatalogManager catalogManager, DistributionZoneManager dzManager, Loza raftManager, TopologyService topologyService, TableManager tableManager, MetricManager metricManager, FailureManager failureManager, PartitionReplicaLifecycleManager partitionReplicaLifecycleManager, NodeProperties nodeProperties, SystemViewManager systemViewManager, ClusterManagementGroupManager cmgManager) {
        this.threadPool = threadPool;
        this.messagingService = messagingService;
        this.metaStorageManager = metaStorageManager;
        this.catalogManager = catalogManager;
        this.dzManager = dzManager;
        this.raftManager = raftManager;
        this.topologyService = topologyService;
        this.tableManager = tableManager;
        this.metricManager = metricManager;
        this.failureManager = failureManager;
        this.partitionReplicaLifecycleManager = partitionReplicaLifecycleManager;
        this.nodeProperties = nodeProperties;
        this.systemViewManager = systemViewManager;
        this.cmgManager = cmgManager;
        this.watchListener = event -> {
            this.handleTriggerKeyUpdate(event);
            return CompletableFutures.nullCompletedFuture();
        };
    }

    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        this.systemViewManager.register((SystemViewProvider)this);
        this.messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, this::handleMessage);
        this.metaStorageManager.registerExactWatch(RECOVERY_TRIGGER_KEY, this.watchListener);
        if (!this.nodeProperties.colocationEnabled()) {
            this.dzManager.listen((Event)HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED, this::onHaZoneTablePartitionTopologyReduce);
        } else {
            this.dzManager.listen((Event)HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED, this::onHaZonePartitionTopologyReduce);
        }
        this.catalogManager.listen((Event)CatalogEvent.TABLE_CREATE, EventListener.fromConsumer(this::onTableCreate));
        this.catalogManager.listen((Event)CatalogEvent.TABLE_DROP, EventListener.fromConsumer(this::onTableDrop));
        this.registerMetricSources();
        return CompletableFutures.nullCompletedFuture();
    }

    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        this.metaStorageManager.unregisterWatch(this.watchListener);
        for (CompletableFuture<Void> future : this.ongoingOperationsById.values()) {
            future.completeExceptionally(new NodeStoppingException());
        }
        return CompletableFutures.nullCompletedFuture();
    }

    public List<SystemView<?>> systemViews() {
        if (this.nodeProperties.colocationEnabled()) {
            return List.of(DisasterRecoverySystemViews.createGlobalZonePartitionStatesSystemView(this), DisasterRecoverySystemViews.createLocalZonePartitionStatesSystemView(this), DisasterRecoverySystemViews.createGlobalTablePartitionStatesSystemView(this), DisasterRecoverySystemViews.createLocalTablePartitionStatesSystemView(this));
        }
        return List.of(DisasterRecoverySystemViews.createGlobalTablePartitionStatesSystemView(this), DisasterRecoverySystemViews.createLocalTablePartitionStatesSystemView(this));
    }

    @TestOnly
    public Map<UUID, CompletableFuture<Void>> ongoingOperationsById() {
        return this.ongoingOperationsById;
    }

    private CompletableFuture<Boolean> onHaZoneTablePartitionTopologyReduce(HaZoneTopologyUpdateEventParams params) {
        int zoneId = params.zoneId();
        long revision = params.causalityToken();
        long timestamp = this.metaStorageManager.timestampByRevisionLocally(revision).longValue();
        Catalog catalog = this.catalogManager.activeCatalog(timestamp);
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
        HashMap<Integer, Set<Integer>> tablePartitionsToReset = new HashMap<Integer, Set<Integer>>();
        for (CatalogTableDescriptor table : catalog.tables(zoneId)) {
            HashSet<Integer> partitionsToReset = new HashSet<Integer>();
            for (int partId = 0; partId < zoneDescriptor.partitions(); ++partId) {
                TablePartitionId partitionId = new TablePartitionId(table.id(), partId);
                if (this.stableAssignmentsWithOnlyAliveNodes((ReplicationGroupId)partitionId, revision, false).size() >= DisasterRecoveryManager.calculateQuorum(zoneDescriptor.replicas())) continue;
                partitionsToReset.add(partId);
            }
            if (partitionsToReset.isEmpty()) continue;
            tablePartitionsToReset.put(table.id(), partitionsToReset);
        }
        if (!tablePartitionsToReset.isEmpty()) {
            return this.resetPartitions(zoneDescriptor.name(), tablePartitionsToReset, false, revision, false).thenApply(r -> false);
        }
        return CompletableFutures.falseCompletedFuture();
    }

    private CompletableFuture<Boolean> onHaZonePartitionTopologyReduce(HaZoneTopologyUpdateEventParams params) {
        int zoneId = params.zoneId();
        long revision = params.causalityToken();
        long timestamp = this.metaStorageManager.timestampByRevisionLocally(revision).longValue();
        Catalog catalog = this.catalogManager.activeCatalog(timestamp);
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
        HashSet<Integer> partitionsToReset = new HashSet<Integer>();
        for (int partId = 0; partId < zoneDescriptor.partitions(); ++partId) {
            ZonePartitionId partitionId = new ZonePartitionId(zoneId, partId);
            if (this.stableAssignmentsWithOnlyAliveNodes((ReplicationGroupId)partitionId, revision, true).size() >= DisasterRecoveryManager.calculateQuorum(zoneDescriptor.replicas())) continue;
            partitionsToReset.add(partId);
        }
        if (!partitionsToReset.isEmpty()) {
            return this.resetPartitions(zoneDescriptor.name(), Map.of(zoneId, partitionsToReset), false, revision, true).thenApply(r -> false);
        }
        return CompletableFutures.falseCompletedFuture();
    }

    private Set<Assignment> stableAssignmentsWithOnlyAliveNodes(ReplicationGroupId partitionId, long revision, boolean colocationEnabled) {
        Set stableAssignments = colocationEnabled ? ZoneRebalanceUtil.zoneStableAssignmentsGetLocally((MetaStorageManager)this.metaStorageManager, (ZonePartitionId)((ZonePartitionId)partitionId), (long)revision).nodes() : RebalanceUtil.stableAssignmentsGetLocally((MetaStorageManager)this.metaStorageManager, (TablePartitionId)((TablePartitionId)partitionId), (long)revision).nodes();
        Set logicalTopology = this.dzManager.logicalTopology(revision).stream().map(NodeWithAttributes::nodeName).collect(Collectors.toUnmodifiableSet());
        return stableAssignments.stream().filter(a -> logicalTopology.contains(a.consistentId())).collect(Collectors.toUnmodifiableSet());
    }

    public CompletableFuture<Void> resetTablePartitions(String zoneName, String schemaName, String tableName, Set<Integer> partitionIds) {
        int tableId = DisasterRecoveryManager.tableDescriptor(this.catalogLatestVersion(), schemaName, tableName).id();
        return this.resetPartitions(zoneName, Map.of(tableId, partitionIds), true, -1L, false);
    }

    public CompletableFuture<Void> resetTablePartitions(String zoneName, String schemaName, String tableName, Set<Integer> partitionIds, boolean manualUpdate, long triggerRevision) {
        int tableId = DisasterRecoveryManager.tableDescriptor(this.catalogLatestVersion(), schemaName, tableName).id();
        return this.resetPartitions(zoneName, Map.of(tableId, partitionIds), manualUpdate, triggerRevision, false);
    }

    public CompletableFuture<Void> resetPartitions(String zoneName, Set<Integer> partitionIds) {
        int zoneId = DisasterRecoveryManager.zoneDescriptor(this.catalogLatestVersion(), zoneName).id();
        return this.resetPartitions(zoneName, Map.of(zoneId, partitionIds), true, -1L, true);
    }

    public CompletableFuture<Void> resetPartitions(String zoneName, Set<Integer> partitionIds, boolean manualUpdate, long triggerRevision) {
        int zoneId = DisasterRecoveryManager.zoneDescriptor(this.catalogLatestVersion(), zoneName).id();
        return this.resetPartitions(zoneName, Map.of(zoneId, partitionIds), manualUpdate, triggerRevision, true);
    }

    private CompletableFuture<Void> resetPartitions(String zoneName, Map<Integer, Set<Integer>> partitionIds, boolean manualUpdate, long triggerRevision, boolean colocationEnabled) {
        try {
            Catalog catalog = this.catalogLatestVersion();
            CatalogZoneDescriptor zone = DisasterRecoveryManager.zoneDescriptor(catalog, zoneName);
            partitionIds.values().forEach(ids -> DisasterRecoveryManager.checkPartitionsRange(ids, Set.of(zone)));
            return this.processNewRequest(GroupUpdateRequest.create(UUID.randomUUID(), catalog.version(), zone.id(), partitionIds, manualUpdate, colocationEnabled), triggerRevision);
        }
        catch (Throwable t) {
            return CompletableFuture.failedFuture(t);
        }
    }

    public CompletableFuture<Void> resetDistribution(List<String> zoneNames) {
        if (zoneNames.isEmpty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        Catalog catalog = this.catalogLatestVersion();
        List zoneDescriptors = zoneNames.stream().map(zoneName -> DisasterRecoveryManager.zoneDescriptor(catalog, zoneName)).collect(Collectors.toList());
        return CompletableFuture.allOf((CompletableFuture[])zoneDescriptors.stream().map(zone -> this.resetDistribution((CatalogZoneDescriptor)zone, catalog.version())).toArray(CompletableFuture[]::new));
    }

    private CompletableFuture<Void> resetDistribution(CatalogZoneDescriptor zone, int catalogVersion) {
        try {
            return this.processNewRequest(new SingleZoneResetDistributionRequest(UUID.randomUUID(), catalogVersion, zone.id()));
        }
        catch (Throwable t) {
            return CompletableFuture.failedFuture(t);
        }
    }

    public CompletableFuture<Void> restartTablePartitions(Set<String> nodeNames, String zoneName, String schemaName, String tableName, Set<Integer> partitionIds) {
        try {
            this.getNodes(nodeNames);
            Catalog catalog = this.catalogLatestVersion();
            CatalogZoneDescriptor zone = DisasterRecoveryManager.zoneDescriptor(catalog, zoneName);
            CatalogTableDescriptor table = DisasterRecoveryManager.tableDescriptor(catalog, schemaName, tableName);
            DisasterRecoveryManager.checkPartitionsRange(partitionIds, Set.of(zone));
            return this.processNewRequest(new ManualGroupRestartRequest(UUID.randomUUID(), zone.id(), table.id(), partitionIds, nodeNames, catalog.time(), false));
        }
        catch (Throwable t) {
            return CompletableFuture.failedFuture(t);
        }
    }

    public CompletableFuture<Void> restartTablePartitionsWithCleanup(Set<String> nodeNames, String zoneName, String schemaName, String tableName, Set<Integer> partitionIds) {
        try {
            this.getNodes(nodeNames);
            Catalog catalog = this.catalogLatestVersion();
            CatalogZoneDescriptor zone = DisasterRecoveryManager.zoneDescriptor(catalog, zoneName);
            CatalogTableDescriptor table = DisasterRecoveryManager.tableDescriptor(catalog, schemaName, tableName);
            DisasterRecoveryManager.checkPartitionsRange(partitionIds, Set.of(zone));
            return this.processNewRequest(new ManualGroupRestartRequest(UUID.randomUUID(), zone.id(), table.id(), partitionIds, nodeNames, catalog.time(), true));
        }
        catch (Throwable t) {
            return CompletableFuture.failedFuture(t);
        }
    }

    public CompletableFuture<Void> restartPartitions(Set<String> nodeNames, String zoneName, Set<Integer> partitionIds) {
        try {
            this.getNodes(nodeNames);
            Catalog catalog = this.catalogLatestVersion();
            CatalogZoneDescriptor zone = DisasterRecoveryManager.zoneDescriptor(catalog, zoneName);
            DisasterRecoveryManager.checkPartitionsRange(partitionIds, Set.of(zone));
            return this.processNewRequest(new ManualGroupRestartRequest(UUID.randomUUID(), zone.id(), -1, partitionIds, nodeNames, catalog.time(), false));
        }
        catch (Throwable t) {
            return CompletableFuture.failedFuture(t);
        }
    }

    public CompletableFuture<Void> restartPartitionsWithCleanup(Set<String> nodeNames, String zoneName, Set<Integer> partitionIds) {
        try {
            this.getNodes(nodeNames);
            Catalog catalog = this.catalogLatestVersion();
            CatalogZoneDescriptor zone = DisasterRecoveryManager.zoneDescriptor(catalog, zoneName);
            DisasterRecoveryManager.checkPartitionsRange(partitionIds, Set.of(zone));
            return this.processNewRequest(new ManualGroupRestartRequest(UUID.randomUUID(), zone.id(), -1, partitionIds, nodeNames, catalog.time(), true));
        }
        catch (Throwable t) {
            return CompletableFuture.failedFuture(t);
        }
    }

    public CompletableFuture<Map<ZonePartitionId, LocalPartitionStateByNode>> localPartitionStates(Set<String> zoneNames, Set<String> nodeNames, Set<Integer> partitionIds) {
        try {
            assert (this.nodeProperties.colocationEnabled()) : "Zone based replication is unavailable use localTablePartitionStates";
            Catalog catalog = this.catalogLatestVersion();
            return this.localPartitionStatesInternal(zoneNames, nodeNames, partitionIds, catalog, DisasterRecoveryManager.zoneState()).thenApply(res -> DisasterRecoveryManager.normalizeLocal(res, catalog));
        }
        catch (Throwable t) {
            return CompletableFuture.failedFuture(t);
        }
    }

    public CompletableFuture<Map<ZonePartitionId, GlobalPartitionState>> globalPartitionStates(Set<String> zoneNames, Set<Integer> partitionIds) {
        try {
            assert (this.nodeProperties.colocationEnabled()) : "Zone based replication is unavailable use globalTablePartitionStates";
            Catalog catalog = this.catalogLatestVersion();
            return ((CompletableFuture)this.localPartitionStatesInternal(zoneNames, Set.of(), partitionIds, catalog, DisasterRecoveryManager.zoneState()).thenApply(res -> DisasterRecoveryManager.normalizeLocal(res, catalog))).thenApply(res -> DisasterRecoveryManager.assembleGlobal(res, partitionIds, catalog));
        }
        catch (Throwable t) {
            return CompletableFuture.failedFuture(t);
        }
    }

    static Function<LocalPartitionStateMessage, ZonePartitionId> zoneState() {
        return state -> state.zonePartitionId().asZonePartitionId();
    }

    <T extends ReplicationGroupId> CompletableFuture<Map<T, LocalPartitionStateMessageByNode>> localPartitionStatesInternal(Set<String> zoneNames, Set<String> nodeNames, Set<Integer> partitionIds, Catalog catalog, Function<LocalPartitionStateMessage, @Nullable T> keyExtractor) {
        Collection<CatalogZoneDescriptor> zones = DisasterRecoveryManager.filterZones(zoneNames, catalog.zones());
        DisasterRecoveryManager.checkPartitionsRange(partitionIds, zones);
        Set<NodeWithAttributes> nodes = this.getNodes(nodeNames);
        Set zoneIds = zones.stream().map(CatalogObjectDescriptor::id).collect(Collectors.toSet());
        LocalPartitionStatesRequest localPartitionStatesRequest = PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStatesRequest().zoneIds(zoneIds).partitionIds(partitionIds).catalogVersion(catalog.version()).build();
        ConcurrentHashMap result = new ConcurrentHashMap();
        CompletableFuture[] futures = new CompletableFuture[nodes.size()];
        int i = 0;
        for (NodeWithAttributes node : nodes) {
            CompletableFuture invokeFuture = this.messagingService.invoke(node.nodeName(), (NetworkMessage)localPartitionStatesRequest, TimeUnit.SECONDS.toMillis(30L));
            futures[i++] = invokeFuture.thenAccept(networkMessage -> {
                assert (networkMessage instanceof LocalPartitionStatesResponse) : networkMessage;
                LocalPartitionStatesResponse response = (LocalPartitionStatesResponse)networkMessage;
                for (LocalPartitionStateMessage state : response.states()) {
                    result.compute((ReplicationGroupId)keyExtractor.apply(state), (partitionId, messageByNode) -> {
                        if (messageByNode == null) {
                            return new LocalPartitionStateMessageByNode(Map.of(node.nodeName(), state));
                        }
                        messageByNode = new LocalPartitionStateMessageByNode((LocalPartitionStateMessageByNode)messageByNode);
                        messageByNode.put(node.nodeName(), state);
                        return messageByNode;
                    });
                }
            });
        }
        return CompletableFuture.allOf(futures).handle((unused, err) -> {
            if (err != null) {
                throw new DisasterRecoveryException(ErrorGroups.DisasterRecovery.PARTITION_STATE_ERR, (Throwable)err);
            }
            return result;
        });
    }

    public CompletableFuture<Map<TablePartitionId, LocalTablePartitionStateByNode>> localTablePartitionStates(Set<String> zoneNames, Set<String> nodeNames, Set<Integer> partitionIds) {
        try {
            Catalog catalog = this.catalogLatestVersion();
            if (this.nodeProperties.colocationEnabled()) {
                return ((CompletableFuture)this.localPartitionStatesInternal(zoneNames, nodeNames, partitionIds, catalog, DisasterRecoveryManager.zoneState()).thenCompose(res -> this.tableStateForZone(DisasterRecoveryManager.toZonesOnNodes(res), catalog.version()).thenApply(tableState -> DisasterRecoveryManager.zoneStateToTableState(res, tableState, catalog)))).thenApply(res -> DisasterRecoveryManager.normalizeTableLocal(res, catalog));
            }
            return this.localPartitionStatesInternal(zoneNames, nodeNames, partitionIds, catalog, DisasterRecoveryManager.mixedState()).thenCompose(res -> this.normalizeMixedLocal((Map<PartitionGroupId, LocalPartitionStateMessageByNode>)res, catalog));
        }
        catch (Throwable t) {
            return CompletableFuture.failedFuture(t);
        }
    }

    public CompletableFuture<Map<TablePartitionId, GlobalTablePartitionState>> globalTablePartitionStates(Set<String> zoneNames, Set<Integer> partitionIds) {
        try {
            Catalog catalog = this.catalogLatestVersion();
            if (this.nodeProperties.colocationEnabled()) {
                return ((CompletableFuture)((CompletableFuture)this.localPartitionStatesInternal(zoneNames, Set.of(), partitionIds, catalog, DisasterRecoveryManager.zoneState()).thenCompose(res -> this.tableStateForZone(DisasterRecoveryManager.toZonesOnNodes(res), catalog.version()).thenApply(tableState -> DisasterRecoveryManager.zoneStateToTableState(res, tableState, catalog)))).thenApply(res -> DisasterRecoveryManager.normalizeTableLocal(res, catalog))).thenApply(res -> DisasterRecoveryManager.assembleTableGlobal(res, partitionIds, catalog));
            }
            return ((CompletableFuture)this.localPartitionStatesInternal(zoneNames, Set.of(), partitionIds, catalog, DisasterRecoveryManager.mixedState()).thenCompose(res -> this.normalizeMixedLocal((Map<PartitionGroupId, LocalPartitionStateMessageByNode>)res, catalog))).thenApply(res -> DisasterRecoveryManager.assembleTableGlobal(res, partitionIds, catalog));
        }
        catch (Throwable t) {
            return CompletableFuture.failedFuture(t);
        }
    }

    private static Map<String, Set<ZonePartitionId>> toZonesOnNodes(Map<ZonePartitionId, LocalPartitionStateMessageByNode> partitionStateMap) {
        HashMap<String, Set<ZonePartitionId>> res = new HashMap<String, Set<ZonePartitionId>>();
        for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> entry : partitionStateMap.entrySet()) {
            ZonePartitionId zonePartitionId = entry.getKey();
            LocalPartitionStateMessageByNode zoneLocalPartitionStateMessageByNode = entry.getValue();
            for (String nodeName : zoneLocalPartitionStateMessageByNode.nodes()) {
                res.computeIfAbsent(nodeName, k -> new HashSet()).add(zonePartitionId);
            }
        }
        return res;
    }

    private CompletableFuture<Map<String, Map<TablePartitionIdMessage, Long>>> tableStateForZone(Map<String, Set<ZonePartitionId>> zonesOnNodes, int catalogVersion) {
        return this.checkSafeToSendPartitionRowsMessage().thenCompose(canSendRemoteCommand -> DisasterRecoveryManager.tableStateForZoneOnCluster(zonesOnNodes, (nodeName, zonePartitionIds) -> canSendRemoteCommand != false ? this.tableStateForZoneOnNode(catalogVersion, (String)nodeName, (Set<ZonePartitionId>)zonePartitionIds) : this.tableStateForZoneLocal(catalogVersion, (Set<ZonePartitionId>)zonePartitionIds)));
    }

    private static CompletableFuture<Map<String, Map<TablePartitionIdMessage, Long>>> tableStateForZoneOnCluster(Map<String, Set<ZonePartitionId>> zonesOnNodes, BiFunction<String, Set<ZonePartitionId>, CompletableFuture<Map<TablePartitionIdMessage, Long>>> stateProvider) {
        ConcurrentHashMap result = new ConcurrentHashMap();
        CompletableFuture[] futures = (CompletableFuture[])zonesOnNodes.entrySet().stream().map(entry -> ((CompletableFuture)stateProvider.apply((String)entry.getKey(), (Set)entry.getValue())).thenAccept(response -> result.computeIfAbsent((String)entry.getKey(), k -> new ConcurrentHashMap()).putAll(response))).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(futures).handle((unused, err) -> {
            if (err != null) {
                throw new DisasterRecoveryException(ErrorGroups.DisasterRecovery.PARTITION_STATE_ERR, (Throwable)err);
            }
            return result;
        });
    }

    private CompletableFuture<Boolean> checkSafeToSendPartitionRowsMessage() {
        return this.cmgManager.clusterState().thenApply(state -> {
            if (state == null) {
                return false;
            }
            IgniteProductVersion clusterVersion = IgniteProductVersion.fromString((String)state.version());
            return clusterVersion.compareTo(VERSION_WITH_ROW_COUNT) >= 0;
        });
    }

    private CompletableFuture<Map<TablePartitionIdMessage, Long>> tableStateForZoneOnNode(int catalogVersion, String node, Set<ZonePartitionId> zones) {
        Set zoneMessage = zones.stream().map(zonePartitionId -> ReplicaMessageUtils.toZonePartitionIdMessage((ReplicaMessagesFactory)REPLICA_MESSAGES_FACTORY, (ZonePartitionId)zonePartitionId)).collect(Collectors.toSet());
        LocalTablePartitionStateRequest request = PARTITION_REPLICATION_MESSAGES_FACTORY.localTablePartitionStateRequest().zonePartitionIds(zoneMessage).catalogVersion(catalogVersion).build();
        return ((CompletableFuture)this.messagingService.invoke(node, (NetworkMessage)request, TimeUnit.SECONDS.toMillis(30L)).thenApply(networkMessage -> {
            assert (networkMessage instanceof LocalTablePartitionStateResponse) : networkMessage;
            return (LocalTablePartitionStateResponse)networkMessage;
        })).thenApply(response -> {
            ConcurrentHashMap result = new ConcurrentHashMap();
            response.states().forEach(state -> result.putAll(state.tablePartitionIdToEstimatedRowsMap()));
            return result;
        });
    }

    private CompletableFuture<Map<TablePartitionIdMessage, Long>> tableStateForZoneLocal(int catalogVersion, Set<ZonePartitionId> zonePartitionIds) {
        Catalog catalog = this.catalogManager.catalog(catalogVersion);
        ConcurrentHashMap<TablePartitionIdMessage, Long> result = new ConcurrentHashMap<TablePartitionIdMessage, Long>();
        for (ZonePartitionId zonePartitionId : zonePartitionIds) {
            int zoneId = zonePartitionId.zoneId();
            int partitionId = zonePartitionId.partitionId();
            for (CatalogTableDescriptor tableDescriptor : catalog.tables(zoneId)) {
                MvPartitionStorage partitionStorage;
                TablePartitionId tablePartitionId = new TablePartitionId(tableDescriptor.id(), partitionId);
                TableViewInternal tableViewInternal = this.tableManager.cachedTable(tablePartitionId.tableId());
                if (tableViewInternal == null || (partitionStorage = tableViewInternal.internalTable().storage().getMvPartition(tablePartitionId.partitionId())) == null) continue;
                result.put(ReplicaMessageUtils.toTablePartitionIdMessage((ReplicaMessagesFactory)REPLICA_MESSAGES_FACTORY, (TablePartitionId)tablePartitionId), partitionStorage.estimatedSize());
            }
        }
        return CompletableFuture.completedFuture(result);
    }

    private static Map<TablePartitionId, LocalPartitionStateMessageByNode> zoneStateToTableState(Map<ZonePartitionId, LocalPartitionStateMessageByNode> partitionStateMap, Map<String, Map<TablePartitionIdMessage, Long>> tableState, Catalog catalog) {
        HashMap<TablePartitionId, LocalPartitionStateMessageByNode> res = new HashMap<TablePartitionId, LocalPartitionStateMessageByNode>();
        for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> entry : partitionStateMap.entrySet()) {
            ZonePartitionId zonePartitionId = entry.getKey();
            int zoneId = zonePartitionId.zoneId();
            int partitionId = zonePartitionId.partitionId();
            LocalPartitionStateMessageByNode zoneLocalPartitionStateMessageByNode = entry.getValue();
            LocalPartitionStateMessageByNode tableLocalPartitionStateMessageByNode = new LocalPartitionStateMessageByNode(new HashMap<String, LocalPartitionStateMessage>());
            for (CatalogTableDescriptor tableDescriptor : catalog.tables(zoneId)) {
                TablePartitionId tablePartitionId = new TablePartitionId(tableDescriptor.id(), partitionId);
                TablePartitionIdMessage tablePartitionIdMessage = ReplicaMessageUtils.toTablePartitionIdMessage((ReplicaMessagesFactory)REPLICA_MESSAGES_FACTORY, (TablePartitionId)tablePartitionId);
                for (Map.Entry<String, LocalPartitionStateMessage> nodeEntry : zoneLocalPartitionStateMessageByNode.entrySet()) {
                    String nodeName = nodeEntry.getKey();
                    Long estimatedRows = (Long)tableState.getOrDefault(nodeName, Collections.emptyMap()).get(tablePartitionIdMessage);
                    if (estimatedRows == null) continue;
                    LocalPartitionStateMessage localPartitionStateMessage = nodeEntry.getValue();
                    LocalPartitionStateMessage tableLocalPartitionStateMessage = PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStateMessage().partitionId(tablePartitionIdMessage).state(localPartitionStateMessage.state()).logIndex(localPartitionStateMessage.logIndex()).estimatedRows(estimatedRows.longValue()).build();
                    tableLocalPartitionStateMessageByNode.put(nodeName, tableLocalPartitionStateMessage);
                }
                if (tableLocalPartitionStateMessageByNode.values().isEmpty()) continue;
                res.put(tablePartitionId, tableLocalPartitionStateMessageByNode);
            }
        }
        return res;
    }

    static Function<LocalPartitionStateMessage, TablePartitionId> tableState() {
        return state -> state.partitionId().asTablePartitionId();
    }

    static Function<LocalPartitionStateMessage, PartitionGroupId> mixedState() {
        return state -> {
            TablePartitionIdMessage tablePartitionIdMessage = state.partitionId();
            if (tablePartitionIdMessage != null) {
                return tablePartitionIdMessage.asTablePartitionId();
            }
            return state.zonePartitionId().asZonePartitionId();
        };
    }

    private static void checkPartitionsRange(Set<Integer> partitionIds, Collection<CatalogZoneDescriptor> zones) {
        if (partitionIds.isEmpty()) {
            return;
        }
        int minPartition = (Integer)partitionIds.stream().min(Integer::compare).get();
        if (minPartition < 0) {
            throw new IllegalPartitionIdException(minPartition);
        }
        int maxPartition = (Integer)partitionIds.stream().max(Integer::compare).get();
        zones.forEach(zone -> {
            if (maxPartition >= zone.partitions()) {
                throw new IllegalPartitionIdException(maxPartition, zone.partitions(), zone.name());
            }
        });
    }

    private Set<NodeWithAttributes> getNodes(Set<String> nodeNames) throws NodesNotFoundException {
        if (nodeNames.isEmpty()) {
            return this.dzManager.logicalTopology();
        }
        Set<NodeWithAttributes> nodes = this.dzManager.logicalTopology().stream().filter(node -> nodeNames.contains(node.nodeName())).collect(Collectors.toSet());
        Set foundNodeNames = nodes.stream().map(NodeWithAttributes::nodeName).collect(Collectors.toSet());
        if (!nodeNames.equals(foundNodeNames)) {
            Set missingNodeNames = CollectionUtils.difference(nodeNames, foundNodeNames);
            throw new NodesNotFoundException(missingNodeNames);
        }
        return nodes;
    }

    private static Collection<CatalogZoneDescriptor> filterZones(Set<String> zoneNames, Collection<CatalogZoneDescriptor> zones) throws ZonesNotFoundException {
        if (zoneNames.isEmpty()) {
            return zones;
        }
        List<CatalogZoneDescriptor> zoneDescriptors = zones.stream().filter(catalogZoneDescriptor -> zoneNames.contains(catalogZoneDescriptor.name())).collect(Collectors.toList());
        Set foundZoneNames = zoneDescriptors.stream().map(CatalogObjectDescriptor::name).collect(Collectors.toSet());
        if (!zoneNames.equals(foundZoneNames)) {
            Set missingZoneNames = CollectionUtils.difference(zoneNames, foundZoneNames);
            throw new ZonesNotFoundException(missingZoneNames);
        }
        return zoneDescriptors;
    }

    private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest request) {
        return this.processNewRequest(request, -1L);
    }

    private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest request, long revision) {
        UUID operationId = request.operationId();
        CompletableFuture<Void> operationFuture = ((CompletableFuture)new CompletableFuture().whenComplete((v, throwable) -> this.ongoingOperationsById.remove(operationId))).orTimeout(30L, TimeUnit.SECONDS);
        byte[] serializedRequest = VersionedSerialization.toBytes((Object)request, (VersionedSerializer)DisasterRecoveryRequestSerializer.INSTANCE);
        this.ongoingOperationsById.put(operationId, operationFuture);
        if (revision != -1L) {
            this.putRecoveryTriggerIfRevisionIsNotProcessed(request.zoneId(), ByteUtils.longToBytesKeepingOrder((long)revision), serializedRequest, operationId);
        } else {
            this.metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
        }
        return operationFuture;
    }

    private void putRecoveryTriggerIfRevisionIsNotProcessed(int zoneId, byte[] revisionBytes, byte[] recoveryTriggerValue, UUID operationId) {
        ByteArray zoneTriggerRevisionKey = DisasterRecoveryManager.zoneRecoveryTriggerRevisionKey(zoneId);
        this.metaStorageManager.invoke(Conditions.notExists((ByteArray)zoneTriggerRevisionKey).or((Condition)Conditions.value((ByteArray)zoneTriggerRevisionKey).lt(revisionBytes)), List.of(Operations.put((ByteArray)RECOVERY_TRIGGER_KEY, (byte[])recoveryTriggerValue), Operations.put((ByteArray)zoneTriggerRevisionKey, (byte[])revisionBytes)), List.of()).thenAccept(wasWrite -> {
            if (!wasWrite.booleanValue()) {
                this.ongoingOperationsById.remove(operationId).complete(null);
            }
        });
    }

    private void handleTriggerKeyUpdate(WatchEvent watchEvent) {
        DisasterRecoveryRequest request;
        Entry newEntry = watchEvent.entryEvent().newEntry();
        byte[] requestBytes = newEntry.value();
        assert (requestBytes != null);
        try {
            request = (DisasterRecoveryRequest)VersionedSerialization.fromBytes((byte[])requestBytes, (VersionedSerializer)DisasterRecoveryRequestSerializer.INSTANCE);
        }
        catch (Exception e) {
            this.failureManager.process(new FailureContext((Throwable)e, "Unable to deserialize disaster recovery request."));
            return;
        }
        CompletableFuture<Void> operationFuture = this.ongoingOperationsById.remove(request.operationId());
        switch (request.type()) {
            case SINGLE_NODE: {
                if (operationFuture == null) {
                    return;
                }
                request.handle(this, watchEvent.revision(), watchEvent.timestamp()).whenComplete(CompletableFutures.copyStateTo(operationFuture));
                break;
            }
            case MULTI_NODE: {
                CompletableFuture<Void> handleFuture = request.handle(this, watchEvent.revision(), watchEvent.timestamp());
                if (operationFuture == null) {
                    return;
                }
                handleFuture.whenComplete(CompletableFutures.copyStateTo(operationFuture));
                break;
            }
            default: {
                AssertionError error = new AssertionError((Object)("Unexpected request type: " + request.getClass()));
                if (operationFuture == null) break;
                operationFuture.completeExceptionally((Throwable)((Object)error));
            }
        }
    }

    private void handleMessage(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) {
        if (message instanceof LocalPartitionStatesRequest) {
            this.handleLocalPartitionStatesRequest((LocalPartitionStatesRequest)message, sender, correlationId);
        } else if (message instanceof LocalTablePartitionStateRequest) {
            this.handleLocalTableStateRequest((LocalTablePartitionStateRequest)message, sender, correlationId);
        }
    }

    private void handleLocalTableStateRequest(LocalTablePartitionStateRequest request, InternalClusterNode sender, @Nullable Long correlationId) {
        assert (correlationId != null) : "request=" + request + ", sender=" + sender;
        int catalogVersion = request.catalogVersion();
        Set requesedPartitions = request.zonePartitionIds().stream().map(ZonePartitionIdMessage::asZonePartitionId).collect(Collectors.toSet());
        this.catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> {
            HashSet statesList = new HashSet();
            this.raftManager.forEach((raftNodeId, raftGroupService) -> {
                LocalTablePartitionStateMessage message;
                if (raftNodeId.groupId() instanceof ZonePartitionId && (message = this.handleSizeRequestForTablesInZone(requesedPartitions, (ZonePartitionId)raftNodeId.groupId())) != null) {
                    statesList.add(message);
                }
            });
            LocalTablePartitionStateResponse response = PARTITION_REPLICATION_MESSAGES_FACTORY.localTablePartitionStateResponse().states(statesList).build();
            this.messagingService.respond(sender, (NetworkMessage)response, correlationId.longValue());
        }, this.threadPool);
    }

    private void handleLocalPartitionStatesRequest(LocalPartitionStatesRequest request, InternalClusterNode sender, @Nullable Long correlationId) {
        assert (correlationId != null) : "request=" + request + ", sender=" + sender;
        int catalogVersion = request.catalogVersion();
        this.catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> {
            ArrayList statesList = new ArrayList();
            this.raftManager.forEach((raftNodeId, raftGroupService) -> {
                LocalPartitionStateMessage message;
                if (raftNodeId.groupId() instanceof TablePartitionId) {
                    LocalPartitionStateMessage message2 = this.handleStateRequestForTable(request, (RaftGroupService)raftGroupService, (TablePartitionId)raftNodeId.groupId(), catalogVersion);
                    if (message2 != null) {
                        statesList.add(message2);
                    }
                } else if (raftNodeId.groupId() instanceof ZonePartitionId && (message = this.handleStateRequestForZone(request, (RaftGroupService)raftGroupService, (ZonePartitionId)raftNodeId.groupId(), catalogVersion)) != null) {
                    statesList.add(message);
                }
            });
            LocalPartitionStatesResponse response = PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStatesResponse().states(statesList).build();
            this.messagingService.respond(sender, (NetworkMessage)response, correlationId.longValue());
        }, this.threadPool);
    }

    @Nullable
    private LocalTablePartitionStateMessage handleSizeRequestForTablesInZone(Set<ZonePartitionId> requestedPartitions, ZonePartitionId zonePartitionId) {
        if (!DisasterRecoveryManager.containsOrEmpty(zonePartitionId, requestedPartitions)) {
            return null;
        }
        return PARTITION_REPLICATION_MESSAGES_FACTORY.localTablePartitionStateMessage().tablePartitionIdToEstimatedRowsMap(this.estimatedSizeMap(zonePartitionId)).build();
    }

    @Nullable
    private LocalPartitionStateMessage handleStateRequestForZone(LocalPartitionStatesRequest request, RaftGroupService raftGroupService, ZonePartitionId zonePartitionId, int catalogVersion) {
        if (!DisasterRecoveryManager.containsOrEmpty(zonePartitionId.partitionId(), request.partitionIds())) {
            return null;
        }
        Catalog catalog = this.catalogManager.catalog(catalogVersion);
        assert (catalog != null) : "Catalog is not found for version: " + catalogVersion;
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zonePartitionId.zoneId());
        if (zoneDescriptor == null || !DisasterRecoveryManager.containsOrEmpty(zoneDescriptor.id(), request.zoneIds())) {
            return null;
        }
        LocalPartitionStateEnumWithLogIndex localPartitionStateWithLogIndex = LocalPartitionStateEnumWithLogIndex.of(raftGroupService.getRaftNode());
        return PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStateMessage().zonePartitionId(ReplicaMessageUtils.toZonePartitionIdMessage((ReplicaMessagesFactory)REPLICA_MESSAGES_FACTORY, (ZonePartitionId)zonePartitionId)).state(localPartitionStateWithLogIndex.state).logIndex(localPartitionStateWithLogIndex.logIndex).estimatedRows(this.calculateEstimatedSize(zonePartitionId)).build();
    }

    private long calculateEstimatedSize(ZonePartitionId zonePartitionId) {
        return this.tableManager.zoneTables(zonePartitionId.zoneId()).stream().map(tableImpl -> tableImpl.internalTable().storage().getMvPartition(zonePartitionId.partitionId())).filter(Objects::nonNull).mapToLong(MvPartitionStorage::estimatedSize).sum();
    }

    private Map<TablePartitionIdMessage, Long> estimatedSizeMap(ZonePartitionId zonePartitionId) {
        HashMap<TablePartitionIdMessage, Long> partitionIdToEstimatedRowsMap = new HashMap<TablePartitionIdMessage, Long>();
        for (TableViewInternal tableImpl : this.tableManager.zoneTables(zonePartitionId.zoneId())) {
            MvPartitionStorage mvPartitionStorage = tableImpl.internalTable().storage().getMvPartition(zonePartitionId.partitionId());
            if (mvPartitionStorage == null) continue;
            partitionIdToEstimatedRowsMap.put(ReplicaMessageUtils.toTablePartitionIdMessage((ReplicaMessagesFactory)REPLICA_MESSAGES_FACTORY, (TablePartitionId)new TablePartitionId(tableImpl.tableId(), zonePartitionId.partitionId())), mvPartitionStorage.estimatedSize());
        }
        return partitionIdToEstimatedRowsMap;
    }

    @Nullable
    private LocalPartitionStateMessage handleStateRequestForTable(LocalPartitionStatesRequest request, RaftGroupService raftGroupService, TablePartitionId tablePartitionId, int catalogVersion) {
        if (!DisasterRecoveryManager.containsOrEmpty(tablePartitionId.partitionId(), request.partitionIds())) {
            return null;
        }
        Catalog catalog = this.catalogManager.catalog(catalogVersion);
        assert (catalog != null) : "Catalog is not found for version: " + catalogVersion;
        CatalogTableDescriptor tableDescriptor = catalog.table(tablePartitionId.tableId());
        if (tableDescriptor == null || !DisasterRecoveryManager.containsOrEmpty(tableDescriptor.zoneId(), request.zoneIds())) {
            return null;
        }
        TableViewInternal tableViewInternal = this.tableManager.cachedTable(tablePartitionId.tableId());
        if (tableViewInternal == null) {
            return null;
        }
        MvPartitionStorage partitionStorage = tableViewInternal.internalTable().storage().getMvPartition(tablePartitionId.partitionId());
        if (partitionStorage == null) {
            return null;
        }
        LocalPartitionStateEnumWithLogIndex localPartitionStateWithLogIndex = LocalPartitionStateEnumWithLogIndex.of(raftGroupService.getRaftNode());
        return PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStateMessage().partitionId(ReplicaMessageUtils.toTablePartitionIdMessage((ReplicaMessagesFactory)REPLICA_MESSAGES_FACTORY, (TablePartitionId)tablePartitionId)).state(localPartitionStateWithLogIndex.state).logIndex(localPartitionStateWithLogIndex.logIndex).estimatedRows(partitionStorage.estimatedSize()).build();
    }

    private static <T> boolean containsOrEmpty(T item, Collection<T> collection) {
        return collection.isEmpty() || collection.contains(item);
    }

    private static Map<ZonePartitionId, LocalPartitionStateByNode> normalizeLocal(Map<ZonePartitionId, LocalPartitionStateMessageByNode> result, Catalog catalog) {
        HashMap<ZonePartitionId, LocalPartitionStateByNode> map = new HashMap<ZonePartitionId, LocalPartitionStateByNode>();
        for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> entry : result.entrySet()) {
            ZonePartitionId zonePartitionId = entry.getKey();
            LocalPartitionStateMessageByNode messageByNode = entry.getValue();
            long maxLogIndex = messageByNode.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong();
            Map<String, LocalPartitionState> nodeToStateMap = messageByNode.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, nodeToState -> DisasterRecoveryManager.toLocalPartitionState((LocalPartitionStateMessage)nodeToState.getValue(), maxLogIndex, zonePartitionId, catalog)));
            map.put(zonePartitionId, new LocalPartitionStateByNode(nodeToStateMap));
        }
        return map;
    }

    private CompletableFuture<Map<TablePartitionId, LocalTablePartitionStateByNode>> normalizeMixedLocal(Map<PartitionGroupId, LocalPartitionStateMessageByNode> result, Catalog catalog) {
        Map<ZonePartitionId, LocalPartitionStateMessageByNode> zonePartitionIdMap = result.entrySet().stream().filter(entry -> entry.getKey() instanceof ZonePartitionId).collect(Collectors.toMap(entry -> (ZonePartitionId)entry.getKey(), Map.Entry::getValue));
        CompletionStage zonePartitionsNormalizedToTablePartitionsFuture = this.tableStateForZone(DisasterRecoveryManager.toZonesOnNodes(zonePartitionIdMap), catalog.version()).thenApply(tableState -> DisasterRecoveryManager.zoneStateToTableState(zonePartitionIdMap, tableState, catalog));
        Stream<Map.Entry> tablePartitionIdEntries = result.entrySet().stream().filter(entry -> entry.getKey() instanceof TablePartitionId).map(entry -> entry);
        return ((CompletableFuture)zonePartitionsNormalizedToTablePartitionsFuture).thenApply(zonePartitionsNormalizedToTablePartitions -> {
            Map<TablePartitionId, LocalPartitionStateMessageByNode> joinedResult = Stream.concat(tablePartitionIdEntries, zonePartitionsNormalizedToTablePartitions.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            return DisasterRecoveryManager.normalizeTableLocal(joinedResult, catalog);
        });
    }

    private static Map<TablePartitionId, LocalTablePartitionStateByNode> normalizeTableLocal(Map<TablePartitionId, LocalPartitionStateMessageByNode> result, Catalog catalog) {
        HashMap<TablePartitionId, LocalTablePartitionStateByNode> map = new HashMap<TablePartitionId, LocalTablePartitionStateByNode>();
        for (Map.Entry<TablePartitionId, LocalPartitionStateMessageByNode> entry : result.entrySet()) {
            TablePartitionId tablePartitionId = entry.getKey();
            LocalPartitionStateMessageByNode messageByNode = entry.getValue();
            long maxLogIndex = messageByNode.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong();
            Map<String, LocalTablePartitionState> nodeToStateMap = messageByNode.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, nodeToState -> DisasterRecoveryManager.toLocalTablePartitionState(nodeToState, maxLogIndex, tablePartitionId, catalog)));
            map.put(tablePartitionId, new LocalTablePartitionStateByNode(nodeToStateMap));
        }
        return map;
    }

    private static LocalPartitionState toLocalPartitionState(LocalPartitionStateMessage stateMsg, long maxLogIndex, ZonePartitionId zonePartitionId, Catalog catalog) {
        LocalPartitionStateEnum stateEnum = DisasterRecoveryManager.calculateState(stateMsg, maxLogIndex);
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zonePartitionId.zoneId());
        String zoneName = zoneDescriptor.name();
        return new LocalPartitionState(zonePartitionId.zoneId(), zoneName, zonePartitionId.partitionId(), stateEnum, stateMsg.estimatedRows());
    }

    private static LocalTablePartitionState toLocalTablePartitionState(Map.Entry<String, LocalPartitionStateMessage> nodeToMessage, long maxLogIndex, TablePartitionId tablePartitionId, Catalog catalog) {
        LocalPartitionStateMessage stateMsg = nodeToMessage.getValue();
        LocalPartitionStateEnum stateEnum = DisasterRecoveryManager.calculateState(stateMsg, maxLogIndex);
        CatalogTableDescriptor tableDescriptor = catalog.table(tablePartitionId.tableId());
        String zoneName = catalog.zone(tableDescriptor.zoneId()).name();
        String schemaName = catalog.schema(tableDescriptor.schemaId()).name();
        return new LocalTablePartitionState(tableDescriptor.zoneId(), zoneName, tableDescriptor.schemaId(), schemaName, tableDescriptor.id(), tableDescriptor.name(), tablePartitionId.partitionId(), stateEnum, stateMsg.estimatedRows());
    }

    private static LocalPartitionStateEnum calculateState(LocalPartitionStateMessage stateMsg, long maxLogIndex) {
        LocalPartitionStateEnum stateEnum = stateMsg.state();
        if (stateEnum == LocalPartitionStateEnum.HEALTHY && maxLogIndex - stateMsg.logIndex() >= 100L) {
            return LocalPartitionStateEnum.CATCHING_UP;
        }
        return stateEnum;
    }

    private static Map<ZonePartitionId, GlobalPartitionState> assembleGlobal(Map<ZonePartitionId, LocalPartitionStateByNode> localResult, Set<Integer> partitionIds, Catalog catalog) {
        Map<ZonePartitionId, GlobalPartitionState> result = localResult.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
            ZonePartitionId zonePartitionId = (ZonePartitionId)entry.getKey();
            LocalPartitionStateByNode map = (LocalPartitionStateByNode)entry.getValue();
            return DisasterRecoveryManager.assembleGlobalStateFromLocal(catalog, zonePartitionId, map);
        }));
        DisasterRecoveryManager.makeMissingPartitionsUnavailable(localResult, catalog, result, partitionIds);
        return result;
    }

    private static Map<TablePartitionId, GlobalTablePartitionState> assembleTableGlobal(Map<TablePartitionId, LocalTablePartitionStateByNode> localResult, Set<Integer> partitionIds, Catalog catalog) {
        Map<TablePartitionId, GlobalTablePartitionState> result = localResult.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
            TablePartitionId tablePartitionId = (TablePartitionId)entry.getKey();
            LocalTablePartitionStateByNode map = (LocalTablePartitionStateByNode)entry.getValue();
            return DisasterRecoveryManager.assembleTableGlobalStateFromLocal(catalog, tablePartitionId, map);
        }));
        DisasterRecoveryManager.makeMissingTablePartitionsUnavailable(localResult, catalog, result, partitionIds);
        return result;
    }

    private static void makeMissingPartitionsUnavailable(Map<ZonePartitionId, LocalPartitionStateByNode> localResult, Catalog catalog, Map<ZonePartitionId, GlobalPartitionState> result, Set<Integer> partitionIds) {
        localResult.keySet().stream().map(ZonePartitionId::zoneId).distinct().forEach(zoneId -> {
            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId.intValue());
            if (partitionIds.isEmpty()) {
                int partitions = zoneDescriptor.partitions();
                for (int partitionId2 = 0; partitionId2 < partitions; ++partitionId2) {
                    DisasterRecoveryManager.putUnavailableStateIfAbsent(result, partitionId2, zoneDescriptor);
                }
            } else {
                partitionIds.forEach(partitionId -> DisasterRecoveryManager.putUnavailableStateIfAbsent(result, partitionId, zoneDescriptor));
            }
        });
    }

    private static void makeMissingTablePartitionsUnavailable(Map<TablePartitionId, LocalTablePartitionStateByNode> localResult, Catalog catalog, Map<TablePartitionId, GlobalTablePartitionState> result, Set<Integer> partitionIds) {
        localResult.keySet().stream().map(TablePartitionId::tableId).distinct().forEach(tableId -> {
            CatalogTableDescriptor table = catalog.table(tableId.intValue());
            CatalogZoneDescriptor zoneDescriptor = catalog.zone(table.zoneId());
            CatalogSchemaDescriptor schemaDescriptor = catalog.schema(table.schemaId());
            if (partitionIds.isEmpty()) {
                int partitions = zoneDescriptor.partitions();
                for (int partitionId2 = 0; partitionId2 < partitions; ++partitionId2) {
                    DisasterRecoveryManager.putUnavailableTableStateIfAbsent(catalog, result, tableId, partitionId2, schemaDescriptor, zoneDescriptor);
                }
            } else {
                partitionIds.forEach(partitionId -> DisasterRecoveryManager.putUnavailableTableStateIfAbsent(catalog, result, tableId, partitionId, schemaDescriptor, zoneDescriptor));
            }
        });
    }

    private static void putUnavailableStateIfAbsent(Map<ZonePartitionId, GlobalPartitionState> states, int partitionId, CatalogZoneDescriptor zoneDescriptor) {
        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneDescriptor.id(), partitionId);
        states.computeIfAbsent(zonePartitionId, key -> new GlobalPartitionState(zoneDescriptor.id(), zoneDescriptor.name(), key.partitionId(), GlobalPartitionStateEnum.UNAVAILABLE));
    }

    private static void putUnavailableTableStateIfAbsent(Catalog catalog, Map<TablePartitionId, GlobalTablePartitionState> states, Integer tableId, int partitionId, CatalogSchemaDescriptor schemaDescriptor, CatalogZoneDescriptor zoneDescriptor) {
        TablePartitionId tablePartitionId = new TablePartitionId(tableId.intValue(), partitionId);
        states.computeIfAbsent(tablePartitionId, key -> new GlobalTablePartitionState(zoneDescriptor.id(), zoneDescriptor.name(), schemaDescriptor.id(), schemaDescriptor.name(), key.tableId(), catalog.table(key.tableId()).name(), key.partitionId(), GlobalPartitionStateEnum.UNAVAILABLE));
    }

    private static GlobalPartitionState assembleGlobalStateFromLocal(Catalog catalog, ZonePartitionId zonePartitionId, LocalPartitionStateByNode map) {
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zonePartitionId.zoneId());
        int replicas = zoneDescriptor.replicas();
        int quorum = DisasterRecoveryManager.calculateQuorum(replicas);
        Map<LocalPartitionStateEnum, List<LocalPartitionState>> groupedStates = map.values().stream().collect(Collectors.groupingBy(localPartitionState -> localPartitionState.state));
        int healthyReplicas = groupedStates.getOrDefault(LocalPartitionStateEnum.HEALTHY, Collections.emptyList()).size();
        GlobalPartitionStateEnum globalStateEnum = DisasterRecoveryManager.calculateGlobalState(replicas, healthyReplicas, quorum);
        return new GlobalPartitionState(zoneDescriptor.id(), zoneDescriptor.name(), zonePartitionId.partitionId(), globalStateEnum);
    }

    private static GlobalTablePartitionState assembleTableGlobalStateFromLocal(Catalog catalog, TablePartitionId tablePartitionId, LocalTablePartitionStateByNode map) {
        CatalogTableDescriptor table = catalog.table(tablePartitionId.tableId());
        CatalogSchemaDescriptor schemaDescriptor = catalog.schema(table.schemaId());
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(table.zoneId());
        int replicas = zoneDescriptor.replicas();
        int quorum = DisasterRecoveryManager.calculateQuorum(replicas);
        Map<LocalPartitionStateEnum, List<LocalTablePartitionState>> groupedStates = map.values().stream().collect(Collectors.groupingBy(localPartitionState -> localPartitionState.state));
        int healthyReplicas = groupedStates.getOrDefault(LocalPartitionStateEnum.HEALTHY, Collections.emptyList()).size();
        GlobalPartitionStateEnum globalStateEnum = DisasterRecoveryManager.calculateGlobalState(replicas, healthyReplicas, quorum);
        LocalTablePartitionState anyLocalState = map.values().iterator().next();
        return new GlobalTablePartitionState(zoneDescriptor.id(), zoneDescriptor.name(), schemaDescriptor.id(), schemaDescriptor.name(), anyLocalState.tableId, anyLocalState.tableName, tablePartitionId.partitionId(), globalStateEnum);
    }

    private static GlobalPartitionStateEnum calculateGlobalState(int replicas, int healthyReplicas, int quorum) {
        if (healthyReplicas == replicas) {
            return GlobalPartitionStateEnum.AVAILABLE;
        }
        if (healthyReplicas >= quorum) {
            return GlobalPartitionStateEnum.DEGRADED;
        }
        if (healthyReplicas > 0) {
            return GlobalPartitionStateEnum.READ_ONLY;
        }
        return GlobalPartitionStateEnum.UNAVAILABLE;
    }

    private static int calculateQuorum(int replicas) {
        return replicas / 2 + 1;
    }

    private Catalog catalogLatestVersion() {
        int catalogVersion = this.catalogManager.latestCatalogVersion();
        Catalog catalog = this.catalogManager.catalog(catalogVersion);
        assert (catalog != null) : catalogVersion;
        return catalog;
    }

    private static CatalogTableDescriptor tableDescriptor(Catalog catalog, String schemaName, String tableName) {
        CatalogTableDescriptor tableDescriptor = catalog.table(schemaName, tableName);
        if (tableDescriptor == null) {
            throw new TableNotFoundException(QualifiedNameHelper.fromNormalized((String)schemaName, (String)tableName));
        }
        return tableDescriptor;
    }

    private static CatalogZoneDescriptor zoneDescriptor(Catalog catalog, String zoneName) {
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneName);
        if (zoneDescriptor == null) {
            throw new DistributionZoneNotFoundException(zoneName);
        }
        return zoneDescriptor;
    }

    private static ByteArray zoneRecoveryTriggerRevisionKey(int zoneId) {
        return new ByteArray(RECOVERY_TRIGGER_REVISION_KEY_PREFIX + zoneId);
    }

    InternalClusterNode localNode() {
        return this.topologyService.localMember();
    }

    private void onTableCreate(CreateTableEventParameters parameters) {
        this.registerPartitionStatesMetricSource(parameters.tableDescriptor());
    }

    private void onTableDrop(DropTableEventParameters parameters) {
        this.unregisterPartitionStatesMetricSource(parameters.tableId());
    }

    private void registerMetricSources() {
        int catalogVersion = this.catalogManager.latestCatalogVersion();
        Catalog catalog = this.catalogManager.catalog(catalogVersion);
        assert (catalog != null) : "Catalog is not found for version: " + catalogVersion;
        catalog.tables().forEach(this::registerPartitionStatesMetricSource);
    }

    private void registerPartitionStatesMetricSource(CatalogTableDescriptor tableDescriptor) {
        PartitionStatesMetricSource metricSource = new PartitionStatesMetricSource(tableDescriptor, this);
        PartitionStatesMetricSource previous = this.metricSourceByTableId.putIfAbsent(tableDescriptor.id(), metricSource);
        assert (previous == null) : "tableId=" + tableDescriptor.id();
        this.metricManager.registerSource((MetricSource)metricSource);
        this.metricManager.enable((MetricSource)metricSource);
    }

    private void unregisterPartitionStatesMetricSource(int tableId) {
        PartitionStatesMetricSource metricSource = this.metricSourceByTableId.get(tableId);
        assert (metricSource != null) : "tableId=" + tableId;
        this.metricManager.unregisterSource((MetricSource)metricSource);
    }

    public boolean colocationEnabled() {
        return this.nodeProperties.colocationEnabled();
    }
}

