/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.distributionzones.rebalance;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite.internal.distributionzones.rebalance.AssignmentUtil;
import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.dsl.Statements;
import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.partitiondistribution.AssignmentsChain;
import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
import org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils;
import org.apache.ignite.internal.partitiondistribution.PendingAssignmentsCalculator;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteBusyLock;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.StringUtils;
import org.jetbrains.annotations.Nullable;

public class ZoneRebalanceUtil {
    private static final IgniteLogger LOG = Loggers.forClass(ZoneRebalanceUtil.class);
    public static final String PENDING_ASSIGNMENTS_QUEUE_PREFIX = "zone.assignments.pending.";
    public static final byte[] PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES = "zone.assignments.pending.".getBytes(StandardCharsets.UTF_8);
    public static final String STABLE_ASSIGNMENTS_PREFIX = "zone.assignments.stable.";
    public static final byte[] STABLE_ASSIGNMENTS_PREFIX_BYTES = "zone.assignments.stable.".getBytes(StandardCharsets.UTF_8);
    public static final String PLANNED_ASSIGNMENTS_PREFIX = "zone.assignments.planned.";
    public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX = "zone.assignments.switch.reduce.";
    public static final byte[] ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES = "zone.assignments.switch.reduce.".getBytes(StandardCharsets.UTF_8);
    public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX = "zone.assignments.switch.append.";
    private static final String ZONE_PENDING_CHANGE_TRIGGER_PREFIX = "zone.pending.change.trigger.";
    static final byte[] ZONE_PENDING_CHANGE_TRIGGER_PREFIX_BYTES = "zone.pending.change.trigger.".getBytes(StandardCharsets.UTF_8);
    public static final String ZONE_ASSIGNMENTS_CHAIN_PREFIX = "zone.assignments.chain.";

