/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.sql.engine.statistic;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.MessagingService;
import org.apache.ignite3.internal.network.NetworkMessage;
import org.apache.ignite3.internal.sql.engine.statistic.PartitionModificationInfo;
import org.apache.ignite3.internal.sql.engine.statistic.StatisticAggregator;
import org.apache.ignite3.internal.table.InternalTable;
import org.apache.ignite3.internal.table.message.GetEstimatedSizeWithLastModifiedTsRequest;
import org.apache.ignite3.internal.table.message.GetEstimatedSizeWithLastModifiedTsResponse;
import org.apache.ignite3.internal.table.message.PartitionModificationInfoMessage;
import org.apache.ignite3.internal.table.message.TableMessageGroup;
import org.apache.ignite3.internal.table.message.TableMessagesFactory;
import org.apache.ignite3.internal.tostring.S;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.jetbrains.annotations.Nullable;

public class StatisticAggregatorImpl
implements StatisticAggregator<Collection<InternalTable>, CompletableFuture<Int2ObjectMap<PartitionModificationInfo>>> {
    private static final IgniteLogger LOG = Loggers.forClass(StatisticAggregatorImpl.class);
    private final Supplier<Set<LogicalNode>> clusterNodes;
    private final MessagingService messagingService;
    private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory();
    private static final long REQUEST_ESTIMATION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(5L);
    private final AtomicReference<@Nullable Map<TablePartitionIdentifier, CompletableFuture<Object>>> requestsCompletion = new AtomicReference();

    public StatisticAggregatorImpl(Supplier<Set<LogicalNode>> clusterNodes, MessagingService messagingService) {
        this.clusterNodes = clusterNodes;
        this.messagingService = messagingService;
        messagingService.addMessageHandler(TableMessageGroup.class, this::handleMessage);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleMessage(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) {
        Map<TablePartitionIdentifier, CompletableFuture<Object>> completedRequests = this.requestsCompletion.get();
        if (message instanceof GetEstimatedSizeWithLastModifiedTsResponse && completedRequests != null) {
            GetEstimatedSizeWithLastModifiedTsResponse response = (GetEstimatedSizeWithLastModifiedTsResponse)message;
            for (PartitionModificationInfoMessage ent : response.modifications()) {
                TablePartitionIdentifier id = new TablePartitionIdentifier(ent.tableId(), ent.partId());
                long estSize = ent.estimatedSize();
                long modificationCounter = ent.lastModificationCounter();
                CompletableFuture<Object> responseFut = completedRequests.get(id);
                if (responseFut == null) continue;
                StatisticAggregatorImpl statisticAggregatorImpl = this;
                synchronized (statisticAggregatorImpl) {
                    if (CompletableFutures.isCompletedSuccessfully(responseFut)) {
                        PartitionModificationInfo res = (PartitionModificationInfo)responseFut.join();
                        if (modificationCounter > res.lastModificationCounter()) {
                            responseFut.complete(CompletableFuture.completedFuture(new PartitionModificationInfo(estSize, modificationCounter)));
                        }
                    } else {
                        responseFut.complete(new PartitionModificationInfo(estSize, modificationCounter));
                    }
                }
            }
        }
    }

    @Override
    public CompletableFuture<Int2ObjectMap<PartitionModificationInfo>> estimatedSizeWithLastUpdate(Collection<InternalTable> tables) {
        if (this.requestsCompletion.get() != null) {
            return CompletableFuture.completedFuture(Int2ObjectMaps.emptyMap());
        }
        Collection tablesId = tables.stream().map(InternalTable::tableId).collect(Collectors.toList());
        GetEstimatedSizeWithLastModifiedTsRequest request = TABLE_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest().tables(tablesId).build();
        HashMap partIdRequests = new HashMap();
        this.requestsCompletion.set(partIdRequests);
        for (InternalTable internalTable : tables) {
            for (int p = 0; p < internalTable.partitions(); ++p) {
                partIdRequests.put(new TablePartitionIdentifier(internalTable.tableId(), p), new CompletableFuture().orTimeout(REQUEST_ESTIMATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
            }
        }
        ArrayList<CompletableFuture<Void>> reqFutures = new ArrayList<CompletableFuture<Void>>();
        for (LogicalNode node : this.clusterNodes.get()) {
            CompletableFuture<Void> reqFut = this.messagingService.send(node, request);
            reqFutures.add(reqFut.orTimeout(REQUEST_ESTIMATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
        }
        CompletableFuture[] completableFutureArray = (CompletableFuture[])reqFutures.toArray(CompletableFuture[]::new);
        CompletionStage<Void> allRequests = CompletableFuture.allOf(completableFutureArray);
        Int2ObjectOpenHashMap summary = new Int2ObjectOpenHashMap();
        for (InternalTable t : tables) {
            HashMap<TablePartitionIdentifier, CompletableFuture> tableResponses = new HashMap<TablePartitionIdentifier, CompletableFuture>();
            for (Map.Entry ent : partIdRequests.entrySet()) {
                if (((TablePartitionIdentifier)ent.getKey()).tableId() != t.tableId()) continue;
                tableResponses.put((TablePartitionIdentifier)ent.getKey(), (CompletableFuture)ent.getValue());
            }
            allRequests = ((CompletableFuture)allRequests.thenCompose(r -> CompletableFuture.allOf((CompletableFuture[])tableResponses.values().toArray(CompletableFuture[]::new)))).handle((arg_0, arg_1) -> StatisticAggregatorImpl.lambda$estimatedSizeWithLastUpdate$5(t, tableResponses, (Int2ObjectMap)summary, arg_0, arg_1));
        }
        return allRequests.handle((arg_0, arg_1) -> this.lambda$estimatedSizeWithLastUpdate$6((Int2ObjectMap)summary, arg_0, arg_1));
    }

    private /* synthetic */ Int2ObjectMap lambda$estimatedSizeWithLastUpdate$6(Int2ObjectMap summary, Void ret, Throwable ex) {
        if (ex != null) {
            LOG.debug("Exception during tables size estimation.", ex);
            return Int2ObjectMaps.emptyMap();
        }
        this.requestsCompletion.set(null);
        return summary;
    }

    private static /* synthetic */ Void lambda$estimatedSizeWithLastUpdate$5(InternalTable t, Map tableResponses, Int2ObjectMap summary, Void ret, Throwable ex) {
        CompletableFuture<Void> allResponses;
        if (ex != null) {
            LOG.debug("Can`t update statistics for table [id={}].", ex, t.tableId());
        }
        if (!CompletableFutures.isCompletedSuccessfully(allResponses = CompletableFuture.allOf((CompletableFuture[])tableResponses.values().toArray(CompletableFuture[]::new)))) {
            if (LOG.isDebugEnabled()) {
                for (Map.Entry ent : tableResponses.entrySet()) {
                    if (!CompletableFutures.isCompletedSuccessfully((CompletableFuture)ent.getValue())) continue;
                    LOG.debug("Can`t update statistics for table partition [id={}].", ent.getKey());
                }
            }
            return null;
        }
        for (Map.Entry ent : tableResponses.entrySet()) {
            PartitionModificationInfo info = (PartitionModificationInfo)((CompletableFuture)ent.getValue()).join();
            long estSize = info.getEstimatedSize();
            long modificationCounter = info.lastModificationCounter();
            summary.compute(((TablePartitionIdentifier)ent.getKey()).tableId(), (k, v) -> v == null ? new PartitionModificationInfo(estSize, modificationCounter) : new PartitionModificationInfo(v.getEstimatedSize() + estSize, Math.max(v.lastModificationCounter(), modificationCounter)));
        }
        return null;
    }

    private static class TablePartitionIdentifier {
        final int tableId;
        final int partitionId;

        TablePartitionIdentifier(int tableId, int partitionId) {
            this.tableId = tableId;
            this.partitionId = partitionId;
        }

        int tableId() {
            return this.tableId;
        }

        int partitionId() {
            return this.partitionId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TablePartitionIdentifier that = (TablePartitionIdentifier)o;
            return this.tableId == that.tableId && this.partitionId == that.partitionId;
        }

        public int hashCode() {
            int hash = 31 + this.partitionId;
            hash += hash * 31 + this.tableId;
            return hash;
        }

        public String toString() {
            return S.toString(TablePartitionIdentifier.class, this);
        }
    }
}

