/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.sql.engine.exec.mapping;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntObjectPair;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongListIterator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite3.internal.components.NodeProperties;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.network.ClusterNodeImpl;
import org.apache.ignite3.internal.partitiondistribution.Assignment;
import org.apache.ignite3.internal.partitiondistribution.TokenizedAssignments;
import org.apache.ignite3.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite3.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite3.internal.sql.engine.exec.mapping.ExecutionDistributionProvider;
import org.apache.ignite3.internal.sql.engine.exec.mapping.ExecutionTarget;
import org.apache.ignite3.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
import org.apache.ignite3.internal.sql.engine.exec.mapping.FragmentMapper;
import org.apache.ignite3.internal.sql.engine.exec.mapping.FragmentMapping;
import org.apache.ignite3.internal.sql.engine.exec.mapping.IdGenerator;
import org.apache.ignite3.internal.sql.engine.exec.mapping.MappedFragment;
import org.apache.ignite3.internal.sql.engine.exec.mapping.MappingContext;
import org.apache.ignite3.internal.sql.engine.exec.mapping.MappingParameters;
import org.apache.ignite3.internal.sql.engine.exec.mapping.MappingService;
import org.apache.ignite3.internal.sql.engine.exec.mapping.QuerySplitter;
import org.apache.ignite3.internal.sql.engine.prepare.Fragment;
import org.apache.ignite3.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite3.internal.sql.engine.prepare.PlanId;
import org.apache.ignite3.internal.sql.engine.prepare.pruning.PartitionPruner;
import org.apache.ignite3.internal.sql.engine.prepare.pruning.PartitionPruningMetadata;
import org.apache.ignite3.internal.sql.engine.rel.IgniteReceiver;
import org.apache.ignite3.internal.sql.engine.rel.IgniteSender;
import org.apache.ignite3.internal.sql.engine.schema.IgniteDataSource;
import org.apache.ignite3.internal.sql.engine.schema.IgniteSystemView;
import org.apache.ignite3.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite3.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite3.internal.sql.engine.util.Commons;
import org.apache.ignite3.internal.sql.engine.util.cache.Cache;
import org.apache.ignite3.internal.sql.engine.util.cache.CacheFactory;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.sql.SqlException;
import org.jetbrains.annotations.Nullable;