    public static CompletableFuture<Void> updatePendingAssignmentsKeys(CatalogZoneDescriptor zoneDescriptor, ZonePartitionId zonePartitionId, Collection<String> dataNodes, int partitions, int replicas, int consensusGroupSize, long revision, HybridTimestamp timestamp, MetaStorageManager metaStorageMgr, int partNum, Set<Assignment> zoneCfgPartAssignments, List<Set<Assignment>> currentAllPartitionsAssignments, long assignmentsTimestamp, Set<String> aliveNodes, ConsistencyMode consistencyMode) {
        Set targetAssignmentSet;
        ByteArray partChangeTriggerKey = ZoneRebalanceUtil.pendingChangeTriggerKey(zonePartitionId);
        ByteArray partAssignmentsPendingKey = ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(zonePartitionId);
        ByteArray partAssignmentsPlannedKey = ZoneRebalanceUtil.plannedPartAssignmentsKey(zonePartitionId);
        ByteArray partAssignmentsStableKey = ZoneRebalanceUtil.stablePartAssignmentsKey(zonePartitionId);
        Set calculatedAssignments = PartitionDistributionUtils.calculateAssignmentForPartition(dataNodes, currentAllPartitionsAssignments, (int)partNum, (int)partitions, (int)replicas, (int)consensusGroupSize);
        if (consistencyMode == ConsistencyMode.HIGH_AVAILABILITY) {
            Set resultingAssignments = calculatedAssignments.stream().filter(a -> aliveNodes.contains(a.consistentId())).collect(Collectors.toSet());
            for (Assignment assignment : zoneCfgPartAssignments) {
                if (!calculatedAssignments.contains(assignment)) continue;
                resultingAssignments.add(assignment);
            }
            targetAssignmentSet = resultingAssignments;
        } else {
            targetAssignmentSet = calculatedAssignments;
        }
        boolean isNewAssignments = !zoneCfgPartAssignments.equals(targetAssignmentSet);
        Assignments targetAssignments = Assignments.of((Set)targetAssignmentSet, (long)assignmentsTimestamp);
        AssignmentsQueue partAssignmentsPendingQueue = PendingAssignmentsCalculator.pendingAssignmentsCalculator().stable(Assignments.of(zoneCfgPartAssignments, (long)assignmentsTimestamp)).target(targetAssignments).toQueue();
        byte[] partAssignmentsPlannedBytes = targetAssignments.toBytes();
        byte[] partAssignmentsPendingBytes = partAssignmentsPendingQueue.toBytes();
        Condition newAssignmentsCondition = Conditions.exists((ByteArray)partAssignmentsStableKey).and((Condition)Conditions.value((ByteArray)partAssignmentsStableKey).ne(partAssignmentsPlannedBytes));
        if (isNewAssignments) {
            newAssignmentsCondition = Conditions.notExists((ByteArray)partAssignmentsStableKey).or(newAssignmentsCondition);
        }
        byte[] timestampBytes = ByteUtils.longToBytesKeepingOrder((long)timestamp.longValue());
        Iif iif = Statements.iif((Condition)Conditions.or((Condition)Conditions.notExists((ByteArray)partChangeTriggerKey), (Condition)Conditions.value((ByteArray)partChangeTriggerKey).lt(timestampBytes)), (Iif)Statements.iif((Condition)Conditions.and((Condition)Conditions.notExists((ByteArray)partAssignmentsPendingKey), (Condition)newAssignmentsCondition), (Update)Operations.ops((Operation[])new Operation[]{Operations.put((ByteArray)partAssignmentsPendingKey, (byte[])partAssignmentsPendingBytes), Operations.put((ByteArray)partChangeTriggerKey, (byte[])timestampBytes)}).yield(RebalanceUtil.UpdateStatus.PENDING_KEY_UPDATED.ordinal()), (Iif)Statements.iif((Condition)Conditions.and((Condition)Conditions.value((ByteArray)partAssignmentsPendingKey).ne(partAssignmentsPendingBytes), (Condition)Conditions.exists((ByteArray)partAssignmentsPendingKey)), (Update)Operations.ops((Operation[])new Operation[]{Operations.put((ByteArray)partAssignmentsPlannedKey, (byte[])partAssignmentsPlannedBytes), Operations.put((ByteArray)partChangeTriggerKey, (byte[])timestampBytes)}).yield(RebalanceUtil.UpdateStatus.PLANNED_KEY_UPDATED.ordinal()), (Iif)Statements.iif((Condition)Conditions.value((ByteArray)partAssignmentsPendingKey).eq(partAssignmentsPendingBytes), (Update)Operations.ops((Operation[])new Operation[]{Operations.remove((ByteArray)partAssignmentsPlannedKey), Operations.put((ByteArray)partChangeTriggerKey, (byte[])timestampBytes)}).yield(RebalanceUtil.UpdateStatus.PLANNED_KEY_REMOVED_EQUALS_PENDING.ordinal()), (Iif)Statements.iif((Condition)Conditions.notExists((ByteArray)partAssignmentsPendingKey), (Update)Operations.ops((Operation[])new Operation[]{Operations.remove((ByteArray)partAssignmentsPlannedKey), Operations.put((ByteArray)partChangeTriggerKey, (byte[])timestampBytes)}).yield(RebalanceUtil.UpdateStatus.PLANNED_KEY_REMOVED_EMPTY_PENDING.ordinal()), (Update)Operations.ops((Operation[])new Operation[0]).yield(RebalanceUtil.UpdateStatus.ASSIGNMENT_NOT_UPDATED.ordinal()))))), (Update)Operations.ops((Operation[])new Operation[0]).yield(RebalanceUtil.UpdateStatus.OUTDATED_UPDATE_RECEIVED.ordinal()));
        return metaStorageMgr.invoke(iif).thenAccept(sr -> {
            switch (UpdateStatus.valueOf(sr.getAsInt())) {
                case PENDING_KEY_UPDATED: {
                    LOG.info("Update metastore pending partitions key [key={}, partition={}, zone={}/{}, newVal={}, timestamp={}]", new Object[]{partAssignmentsPendingKey.toString(), partNum, zoneDescriptor.id(), zoneDescriptor.name(), partAssignmentsPendingQueue, timestamp});
                    break;
                }
                case PLANNED_KEY_UPDATED: {
                    LOG.info("Update metastore planned partitions key [key={}, partition={}, zone={}/{}, newVal={}]", new Object[]{partAssignmentsPlannedKey, partNum, zoneDescriptor.id(), zoneDescriptor.name(), targetAssignmentSet});
                    break;
                }
                case PLANNED_KEY_REMOVED_EQUALS_PENDING: {
                    LOG.info("Remove planned key because current pending key has the same value [key={}, partition={}, zone={}/{}, val={}]", new Object[]{partAssignmentsPlannedKey.toString(), partNum, zoneDescriptor.id(), zoneDescriptor.name(), targetAssignmentSet});
                    break;
                }
                case PLANNED_KEY_REMOVED_EMPTY_PENDING: {
                    LOG.info("Remove planned key because pending is empty and calculated assignments are equal to current assignments [key={}, partition={}, zone={}/{}, val={}]", new Object[]{partAssignmentsPlannedKey.toString(), partNum, zoneDescriptor.id(), zoneDescriptor.name(), targetAssignmentSet});
                    break;
                }
                case ASSIGNMENT_NOT_UPDATED: {
                    LOG.debug("Assignments are not updated [key={}, partition={}, zone={}/{}, val={}]", new Object[]{partAssignmentsPlannedKey.toString(), partNum, zoneDescriptor.id(), zoneDescriptor.name(), targetAssignmentSet});
                    break;
                }
                case OUTDATED_UPDATE_RECEIVED: {
                    LOG.debug("Received outdated rebalance trigger event [revision={}, partition={}, zone={}/{}]", new Object[]{revision, partNum, zoneDescriptor.id(), zoneDescriptor.name()});
                    break;
                }
                default: {
                    throw new IllegalStateException("Unknown return code for rebalance metastore multi-invoke");
                }
            }
        });
    }

