/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table.distributed.disaster;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite3.internal.catalog.Catalog;
import org.apache.ignite3.internal.catalog.CatalogManager;
import org.apache.ignite3.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite3.internal.catalog.events.CatalogEvent;
import org.apache.ignite3.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite3.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite3.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite3.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite3.internal.distributionzones.DistributionZonesUtil;
import org.apache.ignite3.internal.distributionzones.NodeWithAttributes;
import org.apache.ignite3.internal.distributionzones.events.HaZoneTopologyUpdateEvent;
import org.apache.ignite3.internal.distributionzones.events.HaZoneTopologyUpdateEventParams;
import org.apache.ignite3.internal.distributionzones.exception.DistributionZoneNotFoundException;
import org.apache.ignite3.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite3.internal.event.EventListener;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureManager;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.manager.IgniteComponent;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.MetaStorageManager;
import org.apache.ignite3.internal.metastorage.WatchEvent;
import org.apache.ignite3.internal.metastorage.WatchListener;
import org.apache.ignite3.internal.metastorage.dsl.Conditions;
import org.apache.ignite3.internal.metastorage.dsl.Operations;
import org.apache.ignite3.internal.metrics.MetricManager;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.MessagingService;
import org.apache.ignite3.internal.network.NetworkMessage;
import org.apache.ignite3.internal.network.RecipientLeftException;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.network.UnresolvableConsistentIdException;
import org.apache.ignite3.internal.partition.replicator.PartitionReplicaLifecycleManager;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite3.internal.partition.replicator.network.disaster.DisasterRecoveryRequestMessage;
import org.apache.ignite3.internal.partition.replicator.network.disaster.DisasterRecoveryResponseMessage;
import org.apache.ignite3.internal.partition.replicator.network.disaster.LocalPartitionStateEnum;
import org.apache.ignite3.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
import org.apache.ignite3.internal.partition.replicator.network.disaster.LocalPartitionStatesRequest;
import org.apache.ignite3.internal.partition.replicator.network.disaster.LocalPartitionStatesResponse;
import org.apache.ignite3.internal.partition.replicator.network.disaster.LocalTablePartitionStateMessage;
import org.apache.ignite3.internal.partition.replicator.network.disaster.LocalTablePartitionStateRequest;
import org.apache.ignite3.internal.partition.replicator.network.disaster.LocalTablePartitionStateResponse;
import org.apache.ignite3.internal.partitiondistribution.Assignment;
import org.apache.ignite3.internal.properties.IgniteProductVersion;
import org.apache.ignite3.internal.raft.Loza;
import org.apache.ignite3.internal.replicator.PartitionGroupId;
import org.apache.ignite3.internal.replicator.ReplicationGroupId;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite3.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite3.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite3.internal.replicator.message.ZonePartitionIdMessage;
import org.apache.ignite3.internal.storage.MvPartitionStorage;
import org.apache.ignite3.internal.systemview.api.SystemView;
import org.apache.ignite3.internal.systemview.api.SystemViewManager;
import org.apache.ignite3.internal.systemview.api.SystemViewProvider;
import org.apache.ignite3.internal.table.TableViewInternal;
import org.apache.ignite3.internal.table.distributed.TableManager;
import org.apache.ignite3.internal.table.distributed.disaster.DisasterRecoveryRequest;
import org.apache.ignite3.internal.table.distributed.disaster.DisasterRecoveryRequestSerializer;
import org.apache.ignite3.internal.table.distributed.disaster.DisasterRecoverySystemViews;
import org.apache.ignite3.internal.table.distributed.disaster.GlobalPartitionState;
import org.apache.ignite3.internal.table.distributed.disaster.GlobalPartitionStateEnum;
import org.apache.ignite3.internal.table.distributed.disaster.GlobalTablePartitionState;
import org.apache.ignite3.internal.table.distributed.disaster.GroupUpdateRequest;
import org.apache.ignite3.internal.table.distributed.disaster.LocalPartitionState;
import org.apache.ignite3.internal.table.distributed.disaster.LocalPartitionStateByNode;
import org.apache.ignite3.internal.table.distributed.disaster.LocalPartitionStateEnumWithLogIndex;
import org.apache.ignite3.internal.table.distributed.disaster.LocalPartitionStateMessageByNode;
import org.apache.ignite3.internal.table.distributed.disaster.LocalTablePartitionState;
import org.apache.ignite3.internal.table.distributed.disaster.LocalTablePartitionStateByNode;
import org.apache.ignite3.internal.table.distributed.disaster.ManualGroupRestartRequest;
import org.apache.ignite3.internal.table.distributed.disaster.PartitionStatesMetricSource;
import org.apache.ignite3.internal.table.distributed.disaster.SingleZoneResetDistributionRequest;
import org.apache.ignite3.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
import org.apache.ignite3.internal.table.distributed.disaster.exceptions.DisasterRecoveryRequestForwardException;
import org.apache.ignite3.internal.table.distributed.disaster.exceptions.IllegalNodesException;
import org.apache.ignite3.internal.table.distributed.disaster.exceptions.IllegalPartitionIdException;
import org.apache.ignite3.internal.table.distributed.disaster.exceptions.NodesNotFoundException;
import org.apache.ignite3.internal.table.distributed.disaster.exceptions.NotEnoughAliveNodesException;
import org.apache.ignite3.internal.table.distributed.storage.NullMvTableStorage;
import org.apache.ignite3.internal.util.ByteUtils;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteBusyLock;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.versioned.VersionedSerialization;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.lang.TableNotFoundException;
import org.apache.ignite3.raft.jraft.Node;
import org.apache.ignite3.raft.jraft.RaftGroupService;
import org.apache.ignite3.table.QualifiedNameHelper;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class DisasterRecoveryManager
implements IgniteComponent,
SystemViewProvider {
    static final IgniteLogger LOG = Loggers.forClass(DisasterRecoveryManager.class);
    static final ByteArray RECOVERY_TRIGGER_KEY = new ByteArray("disaster.recovery.trigger");
    private static final String RECOVERY_TRIGGER_REVISION_KEY_PREFIX = "disaster.recovery.trigger.revision.";
    private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private static final IgniteProductVersion VERSION_WITH_ROW_COUNT = IgniteProductVersion.fromString("9.1.5");
    private static final int TIMEOUT_SECONDS = 30;
    private static final int CATCH_UP_THRESHOLD = 100;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final ExecutorService threadPool;
    private final MessagingService messagingService;
    final MetaStorageManager metaStorageManager;
    final CatalogManager catalogManager;
    final DistributionZoneManager dzManager;
    final Loza raftManager;
    private final TopologyService topologyService;
    private final WatchListener watchListener;
    final TableManager tableManager;
    final PartitionReplicaLifecycleManager partitionReplicaLifecycleManager;
    private final MetricManager metricManager;
    private final FailureManager failureManager;
    private final SystemViewManager systemViewManager;
    private final ClusterManagementGroupManager cmgManager;
    private final Map<UUID, CompletableFuture<Void>> ongoingOperationsById = new ConcurrentHashMap<UUID, CompletableFuture<Void>>();
    private final Map<Integer, PartitionStatesMetricSource> metricSourceByTableId = new ConcurrentHashMap<Integer, PartitionStatesMetricSource>();

    public DisasterRecoveryManager(ExecutorService threadPool, MessagingService messagingService, MetaStorageManager metaStorageManager, CatalogManager catalogManager, DistributionZoneManager dzManager, Loza raftManager, TopologyService topologyService, TableManager tableManager, MetricManager metricManager, FailureManager failureManager, PartitionReplicaLifecycleManager partitionReplicaLifecycleManager, SystemViewManager systemViewManager, ClusterManagementGroupManager cmgManager) {
        this.threadPool = threadPool;
        this.messagingService = messagingService;
        this.metaStorageManager = metaStorageManager;
        this.catalogManager = catalogManager;
        this.dzManager = dzManager;
        this.raftManager = raftManager;
        this.topologyService = topologyService;
        this.tableManager = tableManager;
        this.metricManager = metricManager;
        this.failureManager = failureManager;
        this.partitionReplicaLifecycleManager = partitionReplicaLifecycleManager;
        this.systemViewManager = systemViewManager;
        this.cmgManager = cmgManager;
        this.watchListener = event -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            this.handleTriggerKeyUpdate(event);
            return CompletableFutures.nullCompletedFuture();
        });
    }

    @Override
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            this.systemViewManager.register(this);
            this.messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, this::handleMessage);
            this.metaStorageManager.registerExactWatch(RECOVERY_TRIGGER_KEY, this.watchListener);
            this.dzManager.listen(HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED, this::onHaZonePartitionTopologyReduce);
            this.catalogManager.listen(CatalogEvent.TABLE_CREATE, EventListener.fromConsumer(this::onTableCreate));
            this.catalogManager.listen(CatalogEvent.TABLE_DROP, EventListener.fromConsumer(this::onTableDrop));
            this.registerMetricSources();
            return CompletableFutures.nullCompletedFuture();
        });
    }

    @Override
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        this.busyLock.block();
        this.metaStorageManager.unregisterWatch(this.watchListener);
        for (CompletableFuture<Void> future : this.ongoingOperationsById.values()) {
            future.completeExceptionally(new NodeStoppingException());
        }
        return CompletableFutures.nullCompletedFuture();
    }

    @Override
    public List<SystemView<?>> systemViews() {
        return List.of(DisasterRecoverySystemViews.createGlobalZonePartitionStatesSystemView(this), DisasterRecoverySystemViews.createLocalZonePartitionStatesSystemView(this), DisasterRecoverySystemViews.createGlobalTablePartitionStatesSystemView(this), DisasterRecoverySystemViews.createLocalTablePartitionStatesSystemView(this));
    }

    @TestOnly
    public Map<UUID, CompletableFuture<Void>> ongoingOperationsById() {
        return this.ongoingOperationsById;
    }

    public IgniteSpinBusyLock busyLock() {
        return this.busyLock;
    }

    private Set<Assignment> stableAssignmentsWithOnlyAliveNodes(ReplicationGroupId partitionId, long revision) {
        Set<Assignment> stableAssignments = ZoneRebalanceUtil.zoneStableAssignmentsGetLocally(this.metaStorageManager, (ZonePartitionId)partitionId, revision).nodes();
        Set logicalTopology = this.dzManager.logicalTopology(revision).stream().map(NodeWithAttributes::nodeName).collect(Collectors.toUnmodifiableSet());
        return stableAssignments.stream().filter(a -> logicalTopology.contains(a.consistentId())).collect(Collectors.toUnmodifiableSet());
    }

    private CompletableFuture<Boolean> onHaZonePartitionTopologyReduce(HaZoneTopologyUpdateEventParams params) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            int zoneId = params.zoneId();
            long revision = params.causalityToken();
            long timestamp = this.metaStorageManager.timestampByRevisionLocally(revision).longValue();
            Catalog catalog = this.catalogManager.activeCatalog(timestamp);
            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
            HashSet<Integer> partitionsToReset = new HashSet<Integer>();
            for (int partId = 0; partId < zoneDescriptor.partitions(); ++partId) {
                ZonePartitionId partitionId = new ZonePartitionId(zoneId, partId);
                if (this.stableAssignmentsWithOnlyAliveNodes(partitionId, revision).size() >= DisasterRecoveryManager.calculateQuorum(zoneDescriptor.replicas())) continue;
                partitionsToReset.add(partId);
            }
            if (!partitionsToReset.isEmpty()) {
                return this.resetPartitions(zoneDescriptor.name(), Map.of(zoneId, partitionsToReset), false, revision, true).thenApply(r -> false);
            }
            return CompletableFutures.falseCompletedFuture();
        });
    }

    public CompletableFuture<Void> resetTablePartitions(String zoneName, String schemaName, String tableName, Set<Integer> partitionIds) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            int tableId = DisasterRecoveryManager.tableDescriptor(this.catalogLatestVersion(), schemaName, tableName).id();
            return this.resetPartitions(zoneName, Map.of(tableId, partitionIds), true, -1L, false);
        });
    }

    public CompletableFuture<Void> resetTablePartitions(String zoneName, String schemaName, String tableName, Set<Integer> partitionIds, boolean manualUpdate, long triggerRevision) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            int tableId = DisasterRecoveryManager.tableDescriptor(this.catalogLatestVersion(), schemaName, tableName).id();
            return this.resetPartitions(zoneName, Map.of(tableId, partitionIds), manualUpdate, triggerRevision, false);
        });
    }

    public CompletableFuture<Void> resetPartitions(String zoneName, Set<Integer> partitionIds) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            int zoneId = DisasterRecoveryManager.zoneDescriptor(this.catalogLatestVersion(), zoneName).id();
            return this.resetPartitions(zoneName, Map.of(zoneId, partitionIds), true, -1L, true);
        });
    }

    public CompletableFuture<Void> resetPartitions(String zoneName, Set<Integer> partitionIds, boolean manualUpdate, long triggerRevision) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            int zoneId = DisasterRecoveryManager.zoneDescriptor(this.catalogLatestVersion(), zoneName).id();
            return this.resetPartitions(zoneName, Map.of(zoneId, partitionIds), manualUpdate, triggerRevision, true);
        });
    }

    private CompletableFuture<Void> resetPartitions(String zoneName, Map<Integer, Set<Integer>> partitionIds, boolean manualUpdate, long triggerRevision, boolean colocationEnabled) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                Catalog catalog = this.catalogLatestVersion();
                CatalogZoneDescriptor zone = DisasterRecoveryManager.zoneDescriptor(catalog, zoneName);
                partitionIds.values().forEach(ids -> DisasterRecoveryManager.checkPartitionsRange(ids, Set.of(zone)));
                return this.processNewRequest(GroupUpdateRequest.create(UUID.randomUUID(), catalog.version(), zone.id(), partitionIds, manualUpdate, colocationEnabled), triggerRevision);
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    public CompletableFuture<Void> resetDistribution(List<String> zoneNames) {
        if (zoneNames.isEmpty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        Catalog catalog = this.catalogLatestVersion();
        List zoneDescriptors = zoneNames.stream().map(zoneName -> DisasterRecoveryManager.zoneDescriptor(catalog, zoneName)).collect(Collectors.toList());
        return CompletableFuture.allOf((CompletableFuture[])zoneDescriptors.stream().map(zone -> this.resetDistribution((CatalogZoneDescriptor)zone, catalog.version())).toArray(CompletableFuture[]::new));
    }

    private CompletableFuture<Void> resetDistribution(CatalogZoneDescriptor zone, int catalogVersion) {
        try {
            return this.processNewRequest(new SingleZoneResetDistributionRequest(UUID.randomUUID(), catalogVersion, zone.id()));
        }
        catch (Throwable t) {
            return CompletableFuture.failedFuture(t);
        }
    }

    public CompletableFuture<Void> restartTablePartitions(Set<String> nodeNames, String zoneName, String schemaName, String tableName, Set<Integer> partitionIds) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                this.getNodes(nodeNames);
                Catalog catalog = this.catalogLatestVersion();
                CatalogZoneDescriptor zone = DisasterRecoveryManager.zoneDescriptor(catalog, zoneName);
                CatalogTableDescriptor table = DisasterRecoveryManager.tableDescriptor(catalog, schemaName, tableName);
                DisasterRecoveryManager.checkPartitionsRange(partitionIds, Set.of(zone));
                return this.processNewRequest(new ManualGroupRestartRequest(UUID.randomUUID(), zone.id(), table.id(), partitionIds, nodeNames, catalog.time(), false));
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    public CompletableFuture<Void> restartTablePartitionsWithCleanup(Set<String> nodeNames, String zoneName, String schemaName, String tableName, Set<Integer> partitionIds) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                this.getNodes(nodeNames);
                Catalog catalog = this.catalogLatestVersion();
                CatalogZoneDescriptor zone = DisasterRecoveryManager.zoneDescriptor(catalog, zoneName);
                CatalogTableDescriptor table = DisasterRecoveryManager.tableDescriptor(catalog, schemaName, tableName);
                DisasterRecoveryManager.checkPartitionsRange(partitionIds, Set.of(zone));
                DisasterRecoveryManager.checkOnlyOneNodeSpecified(nodeNames);
                return this.processNewRequest(new ManualGroupRestartRequest(UUID.randomUUID(), zone.id(), table.id(), partitionIds, nodeNames, catalog.time(), true));
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    public CompletableFuture<Void> restartPartitions(Set<String> nodeNames, String zoneName, Set<Integer> partitionIds) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                this.getNodes(nodeNames);
                Catalog catalog = this.catalogLatestVersion();
                CatalogZoneDescriptor zone = DisasterRecoveryManager.zoneDescriptor(catalog, zoneName);
                DisasterRecoveryManager.checkPartitionsRange(partitionIds, Set.of(zone));
                return this.processNewRequest(new ManualGroupRestartRequest(UUID.randomUUID(), zone.id(), -1, partitionIds, nodeNames, catalog.time(), false));
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    public CompletableFuture<Void> restartPartitionsWithCleanup(Set<String> nodeNames, String zoneName, Set<Integer> partitionIds) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                this.getNodes(nodeNames);
                Catalog catalog = this.catalogLatestVersion();
                CatalogZoneDescriptor zone = DisasterRecoveryManager.zoneDescriptor(catalog, zoneName);
                DisasterRecoveryManager.checkPartitionsRange(partitionIds, Set.of(zone));
                DisasterRecoveryManager.checkOnlyOneNodeSpecified(nodeNames);
                return this.processNewRequest(new ManualGroupRestartRequest(UUID.randomUUID(), zone.id(), -1, partitionIds, nodeNames, catalog.time(), true));
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    public CompletableFuture<Map<ZonePartitionId, LocalPartitionStateByNode>> localPartitionStates(Set<String> zoneNames, Set<String> nodeNames, Set<Integer> partitionIds) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                Catalog catalog = this.catalogLatestVersion();
                return this.localPartitionStatesInternal(zoneNames, nodeNames, partitionIds, catalog, DisasterRecoveryManager.zoneState()).thenApply(res -> DisasterRecoveryManager.normalizeLocal(res, catalog));
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    public CompletableFuture<Map<ZonePartitionId, GlobalPartitionState>> globalPartitionStates(Set<String> zoneNames, Set<Integer> partitionIds) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                Catalog catalog = this.catalogLatestVersion();
                return ((CompletableFuture)this.localPartitionStatesInternal(zoneNames, Set.of(), partitionIds, catalog, DisasterRecoveryManager.zoneState()).thenApply(res -> DisasterRecoveryManager.normalizeLocal(res, catalog))).thenApply(res -> DisasterRecoveryManager.assembleGlobal(res, partitionIds, catalog));
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    static Function<LocalPartitionStateMessage, ZonePartitionId> zoneState() {
        return state -> state.zonePartitionId().asZonePartitionId();
    }

    <T extends ReplicationGroupId> CompletableFuture<Map<T, LocalPartitionStateMessageByNode>> localPartitionStatesInternal(Set<String> zoneNames, Set<String> nodeNames, Set<Integer> partitionIds, Catalog catalog, Function<LocalPartitionStateMessage, @Nullable T> keyExtractor) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            Collection<CatalogZoneDescriptor> zones = DistributionZonesUtil.filterZonesForOperations(zoneNames, catalog.zones());
            DisasterRecoveryManager.checkPartitionsRange(partitionIds, zones);
            Set<NodeWithAttributes> nodes = this.getNodes(nodeNames);
            Set<Integer> zoneIds = zones.stream().map(CatalogObjectDescriptor::id).collect(Collectors.toSet());
            LocalPartitionStatesRequest localPartitionStatesRequest = PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStatesRequest().zoneIds(zoneIds).partitionIds(partitionIds).catalogVersion(catalog.version()).build();
            ConcurrentHashMap result = new ConcurrentHashMap();
            CompletableFuture[] futures = new CompletableFuture[nodes.size()];
            int i = 0;
            for (NodeWithAttributes node : nodes) {
                CompletableFuture<NetworkMessage> invokeFuture = this.messagingService.invoke(node.nodeName(), (NetworkMessage)localPartitionStatesRequest, TimeUnit.SECONDS.toMillis(30L));
                futures[i++] = invokeFuture.thenAccept(networkMessage -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Received local partition states response [networkMessage={}]", networkMessage);
                    }
                    assert (networkMessage instanceof LocalPartitionStatesResponse) : networkMessage;
                    LocalPartitionStatesResponse response = (LocalPartitionStatesResponse)networkMessage;
                    for (LocalPartitionStateMessage state : response.states()) {
                        result.compute((ReplicationGroupId)keyExtractor.apply(state), (partitionId, messageByNode) -> {
                            if (messageByNode == null) {
                                return new LocalPartitionStateMessageByNode(Map.of(node.nodeName(), state));
                            }
                            messageByNode = new LocalPartitionStateMessageByNode((LocalPartitionStateMessageByNode)messageByNode);
                            messageByNode.put(node.nodeName(), state);
                            return messageByNode;
                        });
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Combined partition state result for node [node={}, result={}]", node, result);
                    }
                }));
            }
            return CompletableFuture.allOf(futures).handle((unused, err) -> {
                if (err != null) {
                    throw new DisasterRecoveryException(ErrorGroups.DisasterRecovery.PARTITION_STATE_ERR, (Throwable)err);
                }
                if (LOG.isInfoEnabled()) {
                    LOG.debug("Returning total result for local partition states [result={}]", result);
                }
                return result;
            });
        });
    }

    public CompletableFuture<Map<TablePartitionId, LocalTablePartitionStateByNode>> localTablePartitionStates(Set<String> zoneNames, Set<String> nodeNames, Set<Integer> partitionIds) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                Catalog catalog = this.catalogLatestVersion();
                return ((CompletableFuture)this.localPartitionStatesInternal(zoneNames, nodeNames, partitionIds, catalog, DisasterRecoveryManager.zoneState()).thenCompose(res -> this.tableStateForZone(DisasterRecoveryManager.toZonesOnNodes(res), catalog.version()).thenApply(tableState -> DisasterRecoveryManager.zoneStateToTableState(res, tableState, catalog)))).thenApply(res -> DisasterRecoveryManager.normalizeTableLocal(res, catalog));
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    public CompletableFuture<Map<TablePartitionId, GlobalTablePartitionState>> globalTablePartitionStates(Set<String> zoneNames, Set<Integer> partitionIds) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                Catalog catalog = this.catalogLatestVersion();
                return ((CompletableFuture)((CompletableFuture)this.localPartitionStatesInternal(zoneNames, Set.of(), partitionIds, catalog, DisasterRecoveryManager.zoneState()).thenCompose(res -> this.tableStateForZone(DisasterRecoveryManager.toZonesOnNodes(res), catalog.version()).thenApply(tableState -> DisasterRecoveryManager.zoneStateToTableState(res, tableState, catalog)))).thenApply(res -> DisasterRecoveryManager.normalizeTableLocal(res, catalog))).thenApply(res -> DisasterRecoveryManager.assembleTableGlobal(res, partitionIds, catalog));
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    private static Map<String, Set<ZonePartitionId>> toZonesOnNodes(Map<ZonePartitionId, LocalPartitionStateMessageByNode> partitionStateMap) {
        HashMap<String, Set<ZonePartitionId>> res = new HashMap<String, Set<ZonePartitionId>>();
        for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> entry : partitionStateMap.entrySet()) {
            ZonePartitionId zonePartitionId = entry.getKey();
            LocalPartitionStateMessageByNode zoneLocalPartitionStateMessageByNode = entry.getValue();
            for (String nodeName : zoneLocalPartitionStateMessageByNode.nodes()) {
                res.computeIfAbsent(nodeName, k -> new HashSet()).add(zonePartitionId);
            }
        }
        return res;
    }

    private CompletableFuture<Map<String, Map<TablePartitionIdMessage, Long>>> tableStateForZone(Map<String, Set<ZonePartitionId>> zonesOnNodes, int catalogVersion) {
        return this.checkSafeToSendPartitionRowsMessage().thenCompose(canSendRemoteCommand -> this.tableStateForZoneOnCluster(zonesOnNodes, catalogVersion, (boolean)canSendRemoteCommand));
    }

    private CompletableFuture<Map<String, Map<TablePartitionIdMessage, Long>>> tableStateForZoneOnCluster(Map<String, Set<ZonePartitionId>> zonesOnNodes, int catalogVersion, boolean canSendRemoteCommand) {
        ConcurrentHashMap result = new ConcurrentHashMap();
        CompletableFuture[] futures = (CompletableFuture[])zonesOnNodes.entrySet().stream().map(entry -> this.tableStateForZoneFuture((String)entry.getKey(), (Set)entry.getValue(), catalogVersion, canSendRemoteCommand).thenAccept(response -> result.computeIfAbsent((String)entry.getKey(), k -> new ConcurrentHashMap()).putAll(response))).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(futures).handle((unused, err) -> {
            if (err != null) {
                throw new DisasterRecoveryException(ErrorGroups.DisasterRecovery.PARTITION_STATE_ERR, (Throwable)err);
            }
            return result;
        });
    }

    private CompletableFuture<Map<TablePartitionIdMessage, Long>> tableStateForZoneFuture(String nodeName, Set<ZonePartitionId> zones, int catalogVersion, boolean canSendRemoteCommand) {
        return canSendRemoteCommand ? this.tableStateForZoneOnNode(catalogVersion, nodeName, zones) : this.tableStateForZoneLocal(catalogVersion, zones);
    }

    private CompletableFuture<Boolean> checkSafeToSendPartitionRowsMessage() {
        return this.cmgManager.clusterState().thenApply(state -> {
            if (state == null) {
                return false;
            }
            IgniteProductVersion clusterVersion = IgniteProductVersion.fromString(state.version());
            return clusterVersion.compareTo(VERSION_WITH_ROW_COUNT) >= 0;
        });
    }

    private CompletableFuture<Map<TablePartitionIdMessage, Long>> tableStateForZoneOnNode(int catalogVersion, String node, Set<ZonePartitionId> zones) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending LocalTablePartitionStateRequest to node [nodeName={}, zones={}]", node, zones);
        }
        Set<ZonePartitionIdMessage> zoneMessage = zones.stream().map(zonePartitionId -> ReplicaMessageUtils.toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, zonePartitionId)).collect(Collectors.toSet());
        LocalTablePartitionStateRequest request = PARTITION_REPLICATION_MESSAGES_FACTORY.localTablePartitionStateRequest().zonePartitionIds(zoneMessage).catalogVersion(catalogVersion).build();
        return this.messagingService.invoke(node, (NetworkMessage)request, TimeUnit.SECONDS.toMillis(30L)).thenApply(networkMessage -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got response from node [nodeName={}, networkMessage={}]", node, networkMessage);
            }
            assert (networkMessage instanceof LocalTablePartitionStateResponse) : networkMessage;
            LocalTablePartitionStateResponse response = (LocalTablePartitionStateResponse)networkMessage;
            ConcurrentHashMap result = new ConcurrentHashMap();
            response.states().forEach(state -> result.putAll(state.tablePartitionIdToEstimatedRowsMap()));
            return result;
        });
    }

    private CompletableFuture<Map<TablePartitionIdMessage, Long>> tableStateForZoneLocal(int catalogVersion, Set<ZonePartitionId> zonePartitionIds) {
        Catalog catalog = this.catalogManager.catalog(catalogVersion);
        ConcurrentHashMap<TablePartitionIdMessage, Long> result = new ConcurrentHashMap<TablePartitionIdMessage, Long>();
        for (ZonePartitionId zonePartitionId : zonePartitionIds) {
            int zoneId = zonePartitionId.zoneId();
            int partitionId = zonePartitionId.partitionId();
            for (CatalogTableDescriptor tableDescriptor : catalog.tables(zoneId)) {
                MvPartitionStorage partitionStorage;
                TablePartitionId tablePartitionId = new TablePartitionId(tableDescriptor.id(), partitionId);
                TableViewInternal tableViewInternal = this.tableManager.cachedTable(tablePartitionId.tableId());
                if (tableViewInternal == null || (partitionStorage = tableViewInternal.internalTable().storage().getMvPartition(tablePartitionId.partitionId())) == null) continue;
                result.put(ReplicaMessageUtils.toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId), partitionStorage.estimatedSize());
            }
        }
        return CompletableFuture.completedFuture(result);
    }

    private static Map<TablePartitionId, LocalPartitionStateMessageByNode> zoneStateToTableState(Map<ZonePartitionId, LocalPartitionStateMessageByNode> partitionStateMap, Map<String, Map<TablePartitionIdMessage, Long>> tableState, Catalog catalog) {
        HashMap<TablePartitionId, LocalPartitionStateMessageByNode> res = new HashMap<TablePartitionId, LocalPartitionStateMessageByNode>();
        for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> entry : partitionStateMap.entrySet()) {
            ZonePartitionId zonePartitionId = entry.getKey();
            int zoneId = zonePartitionId.zoneId();
            int partitionId = zonePartitionId.partitionId();
            LocalPartitionStateMessageByNode zoneLocalPartitionStateMessageByNode = entry.getValue();
            LocalPartitionStateMessageByNode tableLocalPartitionStateMessageByNode = new LocalPartitionStateMessageByNode(new HashMap<String, LocalPartitionStateMessage>());
            for (CatalogTableDescriptor tableDescriptor : catalog.tables(zoneId)) {
                TablePartitionId tablePartitionId = new TablePartitionId(tableDescriptor.id(), partitionId);
                TablePartitionIdMessage tablePartitionIdMessage = ReplicaMessageUtils.toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId);
                for (Map.Entry<String, LocalPartitionStateMessage> nodeEntry : zoneLocalPartitionStateMessageByNode.entrySet()) {
                    String nodeName = nodeEntry.getKey();
                    Long estimatedRows = (Long)tableState.getOrDefault(nodeName, Collections.emptyMap()).get(tablePartitionIdMessage);
                    if (estimatedRows == null) continue;
                    LocalPartitionStateMessage localPartitionStateMessage = nodeEntry.getValue();
                    LocalPartitionStateMessage tableLocalPartitionStateMessage = PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStateMessage().partitionId(tablePartitionIdMessage).state(localPartitionStateMessage.state()).logIndex(localPartitionStateMessage.logIndex()).estimatedRows(estimatedRows).isLearner(localPartitionStateMessage.isLearner()).build();
                    tableLocalPartitionStateMessageByNode.put(nodeName, tableLocalPartitionStateMessage);
                }
                if (tableLocalPartitionStateMessageByNode.values().isEmpty()) continue;
                res.put(tablePartitionId, tableLocalPartitionStateMessageByNode);
            }
        }
        return res;
    }

    static Function<LocalPartitionStateMessage, TablePartitionId> tableState() {
        return state -> state.partitionId().asTablePartitionId();
    }

    static Function<LocalPartitionStateMessage, PartitionGroupId> mixedState() {
        return state -> {
            TablePartitionIdMessage tablePartitionIdMessage = state.partitionId();
            if (tablePartitionIdMessage != null) {
                return tablePartitionIdMessage.asTablePartitionId();
            }
            return state.zonePartitionId().asZonePartitionId();
        };
    }

    private static void checkPartitionsRange(Set<Integer> partitionIds, Collection<CatalogZoneDescriptor> zones) {
        if (partitionIds.isEmpty()) {
            return;
        }
        int minPartition = (Integer)partitionIds.stream().min(Integer::compare).get();
        if (minPartition < 0) {
            throw new IllegalPartitionIdException(minPartition);
        }
        int maxPartition = (Integer)partitionIds.stream().max(Integer::compare).get();
        zones.forEach(zone -> {
            if (maxPartition >= zone.partitions()) {
                throw new IllegalPartitionIdException(maxPartition, zone.partitions(), zone.name());
            }
        });
    }

    private static void checkOnlyOneNodeSpecified(Set<String> nodeNames) {
        if (nodeNames.size() != 1) {
            throw new IllegalNodesException();
        }
    }

    private Set<NodeWithAttributes> getNodes(Set<String> nodeNames) throws NodesNotFoundException {
        if (nodeNames.isEmpty()) {
            return this.dzManager.logicalTopology();
        }
        Set<NodeWithAttributes> nodes = this.dzManager.logicalTopology().stream().filter(node -> nodeNames.contains(node.nodeName())).collect(Collectors.toSet());
        Set foundNodeNames = nodes.stream().map(NodeWithAttributes::nodeName).collect(Collectors.toSet());
        if (!nodeNames.equals(foundNodeNames)) {
            Set<String> missingNodeNames = CollectionUtils.difference(nodeNames, foundNodeNames);
            throw new NodesNotFoundException(missingNodeNames);
        }
        return nodes;
    }

    private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest request) {
        return this.processNewRequest(request, -1L);
    }

    private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest request, long revision) {
        String localNodeName;
        String firstNode;
        ManualGroupRestartRequest restartRequest;
        Set<String> nodeNames;
        if (request instanceof ManualGroupRestartRequest && !(nodeNames = (restartRequest = (ManualGroupRestartRequest)request).nodeNames()).isEmpty() && !(firstNode = (String)nodeNames.stream().sorted().findFirst().orElseThrow()).equals(localNodeName = this.topologyService.localMember().name())) {
            return this.forwardDisasterRecoveryRequest(request, revision, firstNode);
        }
        UUID operationId = request.operationId();
        CompletableFuture<Void> operationFuture = new CompletableFuture().orTimeout(30L, TimeUnit.SECONDS);
        operationFuture.whenComplete((v, throwable) -> this.ongoingOperationsById.remove(operationId));
        byte[] serializedRequest = VersionedSerialization.toBytes(request, DisasterRecoveryRequestSerializer.INSTANCE);
        this.ongoingOperationsById.put(operationId, operationFuture);
        if (revision != -1L) {
            this.putRecoveryTriggerIfRevisionIsNotProcessed(request.zoneId(), ByteUtils.longToBytesKeepingOrder(revision), serializedRequest, operationId);
        } else {
            this.metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
        }
        return operationFuture;
    }

    private void putRecoveryTriggerIfRevisionIsNotProcessed(int zoneId, byte[] revisionBytes, byte[] recoveryTriggerValue, UUID operationId) {
        ByteArray zoneTriggerRevisionKey = DisasterRecoveryManager.zoneRecoveryTriggerRevisionKey(zoneId);
        this.metaStorageManager.invoke(Conditions.notExists(zoneTriggerRevisionKey).or(Conditions.value(zoneTriggerRevisionKey).lt(revisionBytes)), List.of(Operations.put(RECOVERY_TRIGGER_KEY, recoveryTriggerValue), Operations.put(zoneTriggerRevisionKey, revisionBytes)), List.of()).thenAccept(wasWrite -> {
            if (!wasWrite.booleanValue()) {
                this.ongoingOperationsById.remove(operationId).complete(null);
            }
        });
    }

    private CompletableFuture<Void> forwardDisasterRecoveryRequest(DisasterRecoveryRequest request, long revision, String targetNodeName) {
        byte[] serializedRequest = VersionedSerialization.toBytes(request, DisasterRecoveryRequestSerializer.INSTANCE);
        DisasterRecoveryRequestMessage message = PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryRequestMessage().requestBytes(serializedRequest).revision(revision).build();
        return this.messagingService.invoke(targetNodeName, (NetworkMessage)message, TimeUnit.SECONDS.toMillis(30L)).thenApply(responseMsg -> {
            assert (responseMsg instanceof DisasterRecoveryResponseMessage) : responseMsg;
            DisasterRecoveryResponseMessage response = (DisasterRecoveryResponseMessage)responseMsg;
            if (response.errorMessage() != null) {
                String errorMessage = response.errorMessage();
                throw new DisasterRecoveryRequestForwardException(targetNodeName, errorMessage == null ? "" : errorMessage);
            }
            return null;
        });
    }

    private void handleTriggerKeyUpdate(WatchEvent watchEvent) {
        DisasterRecoveryRequest request;
        Entry newEntry = watchEvent.entryEvent().newEntry();
        byte[] requestBytes = newEntry.value();
        assert (requestBytes != null);
        try {
            request = VersionedSerialization.fromBytes(requestBytes, DisasterRecoveryRequestSerializer.INSTANCE);
        }
        catch (Exception e) {
            this.failureManager.process(new FailureContext(e, "Unable to deserialize disaster recovery request."));
            return;
        }
        CompletableFuture<Void> operationFuture = this.ongoingOperationsById.remove(request.operationId());
        switch (request.type()) {
            case SINGLE_NODE: {
                if (operationFuture == null) {
                    return;
                }
                request.handle(this, watchEvent.revision(), watchEvent.timestamp()).handle((res, ex) -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
                    CompletableFutures.copyStateTo(operationFuture).accept((Void)res, (Throwable)ex);
                    if (ex != null && !ExceptionUtils.hasCause(ex, NodeStoppingException.class, UnresolvableConsistentIdException.class, RecipientLeftException.class)) {
                        this.failureManager.process(new FailureContext((Throwable)ex, "Unable to handle disaster recovery request."));
                    }
                    return null;
                }));
                break;
            }
            case MULTI_NODE: {
                request.handle(this, watchEvent.revision(), watchEvent.timestamp()).handle((res, ex) -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
                    if (operationFuture != null) {
                        CompletableFutures.copyStateTo(operationFuture).accept((Void)res, (Throwable)ex);
                    }
                    if (ex != null && !ExceptionUtils.hasCause(ex, NodeStoppingException.class, UnresolvableConsistentIdException.class, RecipientLeftException.class, NotEnoughAliveNodesException.class)) {
                        this.failureManager.process(new FailureContext((Throwable)ex, "Unable to handle disaster recovery request."));
                    }
                    return null;
                }));
                break;
            }
            default: {
                AssertionError error = new AssertionError((Object)("Unexpected request type: " + request.getClass()));
                if (operationFuture == null) break;
                operationFuture.completeExceptionally((Throwable)((Object)error));
            }
        }
    }

    private void handleMessage(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) {
        if (message instanceof LocalPartitionStatesRequest) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received local partition states request [request={}]", message);
            }
            this.handleLocalPartitionStatesRequest((LocalPartitionStatesRequest)message, sender, correlationId);
        } else if (message instanceof LocalTablePartitionStateRequest) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received local table partition states request [request={}]", message);
            }
            this.handleLocalTableStateRequest((LocalTablePartitionStateRequest)message, sender, correlationId);
        } else if (message instanceof DisasterRecoveryRequestMessage) {
            this.handleDisasterRecoveryRequest((DisasterRecoveryRequestMessage)message, sender, correlationId);
        }
    }

    private void handleDisasterRecoveryRequest(DisasterRecoveryRequestMessage message, InternalClusterNode sender, @Nullable Long correlationId) {
        DisasterRecoveryRequest request;
        assert (correlationId != null) : "request=" + message + ", sender=" + sender;
        try {
            request = VersionedSerialization.fromBytes(message.requestBytes(), DisasterRecoveryRequestSerializer.INSTANCE);
        }
        catch (Exception e) {
            LOG.error("Failed to deserialize disaster recovery request.", (Throwable)e);
            DisasterRecoveryResponseMessage response = PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryResponseMessage().errorMessage("Failed to deserialize request: " + e.getMessage()).build();
            this.messagingService.respond(sender, (NetworkMessage)response, (long)correlationId);
            return;
        }
        this.processNewRequest(request, message.revision()).whenComplete((result, throwable) -> {
            DisasterRecoveryResponseMessage response = throwable != null ? PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryResponseMessage().errorMessage(throwable.getMessage()).build() : PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryResponseMessage().errorMessage(null).build();
            this.messagingService.respond(sender, (NetworkMessage)response, (long)correlationId);
        });
    }

    private void handleLocalTableStateRequest(LocalTablePartitionStateRequest request, InternalClusterNode sender, @Nullable Long correlationId) {
        assert (correlationId != null) : "request=" + request + ", sender=" + sender;
        int catalogVersion = request.catalogVersion();
        Set requestedPartitions = request.zonePartitionIds().stream().map(ZonePartitionIdMessage::asZonePartitionId).collect(Collectors.toSet());
        this.catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> {
            HashSet<LocalTablePartitionStateMessage> statesList = new HashSet<LocalTablePartitionStateMessage>();
            this.raftManager.forEach((raftNodeId, raftGroupService) -> {
                LocalTablePartitionStateMessage message;
                if (raftNodeId.groupId() instanceof ZonePartitionId && (message = this.handleSizeRequestForTablesInZone(requestedPartitions, (ZonePartitionId)raftNodeId.groupId())) != null) {
                    statesList.add(message);
                }
            });
            LocalTablePartitionStateResponse response = PARTITION_REPLICATION_MESSAGES_FACTORY.localTablePartitionStateResponse().states(statesList).build();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Responding with state for local table partitions [response={}]", response);
            }
            this.messagingService.respond(sender, (NetworkMessage)response, (long)correlationId);
        }, this.threadPool);
    }

    private void handleLocalPartitionStatesRequest(LocalPartitionStatesRequest request, InternalClusterNode sender, @Nullable Long correlationId) {
        assert (correlationId != null) : "request=" + request + ", sender=" + sender;
        int catalogVersion = request.catalogVersion();
        this.catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> {
            ArrayList<LocalPartitionStateMessage> statesList = new ArrayList<LocalPartitionStateMessage>();
            this.raftManager.forEach((raftNodeId, raftGroupService) -> {
                LocalPartitionStateMessage message;
                if (raftNodeId.groupId() instanceof TablePartitionId) {
                    LocalPartitionStateMessage message2 = this.handleStateRequestForTable(request, (RaftGroupService)raftGroupService, (TablePartitionId)raftNodeId.groupId(), catalogVersion);
                    if (message2 != null) {
                        statesList.add(message2);
                    }
                } else if (raftNodeId.groupId() instanceof ZonePartitionId && (message = this.handleStateRequestForZone(request, (RaftGroupService)raftGroupService, (ZonePartitionId)raftNodeId.groupId(), catalogVersion)) != null) {
                    statesList.add(message);
                }
            });
            LocalPartitionStatesResponse response = PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStatesResponse().states(statesList).build();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Responding with state for local partitions [response={}]", response);
            }
            this.messagingService.respond(sender, (NetworkMessage)response, (long)correlationId);
        }, this.threadPool);
    }

    @Nullable
    private LocalTablePartitionStateMessage handleSizeRequestForTablesInZone(Set<ZonePartitionId> requestedPartitions, ZonePartitionId zonePartitionId) {
        if (!DisasterRecoveryManager.containsOrEmpty(zonePartitionId, requestedPartitions)) {
            return null;
        }
        return PARTITION_REPLICATION_MESSAGES_FACTORY.localTablePartitionStateMessage().tablePartitionIdToEstimatedRowsMap(this.estimatedSizeMap(zonePartitionId)).build();
    }

    @Nullable
    private LocalPartitionStateMessage handleStateRequestForZone(LocalPartitionStatesRequest request, RaftGroupService raftGroupService, ZonePartitionId zonePartitionId, int catalogVersion) {
        if (!DisasterRecoveryManager.containsOrEmpty(zonePartitionId.partitionId(), request.partitionIds())) {
            return null;
        }
        Catalog catalog = this.catalogManager.catalog(catalogVersion);
        assert (catalog != null) : "Catalog is not found for version: " + catalogVersion;
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zonePartitionId.zoneId());
        if (zoneDescriptor == null || !DisasterRecoveryManager.containsOrEmpty(zoneDescriptor.id(), request.zoneIds())) {
            return null;
        }
        Node raftNode = raftGroupService.getRaftNode();
        LocalPartitionStateEnumWithLogIndex localPartitionStateWithLogIndex = LocalPartitionStateEnumWithLogIndex.of(raftNode);
        return PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStateMessage().zonePartitionId(ReplicaMessageUtils.toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, zonePartitionId)).state(localPartitionStateWithLogIndex.state).logIndex(localPartitionStateWithLogIndex.logIndex).estimatedRows(this.calculateEstimatedSize(zonePartitionId)).isLearner(raftNode.isLearner()).build();
    }

    private long calculateEstimatedSize(ZonePartitionId zonePartitionId) {
        return this.tableManager.zoneTables(zonePartitionId.zoneId()).stream().map(tableImpl -> tableImpl.internalTable().storage()).filter(storage -> !(storage instanceof NullMvTableStorage)).map(storage -> storage.getMvPartition(zonePartitionId.partitionId())).filter(Objects::nonNull).mapToLong(MvPartitionStorage::estimatedSize).sum();
    }

    private Map<TablePartitionIdMessage, Long> estimatedSizeMap(ZonePartitionId zonePartitionId) {
        HashMap<TablePartitionIdMessage, Long> partitionIdToEstimatedRowsMap = new HashMap<TablePartitionIdMessage, Long>();
        for (TableViewInternal tableImpl : this.tableManager.zoneTables(zonePartitionId.zoneId())) {
            MvPartitionStorage mvPartitionStorage;
            if (tableImpl.internalTable().storage() instanceof NullMvTableStorage || (mvPartitionStorage = tableImpl.internalTable().storage().getMvPartition(zonePartitionId.partitionId())) == null) continue;
            partitionIdToEstimatedRowsMap.put(ReplicaMessageUtils.toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, new TablePartitionId(tableImpl.tableId(), zonePartitionId.partitionId())), mvPartitionStorage.estimatedSize());
        }
        return partitionIdToEstimatedRowsMap;
    }

    @Nullable
    private LocalPartitionStateMessage handleStateRequestForTable(LocalPartitionStatesRequest request, RaftGroupService raftGroupService, TablePartitionId tablePartitionId, int catalogVersion) {
        if (!DisasterRecoveryManager.containsOrEmpty(tablePartitionId.partitionId(), request.partitionIds())) {
            return null;
        }
        Catalog catalog = this.catalogManager.catalog(catalogVersion);
        assert (catalog != null) : "Catalog is not found for version: " + catalogVersion;
        CatalogTableDescriptor tableDescriptor = catalog.table(tablePartitionId.tableId());
        if (tableDescriptor == null || !DisasterRecoveryManager.containsOrEmpty(tableDescriptor.zoneId(), request.zoneIds())) {
            return null;
        }
        TableViewInternal tableViewInternal = this.tableManager.cachedTable(tablePartitionId.tableId());
        if (tableViewInternal == null) {
            return null;
        }
        MvPartitionStorage partitionStorage = tableViewInternal.internalTable().storage().getMvPartition(tablePartitionId.partitionId());
        if (partitionStorage == null) {
            return null;
        }
        Node raftNode = raftGroupService.getRaftNode();
        LocalPartitionStateEnumWithLogIndex localPartitionStateWithLogIndex = LocalPartitionStateEnumWithLogIndex.of(raftNode);
        return PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStateMessage().partitionId(ReplicaMessageUtils.toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId)).state(localPartitionStateWithLogIndex.state).logIndex(localPartitionStateWithLogIndex.logIndex).estimatedRows(partitionStorage.estimatedSize()).isLearner(raftNode.isLearner()).build();
    }

    private static <T> boolean containsOrEmpty(T item, Collection<T> collection) {
        return collection.isEmpty() || collection.contains(item);
    }

    private static Map<ZonePartitionId, LocalPartitionStateByNode> normalizeLocal(Map<ZonePartitionId, LocalPartitionStateMessageByNode> result, Catalog catalog) {
        HashMap<ZonePartitionId, LocalPartitionStateByNode> map = new HashMap<ZonePartitionId, LocalPartitionStateByNode>();
        for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> entry : result.entrySet()) {
            ZonePartitionId zonePartitionId = entry.getKey();
            LocalPartitionStateMessageByNode messageByNode = entry.getValue();
            long maxLogIndex = messageByNode.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong();
            Map<String, LocalPartitionState> nodeToStateMap = messageByNode.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, nodeToState -> DisasterRecoveryManager.toLocalPartitionState((LocalPartitionStateMessage)nodeToState.getValue(), maxLogIndex, zonePartitionId, catalog)));
            map.put(zonePartitionId, new LocalPartitionStateByNode(nodeToStateMap));
        }
        return map;
    }

    private CompletableFuture<Map<TablePartitionId, LocalTablePartitionStateByNode>> normalizeMixedLocal(Map<PartitionGroupId, LocalPartitionStateMessageByNode> result, Catalog catalog) {
        Map<ZonePartitionId, LocalPartitionStateMessageByNode> zonePartitionIdMap = result.entrySet().stream().filter(entry -> entry.getKey() instanceof ZonePartitionId).collect(Collectors.toMap(entry -> (ZonePartitionId)entry.getKey(), Map.Entry::getValue));
        CompletionStage zonePartitionsNormalizedToTablePartitionsFuture = this.tableStateForZone(DisasterRecoveryManager.toZonesOnNodes(zonePartitionIdMap), catalog.version()).thenApply(tableState -> DisasterRecoveryManager.zoneStateToTableState(zonePartitionIdMap, tableState, catalog));
        Stream<Map.Entry> tablePartitionIdEntries = result.entrySet().stream().filter(entry -> entry.getKey() instanceof TablePartitionId).map(entry -> entry);
        return ((CompletableFuture)zonePartitionsNormalizedToTablePartitionsFuture).thenApply(zonePartitionsNormalizedToTablePartitions -> {
            Map<TablePartitionId, LocalPartitionStateMessageByNode> joinedResult = Stream.concat(tablePartitionIdEntries, zonePartitionsNormalizedToTablePartitions.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            return DisasterRecoveryManager.normalizeTableLocal(joinedResult, catalog);
        });
    }

    private static Map<TablePartitionId, LocalTablePartitionStateByNode> normalizeTableLocal(Map<TablePartitionId, LocalPartitionStateMessageByNode> result, Catalog catalog) {
        HashMap<TablePartitionId, LocalTablePartitionStateByNode> map = new HashMap<TablePartitionId, LocalTablePartitionStateByNode>();
        for (Map.Entry<TablePartitionId, LocalPartitionStateMessageByNode> entry : result.entrySet()) {
            TablePartitionId tablePartitionId = entry.getKey();
            LocalPartitionStateMessageByNode messageByNode = entry.getValue();
            long maxLogIndex = messageByNode.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong();
            Map<String, LocalTablePartitionState> nodeToStateMap = messageByNode.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, nodeToState -> DisasterRecoveryManager.toLocalTablePartitionState(nodeToState, maxLogIndex, tablePartitionId, catalog)));
            map.put(tablePartitionId, new LocalTablePartitionStateByNode(nodeToStateMap));
        }
        return map;
    }

    private static LocalPartitionState toLocalPartitionState(LocalPartitionStateMessage stateMsg, long maxLogIndex, ZonePartitionId zonePartitionId, Catalog catalog) {
        LocalPartitionStateEnum stateEnum = DisasterRecoveryManager.calculateState(stateMsg, maxLogIndex);
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zonePartitionId.zoneId());
        String zoneName = zoneDescriptor.name();
        return new LocalPartitionState(zonePartitionId.zoneId(), zoneName, zonePartitionId.partitionId(), stateEnum, stateMsg.estimatedRows());
    }

    private static LocalTablePartitionState toLocalTablePartitionState(Map.Entry<String, LocalPartitionStateMessage> nodeToMessage, long maxLogIndex, TablePartitionId tablePartitionId, Catalog catalog) {
        LocalPartitionStateMessage stateMsg = nodeToMessage.getValue();
        LocalPartitionStateEnum stateEnum = DisasterRecoveryManager.calculateState(stateMsg, maxLogIndex);
        CatalogTableDescriptor tableDescriptor = catalog.table(tablePartitionId.tableId());
        String zoneName = catalog.zone(tableDescriptor.zoneId()).name();
        String schemaName = catalog.schema(tableDescriptor.schemaId()).name();
        return new LocalTablePartitionState(tableDescriptor.zoneId(), zoneName, tableDescriptor.schemaId(), schemaName, tableDescriptor.id(), tableDescriptor.name(), tablePartitionId.partitionId(), stateEnum, stateMsg.estimatedRows());
    }

    private static LocalPartitionStateEnum calculateState(LocalPartitionStateMessage stateMsg, long maxLogIndex) {
        LocalPartitionStateEnum stateEnum = stateMsg.state();
        if (stateEnum == LocalPartitionStateEnum.HEALTHY && maxLogIndex - stateMsg.logIndex() >= 100L) {
            return LocalPartitionStateEnum.CATCHING_UP;
        }
        return stateEnum;
    }

    private static Map<ZonePartitionId, GlobalPartitionState> assembleGlobal(Map<ZonePartitionId, LocalPartitionStateByNode> localResult, Set<Integer> partitionIds, Catalog catalog) {
        Map<ZonePartitionId, GlobalPartitionState> result = localResult.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
            ZonePartitionId zonePartitionId = (ZonePartitionId)entry.getKey();
            LocalPartitionStateByNode map = (LocalPartitionStateByNode)entry.getValue();
            return DisasterRecoveryManager.assembleGlobalStateFromLocal(catalog, zonePartitionId, map);
        }));
        DisasterRecoveryManager.makeMissingPartitionsUnavailable(localResult, catalog, result, partitionIds);
        return result;
    }

    private static Map<TablePartitionId, GlobalTablePartitionState> assembleTableGlobal(Map<TablePartitionId, LocalTablePartitionStateByNode> localResult, Set<Integer> partitionIds, Catalog catalog) {
        Map<TablePartitionId, GlobalTablePartitionState> result = localResult.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
            TablePartitionId tablePartitionId = (TablePartitionId)entry.getKey();
            LocalTablePartitionStateByNode map = (LocalTablePartitionStateByNode)entry.getValue();
            return DisasterRecoveryManager.assembleTableGlobalStateFromLocal(catalog, tablePartitionId, map);
        }));
        DisasterRecoveryManager.makeMissingTablePartitionsUnavailable(localResult, catalog, result, partitionIds);
        return result;
    }

    private static void makeMissingPartitionsUnavailable(Map<ZonePartitionId, LocalPartitionStateByNode> localResult, Catalog catalog, Map<ZonePartitionId, GlobalPartitionState> result, Set<Integer> partitionIds) {
        localResult.keySet().stream().map(ZonePartitionId::zoneId).distinct().forEach(zoneId -> {
            CatalogZoneDescriptor zoneDescriptor = catalog.zone((int)zoneId);
            if (partitionIds.isEmpty()) {
                int partitions = zoneDescriptor.partitions();
                for (int partitionId2 = 0; partitionId2 < partitions; ++partitionId2) {
                    DisasterRecoveryManager.putUnavailableStateIfAbsent(result, partitionId2, zoneDescriptor);
                }
            } else {
                partitionIds.forEach(partitionId -> DisasterRecoveryManager.putUnavailableStateIfAbsent(result, partitionId, zoneDescriptor));
            }
        });
    }

    private static void makeMissingTablePartitionsUnavailable(Map<TablePartitionId, LocalTablePartitionStateByNode> localResult, Catalog catalog, Map<TablePartitionId, GlobalTablePartitionState> result, Set<Integer> partitionIds) {
        localResult.keySet().stream().map(TablePartitionId::tableId).distinct().forEach(tableId -> {
            CatalogTableDescriptor table = catalog.table((int)tableId);
            CatalogZoneDescriptor zoneDescriptor = catalog.zone(table.zoneId());
            CatalogSchemaDescriptor schemaDescriptor = catalog.schema(table.schemaId());
            if (partitionIds.isEmpty()) {
                int partitions = zoneDescriptor.partitions();
                for (int partitionId2 = 0; partitionId2 < partitions; ++partitionId2) {
                    DisasterRecoveryManager.putUnavailableTableStateIfAbsent(catalog, result, tableId, partitionId2, schemaDescriptor, zoneDescriptor);
                }
            } else {
                partitionIds.forEach(partitionId -> DisasterRecoveryManager.putUnavailableTableStateIfAbsent(catalog, result, tableId, partitionId, schemaDescriptor, zoneDescriptor));
            }
        });
    }

    private static void putUnavailableStateIfAbsent(Map<ZonePartitionId, GlobalPartitionState> states, int partitionId, CatalogZoneDescriptor zoneDescriptor) {
        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneDescriptor.id(), partitionId);
        states.computeIfAbsent(zonePartitionId, key -> new GlobalPartitionState(zoneDescriptor.id(), zoneDescriptor.name(), key.partitionId(), GlobalPartitionStateEnum.UNAVAILABLE));
    }

    private static void putUnavailableTableStateIfAbsent(Catalog catalog, Map<TablePartitionId, GlobalTablePartitionState> states, Integer tableId, int partitionId, CatalogSchemaDescriptor schemaDescriptor, CatalogZoneDescriptor zoneDescriptor) {
        TablePartitionId tablePartitionId = new TablePartitionId(tableId, partitionId);
        states.computeIfAbsent(tablePartitionId, key -> new GlobalTablePartitionState(zoneDescriptor.id(), zoneDescriptor.name(), schemaDescriptor.id(), schemaDescriptor.name(), key.tableId(), catalog.table(key.tableId()).name(), key.partitionId(), GlobalPartitionStateEnum.UNAVAILABLE));
    }

    private static GlobalPartitionState assembleGlobalStateFromLocal(Catalog catalog, ZonePartitionId zonePartitionId, LocalPartitionStateByNode map) {
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zonePartitionId.zoneId());
        int replicas = zoneDescriptor.replicas();
        int quorum = DisasterRecoveryManager.calculateQuorum(replicas);
        Map<LocalPartitionStateEnum, List<LocalPartitionState>> groupedStates = map.values().stream().collect(Collectors.groupingBy(localPartitionState -> localPartitionState.state));
        int healthyReplicas = groupedStates.getOrDefault((Object)LocalPartitionStateEnum.HEALTHY, Collections.emptyList()).size();
        GlobalPartitionStateEnum globalStateEnum = DisasterRecoveryManager.calculateGlobalState(replicas, healthyReplicas, quorum);
        return new GlobalPartitionState(zoneDescriptor.id(), zoneDescriptor.name(), zonePartitionId.partitionId(), globalStateEnum);
    }

    private static GlobalTablePartitionState assembleTableGlobalStateFromLocal(Catalog catalog, TablePartitionId tablePartitionId, LocalTablePartitionStateByNode map) {
        CatalogTableDescriptor table = catalog.table(tablePartitionId.tableId());
        CatalogSchemaDescriptor schemaDescriptor = catalog.schema(table.schemaId());
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(table.zoneId());
        int replicas = zoneDescriptor.replicas();
        int quorum = DisasterRecoveryManager.calculateQuorum(replicas);
        Map<LocalPartitionStateEnum, List<LocalTablePartitionState>> groupedStates = map.values().stream().collect(Collectors.groupingBy(localPartitionState -> localPartitionState.state));
        int healthyReplicas = groupedStates.getOrDefault((Object)LocalPartitionStateEnum.HEALTHY, Collections.emptyList()).size();
        GlobalPartitionStateEnum globalStateEnum = DisasterRecoveryManager.calculateGlobalState(replicas, healthyReplicas, quorum);
        LocalTablePartitionState anyLocalState = map.values().iterator().next();
        return new GlobalTablePartitionState(zoneDescriptor.id(), zoneDescriptor.name(), schemaDescriptor.id(), schemaDescriptor.name(), anyLocalState.tableId, anyLocalState.tableName, tablePartitionId.partitionId(), globalStateEnum);
    }

    private static GlobalPartitionStateEnum calculateGlobalState(int replicas, int healthyReplicas, int quorum) {
        if (healthyReplicas == replicas) {
            return GlobalPartitionStateEnum.AVAILABLE;
        }
        if (healthyReplicas >= quorum) {
            return GlobalPartitionStateEnum.DEGRADED;
        }
        if (healthyReplicas > 0) {
            return GlobalPartitionStateEnum.READ_ONLY;
        }
        return GlobalPartitionStateEnum.UNAVAILABLE;
    }

    private static int calculateQuorum(int replicas) {
        return replicas / 2 + 1;
    }

    private Catalog catalogLatestVersion() {
        int catalogVersion = this.catalogManager.latestCatalogVersion();
        Catalog catalog = this.catalogManager.catalog(catalogVersion);
        assert (catalog != null) : catalogVersion;
        return catalog;
    }

    private static CatalogTableDescriptor tableDescriptor(Catalog catalog, String schemaName, String tableName) {
        CatalogTableDescriptor tableDescriptor = catalog.table(schemaName, tableName);
        if (tableDescriptor == null) {
            throw new TableNotFoundException(QualifiedNameHelper.fromNormalized(schemaName, tableName));
        }
        return tableDescriptor;
    }

    private static CatalogZoneDescriptor zoneDescriptor(Catalog catalog, String zoneName) {
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneName);
        if (zoneDescriptor == null) {
            throw new DistributionZoneNotFoundException(zoneName);
        }
        return zoneDescriptor;
    }

    private static ByteArray zoneRecoveryTriggerRevisionKey(int zoneId) {
        return new ByteArray(RECOVERY_TRIGGER_REVISION_KEY_PREFIX + zoneId);
    }

    InternalClusterNode localNode() {
        return this.topologyService.localMember();
    }

    private void onTableCreate(CreateTableEventParameters parameters) {
        this.registerPartitionStatesMetricSource(parameters.tableDescriptor());
    }

    private void onTableDrop(DropTableEventParameters parameters) {
        this.unregisterPartitionStatesMetricSource(parameters.tableId());
    }

    private void registerMetricSources() {
        int catalogVersion = this.catalogManager.latestCatalogVersion();
        Catalog catalog = this.catalogManager.catalog(catalogVersion);
        assert (catalog != null) : "Catalog is not found for version: " + catalogVersion;
        catalog.tables().forEach(this::registerPartitionStatesMetricSource);
    }

    private void registerPartitionStatesMetricSource(CatalogTableDescriptor tableDescriptor) {
        PartitionStatesMetricSource metricSource = new PartitionStatesMetricSource(tableDescriptor, this);
        PartitionStatesMetricSource previous = this.metricSourceByTableId.putIfAbsent(tableDescriptor.id(), metricSource);
        assert (previous == null) : "tableId=" + tableDescriptor.id();
        this.metricManager.registerSource(metricSource);
        this.metricManager.enable(metricSource);
    }

    private void unregisterPartitionStatesMetricSource(int tableId) {
        PartitionStatesMetricSource metricSource = this.metricSourceByTableId.get(tableId);
        assert (metricSource != null) : "tableId=" + tableId;
        this.metricManager.unregisterSource(metricSource);
    }
}

