/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table.distributed.disaster;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.catalog.Catalog;
import org.apache.ignite3.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite3.internal.distributionzones.NodeWithAttributes;
import org.apache.ignite3.internal.distributionzones.rebalance.AssignmentUtil;
import org.apache.ignite3.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite3.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.MetaStorageManager;
import org.apache.ignite3.internal.metastorage.dsl.Conditions;
import org.apache.ignite3.internal.metastorage.dsl.Iif;
import org.apache.ignite3.internal.metastorage.dsl.Operation;
import org.apache.ignite3.internal.metastorage.dsl.Operations;
import org.apache.ignite3.internal.metastorage.dsl.Statements;
import org.apache.ignite3.internal.partition.replicator.network.disaster.LocalPartitionStateEnum;
import org.apache.ignite3.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
import org.apache.ignite3.internal.partitiondistribution.Assignment;
import org.apache.ignite3.internal.partitiondistribution.Assignments;
import org.apache.ignite3.internal.partitiondistribution.AssignmentsQueue;
import org.apache.ignite3.internal.partitiondistribution.PartitionDistributionUtils;
import org.apache.ignite3.internal.partitiondistribution.PendingAssignmentsCalculator;
import org.apache.ignite3.internal.replicator.PartitionGroupId;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.table.distributed.disaster.DisasterRecoveryManager;
import org.apache.ignite3.internal.table.distributed.disaster.GroupUpdateRequest;
import org.apache.ignite3.internal.table.distributed.disaster.LocalPartitionStateMessageByNode;
import org.apache.ignite3.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
import org.apache.ignite3.internal.util.ByteUtils;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.lang.ErrorGroups;
import org.jetbrains.annotations.Nullable;

abstract class GroupUpdateRequestHandler<T extends PartitionGroupId> {
    private static final IgniteLogger LOG = Loggers.forClass(GroupUpdateRequest.class);
    private final GroupUpdateRequest request;

    public static GroupUpdateRequestHandler<?> handler(GroupUpdateRequest request) {
        return request.colocationEnabled() ? new ZoneGroupUpdateRequestHandler(request) : new TableGroupUpdateRequestHandler(request);
    }

    GroupUpdateRequestHandler(GroupUpdateRequest request) {
        this.request = request;
    }