    static CompletableFuture<Void> triggerZonePartitionsRebalance(CatalogZoneDescriptor zoneDescriptor, Set<String> dataNodes, long storageRevision, HybridTimestamp storageTimestamp, MetaStorageManager metaStorageManager, IgniteSpinBusyLock busyLock, long assignmentsTimestamp, Set<String> aliveNodes) {
        int[] partitionIds = AssignmentUtil.partitionIds(zoneDescriptor.partitions());
        return ZoneRebalanceUtil.zoneStableAssignments(metaStorageManager, zoneDescriptor.id(), partitionIds).thenCompose(stableAssignments -> {
            if (stableAssignments.isEmpty()) {
                return CompletableFutures.nullCompletedFuture();
            }
            return ZoneRebalanceUtil.zonePendingAssignments(metaStorageManager, zoneDescriptor.id(), partitionIds).thenCompose(pendingAssignments -> {
                HashMap<Integer, Assignments> assignmentsMap = new HashMap<Integer, Assignments>((Map<Integer, Assignments>)stableAssignments);
                assignmentsMap.putAll((Map<Integer, Assignments>)pendingAssignments);
                return ZoneRebalanceUtil.zonePartitionAssignments(zoneDescriptor, dataNodes, storageRevision, storageTimestamp, metaStorageManager, busyLock, assignmentsTimestamp, stableAssignments, AssignmentUtil.assignmentsAsList(assignmentsMap, zoneDescriptor.partitions()), aliveNodes);
            });
        });
    }

    private static CompletableFuture<Void> zonePartitionAssignments(CatalogZoneDescriptor zoneDescriptor, Set<String> dataNodes, long storageRevision, HybridTimestamp storageTimestamp, MetaStorageManager metaStorageManager, IgniteSpinBusyLock busyLock, long assignmentsTimestamp, Map<Integer, Assignments> zoneAssignments, List<Set<Assignment>> currentAssignments, Set<String> aliveNodes) {
        int finalPartId;
        CompletableFuture[] partitionFutures = new CompletableFuture[zoneDescriptor.partitions()];
        for (int partId = 0; partId < zoneDescriptor.partitions(); ++partId) {
            ZonePartitionId replicaGrpId = new ZonePartitionId(zoneDescriptor.id(), partId);
            finalPartId = partId;
            partitionFutures[partId] = IgniteUtils.inBusyLockAsync((IgniteBusyLock)busyLock, () -> zoneAssignments.isEmpty() ? CompletableFutures.nullCompletedFuture() : ZoneRebalanceUtil.updatePendingAssignmentsKeys(zoneDescriptor, replicaGrpId, dataNodes, zoneDescriptor.partitions(), zoneDescriptor.replicas(), zoneDescriptor.consensusGroupSize(), storageRevision, storageTimestamp, metaStorageManager, finalPartId, ((Assignments)zoneAssignments.get(finalPartId)).nodes(), currentAssignments, assignmentsTimestamp, aliveNodes, zoneDescriptor.consistencyMode()));
        }
        ConcurrentHashMap.KeySetView unwrappedCauses = ConcurrentHashMap.newKeySet();
        for (int partId = 0; partId < partitionFutures.length; ++partId) {
            finalPartId = partId;
            partitionFutures[partId].exceptionally(e -> {
                Throwable cause = ExceptionUtils.unwrapCause((Throwable)e);
                if (unwrappedCauses.add(cause)) {
                    LOG.error("Exception on updating assignments for [zoneId={}, name={}, partition={}]", e, new Object[]{zoneDescriptor.id(), zoneDescriptor.name(), finalPartId});
                } else {
                    LOG.error("Exception on updating assignments for [zoneId={}, name={}]", e, new Object[]{zoneDescriptor.id(), zoneDescriptor.name()});
                }
                return null;
            });
        }
        return CompletableFuture.allOf(partitionFutures);
    }

