/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.placementdriver;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.components.NodeProperties;
import org.apache.ignite3.internal.distributionzones.exception.EmptyDataNodesException;
import org.apache.ignite3.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite3.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.EntryEvent;
import org.apache.ignite3.internal.metastorage.MetaStorageManager;
import org.apache.ignite3.internal.metastorage.Revisions;
import org.apache.ignite3.internal.metastorage.WatchEvent;
import org.apache.ignite3.internal.metastorage.WatchListener;
import org.apache.ignite3.internal.partitiondistribution.Assignment;
import org.apache.ignite3.internal.partitiondistribution.Assignments;
import org.apache.ignite3.internal.partitiondistribution.AssignmentsQueue;
import org.apache.ignite3.internal.partitiondistribution.TokenizedAssignments;
import org.apache.ignite3.internal.partitiondistribution.TokenizedAssignmentsImpl;
import org.apache.ignite3.internal.placementdriver.AssignmentsPlacementDriver;
import org.apache.ignite3.internal.placementdriver.EmptyAssignmentsException;
import org.apache.ignite3.internal.placementdriver.Utils;
import org.apache.ignite3.internal.replicator.ReplicationGroupId;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.Cursor;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.FastTimestamps;
import org.apache.ignite3.internal.util.IgniteBusyLock;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;