    public CompletableFuture<Void> handle(DisasterRecoveryManager disasterRecoveryManager, long msRevision, HybridTimestamp msTimestamp) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)disasterRecoveryManager.busyLock(), () -> {
            int catalogVersion = disasterRecoveryManager.catalogManager.activeCatalogVersion(msTimestamp.longValue());
            if (this.request.catalogVersion() != catalogVersion) {
                return CompletableFuture.failedFuture(new DisasterRecoveryException(ErrorGroups.DisasterRecovery.CLUSTER_NOT_IDLE_ERR, "Cluster is not idle, concurrent DDL update detected."));
            }
            Catalog catalog = disasterRecoveryManager.catalogManager.catalog(catalogVersion);
            int zoneId = this.request.zoneId();
            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
            HashSet<Integer> allZonePartitionsToReset = new HashSet<Integer>();
            this.request.partitionIds().values().forEach(allZonePartitionsToReset::addAll);
            CompletableFuture<Set<String>> dataNodesFuture = disasterRecoveryManager.dzManager.dataNodes(msTimestamp, catalogVersion, zoneId);
            CompletableFuture<Map<T, LocalPartitionStateMessageByNode>> localStatesFuture = this.localStatesFuture(disasterRecoveryManager, Set.of(zoneDescriptor.name()), allZonePartitionsToReset, catalog);
            return ((CompletableFuture)dataNodesFuture.thenCombine(localStatesFuture, (dataNodes, localStatesMap) -> IgniteUtils.inBusyLock((IgniteBusyLock)disasterRecoveryManager.busyLock(), () -> {
                Set<String> nodeConsistentIds = disasterRecoveryManager.dzManager.logicalTopology(msRevision).stream().map(NodeWithAttributes::nodeName).collect(Collectors.toSet());
                ArrayList<CompletableFuture<Void>> assignmentsUpdateFuts = new ArrayList<CompletableFuture<Void>>(this.request.partitionIds().size());
                for (Map.Entry<Integer, Set<Integer>> partitionEntry : this.request.partitionIds().entrySet()) {
                    int[] partitionIdsArray = AssignmentUtil.partitionIds(partitionEntry.getValue(), zoneDescriptor.partitions());
                    assignmentsUpdateFuts.add(this.forceAssignmentsUpdate(partitionEntry.getKey(), zoneDescriptor, (Set<String>)dataNodes, nodeConsistentIds, msRevision, msTimestamp, disasterRecoveryManager.metaStorageManager, (Map<T, LocalPartitionStateMessageByNode>)localStatesMap, catalog.time(), partitionIdsArray, this.request.manualUpdate(), disasterRecoveryManager));
                }
                return CompletableFuture.allOf(assignmentsUpdateFuts.toArray(new CompletableFuture[0]));
            }).whenComplete((unused, throwable) -> {
                if (throwable != null) {
                    LOG.error("Failed to reset partition", (Throwable)throwable);
                }
            }))).thenCompose(Function.identity());
        });
    }

    private CompletableFuture<Void> forceAssignmentsUpdate(int replicationId, CatalogZoneDescriptor zoneDescriptor, Set<String> dataNodes, Set<String> aliveNodesConsistentIds, long revision, HybridTimestamp timestamp, MetaStorageManager metaStorageManager, Map<T, LocalPartitionStateMessageByNode> localStatesMap, long assignmentsTimestamp, int[] partitionIds, boolean manualUpdate, DisasterRecoveryManager disasterRecoveryManager) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)disasterRecoveryManager.busyLock(), () -> {
            CompletableFuture<Map<Integer, Assignments>> stableAssignments = this.stableAssignments(metaStorageManager, replicationId, partitionIds);
            return stableAssignments.thenCompose(assignments -> IgniteUtils.inBusyLock((IgniteBusyLock)disasterRecoveryManager.busyLock(), () -> {
                if (assignments.isEmpty()) {
                    return CompletableFutures.nullCompletedFuture();
                }
                return this.pendingAssignments(metaStorageManager, replicationId, partitionIds).thenCompose(pendingAssignments -> {
                    HashMap<Integer, Assignments> assignmentsMap = new HashMap<Integer, Assignments>((Map<Integer, Assignments>)assignments);
                    assignmentsMap.putAll((Map<Integer, Assignments>)pendingAssignments);
                    return this.updateAssignments(replicationId, zoneDescriptor, dataNodes, aliveNodesConsistentIds, revision, timestamp, metaStorageManager, localStatesMap, assignmentsTimestamp, partitionIds, (Map<Integer, Assignments>)assignments, AssignmentUtil.assignmentsAsList(assignmentsMap, zoneDescriptor.partitions()), manualUpdate, disasterRecoveryManager);
                });
            }));
        });
    }

    private CompletableFuture<Void> updateAssignments(int replicationId, CatalogZoneDescriptor zoneDescriptor, Set<String> dataNodes, Set<String> aliveNodesConsistentIds, long revision, HybridTimestamp timestamp, MetaStorageManager metaStorageManager, Map<T, LocalPartitionStateMessageByNode> localStatesMap, long assignmentsTimestamp, int[] partitionIds, Map<Integer, Assignments> stableAssignments, List<Set<Assignment>> currentAllAssignments, boolean manualUpdate, DisasterRecoveryManager disasterRecoveryManager) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)disasterRecoveryManager.busyLock(), () -> {
            Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes, aliveNodesConsistentIds);
            CompletableFuture[] futures = new CompletableFuture[partitionIds.length];
            for (int i = 0; i < partitionIds.length; ++i) {
                Object replicaGrpId = this.replicationGroupId(replicationId, partitionIds[i]);
                LocalPartitionStateMessageByNode localStatesByNode = localStatesMap.containsKey(replicaGrpId) ? (LocalPartitionStateMessageByNode)localStatesMap.get(replicaGrpId) : new LocalPartitionStateMessageByNode(Collections.emptyMap());
                futures[i] = this.partitionUpdate(replicaGrpId, aliveDataNodes, aliveNodesConsistentIds, zoneDescriptor.partitions(), zoneDescriptor.replicas(), zoneDescriptor.consensusGroupSize(), revision, timestamp, metaStorageManager, ((Assignments)stableAssignments.get(replicaGrpId.partitionId())).nodes(), currentAllAssignments, localStatesByNode, assignmentsTimestamp, manualUpdate, disasterRecoveryManager).thenAccept(res -> DisasterRecoveryManager.LOG.info("Partition {} returned {} status on reset attempt", new Object[]{replicaGrpId, RebalanceUtil.UpdateStatus.valueOf(res)}));
            }
            return CompletableFuture.allOf(futures);
        });
    }

    private CompletableFuture<Integer> partitionUpdate(T partId, Collection<String> aliveDataNodes, Set<String> aliveNodesConsistentIds, int partitions, int replicas, int consensusGroupSize, long revision, HybridTimestamp timestamp, MetaStorageManager metaStorageMgr, Set<Assignment> currentAssignments, List<Set<Assignment>> currentAllAssignments, LocalPartitionStateMessageByNode localPartitionStateMessageByNode, long assignmentsTimestamp, boolean manualUpdate, DisasterRecoveryManager disasterRecoveryManager) {
        return IgniteUtils.inBusyLock((IgniteBusyLock)disasterRecoveryManager.busyLock(), () -> {
            boolean isProposedPendingEqualsProposedPlanned;
            Set<Assignment> partAssignments = GroupUpdateRequestHandler.getAliveNodesWithData(aliveNodesConsistentIds, localPartitionStateMessageByNode);
            Set<Assignment> aliveStableNodes = CollectionUtils.intersect(currentAssignments, partAssignments);
            if (aliveStableNodes.size() >= replicas / 2 + 1) {
                return CompletableFuture.completedFuture(RebalanceUtil.UpdateStatus.ASSIGNMENT_NOT_UPDATED.ordinal());
            }
            if (aliveStableNodes.isEmpty() && !manualUpdate) {
                return CompletableFuture.completedFuture(RebalanceUtil.UpdateStatus.ASSIGNMENT_NOT_UPDATED.ordinal());
            }
            if (manualUpdate) {
                GroupUpdateRequestHandler.enrichAssignments(partId, aliveDataNodes, partitions, replicas, consensusGroupSize, partAssignments, currentAllAssignments);
            }
            partAssignments = PartitionDistributionUtils.calculateAssignmentForPartition(partAssignments.stream().map(Assignment::consistentId).collect(Collectors.toSet()), currentAllAssignments, partId.partitionId(), partitions, replicas, consensusGroupSize);
            Assignment nextAssignment = GroupUpdateRequestHandler.nextAssignment(localPartitionStateMessageByNode, partAssignments);
            boolean bl = isProposedPendingEqualsProposedPlanned = partAssignments.size() == 1;
            assert (partAssignments.contains(nextAssignment)) : IgniteStringFormatter.format("Recovery nodes set doesn't contain the reset node assignment [partAssignments={}, nextAssignment={}]", partAssignments, nextAssignment);
            AssignmentsQueue assignmentsQueue = PendingAssignmentsCalculator.pendingAssignmentsCalculator().stable(Assignments.of(currentAssignments, assignmentsTimestamp)).target(Assignments.forced(Set.of(nextAssignment), assignmentsTimestamp)).toQueue();
            return this.invoke(partId, revision, timestamp, metaStorageMgr, assignmentsTimestamp, assignmentsQueue, isProposedPendingEqualsProposedPlanned, partAssignments);
        });
    }

    private static Assignment nextAssignment(LocalPartitionStateMessageByNode localPartitionStateByNode, Set<Assignment> assignments) {
        Optional<Assignment> nodeWithMaxLogIndex = assignments.stream().filter(assignment -> localPartitionStateByNode.partitionState(assignment.consistentId()) != null).min(Comparator.comparingLong(node -> localPartitionStateByNode.partitionState(node.consistentId()).logIndex()).reversed().thenComparing(Assignment::consistentId)).or(() -> assignments.stream().min(Comparator.comparing(Assignment::consistentId)));
        return nodeWithMaxLogIndex.orElseThrow();
    }

    static Set<Assignment> getAliveNodesWithData(Set<String> aliveNodesConsistentIds, LocalPartitionStateMessageByNode localPartitionStateMessageByNode) {
        HashSet<Assignment> partAssignments = new HashSet<Assignment>();
        for (Map.Entry<String, LocalPartitionStateMessage> entry : localPartitionStateMessageByNode.entrySet()) {
            String nodeName = entry.getKey();
            LocalPartitionStateEnum state = entry.getValue().state();
            if (!aliveNodesConsistentIds.contains(nodeName) || state != LocalPartitionStateEnum.HEALTHY && state != LocalPartitionStateEnum.CATCHING_UP) continue;
            if (entry.getValue().isLearner()) {
                partAssignments.add(Assignment.forLearner(nodeName));
                continue;
            }
            partAssignments.add(Assignment.forPeer(nodeName));
        }
        return partAssignments;
    }

    private static void enrichAssignments(PartitionGroupId partId, Collection<String> aliveDataNodes, int partitions, int replicas, int consensusGroupSize, Set<Assignment> partAssignments, List<Set<Assignment>> currentDistribution) {
        Set<Assignment> calcAssignments = PartitionDistributionUtils.calculateAssignmentForPartition(aliveDataNodes, currentDistribution, partId.partitionId(), partitions, replicas, consensusGroupSize);
        for (Assignment calcAssignment : calcAssignments) {
            if (partAssignments.size() == replicas) break;
            if (partAssignments.contains(Assignment.forPeer(calcAssignment.consistentId())) || partAssignments.contains(Assignment.forLearner(calcAssignment.consistentId()))) continue;
            partAssignments.add(calcAssignment);
        }
    }

    abstract CompletableFuture<Map<T, LocalPartitionStateMessageByNode>> localStatesFuture(DisasterRecoveryManager var1, Set<String> var2, Set<Integer> var3, Catalog var4);

    abstract CompletableFuture<Map<Integer, Assignments>> stableAssignments(MetaStorageManager var1, int var2, int[] var3);

    abstract CompletableFuture<Map<Integer, Assignments>> pendingAssignments(MetaStorageManager var1, int var2, int[] var3);

    abstract T replicationGroupId(int var1, int var2);

    abstract CompletableFuture<Integer> invoke(T var1, long var2, HybridTimestamp var4, MetaStorageManager var5, long var6, AssignmentsQueue var8, boolean var9, Set<Assignment> var10);

    static Iif executeInvoke(byte[] timestampBytes, byte[] pendingAssignmentsBytes, byte @Nullable [] plannedAssignmentsBytes, ByteArray pendingChangeTriggerKey, ByteArray partAssignmentsPendingKey, ByteArray partAssignmentsPlannedKey) {
        return Statements.iif(Conditions.notExists(pendingChangeTriggerKey).or(Conditions.value(pendingChangeTriggerKey).lt(timestampBytes)), Operations.ops(Operations.put(pendingChangeTriggerKey, timestampBytes), Operations.put(partAssignmentsPendingKey, pendingAssignmentsBytes), plannedAssignmentsBytes == null ? Operations.remove(partAssignmentsPlannedKey) : Operations.put(partAssignmentsPlannedKey, plannedAssignmentsBytes)).yield(RebalanceUtil.UpdateStatus.PENDING_KEY_UPDATED.ordinal()), Operations.ops(new Operation[0]).yield(RebalanceUtil.UpdateStatus.OUTDATED_UPDATE_RECEIVED.ordinal()));
    }

    private static class ZoneGroupUpdateRequestHandler
    extends GroupUpdateRequestHandler<ZonePartitionId> {
        ZoneGroupUpdateRequestHandler(GroupUpdateRequest request) {
            super(request);
        }

        @Override
        CompletableFuture<Map<ZonePartitionId, LocalPartitionStateMessageByNode>> localStatesFuture(DisasterRecoveryManager disasterRecoveryManager, Set<String> zoneNames, Set<Integer> partitionIds, Catalog catalog) {
            return disasterRecoveryManager.localPartitionStatesInternal(zoneNames, Collections.emptySet(), partitionIds, catalog, DisasterRecoveryManager.zoneState());
        }

        @Override
        CompletableFuture<Map<Integer, Assignments>> stableAssignments(MetaStorageManager metaStorageManager, int zoneId, int[] partitionIds) {
            return ZoneRebalanceUtil.zoneStableAssignments(metaStorageManager, zoneId, partitionIds);
        }

        @Override
        CompletableFuture<Map<Integer, Assignments>> pendingAssignments(MetaStorageManager metaStorageManager, int zoneId, int[] partitionIds) {
            return ZoneRebalanceUtil.zonePendingAssignments(metaStorageManager, zoneId, partitionIds);
        }

        @Override
        ZonePartitionId replicationGroupId(int id, int partitionId) {
            return new ZonePartitionId(id, partitionId);
        }

        @Override
        CompletableFuture<Integer> invoke(ZonePartitionId partId, long revision, HybridTimestamp timestamp, MetaStorageManager metaStorageMgr, long assignmentsTimestamp, AssignmentsQueue assignmentsQueue, boolean isProposedPendingEqualsProposedPlanned, Set<Assignment> partAssignments) {
            Iif invokeClosure = ZoneGroupUpdateRequestHandler.executeInvoke(ByteUtils.longToBytesKeepingOrder(timestamp.longValue()), assignmentsQueue.toBytes(), isProposedPendingEqualsProposedPlanned ? null : Assignments.toBytes(partAssignments, assignmentsTimestamp, true), ZoneRebalanceUtil.pendingChangeTriggerKey(partId), ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(partId), ZoneRebalanceUtil.plannedPartAssignmentsKey(partId));
            return metaStorageMgr.invoke(invokeClosure).thenApply(sr -> {
                switch (RebalanceUtil.UpdateStatus.valueOf(sr.getAsInt())) {
                    case PENDING_KEY_UPDATED: {
                        LOG.info("Force update metastore pending partitions key [key={}, partition={}, zone={}, newVal={}]", ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(partId).toString(), partId.partitionId(), partId.zoneId(), assignmentsQueue);
                        break;
                    }
                    case OUTDATED_UPDATE_RECEIVED: {
                        LOG.info("Received outdated force rebalance trigger event [revision={}, partition={}, zone={}]", revision, partId.partitionId(), partId.zoneId());
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unknown return code for rebalance metastore multi-invoke");
                    }
                }
                return sr.getAsInt();
            });
        }
    }

    static class TableGroupUpdateRequestHandler
    extends GroupUpdateRequestHandler<TablePartitionId> {
        TableGroupUpdateRequestHandler(GroupUpdateRequest request) {
            super(request);
        }

        @Override
        CompletableFuture<Map<TablePartitionId, LocalPartitionStateMessageByNode>> localStatesFuture(DisasterRecoveryManager disasterRecoveryManager, Set<String> zoneNames, Set<Integer> partitionIds, Catalog catalog) {
            return disasterRecoveryManager.localPartitionStatesInternal(zoneNames, Collections.emptySet(), partitionIds, catalog, DisasterRecoveryManager.tableState());
        }

        @Override
        CompletableFuture<Map<Integer, Assignments>> stableAssignments(MetaStorageManager metaStorageManager, int tableId, int[] partitionIds) {
            return RebalanceUtil.tableStableAssignments(metaStorageManager, tableId, partitionIds);
        }

        @Override
        CompletableFuture<Map<Integer, Assignments>> pendingAssignments(MetaStorageManager metaStorageManager, int tableId, int[] partitionIds) {
            return RebalanceUtil.tablePendingAssignments(metaStorageManager, tableId, partitionIds);
        }

        @Override
        TablePartitionId replicationGroupId(int id, int partitionId) {
            return new TablePartitionId(id, partitionId);
        }

        @Override
        CompletableFuture<Integer> invoke(TablePartitionId partId, long revision, HybridTimestamp timestamp, MetaStorageManager metaStorageMgr, long assignmentsTimestamp, AssignmentsQueue assignmentsQueue, boolean isProposedPendingEqualsProposedPlanned, Set<Assignment> partAssignments) {
            Iif invokeClosure = TableGroupUpdateRequestHandler.executeInvoke(ByteUtils.longToBytesKeepingOrder(timestamp.longValue()), assignmentsQueue.toBytes(), isProposedPendingEqualsProposedPlanned ? null : Assignments.toBytes(partAssignments, assignmentsTimestamp, true), RebalanceUtil.pendingChangeTriggerKey(partId), RebalanceUtil.pendingPartAssignmentsQueueKey(partId), RebalanceUtil.plannedPartAssignmentsKey(partId));
            return metaStorageMgr.invoke(invokeClosure).thenApply(sr -> {
                switch (RebalanceUtil.UpdateStatus.valueOf(sr.getAsInt())) {
                    case PENDING_KEY_UPDATED: {
                        LOG.info("Force update metastore pending partitions key [key={}, partition={}, table={}, newVal={}]", RebalanceUtil.pendingPartAssignmentsQueueKey(partId).toString(), partId.partitionId(), partId.tableId(), assignmentsQueue);
                        break;
                    }
                    case OUTDATED_UPDATE_RECEIVED: {
                        LOG.info("Received outdated force rebalance trigger event [revision={}, partition={}, table={}]", revision, partId.partitionId(), partId.tableId());
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unknown return code for rebalance metastore multi-invoke");
                    }
                }
                return sr.getAsInt();
            });
        }
    }
}