    public static CompletableFuture<Set<Assignment>> zonePartitionAssignments(MetaStorageManager metaStorageManager, int zoneId, int partitionId) {
        return metaStorageManager.get(ZoneRebalanceUtil.stablePartAssignmentsKey(new ZonePartitionId(zoneId, partitionId))).thenApply(e -> e.value() == null ? null : Assignments.fromBytes((byte[])e.value()).nodes());
    }

    public static ByteArray pendingChangeTriggerKey(ZonePartitionId zonePartitionId) {
        return new ByteArray(ZONE_PENDING_CHANGE_TRIGGER_PREFIX + zonePartitionId);
    }

    public static ByteArray assignmentsChainKey(ZonePartitionId partId) {
        return new ByteArray(ZONE_ASSIGNMENTS_CHAIN_PREFIX + partId);
    }

    public static ByteArray pendingPartAssignmentsQueueKey(ZonePartitionId zonePartitionId) {
        return new ByteArray(PENDING_ASSIGNMENTS_QUEUE_PREFIX + zonePartitionId);
    }

    public static ByteArray plannedPartAssignmentsKey(ZonePartitionId zonePartitionId) {
        return new ByteArray(PLANNED_ASSIGNMENTS_PREFIX + zonePartitionId);
    }

    public static ByteArray stablePartAssignmentsKey(ZonePartitionId zonePartitionId) {
        return new ByteArray(STABLE_ASSIGNMENTS_PREFIX + zonePartitionId);
    }

    public static ByteArray switchReduceKey(ZonePartitionId zonePartitionId) {
        return new ByteArray(ASSIGNMENTS_SWITCH_REDUCE_PREFIX + zonePartitionId);
    }

    public static ByteArray switchAppendKey(ZonePartitionId zonePartitionId) {
        return new ByteArray(ASSIGNMENTS_SWITCH_APPEND_PREFIX + zonePartitionId);
    }

    public static ZonePartitionId extractZonePartitionId(byte[] key, byte[] prefix) {
        String zonePartitionIdString = StringUtils.toStringWithoutPrefix((byte[])key, (int)prefix.length);
        return ZonePartitionId.fromString((String)zonePartitionIdString);
    }

    public static <T> Set<T> subtract(Set<T> minuend, Set<T> subtrahend) {
        return minuend.stream().filter(v -> !subtrahend.contains(v)).collect(Collectors.toSet());
    }

    public static <T> Set<T> union(Set<T> op1, Set<T> op2) {
        HashSet<T> res = new HashSet<T>(op1);
        res.addAll(op2);
        return res;
    }

    public static <T> Set<T> intersect(Set<T> op1, Set<T> op2) {
        return op1.stream().filter(op2::contains).collect(Collectors.toSet());
    }

    @Nullable
    public static Set<Assignment> zonePartitionAssignmentsGetLocally(MetaStorageManager metaStorageManager, int zoneId, int partitionNumber, long revision) {
        Assignments assignments = ZoneRebalanceUtil.zoneStableAssignmentsGetLocally(metaStorageManager, new ZonePartitionId(zoneId, partitionNumber), revision);
        return assignments == null ? null : assignments.nodes();
    }

    public static List<Assignments> zoneAssignmentsGetLocally(MetaStorageManager metaStorageManager, int zoneId, int numberOfPartitions, long revision) {
        return IntStream.range(0, numberOfPartitions).mapToObj(p -> {
            Assignments assignments = ZoneRebalanceUtil.zoneStableAssignmentsGetLocally(metaStorageManager, new ZonePartitionId(zoneId, p), revision);
            assert (assignments != null) : "No assignments found for " + new ZonePartitionId(zoneId, p);
            return assignments;
        }).collect(Collectors.toList());
    }

    public static List<@Nullable Assignments> zonePendingAssignmentsGetLocally(MetaStorageManager metaStorageManager, int zoneId, int numberOfPartitions, long revision) {
        return IntStream.range(0, numberOfPartitions).mapToObj(p -> {
            Entry e = metaStorageManager.getLocally(ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(new ZonePartitionId(zoneId, p)), revision);
            return e != null && !e.empty() && !e.tombstone() ? AssignmentsQueue.fromBytes((byte[])e.value()).poll() : null;
        }).collect(Collectors.toList());
    }