public class MappingServiceImpl
implements MappingService,
LogicalTopologyEventListener {
    private final LogicalTopologyHolder topologyHolder = new LogicalTopologyHolder();
    private final CompletableFuture<Void> initialTopologyFuture = new CompletableFuture();
    private final String localNodeName;
    private final ClockService clock;
    private final Cache<PlanId, FragmentsTemplate> templatesCache;
    private final PartitionPruner partitionPruner;
    private final ExecutionDistributionProvider distributionProvider;
    private final Executor taskExecutor;
    private final NodeProperties nodeProperties;

    public MappingServiceImpl(String localNodeName, ClockService clock, CacheFactory cacheFactory, int cacheSize, PartitionPruner partitionPruner, ExecutionDistributionProvider distributionProvider, NodeProperties nodeProperties, Executor taskExecutor) {
        this.localNodeName = localNodeName;
        this.clock = clock;
        this.templatesCache = cacheFactory.create(cacheSize);
        this.partitionPruner = partitionPruner;
        this.distributionProvider = distributionProvider;
        this.nodeProperties = nodeProperties;
        this.taskExecutor = taskExecutor;
    }

    public CompletableFuture<Boolean> onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
        assert (parameters != null);
        return CompletableFutures.falseCompletedFuture();
    }

    @Override
    public CompletableFuture<List<MappedFragment>> map(MultiStepPlan multiStepPlan, MappingParameters parameters) {
        if (this.initialTopologyFuture.isDone()) {
            return this.map0(multiStepPlan, parameters);
        }
        return this.initialTopologyFuture.thenComposeAsync(ignore -> this.map0(multiStepPlan, parameters), this.taskExecutor);
    }

    private CompletableFuture<List<MappedFragment>> map0(MultiStepPlan multiStepPlan, MappingParameters parameters) {
        FragmentsTemplate template = this.getOrCreateTemplate(multiStepPlan);
        boolean mapOnBackups = parameters.mapOnBackups();
        PartitionPruningMetadata partitionPruningMetadata = multiStepPlan.partitionPruningMetadata();
        LogicalTopologyHolder.TopologySnapshot topologySnapshot = this.topologyHolder.topology();
        CompletableFuture<MappedFragments> mappedFragments = this.mapFragments(template, mapOnBackups, MappingServiceImpl.composeNodeExclusionFilter(topologySnapshot, parameters));
        return mappedFragments.thenApply(frags -> this.applyPartitionPruning(frags.fragments, parameters, partitionPruningMetadata));
    }

    private MappingsCacheValue computeMappingCacheKey(MappingsCacheValue val, LogicalTopologyHolder.TopologySnapshot topologySnapshot, MappingParameters parameters, FragmentsTemplate template, boolean mapOnBackups) {
        if (val == null) {
            IntOpenHashSet tableOrZoneIds = new IntOpenHashSet();
            boolean topologyAware = false;
            for (Fragment fragment : template.fragments) {
                topologyAware = topologyAware || !fragment.systemViews().isEmpty();
                for (IgniteTable source : fragment.tables().values()) {
                    if (this.nodeProperties.colocationEnabled()) {
                        int zoneId = source.useSecondaryStorage() ? source.secondaryZoneId().intValue() : source.zoneId();
                        tableOrZoneIds.add(zoneId);
                        continue;
                    }
                    tableOrZoneIds.add(source.id());
                }
            }
            long topVer = topologyAware ? topologySnapshot.version() : Long.MAX_VALUE;
            return new MappingsCacheValue(topVer, (IntSet)tableOrZoneIds, this.mapFragments(template, mapOnBackups, MappingServiceImpl.composeNodeExclusionFilter(topologySnapshot, parameters)));
        }
        long topologyVer = topologySnapshot.version();
        if (val.topologyVersion < topologyVer) {
            return new MappingsCacheValue(topologyVer, val.tableOrZoneIds, this.mapFragments(template, mapOnBackups, MappingServiceImpl.composeNodeExclusionFilter(topologySnapshot, parameters)));
        }
        return val;
    }

    CompletableFuture<DistributionHolder> composeDistributions(Set<IgniteSystemView> views, Set<IgniteTable> tables, boolean mapOnBackups) {
        if (tables.isEmpty() && views.isEmpty()) {
            DistributionHolder holder = new DistributionHolder(Set.of(this.localNodeName), (Int2ObjectMap<List<TokenizedAssignments>>)Int2ObjectMaps.emptyMap(), (Int2ObjectMap<List<String>>)Int2ObjectMaps.emptyMap());
            return CompletableFuture.completedFuture(holder);
        }
        Int2ObjectOpenHashMap tablesAssignments = new Int2ObjectOpenHashMap(tables.size());
        HashSet<String> allNodes = new HashSet<String>();
        allNodes.add(this.localNodeName);
        for (IgniteTable tbl : tables) {
            CompletableFuture<List<TokenizedAssignments>> assignments = this.distributionProvider.forTable(this.clock.now(), tbl, mapOnBackups);
            tablesAssignments.put(tbl.id(), assignments);
        }
        return ((CompletableFuture)CompletableFuture.allOf((CompletableFuture[])tablesAssignments.values().toArray((Object[])new CompletableFuture[0])).thenApply(arg_0 -> MappingServiceImpl.lambda$composeDistributions$4(tables, (Int2ObjectMap)tablesAssignments, allNodes, arg_0))).thenApply(assignmentsPerTable -> {
            Int2ObjectMap<List> nodesPerView = views.stream().collect(CollectionUtils.toIntMapCollector(IgniteDataSource::id, this.distributionProvider::forSystemView));
            nodesPerView.values().stream().flatMap(Collection::stream).forEach(allNodes::add);
            return new DistributionHolder(allNodes, (Int2ObjectMap<List<TokenizedAssignments>>)assignmentsPerTable, nodesPerView);
        });
    }

    private CompletableFuture<MappedFragments> mapFragments(FragmentsTemplate template, boolean mapOnBackups, Predicate<String> nodeExclusionFilter) {
        Set<IgniteSystemView> views = template.fragments.stream().flatMap(fragment -> fragment.systemViews().stream()).collect(Collectors.toSet());
        Set<IgniteTable> tables = template.fragments.stream().flatMap(fragment -> fragment.tables().values().stream()).collect(Collectors.toSet());
        CompletableFuture<DistributionHolder> res = this.composeDistributions(views, tables, mapOnBackups);
        return res.thenApply(assignments -> {
            Int2ObjectOpenHashMap targetsById = new Int2ObjectOpenHashMap();
            MappingContext context = new MappingContext(this.localNodeName, assignments.nodes(nodeExclusionFilter), template.cluster);
            ExecutionTargetFactory targetFactory = context.targetFactory();
            List<IntObjectPair<ExecutionTarget>> allTargets = MappingServiceImpl.prepareTargets(template, assignments, targetFactory);
            for (IntObjectPair<ExecutionTarget> pair : allTargets) {
                targetsById.put(pair.firstInt(), (Object)((ExecutionTarget)pair.second()));
            }
            FragmentMapper mapper = new FragmentMapper(context.cluster().getMetadataQuery(), context, (Int2ObjectMap<ExecutionTarget>)targetsById);
            IdGenerator idGenerator = new IdGenerator(template.nextId);
            ArrayList<Fragment> fragments = new ArrayList<Fragment>(template.fragments);
            List<FragmentMapping> mappings = mapper.map(fragments, idGenerator);
            Long2ObjectOpenHashMap groupsBySourceId = new Long2ObjectOpenHashMap();
            Long2ObjectOpenHashMap allSourcesByExchangeId = new Long2ObjectOpenHashMap();
            for (FragmentMapping mapping : mappings) {
                Fragment fragment = mapping.fragment();
                for (ColocationGroup group : mapping.groups()) {
                    LongListIterator longListIterator = group.sourceIds().iterator();
                    while (longListIterator.hasNext()) {
                        long sourceId = (Long)longListIterator.next();
                        groupsBySourceId.put(sourceId, (Object)group);
                    }
                }
                if (fragment.rootFragment()) continue;
                IgniteSender sender = (IgniteSender)fragment.root();
                List nodeNames = mapping.groups().stream().flatMap(g -> g.nodeNames().stream()).distinct().collect(Collectors.toList());
                allSourcesByExchangeId.put(sender.exchangeId(), nodeNames);
            }
            ArrayList<MappedFragment> mappedFragmentsList = new ArrayList<MappedFragment>(mappings.size());
            HashSet<String> targetNodes = new HashSet<String>();
            for (FragmentMapping mapping : mappings) {
                Fragment fragment = mapping.fragment();
                ColocationGroup targetGroup = null;
                if (!fragment.rootFragment()) {
                    IgniteSender sender = (IgniteSender)fragment.root();
                    targetGroup = (ColocationGroup)groupsBySourceId.get(sender.exchangeId());
                }
                Long2ObjectOpenHashMap sourcesByExchangeId = null;
                for (IgniteReceiver receiver : fragment.remotes()) {
                    if (sourcesByExchangeId == null) {
                        sourcesByExchangeId = new Long2ObjectOpenHashMap();
                    }
                    long exchangeId = receiver.exchangeId();
                    sourcesByExchangeId.put(exchangeId, (Object)((List)allSourcesByExchangeId.get(exchangeId)));
                }
                MappedFragment mappedFragment = new MappedFragment(fragment, mapping.groups(), (Long2ObjectMap<List<String>>)sourcesByExchangeId, targetGroup, null);
                mappedFragmentsList.add(mappedFragment);
                targetNodes.addAll(mappedFragment.nodes());
            }
            return new MappedFragments(mappedFragmentsList, targetNodes);
        });
    }

    private static List<IntObjectPair<ExecutionTarget>> prepareTargets(FragmentsTemplate template, DistributionHolder distr, ExecutionTargetFactory targetFactory) {
        Stream tableTargets = template.fragments.stream().flatMap(fragment -> fragment.tables().values().stream().map(table -> IntObjectPair.of((int)table.id(), (Object)targetFactory.partitioned(distr.tableAssignments(table.id())))));
        Stream viewTargets = template.fragments.stream().flatMap(fragment -> fragment.systemViews().stream().map(view -> IntObjectPair.of((int)view.id(), (Object)MappingServiceImpl.buildTargetForSystemView(targetFactory, view, distr.viewNodes(view.id())))));
        return Stream.concat(tableTargets, viewTargets).collect(Collectors.toList());
    }

    private static ExecutionTarget buildTargetForSystemView(ExecutionTargetFactory factory, IgniteSystemView view, List<String> nodes) {
        if (CollectionUtils.nullOrEmpty(nodes)) {
            throw new SqlException(ErrorGroups.Sql.MAPPING_ERR, IgniteStringFormatter.format("The view with name '{}' could not be found on any active nodes in the cluster", view.name()));
        }
        return view.distribution() == IgniteDistributions.single() ? factory.oneOf(nodes) : factory.allOf(nodes);
    }

    @Override
    public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot newTopology) {
        this.topologyHolder.update(newTopology);
    }

    @Override
    public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) {
        this.topologyHolder.update(newTopology);
    }

    @Override
    public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
        this.topologyHolder.update(newTopology);
    }

    private List<MappedFragment> applyPartitionPruning(List<MappedFragment> mappedFragments, MappingParameters parameters, @Nullable PartitionPruningMetadata partitionPruningMetadata) {
        if (partitionPruningMetadata == null) {
            return mappedFragments;
        }
        return this.partitionPruner.apply(mappedFragments, parameters.dynamicParameters(), partitionPruningMetadata);
    }

    private FragmentsTemplate getOrCreateTemplate(MultiStepPlan plan) {
        return this.templatesCache.get(plan.id(), key -> {
            IdGenerator idGenerator = new IdGenerator(plan.numSources());
            RelOptCluster cluster = Commons.cluster();
            List<Fragment> fragments = new QuerySplitter(idGenerator, cluster).split(plan.getRel());
            return new FragmentsTemplate(idGenerator.nextId(), cluster, fragments);
        });
    }

    private static Predicate<String> composeNodeExclusionFilter(LogicalTopologyHolder.TopologySnapshot topologySnapshot, MappingParameters parameters) {
        Predicate<String> filterFromParameters = parameters.nodeExclusionFilter();
        Predicate<String> deadNodesFilter = node -> !topologySnapshot.nodes.contains(node);
        if (filterFromParameters == null) {
            return deadNodesFilter;
        }
        return filterFromParameters.or(deadNodesFilter);
    }

    private static /* synthetic */ Int2ObjectMap lambda$composeDistributions$4(Set tables, Int2ObjectMap tablesAssignments, Set allNodes, Void ignore) {
        Int2ObjectOpenHashMap assignmentsPerTable = new Int2ObjectOpenHashMap(tables.size());
        tablesAssignments.keySet().forEach(arg_0 -> MappingServiceImpl.lambda$composeDistributions$3(tablesAssignments, allNodes, (Int2ObjectMap)assignmentsPerTable, arg_0));
        return assignmentsPerTable;
    }

    private static /* synthetic */ void lambda$composeDistributions$3(Int2ObjectMap tablesAssignments, Set allNodes, Int2ObjectMap assignmentsPerTable, int k) {
        List assignments = (List)((CompletableFuture)tablesAssignments.get(k)).join();
        assignments.stream().flatMap(i -> i.nodes().stream()).map(Assignment::consistentId).forEach(allNodes::add);
        assignmentsPerTable.put(k, (Object)assignments);
    }

    class LogicalTopologyHolder {
        private volatile TopologySnapshot topology = new TopologySnapshot(Long.MIN_VALUE, Set.of());

        LogicalTopologyHolder() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void update(LogicalTopologySnapshot topologySnapshot) {
            LogicalTopologyHolder logicalTopologyHolder = this;
            synchronized (logicalTopologyHolder) {
                if (this.topology.version() < topologySnapshot.version()) {
                    this.topology = new TopologySnapshot(topologySnapshot.version(), this.deriveNodeNames(topologySnapshot));
                }
                if (MappingServiceImpl.this.initialTopologyFuture.isDone() || !this.topology.nodes().contains(MappingServiceImpl.this.localNodeName)) {
                    return;
                }
            }
            MappingServiceImpl.this.initialTopologyFuture.complete(null);
        }

        TopologySnapshot topology() {
            return this.topology;
        }

        private Set<String> deriveNodeNames(LogicalTopologySnapshot topology) {
            return topology.nodes().stream().map(ClusterNodeImpl::name).collect(Collectors.toUnmodifiableSet());
        }

        class TopologySnapshot {
            private final Set<String> nodes;
            private final long version;

            TopologySnapshot(long version, Set<String> nodes) {
                this.version = version;
                this.nodes = nodes;
            }

            public Set<String> nodes() {
                return this.nodes;
            }

            public long version() {
                return this.version;
            }
        }
    }

    private static class FragmentsTemplate {
        private final long nextId;
        private final RelOptCluster cluster;
        private final List<Fragment> fragments;

        FragmentsTemplate(long nextId, RelOptCluster cluster, List<Fragment> fragments) {
            this.nextId = nextId;
            this.cluster = cluster;
            this.fragments = fragments;
        }
    }

    private static class MappingsCacheValue {
        private final long topologyVersion;
        private final IntSet tableOrZoneIds;

        MappingsCacheValue(long topologyVersion, IntSet tableOrZoneIds, CompletableFuture<MappedFragments> mappedFragments) {
            this.topologyVersion = topologyVersion;
            this.tableOrZoneIds = tableOrZoneIds;
        }
    }

    private static class DistributionHolder {
        private final Set<String> nodes;
        private final Int2ObjectMap<List<TokenizedAssignments>> assignmentsPerTable;
        private final Int2ObjectMap<List<String>> nodesPerView;

        DistributionHolder(Set<String> nodes, Int2ObjectMap<List<TokenizedAssignments>> assignmentsPerTable, Int2ObjectMap<List<String>> nodesPerView) {
            this.nodes = nodes;
            this.assignmentsPerTable = assignmentsPerTable;
            this.nodesPerView = nodesPerView;
        }

        List<String> nodes(@Nullable Predicate<String> nodeExclusionFilter) {
            if (nodeExclusionFilter == null) {
                return List.copyOf(this.nodes);
            }
            return this.nodes.stream().filter(nodeExclusionFilter.negate()).collect(Collectors.toList());
        }

        List<TokenizedAssignments> tableAssignments(int tableId) {
            return (List)this.assignmentsPerTable.get(tableId);
        }

        List<String> viewNodes(int viewId) {
            return (List)this.nodesPerView.get(viewId);
        }
    }

    private static class MappedFragments {
        final List<MappedFragment> fragments;
        final Set<String> nodes;

        MappedFragments(List<MappedFragment> fragments, Set<String> nodes) {
            this.fragments = fragments;
            this.nodes = nodes;
        }
    }

    private static class MappingsCacheKey {
        private final PlanId planId;
        private final boolean mapOnBackups;

        MappingsCacheKey(PlanId planId, boolean mapOnBackups) {
            this.planId = planId;
            this.mapOnBackups = mapOnBackups;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            MappingsCacheKey that = (MappingsCacheKey)o;
            return this.mapOnBackups == that.mapOnBackups && Objects.equals(this.planId, that.planId);
        }

        public int hashCode() {
            return Objects.hash(this.planId, this.mapOnBackups);
        }
    }
}

