/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.distributionzones.rebalance;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.AlterZoneEventParameters;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.DistributionZonesUtil;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite.internal.distributionzones.utils.CatalogAlterZoneEventListener;
import org.apache.ignite.internal.event.Event;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteBusyLock;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;

public class DistributionZoneRebalanceEngineV2 {
    private static final IgniteLogger LOG = Loggers.forClass(DistributionZoneRebalanceEngineV2.class);
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final IgniteSpinBusyLock busyLock;
    private final MetaStorageManager metaStorageManager;
    private final DistributionZoneManager distributionZoneManager;
    private final WatchListener dataNodesListener;
    private final CatalogService catalogService;

    public DistributionZoneRebalanceEngineV2(IgniteSpinBusyLock busyLock, MetaStorageManager metaStorageManager, DistributionZoneManager distributionZoneManager, CatalogManager catalogService) {
        this.busyLock = busyLock;
        this.metaStorageManager = metaStorageManager;
        this.distributionZoneManager = distributionZoneManager;
        this.catalogService = catalogService;
        this.dataNodesListener = this.createDistributionZonesDataNodesListener();
    }

    public CompletableFuture<Void> startAsync() {
        return IgniteUtils.inBusyLockAsync((IgniteBusyLock)this.busyLock, () -> {
            this.catalogService.listen((Event)CatalogEvent.ZONE_ALTER, (EventListener)new CatalogAlterZoneEventListener(this.catalogService){

                @Override
                protected CompletableFuture<Void> onReplicasUpdate(AlterZoneEventParameters parameters, int oldReplicas) {
                    return DistributionZoneRebalanceEngineV2.this.onUpdateReplicas(parameters);
                }
            });
            this.metaStorageManager.registerPrefixWatch(DistributionZonesUtil.zoneDataNodesHistoryPrefix(), this.dataNodesListener);
            CompletableFuture recoveryFinishFuture = this.metaStorageManager.recoveryFinishedFuture();
            assert (recoveryFinishFuture.isDone());
            long recoveryRevision = ((Revisions)recoveryFinishFuture.join()).revision();
            return this.recoveryRebalanceTrigger(recoveryRevision);
        });
    }

    public void stop() {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return;
        }
        this.metaStorageManager.unregisterWatch(this.dataNodesListener);
    }

    private WatchListener createDistributionZonesDataNodesListener() {
        return evt -> IgniteUtils.inBusyLockAsync((IgniteBusyLock)this.busyLock, () -> {
            Set<NodeWithAttributes> dataNodes = DistributionZonesUtil.parseDataNodes(evt.entryEvent().newEntry().value(), evt.timestamp());
            if (dataNodes == null) {
                return CompletableFutures.nullCompletedFuture();
            }
            int zoneId = RebalanceUtil.extractZoneId(evt.entryEvent().newEntry().key(), DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX_BYTES);
            int catalogVersion = this.catalogService.latestCatalogVersion();
            Catalog catalog = this.catalogService.catalog(catalogVersion);
            long assignmentsTimestamp = catalog.time();
            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
            if (zoneDescriptor == null) {
                return CompletableFutures.nullCompletedFuture();
            }
            Set<String> filteredDataNodes = DistributionZonesUtil.nodeNames(DistributionZonesUtil.filterDataNodes(dataNodes, zoneDescriptor));
            if (LOG.isInfoEnabled()) {
                ArrayList<NodeWithAttributes> matchedNodes = new ArrayList<NodeWithAttributes>();
                ArrayList<NodeWithAttributes> filteredOutNodes = new ArrayList<NodeWithAttributes>();
                for (NodeWithAttributes dataNode : dataNodes) {
                    if (filteredDataNodes.contains(dataNode.nodeName())) {
                        matchedNodes.add(dataNode);
                        continue;
                    }
                    filteredOutNodes.add(dataNode);
                }
                if (!filteredOutNodes.isEmpty() && !filteredDataNodes.isEmpty()) {
                    LOG.info("Some data nodes were filtered out because they don't match zone's attributes:\n\tzoneId={}\n\tfilter={}\n\tstorageProfiles={}'\n\tfilteredOutNodes={}\n\tremainingNodes={}", new Object[]{zoneDescriptor.id(), zoneDescriptor.filter(), zoneDescriptor.storageProfiles(), filteredOutNodes, matchedNodes});
                }
            }
            if (filteredDataNodes.isEmpty()) {
                LOG.info("Rebalance is not triggered because data nodes are empty [zoneId={}, filter={}, storageProfiles={}]", new Object[]{zoneDescriptor.id(), zoneDescriptor.filter(), zoneDescriptor.storageProfiles().profiles()});
                return CompletableFutures.nullCompletedFuture();
            }
            long revision = evt.entryEvent().newEntry().revision();
            HybridTimestamp timestamp = evt.entryEvent().newEntry().timestamp();
            Set<String> aliveNodes = DistributionZonesUtil.nodeNames(this.distributionZoneManager.logicalTopology(revision));
            return ZoneRebalanceUtil.triggerZonePartitionsRebalance(zoneDescriptor, filteredDataNodes, revision, timestamp, this.metaStorageManager, this.busyLock, assignmentsTimestamp, aliveNodes);
        });
    }

    private CompletableFuture<Void> onUpdateReplicas(AlterZoneEventParameters parameters) {
        return this.recalculateAssignmentsAndTriggerZonePartitionsRebalance(parameters.zoneDescriptor(), parameters.causalityToken(), parameters.zoneDescriptor().updateTimestamp(), parameters.catalogVersion());
    }

    private CompletableFuture<Void> recalculateAssignmentsAndTriggerZonePartitionsRebalance(CatalogZoneDescriptor zoneDescriptor, long causalityToken, HybridTimestamp timestamp, int catalogVersion) {
        return this.distributionZoneManager.dataNodes(timestamp, catalogVersion, zoneDescriptor.id()).thenCompose(dataNodes -> IgniteUtils.inBusyLockAsync((IgniteBusyLock)this.busyLock, () -> {
            if (dataNodes.isEmpty()) {
                return CompletableFutures.nullCompletedFuture();
            }
            Catalog catalog = this.catalogService.catalog(catalogVersion);
            Set<String> aliveNodes = DistributionZonesUtil.nodeNames(this.distributionZoneManager.logicalTopology(causalityToken));
            return ZoneRebalanceUtil.triggerZonePartitionsRebalance(zoneDescriptor, dataNodes, causalityToken, timestamp, this.metaStorageManager, this.busyLock, catalog.time(), aliveNodes);
        }));
    }

    private CompletableFuture<Void> recoveryRebalanceTrigger(long recoveryRevision) {
        if (recoveryRevision > 0L) {
            Catalog catalog = this.catalogService.catalog(this.catalogService.latestCatalogVersion());
            HybridTimestamp recoveryTimestamp = this.metaStorageManager.timestampByRevisionLocally(recoveryRevision);
            List<CompletableFuture> zonesRecoveryFutures = catalog.zones().stream().map(zoneDesc -> this.recalculateAssignmentsAndTriggerZonePartitionsRebalance((CatalogZoneDescriptor)zoneDesc, recoveryRevision, recoveryTimestamp, catalog.version())).collect(Collectors.toUnmodifiableList());
            return CompletableFuture.allOf(zonesRecoveryFutures.toArray(new CompletableFuture[0]));
        }
        return CompletableFutures.nullCompletedFuture();
    }
}

