/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.table.distributed.disaster;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.DistributionZonesUtil;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
import org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEvent;
import org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEventParams;
import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
import org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite.internal.event.Event;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.metrics.MetricSource;
import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.network.UnresolvableConsistentIdException;
import org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.disaster.DisasterRecoveryRequestMessage;
import org.apache.ignite.internal.partition.replicator.network.disaster.DisasterRecoveryResponseMessage;
import org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum;
import org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
import org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesRequest;
import org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesResponse;
import org.apache.ignite.internal.partition.replicator.network.disaster.OperationCompletedMessage;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.systemview.api.SystemView;
import org.apache.ignite.internal.systemview.api.SystemViewManager;
import org.apache.ignite.internal.systemview.api.SystemViewProvider;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequest;
import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestSerializer;
import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestType;
import org.apache.ignite.internal.table.distributed.disaster.DisasterRecoverySystemViews;
import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum;
import org.apache.ignite.internal.table.distributed.disaster.GroupUpdateRequest;
import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState;
import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateByNode;
import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnumWithLogIndex;
import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateMessageByNode;
import org.apache.ignite.internal.table.distributed.disaster.ManualGroupRestartRequest;
import org.apache.ignite.internal.table.distributed.disaster.MultiNodeDisasterRecoveryRequest;
import org.apache.ignite.internal.table.distributed.disaster.MultiNodeOperations;
import org.apache.ignite.internal.table.distributed.disaster.PartitionStatesMetricSource;
import org.apache.ignite.internal.table.distributed.disaster.SingleZoneResetDistributionRequest;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryRequestForwardException;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalNodesException;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalPartitionIdException;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.NodeLeftException;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.NodesNotFoundException;
import org.apache.ignite.internal.table.distributed.disaster.exceptions.NotEnoughAliveNodesException;
import org.apache.ignite.internal.table.distributed.storage.NullMvTableStorage;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteBusyLock;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.apache.ignite.internal.versioned.VersionedSerializer;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class DisasterRecoveryManager
implements IgniteComponent,
SystemViewProvider {
    static final IgniteLogger LOG = Loggers.forClass(DisasterRecoveryManager.class);
    static final ByteArray RECOVERY_TRIGGER_KEY = new ByteArray("disaster.recovery.trigger");
    private static final String RECOVERY_TRIGGER_REVISION_KEY_PREFIX = "disaster.recovery.trigger.revision.";
    private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private static final int CATCH_UP_THRESHOLD = 100;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final ExecutorService threadPool;
    private final MessagingService messagingService;
    final MetaStorageManager metaStorageManager;
    final CatalogManager catalogManager;
    final DistributionZoneManager dzManager;
    final Loza raftManager;
    private final TopologyService topologyService;
    private final LogicalTopologyService logicalTopology;
    private final WatchListener watchListener;
    private final LogicalTopologyEventListener nodeLeftListener;
    private final TableManager tableManager;
    final PartitionReplicaLifecycleManager partitionReplicaLifecycleManager;
    private final MetricManager metricManager;
    private final FailureManager failureManager;
    private final SystemViewManager systemViewManager;
    private final Map<UUID, CompletableFuture<Void>> ongoingOperationsById = new ConcurrentHashMap<UUID, CompletableFuture<Void>>();
    private final Map<Integer, PartitionStatesMetricSource> metricSourceByTableId = new ConcurrentHashMap<Integer, PartitionStatesMetricSource>();
    private final Map<UUID, MultiNodeOperations> operationsByNodeId = new ConcurrentHashMap<UUID, MultiNodeOperations>();

    public DisasterRecoveryManager(ExecutorService threadPool, MessagingService messagingService, MetaStorageManager metaStorageManager, CatalogManager catalogManager, DistributionZoneManager dzManager, Loza raftManager, TopologyService topologyService, LogicalTopologyService logicalTopology, TableManager tableManager, MetricManager metricManager, FailureManager failureManager, PartitionReplicaLifecycleManager partitionReplicaLifecycleManager, SystemViewManager systemViewManager) {
        this.threadPool = threadPool;
        this.messagingService = messagingService;
        this.metaStorageManager = metaStorageManager;
        this.catalogManager = catalogManager;
        this.dzManager = dzManager;
        this.raftManager = raftManager;
        this.topologyService = topologyService;
        this.logicalTopology = logicalTopology;
        this.tableManager = tableManager;
        this.metricManager = metricManager;
        this.failureManager = failureManager;
        this.partitionReplicaLifecycleManager = partitionReplicaLifecycleManager;
        this.systemViewManager = systemViewManager;
        this.watchListener = event -> (CompletableFuture)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            this.handleTriggerKeyUpdate(event);
            return CompletableFutures.nullCompletedFuture();
        });
        this.nodeLeftListener = new LogicalTopologyEventListener(){

            public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) {
                DisasterRecoveryManager.this.operationsByNodeId.compute(leftNode.id(), (node, operations) -> {
                    if (operations != null) {
                        operations.completeAllExceptionally(leftNode.name(), (Throwable)((Object)new NodeLeftException(leftNode.name(), leftNode.id())));
                    }
                    return null;
                });
            }
        };
    }

    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        return (CompletableFuture)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            this.systemViewManager.register((SystemViewProvider)this);
            this.messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, this::handleMessage);
            this.metaStorageManager.registerExactWatch(RECOVERY_TRIGGER_KEY, this.watchListener);
            this.dzManager.listen((Event)HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED, this::onHaZonePartitionTopologyReduce);
            this.catalogManager.listen((Event)CatalogEvent.TABLE_CREATE, EventListener.fromConsumer(this::onTableCreate));
            this.catalogManager.listen((Event)CatalogEvent.TABLE_DROP, EventListener.fromConsumer(this::onTableDrop));
            this.registerMetricSources();
            this.logicalTopology.addEventListener(this.nodeLeftListener);
            return CompletableFutures.nullCompletedFuture();
        });
    }

    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        this.busyLock.block();
        this.metaStorageManager.unregisterWatch(this.watchListener);
        this.logicalTopology.removeEventListener(this.nodeLeftListener);
        for (CompletableFuture<Void> future : this.ongoingOperationsById.values()) {
            future.completeExceptionally(new NodeStoppingException());
        }
        return CompletableFutures.nullCompletedFuture();
    }

    public List<SystemView<?>> systemViews() {
        return List.of(DisasterRecoverySystemViews.createGlobalZonePartitionStatesSystemView(this), DisasterRecoverySystemViews.createLocalZonePartitionStatesSystemView(this));
    }

    @TestOnly
    public Map<UUID, CompletableFuture<Void>> ongoingOperationsById() {
        return this.ongoingOperationsById;
    }

    public IgniteSpinBusyLock busyLock() {
        return this.busyLock;
    }

    private Set<Assignment> stableAssignmentsWithOnlyAliveNodes(ReplicationGroupId partitionId, long revision) {
        Set stableAssignments = ZoneRebalanceUtil.zoneStableAssignmentsGetLocally((MetaStorageManager)this.metaStorageManager, (ZonePartitionId)((ZonePartitionId)partitionId), (long)revision).nodes();
        Set logicalTopology = this.dzManager.logicalTopology(revision).stream().map(NodeWithAttributes::nodeName).collect(Collectors.toUnmodifiableSet());
        return stableAssignments.stream().filter(a -> logicalTopology.contains(a.consistentId())).collect(Collectors.toUnmodifiableSet());
    }

    private CompletableFuture<Boolean> onHaZonePartitionTopologyReduce(HaZoneTopologyUpdateEventParams params) {
        return (CompletableFuture)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            int zoneId = params.zoneId();
            long revision = params.causalityToken();
            long timestamp = this.metaStorageManager.timestampByRevisionLocally(revision).longValue();
            Catalog catalog = this.catalogManager.activeCatalog(timestamp);
            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
            HashSet<Integer> partitionsToReset = new HashSet<Integer>();
            for (int partId = 0; partId < zoneDescriptor.partitions(); ++partId) {
                ZonePartitionId partitionId = new ZonePartitionId(zoneId, partId);
                if (this.stableAssignmentsWithOnlyAliveNodes((ReplicationGroupId)partitionId, revision).size() >= DisasterRecoveryManager.calculateQuorum(zoneDescriptor.replicas())) continue;
                partitionsToReset.add(partId);
            }
            if (!partitionsToReset.isEmpty()) {
                return this.resetPartitions(zoneDescriptor.name(), Map.of(zoneId, partitionsToReset), false, revision).thenApply(r -> false);
            }
            return CompletableFutures.falseCompletedFuture();
        });
    }

    public CompletableFuture<Void> resetPartitions(String zoneName, Set<Integer> partitionIds) {
        return (CompletableFuture)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            int zoneId = DisasterRecoveryManager.zoneDescriptor(this.catalogLatestVersion(), zoneName).id();
            return this.resetPartitions(zoneName, Map.of(zoneId, partitionIds), true, -1L);
        });
    }

    public CompletableFuture<Void> resetPartitions(String zoneName, Set<Integer> partitionIds, boolean manualUpdate, long triggerRevision) {
        return (CompletableFuture)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            int zoneId = DisasterRecoveryManager.zoneDescriptor(this.catalogLatestVersion(), zoneName).id();
            return this.resetPartitions(zoneName, Map.of(zoneId, partitionIds), manualUpdate, triggerRevision);
        });
    }

    private CompletableFuture<Void> resetPartitions(String zoneName, Map<Integer, Set<Integer>> partitionIds, boolean manualUpdate, long triggerRevision) {
        return (CompletableFuture)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                Catalog catalog = this.catalogLatestVersion();
                CatalogZoneDescriptor zone = DisasterRecoveryManager.zoneDescriptor(catalog, zoneName);
                partitionIds.values().forEach(ids -> DisasterRecoveryManager.checkPartitionsRange(ids, Set.of(zone)));
                return this.processNewRequest(GroupUpdateRequest.create(UUID.randomUUID(), catalog.version(), zone.id(), partitionIds, manualUpdate), triggerRevision);
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    public CompletableFuture<Void> resetDistribution(List<String> zoneNames) {
        if (zoneNames.isEmpty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        Catalog catalog = this.catalogLatestVersion();
        List zoneDescriptors = zoneNames.stream().map(zoneName -> DisasterRecoveryManager.zoneDescriptor(catalog, zoneName)).collect(Collectors.toList());
        return CompletableFuture.allOf((CompletableFuture[])zoneDescriptors.stream().map(zone -> this.resetDistribution((CatalogZoneDescriptor)zone, catalog.version())).toArray(CompletableFuture[]::new));
    }

    private CompletableFuture<Void> resetDistribution(CatalogZoneDescriptor zone, int catalogVersion) {
        try {
            return this.processNewRequest(new SingleZoneResetDistributionRequest(UUID.randomUUID(), catalogVersion, zone.id()));
        }
        catch (Throwable t) {
            return CompletableFuture.failedFuture(t);
        }
    }

    public CompletableFuture<Void> restartPartitions(Set<String> nodeNames, String zoneName, Set<Integer> partitionIds) {
        return (CompletableFuture)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                this.getNodes(nodeNames);
                Catalog catalog = this.catalogLatestVersion();
                CatalogZoneDescriptor zone = DisasterRecoveryManager.zoneDescriptor(catalog, zoneName);
                DisasterRecoveryManager.checkPartitionsRange(partitionIds, Set.of(zone));
                return this.processNewRequest(new ManualGroupRestartRequest(UUID.randomUUID(), zone.id(), partitionIds, nodeNames, catalog.time(), false, this.localNode().name()));
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    public CompletableFuture<Void> restartPartitionsWithCleanup(Set<String> nodeNames, String zoneName, Set<Integer> partitionIds) {
        return (CompletableFuture)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                this.getNodes(nodeNames);
                Catalog catalog = this.catalogLatestVersion();
                CatalogZoneDescriptor zone = DisasterRecoveryManager.zoneDescriptor(catalog, zoneName);
                DisasterRecoveryManager.checkPartitionsRange(partitionIds, Set.of(zone));
                DisasterRecoveryManager.checkOnlyOneNodeSpecified(nodeNames);
                return this.processNewRequest(new ManualGroupRestartRequest(UUID.randomUUID(), zone.id(), partitionIds, nodeNames, catalog.time(), true, this.localNode().name()));
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    public CompletableFuture<Map<ZonePartitionId, LocalPartitionStateByNode>> localPartitionStates(Set<String> zoneNames, Set<String> nodeNames, Set<Integer> partitionIds) {
        return (CompletableFuture)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                Catalog catalog = this.catalogLatestVersion();
                return this.localPartitionStatesInternal(zoneNames, nodeNames, partitionIds, catalog, DisasterRecoveryManager.zoneState()).thenApply(res -> DisasterRecoveryManager.normalizeLocal(res, catalog));
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    public CompletableFuture<Map<ZonePartitionId, GlobalPartitionState>> globalPartitionStates(Set<String> zoneNames, Set<Integer> partitionIds) {
        return (CompletableFuture)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            try {
                Catalog catalog = this.catalogLatestVersion();
                return ((CompletableFuture)this.localPartitionStatesInternal(zoneNames, Set.of(), partitionIds, catalog, DisasterRecoveryManager.zoneState()).thenApply(res -> DisasterRecoveryManager.normalizeLocal(res, catalog))).thenApply(res -> DisasterRecoveryManager.assembleGlobal(res, partitionIds, catalog));
            }
            catch (Throwable t) {
                return CompletableFuture.failedFuture(t);
            }
        });
    }

    static Function<LocalPartitionStateMessage, ZonePartitionId> zoneState() {
        return state -> state.zonePartitionId().asZonePartitionId();
    }

    <T extends ReplicationGroupId> CompletableFuture<Map<T, LocalPartitionStateMessageByNode>> localPartitionStatesInternal(Set<String> zoneNames, Set<String> nodeNames, Set<Integer> partitionIds, Catalog catalog, Function<LocalPartitionStateMessage, @Nullable T> keyExtractor) {
        return (CompletableFuture)IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            Collection zones = DistributionZonesUtil.filterZonesForOperations((Set)zoneNames, (Collection)catalog.zones());
            DisasterRecoveryManager.checkPartitionsRange(partitionIds, zones);
            Set<NodeWithAttributes> nodes = this.getNodes(nodeNames);
            Set zoneIds = zones.stream().map(CatalogObjectDescriptor::id).collect(Collectors.toSet());
            LocalPartitionStatesRequest localPartitionStatesRequest = PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStatesRequest().zoneIds(zoneIds).partitionIds(partitionIds).catalogVersion(catalog.version()).build();
            ConcurrentHashMap result = new ConcurrentHashMap();
            CompletableFuture[] futures = new CompletableFuture[nodes.size()];
            int i = 0;
            for (NodeWithAttributes node : nodes) {
                CompletableFuture invokeFuture = this.messagingService.invoke(node.nodeName(), (NetworkMessage)localPartitionStatesRequest, 30000L);
                futures[i++] = invokeFuture.thenAccept(networkMessage -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Received local partition states response [networkMessage={}]", new Object[]{networkMessage});
                    }
                    assert (networkMessage instanceof LocalPartitionStatesResponse) : networkMessage;
                    LocalPartitionStatesResponse response = (LocalPartitionStatesResponse)networkMessage;
                    for (LocalPartitionStateMessage state : response.states()) {
                        result.compute((ReplicationGroupId)keyExtractor.apply(state), (partitionId, messageByNode) -> {
                            if (messageByNode == null) {
                                return new LocalPartitionStateMessageByNode(Map.of(node.nodeName(), state));
                            }
                            messageByNode = new LocalPartitionStateMessageByNode((LocalPartitionStateMessageByNode)messageByNode);
                            messageByNode.put(node.nodeName(), state);
                            return messageByNode;
                        });
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Combined partition state result for node [node={}, result={}]", new Object[]{node, result});
                    }
                }));
            }
            return CompletableFuture.allOf(futures).handle((unused, err) -> {
                if (err != null) {
                    throw new DisasterRecoveryException(ErrorGroups.DisasterRecovery.PARTITION_STATE_ERR, (Throwable)err);
                }
                if (LOG.isInfoEnabled()) {
                    LOG.debug("Returning total result for local partition states [result={}]", new Object[]{result});
                }
                return result;
            });
        });
    }

    private static void checkPartitionsRange(Set<Integer> partitionIds, Collection<CatalogZoneDescriptor> zones) {
        if (partitionIds.isEmpty()) {
            return;
        }
        int minPartition = (Integer)partitionIds.stream().min(Integer::compare).get();
        if (minPartition < 0) {
            throw new IllegalPartitionIdException(minPartition);
        }
        int maxPartition = (Integer)partitionIds.stream().max(Integer::compare).get();
        zones.forEach(zone -> {
            if (maxPartition >= zone.partitions()) {
                throw new IllegalPartitionIdException(maxPartition, zone.partitions(), zone.name());
            }
        });
    }

    private static void checkOnlyOneNodeSpecified(Set<String> nodeNames) {
        if (nodeNames.size() != 1) {
            throw new IllegalNodesException();
        }
    }

    private Set<NodeWithAttributes> getNodes(Set<String> nodeNames) throws NodesNotFoundException {
        if (nodeNames.isEmpty()) {
            return this.dzManager.logicalTopology();
        }
        Set<NodeWithAttributes> nodes = this.dzManager.logicalTopology().stream().filter(node -> nodeNames.contains(node.nodeName())).collect(Collectors.toSet());
        Set foundNodeNames = nodes.stream().map(NodeWithAttributes::nodeName).collect(Collectors.toSet());
        if (!nodeNames.equals(foundNodeNames)) {
            Set missingNodeNames = CollectionUtils.difference(nodeNames, foundNodeNames);
            throw new NodesNotFoundException(missingNodeNames);
        }
        return nodes;
    }

    private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest request) {
        return this.processNewRequest(request, -1L);
    }

    private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest request, long revision) {
        String localNodeName;
        String firstNode;
        MultiNodeDisasterRecoveryRequest multiNodeRequest;
        Set<String> nodeNames;
        if (request instanceof MultiNodeDisasterRecoveryRequest && !(nodeNames = (multiNodeRequest = (MultiNodeDisasterRecoveryRequest)request).nodeNames()).isEmpty() && !(firstNode = (String)nodeNames.stream().sorted().findFirst().orElseThrow()).equals(localNodeName = this.topologyService.localMember().name())) {
            return this.forwardDisasterRecoveryRequest(multiNodeRequest, revision, firstNode);
        }
        UUID operationId = request.operationId();
        CompletableFuture operationFuture = new CompletableFuture().orTimeout(30000L, TimeUnit.MILLISECONDS);
        CompletableFuture<Void> remoteProcessingFuture = this.remoteProcessingFuture(request);
        operationFuture.whenComplete((v, throwable) -> this.ongoingOperationsById.remove(operationId));
        byte[] serializedRequest = VersionedSerialization.toBytes((Object)request, (VersionedSerializer)DisasterRecoveryRequestSerializer.INSTANCE);
        this.ongoingOperationsById.put(operationId, operationFuture);
        if (revision != -1L) {
            this.putRecoveryTriggerIfRevisionIsNotProcessed(request.zoneId(), ByteUtils.longToBytesKeepingOrder((long)revision), serializedRequest, operationId);
        } else {
            this.metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
        }
        return operationFuture.thenCompose(v -> remoteProcessingFuture);
    }

    private CompletableFuture<Void> remoteProcessingFuture(DisasterRecoveryRequest request) {
        if (request.type() != DisasterRecoveryRequestType.MULTI_NODE) {
            return CompletableFutures.nullCompletedFuture();
        }
        UUID operationId = request.operationId();
        MultiNodeDisasterRecoveryRequest multiNodeRequest = (MultiNodeDisasterRecoveryRequest)request;
        Set<NodeWithAttributes> nodes = this.getRequestNodes(multiNodeRequest.nodeNames());
        CompletableFuture[] remoteProcessingFutures = (CompletableFuture[])nodes.stream().map(node -> this.addMultiNodeOperation((NodeWithAttributes)node, operationId)).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(remoteProcessingFutures);
    }

    private Set<NodeWithAttributes> getRequestNodes(Set<String> requestNodeNames) {
        return this.dzManager.logicalTopology().stream().filter(node -> requestNodeNames.isEmpty() || requestNodeNames.contains(node.nodeName())).collect(Collectors.toSet());
    }

    private CompletableFuture<Void> addMultiNodeOperation(NodeWithAttributes node, UUID operationId) {
        CompletableFuture result = new CompletableFuture().orTimeout(30000L, TimeUnit.MILLISECONDS);
        this.operationsByNodeId.compute(node.nodeId(), (nodeId, operations) -> {
            Set nodes = this.dzManager.logicalTopology().stream().map(NodeWithAttributes::nodeId).collect(Collectors.toSet());
            if (!nodes.contains(nodeId)) {
                result.completeExceptionally((Throwable)((Object)new NodeLeftException(node.nodeName(), (UUID)nodeId)));
                return operations;
            }
            if (operations == null) {
                operations = new MultiNodeOperations();
            }
            operations.add(operationId, result);
            return operations;
        });
        return result.whenComplete((v, e) -> this.operationsByNodeId.compute(node.nodeId(), (nodeId, operations) -> {
            if (operations != null) {
                operations.remove(operationId);
                return operations.isEmpty() ? null : operations;
            }
            return null;
        }));
    }

    private void putRecoveryTriggerIfRevisionIsNotProcessed(int zoneId, byte[] revisionBytes, byte[] recoveryTriggerValue, UUID operationId) {
        ByteArray zoneTriggerRevisionKey = DisasterRecoveryManager.zoneRecoveryTriggerRevisionKey(zoneId);
        this.metaStorageManager.invoke(Conditions.notExists((ByteArray)zoneTriggerRevisionKey).or((Condition)Conditions.value((ByteArray)zoneTriggerRevisionKey).lt(revisionBytes)), List.of(Operations.put((ByteArray)RECOVERY_TRIGGER_KEY, (byte[])recoveryTriggerValue), Operations.put((ByteArray)zoneTriggerRevisionKey, (byte[])revisionBytes)), List.of()).thenAccept(wasWrite -> {
            if (!wasWrite.booleanValue()) {
                this.ongoingOperationsById.remove(operationId).complete(null);
            }
        });
    }

    private CompletableFuture<Void> forwardDisasterRecoveryRequest(MultiNodeDisasterRecoveryRequest request, long revision, String targetNodeName) {
        MultiNodeDisasterRecoveryRequest updatedCoordinator = request.updateCoordinator(targetNodeName);
        byte[] serializedRequest = VersionedSerialization.toBytes((Object)updatedCoordinator, (VersionedSerializer)DisasterRecoveryRequestSerializer.INSTANCE);
        DisasterRecoveryRequestMessage message = PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryRequestMessage().requestBytes(serializedRequest).revision(revision).build();
        return this.messagingService.invoke(targetNodeName, (NetworkMessage)message, 30000L).thenApply(responseMsg -> {
            assert (responseMsg instanceof DisasterRecoveryResponseMessage) : responseMsg;
            DisasterRecoveryResponseMessage response = (DisasterRecoveryResponseMessage)responseMsg;
            if (response.errorMessage() != null) {
                String errorMessage = response.errorMessage();
                throw new DisasterRecoveryRequestForwardException(targetNodeName, errorMessage == null ? "" : errorMessage);
            }
            return null;
        });
    }

    private void handleTriggerKeyUpdate(WatchEvent watchEvent) {
        DisasterRecoveryRequest request;
        Entry newEntry = watchEvent.entryEvent().newEntry();
        byte[] requestBytes = newEntry.value();
        assert (requestBytes != null);
        try {
            request = (DisasterRecoveryRequest)VersionedSerialization.fromBytes((byte[])requestBytes, (VersionedSerializer)DisasterRecoveryRequestSerializer.INSTANCE);
        }
        catch (Exception e) {
            this.failureManager.process(new FailureContext((Throwable)e, "Unable to deserialize disaster recovery request."));
            return;
        }
        CompletableFuture<Void> operationFuture = this.ongoingOperationsById.remove(request.operationId());
        switch (request.type()) {
            case SINGLE_NODE: {
                if (operationFuture == null) {
                    return;
                }
                request.handle(this, watchEvent.revision(), watchEvent.timestamp()).handle((res, ex) -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
                    CompletableFutures.copyStateTo((CompletableFuture)operationFuture).accept(res, ex);
                    if (ex != null && !ExceptionUtils.hasCause((Throwable)ex, (Class[])new Class[]{NodeStoppingException.class, UnresolvableConsistentIdException.class, RecipientLeftException.class})) {
                        this.failureManager.process(new FailureContext(ex, "Unable to handle disaster recovery request."));
                    }
                    return null;
                }));
                break;
            }
            case MULTI_NODE: {
                request.handle(this, watchEvent.revision(), watchEvent.timestamp()).handle((res, ex) -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
                    MultiNodeDisasterRecoveryRequest multiNodeRequest;
                    if (operationFuture != null) {
                        CompletableFutures.copyStateTo((CompletableFuture)operationFuture).accept(res, ex);
                    }
                    if ((multiNodeRequest = (MultiNodeDisasterRecoveryRequest)request).coordinator() != null) {
                        this.messagingService.send(multiNodeRequest.coordinator(), ChannelType.DEFAULT, (NetworkMessage)PARTITION_REPLICATION_MESSAGES_FACTORY.operationCompletedMessage().operationId(request.operationId()).exceptionMessage(ex == null ? null : ex.getMessage()).build());
                    }
                    if (ex != null && !ExceptionUtils.hasCause((Throwable)ex, (Class[])new Class[]{NodeStoppingException.class, UnresolvableConsistentIdException.class, RecipientLeftException.class, NotEnoughAliveNodesException.class})) {
                        this.failureManager.process(new FailureContext(ex, "Unable to handle disaster recovery request."));
                    }
                    return null;
                }));
                break;
            }
            default: {
                AssertionError error = new AssertionError((Object)("Unexpected request type [type=" + request.type() + ", request=" + request + "]"));
                if (operationFuture == null) break;
                operationFuture.completeExceptionally((Throwable)((Object)error));
            }
        }
    }

    private void handleMessage(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) {
        if (message instanceof LocalPartitionStatesRequest) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received local partition states request [request={}]", new Object[]{message});
            }
            this.handleLocalPartitionStatesRequest((LocalPartitionStatesRequest)message, sender, correlationId);
        } else if (message instanceof DisasterRecoveryRequestMessage) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received disaster recovery request [request={}]", new Object[]{message});
            }
            this.handleDisasterRecoveryRequest((DisasterRecoveryRequestMessage)message, sender, correlationId);
        } else if (message instanceof OperationCompletedMessage) {
            this.handleOperationCompletedMessage((OperationCompletedMessage)message, sender);
        }
    }

    private void handleDisasterRecoveryRequest(DisasterRecoveryRequestMessage message, InternalClusterNode sender, @Nullable Long correlationId) {
        DisasterRecoveryRequest request;
        assert (correlationId != null) : "request=" + message + ", sender=" + sender;
        try {
            request = (DisasterRecoveryRequest)VersionedSerialization.fromBytes((byte[])message.requestBytes(), (VersionedSerializer)DisasterRecoveryRequestSerializer.INSTANCE);
        }
        catch (Exception e) {
            LOG.error("Failed to deserialize disaster recovery request.", (Throwable)e);
            DisasterRecoveryResponseMessage response = PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryResponseMessage().errorMessage("Failed to deserialize request: " + e.getMessage()).build();
            this.messagingService.respond(sender, (NetworkMessage)response, correlationId.longValue());
            return;
        }
        this.processNewRequest(request, message.revision()).whenComplete((result, throwable) -> {
            DisasterRecoveryResponseMessage response = throwable != null ? PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryResponseMessage().errorMessage(throwable.getMessage()).build() : PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryResponseMessage().errorMessage(null).build();
            this.messagingService.respond(sender, (NetworkMessage)response, correlationId.longValue());
        });
    }

    private void handleLocalPartitionStatesRequest(LocalPartitionStatesRequest request, InternalClusterNode sender, @Nullable Long correlationId) {
        assert (correlationId != null) : "request=" + request + ", sender=" + sender;
        int catalogVersion = request.catalogVersion();
        this.catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> {
            ArrayList statesList = new ArrayList();
            this.raftManager.forEach((raftNodeId, raftGroupService) -> {
                LocalPartitionStateMessage message;
                if (raftNodeId.groupId() instanceof ZonePartitionId && (message = this.handleStateRequestForZone(request, (RaftGroupService)raftGroupService, (ZonePartitionId)raftNodeId.groupId(), catalogVersion)) != null) {
                    statesList.add(message);
                }
            });
            LocalPartitionStatesResponse response = PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStatesResponse().states(statesList).build();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Responding with state for local partitions [response={}]", new Object[]{response});
            }
            this.messagingService.respond(sender, (NetworkMessage)response, correlationId.longValue());
        }, this.threadPool);
    }

    private void handleOperationCompletedMessage(OperationCompletedMessage message, InternalClusterNode sender) {
        MultiNodeOperations multiNodeOperations = this.operationsByNodeId.get(sender.id());
        if (multiNodeOperations != null) {
            multiNodeOperations.complete(message.operationId(), sender.name(), message.exceptionMessage());
        }
    }

    @Nullable
    private LocalPartitionStateMessage handleStateRequestForZone(LocalPartitionStatesRequest request, RaftGroupService raftGroupService, ZonePartitionId zonePartitionId, int catalogVersion) {
        if (!DisasterRecoveryManager.containsOrEmpty(zonePartitionId.partitionId(), request.partitionIds())) {
            return null;
        }
        Catalog catalog = this.catalogManager.catalog(catalogVersion);
        assert (catalog != null) : "Catalog is not found for version: " + catalogVersion;
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zonePartitionId.zoneId());
        if (zoneDescriptor == null || !DisasterRecoveryManager.containsOrEmpty(zoneDescriptor.id(), request.zoneIds())) {
            return null;
        }
        Node raftNode = raftGroupService.getRaftNode();
        LocalPartitionStateEnumWithLogIndex localPartitionStateWithLogIndex = LocalPartitionStateEnumWithLogIndex.of(raftNode);
        return PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStateMessage().zonePartitionId(ReplicaMessageUtils.toZonePartitionIdMessage((ReplicaMessagesFactory)REPLICA_MESSAGES_FACTORY, (ZonePartitionId)zonePartitionId)).state(localPartitionStateWithLogIndex.state).logIndex(localPartitionStateWithLogIndex.logIndex).estimatedRows(this.calculateEstimatedSize(zonePartitionId)).isLearner(raftNode.isLearner()).build();
    }

    private long calculateEstimatedSize(ZonePartitionId zonePartitionId) {
        return this.tableManager.zoneTables(zonePartitionId.zoneId()).stream().map(tableImpl -> tableImpl.internalTable().storage()).filter(storage -> !(storage instanceof NullMvTableStorage)).map(storage -> storage.getMvPartition(zonePartitionId.partitionId())).filter(Objects::nonNull).mapToLong(MvPartitionStorage::estimatedSize).sum();
    }

    private static <T> boolean containsOrEmpty(T item, Collection<T> collection) {
        return collection.isEmpty() || collection.contains(item);
    }

    private static Map<ZonePartitionId, LocalPartitionStateByNode> normalizeLocal(Map<ZonePartitionId, LocalPartitionStateMessageByNode> result, Catalog catalog) {
        HashMap<ZonePartitionId, LocalPartitionStateByNode> map = new HashMap<ZonePartitionId, LocalPartitionStateByNode>();
        for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> entry : result.entrySet()) {
            ZonePartitionId zonePartitionId = entry.getKey();
            LocalPartitionStateMessageByNode messageByNode = entry.getValue();
            long maxLogIndex = messageByNode.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong();
            Map<String, LocalPartitionState> nodeToStateMap = messageByNode.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, nodeToState -> DisasterRecoveryManager.toLocalPartitionState((LocalPartitionStateMessage)nodeToState.getValue(), maxLogIndex, zonePartitionId, catalog)));
            map.put(zonePartitionId, new LocalPartitionStateByNode(nodeToStateMap));
        }
        return map;
    }

    private static LocalPartitionState toLocalPartitionState(LocalPartitionStateMessage stateMsg, long maxLogIndex, ZonePartitionId zonePartitionId, Catalog catalog) {
        LocalPartitionStateEnum stateEnum = DisasterRecoveryManager.calculateState(stateMsg, maxLogIndex);
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zonePartitionId.zoneId());
        String zoneName = zoneDescriptor.name();
        return new LocalPartitionState(zonePartitionId.zoneId(), zoneName, zonePartitionId.partitionId(), stateEnum, stateMsg.estimatedRows());
    }

    private static LocalPartitionStateEnum calculateState(LocalPartitionStateMessage stateMsg, long maxLogIndex) {
        LocalPartitionStateEnum stateEnum = stateMsg.state();
        if (stateEnum == LocalPartitionStateEnum.HEALTHY && maxLogIndex - stateMsg.logIndex() >= 100L) {
            return LocalPartitionStateEnum.CATCHING_UP;
        }
        return stateEnum;
    }

    private static Map<ZonePartitionId, GlobalPartitionState> assembleGlobal(Map<ZonePartitionId, LocalPartitionStateByNode> localResult, Set<Integer> partitionIds, Catalog catalog) {
        Map<ZonePartitionId, GlobalPartitionState> result = localResult.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
            ZonePartitionId zonePartitionId = (ZonePartitionId)entry.getKey();
            LocalPartitionStateByNode map = (LocalPartitionStateByNode)entry.getValue();
            return DisasterRecoveryManager.assembleGlobalStateFromLocal(catalog, zonePartitionId, map);
        }));
        DisasterRecoveryManager.makeMissingPartitionsUnavailable(localResult, catalog, result, partitionIds);
        return result;
    }

    private static void makeMissingPartitionsUnavailable(Map<ZonePartitionId, LocalPartitionStateByNode> localResult, Catalog catalog, Map<ZonePartitionId, GlobalPartitionState> result, Set<Integer> partitionIds) {
        localResult.keySet().stream().map(ZonePartitionId::zoneId).distinct().forEach(zoneId -> {
            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId.intValue());
            if (partitionIds.isEmpty()) {
                int partitions = zoneDescriptor.partitions();
                for (int partitionId2 = 0; partitionId2 < partitions; ++partitionId2) {
                    DisasterRecoveryManager.putUnavailableStateIfAbsent(result, partitionId2, zoneDescriptor);
                }
            } else {
                partitionIds.forEach(partitionId -> DisasterRecoveryManager.putUnavailableStateIfAbsent(result, partitionId, zoneDescriptor));
            }
        });
    }

    private static void putUnavailableStateIfAbsent(Map<ZonePartitionId, GlobalPartitionState> states, int partitionId, CatalogZoneDescriptor zoneDescriptor) {
        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneDescriptor.id(), partitionId);
        states.computeIfAbsent(zonePartitionId, key -> new GlobalPartitionState(zoneDescriptor.id(), zoneDescriptor.name(), key.partitionId(), GlobalPartitionStateEnum.UNAVAILABLE));
    }

    private static GlobalPartitionState assembleGlobalStateFromLocal(Catalog catalog, ZonePartitionId zonePartitionId, LocalPartitionStateByNode map) {
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zonePartitionId.zoneId());
        int replicas = zoneDescriptor.replicas();
        int quorum = DisasterRecoveryManager.calculateQuorum(replicas);
        Map<LocalPartitionStateEnum, List<LocalPartitionState>> groupedStates = map.values().stream().collect(Collectors.groupingBy(localPartitionState -> localPartitionState.state));
        int healthyReplicas = groupedStates.getOrDefault(LocalPartitionStateEnum.HEALTHY, Collections.emptyList()).size();
        GlobalPartitionStateEnum globalStateEnum = DisasterRecoveryManager.calculateGlobalState(replicas, healthyReplicas, quorum);
        return new GlobalPartitionState(zoneDescriptor.id(), zoneDescriptor.name(), zonePartitionId.partitionId(), globalStateEnum);
    }

    private static GlobalPartitionStateEnum calculateGlobalState(int replicas, int healthyReplicas, int quorum) {
        if (healthyReplicas == replicas) {
            return GlobalPartitionStateEnum.AVAILABLE;
        }
        if (healthyReplicas >= quorum) {
            return GlobalPartitionStateEnum.DEGRADED;
        }
        if (healthyReplicas > 0) {
            return GlobalPartitionStateEnum.READ_ONLY;
        }
        return GlobalPartitionStateEnum.UNAVAILABLE;
    }

    private static int calculateQuorum(int replicas) {
        return replicas / 2 + 1;
    }

    private Catalog catalogLatestVersion() {
        int catalogVersion = this.catalogManager.latestCatalogVersion();
        Catalog catalog = this.catalogManager.catalog(catalogVersion);
        assert (catalog != null) : catalogVersion;
        return catalog;
    }

    private static CatalogZoneDescriptor zoneDescriptor(Catalog catalog, String zoneName) {
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneName);
        if (zoneDescriptor == null) {
            throw new DistributionZoneNotFoundException(zoneName);
        }
        return zoneDescriptor;
    }

    private static ByteArray zoneRecoveryTriggerRevisionKey(int zoneId) {
        return new ByteArray(RECOVERY_TRIGGER_REVISION_KEY_PREFIX + zoneId);
    }

    InternalClusterNode localNode() {
        return this.topologyService.localMember();
    }

    private void onTableCreate(CreateTableEventParameters parameters) {
        this.registerPartitionStatesMetricSource(parameters.tableDescriptor());
    }

    private void onTableDrop(DropTableEventParameters parameters) {
        this.unregisterPartitionStatesMetricSource(parameters.tableId());
    }

    private void registerMetricSources() {
        int catalogVersion = this.catalogManager.latestCatalogVersion();
        Catalog catalog = this.catalogManager.catalog(catalogVersion);
        assert (catalog != null) : "Catalog is not found for version: " + catalogVersion;
        catalog.tables().forEach(this::registerPartitionStatesMetricSource);
    }

    private void registerPartitionStatesMetricSource(CatalogTableDescriptor tableDescriptor) {
        PartitionStatesMetricSource metricSource = new PartitionStatesMetricSource(tableDescriptor, this);
        PartitionStatesMetricSource previous = this.metricSourceByTableId.putIfAbsent(tableDescriptor.id(), metricSource);
        assert (previous == null) : "tableId=" + tableDescriptor.id();
        this.metricManager.registerSource((MetricSource)metricSource);
        this.metricManager.enable((MetricSource)metricSource);
    }

    private void unregisterPartitionStatesMetricSource(int tableId) {
        PartitionStatesMetricSource metricSource = this.metricSourceByTableId.get(tableId);
        assert (metricSource != null) : "tableId=" + tableId;
        this.metricManager.unregisterSource((MetricSource)metricSource);
    }
}