public class AssignmentsTracker
implements AssignmentsPlacementDriver {
    private static final IgniteLogger LOG = Loggers.forClass(AssignmentsTracker.class);
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final MetaStorageManager msManager;
    private final FailureProcessor failureProcessor;
    private final NodeProperties nodeProperties;
    private final Map<ReplicationGroupId, TokenizedAssignments> groupStableAssignments;
    private final WatchListener stableAssignmentsListener;
    private final WatchListener zoneStableAssignmentsListener;
    private final Map<ReplicationGroupId, TokenizedAssignments> groupPendingAssignments;
    private final WatchListener pendingAssignmentsListener;
    private final Map<ReplicationGroupId, CompletableFuture<TokenizedAssignments>> nonEmptyAssignmentsFutures = new ConcurrentHashMap<ReplicationGroupId, CompletableFuture<TokenizedAssignments>>();
    private final Function<Integer, CompletableFuture<Set<String>>> currentDataNodesProvider;
    private final Function<Integer, Integer> zoneIdByTableIdResolver;
    private final WatchListener zonePendingAssignmentsListener;

    public AssignmentsTracker(MetaStorageManager msManager, FailureProcessor failureProcessor, NodeProperties nodeProperties, Function<Integer, CompletableFuture<Set<String>>> currentDataNodesProvider, Function<Integer, Integer> zoneIdByTableIdResolver) {
        this.msManager = msManager;
        this.failureProcessor = failureProcessor;
        this.nodeProperties = nodeProperties;
        this.groupStableAssignments = new ConcurrentHashMap<ReplicationGroupId, TokenizedAssignments>();
        this.stableAssignmentsListener = this.createStableAssignmentsListener();
        this.groupPendingAssignments = new ConcurrentHashMap<ReplicationGroupId, TokenizedAssignments>();
        this.pendingAssignmentsListener = this.createPendingAssignmentsListener();
        this.currentDataNodesProvider = currentDataNodesProvider;
        this.zoneIdByTableIdResolver = zoneIdByTableIdResolver;
        this.zoneStableAssignmentsListener = this.createZoneStableAssignmentsListener();
        this.zonePendingAssignmentsListener = this.createZonePendingAssignmentsListener();
    }

    public void startTrack() {
        this.msManager.registerPrefixWatch(new ByteArray(this.pendingAssignmentsQueuePrefixBytes()), this.pendingAssignmentsListener);
        this.msManager.registerPrefixWatch(new ByteArray(this.stableAssignmentsPrefixBytes()), this.stableAssignmentsListener);
        if (!this.nodeProperties.colocationEnabled()) {
            this.msManager.registerPrefixWatch(new ByteArray(ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES), this.zonePendingAssignmentsListener);
            this.msManager.registerPrefixWatch(new ByteArray(ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES), this.zoneStableAssignmentsListener);
        }
        ((CompletableFuture)this.msManager.recoveryFinishedFuture().thenAccept(recoveryRevisions -> {
            this.handleRecoveryAssignments((Revisions)recoveryRevisions, this.pendingAssignmentsQueuePrefixBytes(), this.groupPendingAssignments, bytes -> AssignmentsQueue.fromBytes(bytes).poll().nodes(), false);
            this.handleRecoveryAssignments((Revisions)recoveryRevisions, this.stableAssignmentsPrefixBytes(), this.groupStableAssignments, bytes -> Assignments.fromBytes(bytes).nodes(), true);
            if (!this.nodeProperties.colocationEnabled()) {
                this.handleZoneRecoveryAssignments((Revisions)recoveryRevisions, ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES, this.groupPendingAssignments, bytes -> AssignmentsQueue.fromBytes(bytes).poll().nodes(), false);
                this.handleZoneRecoveryAssignments((Revisions)recoveryRevisions, ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES, this.groupStableAssignments, bytes -> Assignments.fromBytes(bytes).nodes(), true);
            }
        })).whenComplete((res, ex) -> {
            if (ex != null) {
                this.failureProcessor.process(new FailureContext((Throwable)ex, "Failed to start assignment tracker due to recovery error"));
            } else if (LOG.isInfoEnabled()) {
                LOG.info("Assignment cache initialized for placement driver [stableAssignments=[{}], pendingAssignments=[{}]]", AssignmentsTracker.prepareAssignmentsForLogging(this.groupStableAssignments), AssignmentsTracker.prepareAssignmentsForLogging(this.groupPendingAssignments));
            }
        });
    }

    public void stopTrack() {
        this.msManager.unregisterWatch(this.pendingAssignmentsListener);
        this.msManager.unregisterWatch(this.stableAssignmentsListener);
        if (!this.nodeProperties.colocationEnabled()) {
            this.msManager.unregisterWatch(this.zonePendingAssignmentsListener);
            this.msManager.unregisterWatch(this.zoneStableAssignmentsListener);
        }
    }

    @Override
    public CompletableFuture<List<TokenizedAssignments>> getAssignments(List<? extends ReplicationGroupId> replicationGroupIds, HybridTimestamp clusterTimeToAwait) {
        return this.msManager.clusterTime().waitFor(clusterTimeToAwait).thenApply(ignored -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            Map<ReplicationGroupId, TokenizedAssignments> assignments = this.stableAssignments();
            return replicationGroupIds.stream().map(assignments::get).collect(Collectors.toList());
        }));
    }

    @Override
    public CompletableFuture<List<TokenizedAssignments>> awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds, HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
        return ((CompletableFuture)this.msManager.clusterTime().waitFor(clusterTimeToAwait).thenCompose(ignored -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, () -> {
            long now = FastTimestamps.coarseCurrentTimeMillis();
            return this.awaitNonEmptyAssignmentsWithCheckMostRecent(replicationGroupIds, now, timeoutMillis);
        }))).thenApply(Function.identity());
    }

    private CompletableFuture<List<TokenizedAssignments>> awaitNonEmptyAssignmentsWithCheckMostRecent(List<? extends ReplicationGroupId> replicationGroupIds, long startTime, long timeoutMillis) {
        Map<ReplicationGroupId, TokenizedAssignments> assignmentsMap = this.stableAssignments();
        HashMap<Integer, CompletableFuture<TokenizedAssignments>> futures = new HashMap<Integer, CompletableFuture<TokenizedAssignments>>();
        ArrayList<TokenizedAssignments> result = new ArrayList<TokenizedAssignments>(replicationGroupIds.size());
        for (int i = 0; i < replicationGroupIds.size(); ++i) {
            ReplicationGroupId groupId = replicationGroupIds.get(i);
            TokenizedAssignments a = assignmentsMap.get(groupId);
            if (a == null || a.nodes().isEmpty()) {
                if (timeoutMillis > 0L) {
                    futures.put(i, this.nonEmptyAssignmentFuture(groupId, timeoutMillis));
                    continue;
                }
                futures.put(i, CompletableFuture.failedFuture(new TimeoutException()));
                continue;
            }
            result.add(a);
        }
        if (futures.isEmpty()) {
            return CompletableFuture.completedFuture(result);
        }
        return ((CompletableFuture)CompletableFutures.allOf(futures.values()).handle((unused, e) -> {
            CompletableFuture<Object> r;
            Throwable cause = ExceptionUtils.unwrapCompletionThrowable(e);
            if (cause == null) {
                long now = System.currentTimeMillis();
                long newTimeoutMillis = timeoutMillis - (now - startTime);
                r = this.awaitNonEmptyAssignmentsWithCheckMostRecent(replicationGroupIds, startTime, newTimeoutMillis);
            } else {
                r = cause instanceof EmptyAssignmentsException ? this.checkEmptyAssignmentsReason((EmptyAssignmentsException)cause) : CompletableFuture.failedFuture(cause);
            }
            return r;
        })).thenCompose(Function.identity());
    }

    private CompletableFuture<List<TokenizedAssignments>> checkEmptyAssignmentsReason(EmptyAssignmentsException ex) {
        Integer zoneId = Utils.extractZoneIdFromGroupId(ex.groupId(), this.nodeProperties.colocationEnabled(), this.zoneIdByTableIdResolver);
        if (zoneId == null) {
            return CompletableFuture.failedFuture(ex);
        }
        return this.currentDataNodesProvider.apply(zoneId).thenApply(dataNodes -> {
            if (dataNodes.isEmpty()) {
                throw new EmptyAssignmentsException(ex.groupId(), (Throwable)new EmptyDataNodesException(zoneId));
            }
            ExceptionUtils.sneakyThrow(ex);
            return null;
        });
    }

    private CompletableFuture<TokenizedAssignments> nonEmptyAssignmentFuture(ReplicationGroupId groupId, long futureTimeoutMillis) {
        CompletableFuture result = this.nonEmptyAssignmentsFutures.computeIfAbsent(groupId, k -> new CompletableFuture().orTimeout(futureTimeoutMillis, TimeUnit.MILLISECONDS).handle((v, e) -> {
            if (e instanceof TimeoutException) {
                throw new EmptyAssignmentsException(groupId, (Throwable)e);
            }
            if (e != null) {
                ExceptionUtils.sneakyThrow(e);
                return null;
            }
            return v;
        }));
        TokenizedAssignments assignments = this.groupStableAssignments.get(groupId);
        if (assignments != null && !assignments.nodes().isEmpty()) {
            this.nonEmptyAssignmentsFutures.remove(groupId, result);
            result.complete(assignments);
        }
        return result;
    }

    public Map<ReplicationGroupId, TokenizedAssignments> stableAssignments() {
        return this.groupStableAssignments;
    }

    Map<ReplicationGroupId, TokenizedAssignments> pendingAssignments() {
        return this.groupPendingAssignments;
    }

    private WatchListener createStableAssignmentsListener() {
        return event -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Stable assignments update [revision={}, keys={}]", event.revision(), AssignmentsTracker.collectKeysFromEventAsString(event));
            }
            this.handleReceivedAssignments(event, this.stableAssignmentsPrefixBytes(), this.groupStableAssignments, bytes -> Assignments.fromBytes(bytes).nodes(), true);
            return CompletableFutures.nullCompletedFuture();
        };
    }

    private WatchListener createZoneStableAssignmentsListener() {
        return event -> {
            if (LOG.isInfoEnabled()) {
                LOG.info("Stable assignments update [revision={}, keys={}]", event.revision(), AssignmentsTracker.collectKeysFromEventAsString(event));
            }
            this.handleZoneReceivedAssignments(event, ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES, this.groupStableAssignments, true);
            return CompletableFutures.nullCompletedFuture();
        };
    }

    private WatchListener createPendingAssignmentsListener() {
        return event -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Pending assignments update [revision={}, keys={}]", event.revision(), AssignmentsTracker.collectKeysFromEventAsString(event));
            }
            this.handleReceivedAssignments(event, this.pendingAssignmentsQueuePrefixBytes(), this.groupPendingAssignments, bytes -> AssignmentsQueue.fromBytes(bytes).poll().nodes(), false);
            return CompletableFutures.nullCompletedFuture();
        };
    }

    private WatchListener createZonePendingAssignmentsListener() {
        return event -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Pending assignments update [revision={}, keys={}]", event.revision(), AssignmentsTracker.collectKeysFromEventAsString(event));
            }
            this.handleZoneReceivedAssignments(event, ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES, this.groupPendingAssignments, false);
            return CompletableFutures.nullCompletedFuture();
        };
    }

    private void handleReceivedAssignments(WatchEvent event, byte[] assignmentsMetastoreKeyPrefix, Map<ReplicationGroupId, TokenizedAssignments> groupIdToAssignmentsMap, Function<byte[], Set<Assignment>> deserializer, boolean isStable) {
        for (EntryEvent evt : event.entryEvents()) {
            Entry entry = evt.newEntry();
            ReplicationGroupId grpId = this.extractReplicationGroupPartitionId(entry.key(), assignmentsMetastoreKeyPrefix);
            if (entry.tombstone()) {
                groupIdToAssignmentsMap.remove(grpId);
                this.completeNonEmptyAssignmentsFutureIfExists(grpId, null);
                continue;
            }
            this.updateGroupAssignments(groupIdToAssignmentsMap, grpId, entry, deserializer, isStable);
        }
    }

    private void handleZoneReceivedAssignments(WatchEvent event, byte[] assignmentsMetastoreKeyPrefix, Map<ReplicationGroupId, TokenizedAssignments> groupIdToAssignmentsMap, boolean isStable) {
        for (EntryEvent evt : event.entryEvents()) {
            Entry entry = evt.newEntry();
            ZonePartitionId grpId = ZoneRebalanceUtil.extractZonePartitionId(entry.key(), assignmentsMetastoreKeyPrefix);
            if (entry.tombstone()) {
                groupIdToAssignmentsMap.remove(grpId);
                continue;
            }
            this.updateGroupAssignments(groupIdToAssignmentsMap, grpId, entry, bytes -> Assignments.fromBytes(bytes).nodes(), isStable);
        }
    }

    private void handleRecoveryAssignments(Revisions recoveryRevisions, byte[] assignmentsMetastoreKeyPrefix, Map<ReplicationGroupId, TokenizedAssignments> groupIdToAssignmentsMap, Function<byte[], Set<Assignment>> deserializer, boolean isStable) {
        ByteArray prefix = new ByteArray(assignmentsMetastoreKeyPrefix);
        long revision = recoveryRevisions.revision();
        try (Cursor<Entry> cursor = this.msManager.prefixLocally(prefix, revision);){
            for (Entry entry : cursor) {
                if (entry.tombstone()) continue;
                ReplicationGroupId grpId = this.extractReplicationGroupPartitionId(entry.key(), assignmentsMetastoreKeyPrefix);
                this.updateGroupAssignments(groupIdToAssignmentsMap, grpId, entry, deserializer, isStable);
            }
        }
    }

    private void handleZoneRecoveryAssignments(Revisions recoveryRevisions, byte[] assignmentsMetastoreKeyPrefix, Map<ReplicationGroupId, TokenizedAssignments> groupIdToAssignmentsMap, Function<byte[], Set<Assignment>> deserializer, boolean isStable) {
        ByteArray prefix = new ByteArray(assignmentsMetastoreKeyPrefix);
        long revision = recoveryRevisions.revision();
        try (Cursor<Entry> cursor = this.msManager.prefixLocally(prefix, revision);){
            for (Entry entry : cursor) {
                if (entry.tombstone()) continue;
                ReplicationGroupId grpId = AssignmentsTracker.extractZoneReplicationGroupPartitionId(entry.key(), assignmentsMetastoreKeyPrefix);
                this.updateGroupAssignments(groupIdToAssignmentsMap, grpId, entry, deserializer, isStable);
            }
        }
    }

    private void updateGroupAssignments(Map<ReplicationGroupId, TokenizedAssignments> groupIdToAssignmentsMap, ReplicationGroupId grpId, Entry entry, Function<byte[], Set<Assignment>> deserializer, boolean isStable) {
        byte[] value = entry.value();
        assert (value != null);
        Set<Assignment> assignmentNodes = deserializer.apply(value);
        TokenizedAssignmentsImpl assignments = new TokenizedAssignmentsImpl(assignmentNodes, entry.revision());
        groupIdToAssignmentsMap.put(grpId, assignments);
        if (isStable && !assignments.nodes().isEmpty()) {
            this.completeNonEmptyAssignmentsFutureIfExists(grpId, assignments);
        }
    }

    private void completeNonEmptyAssignmentsFutureIfExists(ReplicationGroupId grpId, @Nullable TokenizedAssignments assignments) {
        CompletableFuture<TokenizedAssignments> fut = this.nonEmptyAssignmentsFutures.remove(grpId);
        if (fut != null) {
            fut.complete(assignments);
        }
    }

    private static String collectKeysFromEventAsString(WatchEvent event) {
        return event.entryEvents().stream().map(e -> new ByteArray(e.newEntry().key()).toString()).collect(Collectors.joining(","));
    }

    private static String prepareAssignmentsForLogging(Map<ReplicationGroupId, TokenizedAssignments> assignmentsMap) {
        class NodeAssignments {
            private List<ReplicationGroupId> peers;
            private List<ReplicationGroupId> learners;

            private NodeAssignments() {
            }

            private void addReplicationGroupId(ReplicationGroupId replicationGroupId, boolean isPeer) {
                List<ReplicationGroupId> peersOrLearners;
                if (isPeer) {
                    if (this.peers == null) {
                        this.peers = new ArrayList<ReplicationGroupId>();
                    }
                    peersOrLearners = this.peers;
                } else {
                    if (this.learners == null) {
                        this.learners = new ArrayList<ReplicationGroupId>();
                    }
                    peersOrLearners = this.learners;
                }
                peersOrLearners.add(replicationGroupId);
            }

            private boolean arePeersEmpty() {
                return this.peers == null || this.peers.isEmpty();
            }

            private boolean areLearnersEmpty() {
                return this.learners == null || this.learners.isEmpty();
            }
        }
        HashMap<String, NodeAssignments> assignmentsToLog = new HashMap<String, NodeAssignments>();
        for (Map.Entry<ReplicationGroupId, TokenizedAssignments> assignments : assignmentsMap.entrySet()) {
            for (Assignment assignment : assignments.getValue().nodes()) {
                assignmentsToLog.computeIfAbsent(assignment.consistentId(), k -> new NodeAssignments()).addReplicationGroupId(assignments.getKey(), assignment.isPeer());
            }
        }
        boolean first = true;
        StringBuilder sb = new StringBuilder();
        for (Map.Entry entry : assignmentsToLog.entrySet()) {
            NodeAssignments value = (NodeAssignments)entry.getValue();
            if (value.arePeersEmpty() && value.areLearnersEmpty()) continue;
            if (first) {
                first = false;
            } else {
                sb.append(", ");
            }
            sb.append((String)entry.getKey()).append("=[");
            if (!value.arePeersEmpty()) {
                sb.append("peers=").append(value.peers);
                if (!value.areLearnersEmpty()) {
                    sb.append(", ");
                }
            }
            if (!value.areLearnersEmpty()) {
                sb.append("learners=").append(value.learners);
            }
            sb.append(']');
        }
        return sb.toString();
    }

    private byte[] pendingAssignmentsQueuePrefixBytes() {
        return this.nodeProperties.colocationEnabled() ? ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES : RebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES;
    }

    private byte[] stableAssignmentsPrefixBytes() {
        return this.nodeProperties.colocationEnabled() ? ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES : RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
    }

    private ReplicationGroupId extractReplicationGroupPartitionId(byte[] key, byte[] prefix) {
        return this.nodeProperties.colocationEnabled() ? ZoneRebalanceUtil.extractZonePartitionId(key, prefix) : RebalanceUtil.extractTablePartitionId(key, prefix);
    }

    private static ReplicationGroupId extractZoneReplicationGroupPartitionId(byte[] key, byte[] prefix) {
        return ZoneRebalanceUtil.extractZonePartitionId(key, prefix);
    }
}

