/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.snapshots.coordinator;

import java.util.Collection;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Cursor;
import org.gridgain.internal.snapshots.SnapshotException;
import org.gridgain.internal.snapshots.SnapshotManagerContext;
import org.gridgain.internal.snapshots.coordinator.LocalSnapshotStateListener;

public class SnapshotRebalanceWatch
implements WatchListener {
    private static final IgniteLogger LOG = Loggers.forClass(SnapshotRebalanceWatch.class);
    private final SnapshotManagerContext context;
    private final ConcurrentMap<UUID, Set<Integer>> objectIdsByOperationId = new ConcurrentHashMap<UUID, Set<Integer>>();
    private final ConcurrentMap<UUID, LocalSnapshotStateListener> ongoingSnapshots;

    SnapshotRebalanceWatch(SnapshotManagerContext context, ConcurrentMap<UUID, LocalSnapshotStateListener> ongoingSnapshots) {
        this.context = context;
        this.ongoingSnapshots = ongoingSnapshots;
    }

    void register() {
        this.context.metaStorageManager().registerPrefixWatch(new ByteArray(this.metastoragePrefix()), (WatchListener)this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> onUpdate(WatchEvent event) {
        if (!this.context.busyLock().enterBusy()) {
            LOG.debug("Skipping Global Snapshot state update because the node is stopping", new Object[0]);
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            for (EntryEvent entryEvent : event.entryEvents()) {
                Entry entry = entryEvent.newEntry();
                if (entry.value() == null) continue;
                this.processNewStateEntry(entry.key());
            }
            CompletableFuture completableFuture = CompletableFutures.nullCompletedFuture();
            return completableFuture;
        }
        finally {
            this.context.busyLock().leaveBusy();
        }
    }

    private void processNewStateEntry(byte[] key) {
        int objectId = this.extractObjectId(key);
        this.objectIdsByOperationId.entrySet().stream().filter(entry -> ((Set)entry.getValue()).contains(objectId)).map(it -> (LocalSnapshotStateListener)this.ongoingSnapshots.get(it.getKey())).forEach(listener -> listener.onSnapshotFailed(this.context.nodeName(), "Rebalance detected, objectId=" + objectId));
    }

    void removeOperation(UUID operationId) {
        this.objectIdsByOperationId.remove(operationId);
    }

    void addOperation(UUID uuid, Collection<CatalogTableDescriptor> tableDescriptors) {
        Set<Integer> objectIds = this.context.nodeProperties().colocationEnabled() ? tableDescriptors.stream().map(CatalogTableDescriptor::zoneId).collect(Collectors.toSet()) : tableDescriptors.stream().map(CatalogObjectDescriptor::id).collect(Collectors.toSet());
        this.objectIdsByOperationId.put(uuid, objectIds);
        this.validateNoActiveRebalance(objectIds);
    }

    private void validateNoActiveRebalance(Set<Integer> objectIds) {
        ByteArray prefix = new ByteArray(this.metastoragePrefix());
        try (Cursor entries = this.context.metaStorageManager().prefixLocally(prefix, Long.MAX_VALUE);){
            for (Entry entry : entries) {
                int objectId;
                if (entry.tombstone() || !objectIds.contains(objectId = this.extractObjectId(entry.key()))) continue;
                throw new SnapshotException("Rebalance detected, objectId=" + objectId);
            }
        }
    }

    private int extractObjectId(byte[] key) {
        if (this.context.nodeProperties().colocationEnabled()) {
            return ZoneRebalanceUtil.extractZonePartitionId((byte[])key, (byte[])this.metastoragePrefix()).objectId();
        }
        return RebalanceUtil.extractTablePartitionId((byte[])key, (byte[])this.metastoragePrefix()).objectId();
    }

    private byte[] metastoragePrefix() {
        return this.context.nodeProperties().colocationEnabled() ? ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES : RebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES;
    }
}