    public static CompletableFuture<Set<Assignment>> stablePartitionAssignments(MetaStorageManager metaStorageManager, int zoneId, int partitionId) {
        return metaStorageManager.get(ZoneRebalanceUtil.stablePartAssignmentsKey(new ZonePartitionId(zoneId, partitionId))).thenApply(e -> e.value() == null ? null : Assignments.fromBytes((byte[])e.value()).nodes());
    }

    public static CompletableFuture<Set<Assignment>> pendingPartitionAssignments(MetaStorageManager metaStorageManager, int zoneId, int partitionId) {
        return metaStorageManager.get(ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(new ZonePartitionId(zoneId, partitionId))).thenApply(e -> e.value() == null ? null : AssignmentsQueue.fromBytes((byte[])e.value()).poll().nodes());
    }

    public static CompletableFuture<Set<Assignment>> plannedPartitionAssignments(MetaStorageManager metaStorageManager, int zoneId, int partitionId) {
        return metaStorageManager.get(ZoneRebalanceUtil.plannedPartAssignmentsKey(new ZonePartitionId(zoneId, partitionId))).thenApply(e -> e.value() == null ? null : Assignments.fromBytes((byte[])e.value()).nodes());
    }

    public static CompletableFuture<Map<Integer, Assignments>> zoneStableAssignments(MetaStorageManager metaStorageManager, int zoneId, int[] partitionIds) {
        return AssignmentUtil.metastoreAssignments(metaStorageManager, partitionIds, partitionId -> ZoneRebalanceUtil.stablePartAssignmentsKey(new ZonePartitionId(zoneId, partitionId.intValue())), Assignments::fromBytes).whenComplete((assignmentsMap, throwable) -> {
            if (throwable == null) {
                int numberOfMsPartitions = assignmentsMap.size();
                assert (numberOfMsPartitions == 0 || numberOfMsPartitions == partitionIds.length) : "Invalid number of partition entries received from meta storage [received=" + numberOfMsPartitions + ", numberOfPartitions=" + partitionIds.length + ", zoneId=" + zoneId + "].";
            }
        });
    }

    public static CompletableFuture<Map<Integer, Assignments>> zonePendingAssignments(MetaStorageManager metaStorageManager, int zoneId, int[] partitionIds) {
        return AssignmentUtil.metastoreAssignments(metaStorageManager, partitionIds, partitionId -> ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(new ZonePartitionId(zoneId, partitionId.intValue())), bytes -> AssignmentsQueue.fromBytes((byte[])bytes).poll());
    }

    @Nullable
    public static Assignments zoneStableAssignmentsGetLocally(MetaStorageManager metaStorageManager, ZonePartitionId zonePartitionId, long revision) {
        Entry entry = metaStorageManager.getLocally(ZoneRebalanceUtil.stablePartAssignmentsKey(zonePartitionId), revision);
        return entry == null || entry.empty() || entry.tombstone() ? null : Assignments.fromBytes((byte[])entry.value());
    }

    public static List<AssignmentsChain> zoneAssignmentsChainGetLocally(MetaStorageManager metaStorageManager, int zoneId, int numberOfPartitions, long revision) {
        return IntStream.range(0, numberOfPartitions).mapToObj(p -> ZoneRebalanceUtil.assignmentsChainGetLocally(metaStorageManager, new ZonePartitionId(zoneId, p), revision)).collect(Collectors.toList());
    }

    @Nullable
    public static AssignmentsChain assignmentsChainGetLocally(MetaStorageManager metaStorageManager, ZonePartitionId zonePartitionId, long revision) {
        Entry e = metaStorageManager.getLocally(ZoneRebalanceUtil.assignmentsChainKey(zonePartitionId), revision);
        return e != null ? AssignmentsChain.fromBytes((byte[])e.value()) : null;
    }

    public static enum UpdateStatus {
        PENDING_KEY_UPDATED,
        PLANNED_KEY_UPDATED,
        PLANNED_KEY_REMOVED_EQUALS_PENDING,
        PLANNED_KEY_REMOVED_EMPTY_PENDING,
        ASSIGNMENT_NOT_UPDATED,
        OUTDATED_UPDATE_RECEIVED;

        private static final UpdateStatus[] VALUES;

        public static UpdateStatus valueOf(int ordinal) {
            return VALUES[ordinal];
        }

        static {
            VALUES = UpdateStatus.values();
        }
    }
}

