/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.client.table;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.ignite3.client.IgniteClientConnectionException;
import org.apache.ignite3.internal.client.proto.ProtocolBitmaskFeature;
import org.apache.ignite3.internal.client.table.MapFunction;
import org.apache.ignite3.internal.client.table.PartitionAwarenessProvider;
import org.apache.ignite3.internal.client.tx.ClientLazyTransaction;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.tx.Transaction;
import org.jetbrains.annotations.NotNull;

class ClientTableMapUtils {
    private static final long DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(5000L);

    ClientTableMapUtils() {
    }

    static <K, R, M> void mapAndRetry(MapFunction<K, R> mapFun, List<Transaction> txns, Collection<M> mapped, long[] startTs, CompletableFuture<R> resFut, IgniteLogger log, Function<List<CompletableFuture<R>>, R> reduceClo, Function<M, Collection<K>> keyProvider, Function<M, Integer> partProvider) {
        if (startTs[0] == 0L) {
            startTs[0] = System.nanoTime();
        }
        ArrayList res = new ArrayList(mapped.size());
        for (M batch : mapped) {
            CompletableFuture<R> fut = mapFun.apply(keyProvider.apply(batch), PartitionAwarenessProvider.of(partProvider.apply(batch)), mapped.size() > 1);
            res.add(fut);
        }
        CompletableFutures.allOf(res).handle((ignored, err) -> {
            List<Object> waitCommitFuts = List.of();
            if (!txns.isEmpty()) {
                if (err != null) {
                    boolean needRetry = ClientTableMapUtils.unlockOnRetry(txns, res, log);
                    long nowRelative = System.nanoTime();
                    if (needRetry && nowRelative - startTs[0] < DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS) {
                        startTs[0] = nowRelative;
                        txns.clear();
                        ClientTableMapUtils.mapAndRetry(mapFun, txns, mapped, startTs, resFut, log, reduceClo, keyProvider, partProvider);
                        return null;
                    }
                    resFut.completeExceptionally((Throwable)err);
                    return null;
                }
                waitCommitFuts = ClientTableMapUtils.unlockFragments(txns, log);
            } else if (err != null) {
                resFut.completeExceptionally((Throwable)err);
                return null;
            }
            Object in = reduceClo.apply(res);
            if (waitCommitFuts.isEmpty()) {
                resFut.complete(in);
            } else {
                CompletableFutures.allOf(waitCommitFuts).whenComplete((r, e) -> resFut.complete(in));
            }
            return null;
        });
    }

    private static <R, C> boolean unlockOnRetry(List<Transaction> txns, List<CompletableFuture<R>> res, IgniteLogger log) {
        boolean allRetryableExceptions = true;
        for (int i = 0; i < res.size(); ++i) {
            CompletableFuture<R> fut0 = res.get(i);
            if (fut0.isCompletedExceptionally()) {
                try {
                    fut0.join();
                }
                catch (CompletionException e2) {
                    allRetryableExceptions = allRetryableExceptions && ExceptionUtils.matchAny(ExceptionUtils.unwrapCause(e2), ErrorGroups.Transactions.ACQUIRE_LOCK_ERR, new int[0]);
                }
            }
            Transaction tx0 = txns.get(i);
            tx0.rollbackAsync().exceptionally(e -> {
                Throwable cause = ExceptionUtils.unwrapCause(e);
                if (!(cause instanceof IgniteClientConnectionException)) {
                    log.error("Failed to rollback a transactional batch: [tx=" + tx0 + "]", cause);
                }
                return null;
            });
        }
        return allRetryableExceptions;
    }

    @NotNull
    private static List<CompletableFuture<Void>> unlockFragments(List<Transaction> txns, IgniteLogger log) {
        ArrayList<CompletableFuture<Void>> waitCommitFuts = new ArrayList<CompletableFuture<Void>>();
        for (Transaction txn : txns) {
            ClientLazyTransaction tx0 = (ClientLazyTransaction)txn;
            CompletionStage fut = tx0.commitAsync().exceptionally(e -> {
                Throwable cause = ExceptionUtils.unwrapCause(e);
                if (!(cause instanceof IgniteClientConnectionException)) {
                    log.error("Failed to commit a transactional batch: [tx=" + tx0 + "]", cause);
                }
                return null;
            });
            if (tx0.startedTx().channel().protocolContext().isFeatureSupported(ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS)) continue;
            waitCommitFuts.add((CompletableFuture<Void>)fut);
        }
        return waitCommitFuts;
    }

    static <E> void reduceWithKeepOrder(List<E> agg, List<E> cur, List<Integer> originalIndices) {
        for (int i = 0; i < cur.size(); ++i) {
            E val = cur.get(i);
            Integer orig = originalIndices.get(i);
            agg.set(orig, val);
        }
    }
}

