/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.client.compute;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.compute.AllNodesBroadcastJobTarget;
import org.apache.ignite.compute.AnyNodeJobTarget;
import org.apache.ignite.compute.BroadcastExecution;
import org.apache.ignite.compute.BroadcastJobTarget;
import org.apache.ignite.compute.ColocatedJobTarget;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.compute.TableJobTarget;
import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.PayloadOutputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
import org.apache.ignite.internal.client.TcpIgniteClient;
import org.apache.ignite.internal.client.compute.ClientJobExecution;
import org.apache.ignite.internal.client.compute.ClientTaskExecution;
import org.apache.ignite.internal.client.compute.SubmitResult;
import org.apache.ignite.internal.client.compute.SubmitTaskResult;
import org.apache.ignite.internal.client.proto.ClientComputeJobPacker;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
import org.apache.ignite.internal.client.proto.TuplePart;
import org.apache.ignite.internal.client.table.ClientRecordSerializer;
import org.apache.ignite.internal.client.table.ClientSchema;
import org.apache.ignite.internal.client.table.ClientTable;
import org.apache.ignite.internal.client.table.ClientTables;
import org.apache.ignite.internal.client.table.ClientTupleSerializer;
import org.apache.ignite.internal.client.table.PartitionAwarenessProvider;
import org.apache.ignite.internal.compute.BroadcastJobExecutionImpl;
import org.apache.ignite.internal.compute.FailedExecution;
import org.apache.ignite.internal.table.partition.HashPartition;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.ViewUtils;
import org.apache.ignite.lang.CancelHandleHelper;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.table.partition.Partition;
import org.jetbrains.annotations.Nullable;

public class ClientCompute
implements IgniteCompute {
    private final ReliableChannel ch;
    private final ClientTables tables;
    private final ConcurrentHashMap<QualifiedName, ClientTable> tableCache = new ConcurrentHashMap();

    public ClientCompute(ReliableChannel ch, ClientTables tables) {
        this.ch = ch;
        this.tables = tables;
    }

    @Override
    public <T, R> CompletableFuture<JobExecution<R>> submitAsync(JobTarget target, JobDescriptor<T, R> descriptor, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        Objects.requireNonNull(target);
        Objects.requireNonNull(descriptor);
        return this.submit0(target, descriptor, arg).thenApply(submitResult -> {
            ClientJobExecution execution = new ClientJobExecution(this.ch, (SubmitResult)submitResult, descriptor.resultMarshaller(), descriptor.resultClass());
            if (cancellationToken != null) {
                CancelHandleHelper.addCancelAction(cancellationToken, execution::cancelAsync, execution.resultAsync());
            }
            return execution;
        });
    }

    @Override
    public <T, R> CompletableFuture<BroadcastExecution<R>> submitAsync(BroadcastJobTarget target, JobDescriptor<T, R> descriptor, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        Objects.requireNonNull(target);
        Objects.requireNonNull(descriptor);
        UUID taskId = UUID.randomUUID();
        if (target instanceof AllNodesBroadcastJobTarget) {
            AllNodesBroadcastJobTarget allNodesBroadcastTarget = (AllNodesBroadcastJobTarget)target;
            Set<ClusterNode> nodes = allNodesBroadcastTarget.nodes();
            CompletableFuture[] futures = (CompletableFuture[])nodes.stream().map(node -> this.executeOnAnyNodeAsync(Set.of(node), descriptor, taskId, arg)).toArray(CompletableFuture[]::new);
            return this.mapSubmitFutures(futures, descriptor, cancellationToken);
        }
        if (target instanceof TableJobTarget) {
            TableJobTarget tableJobTarget = (TableJobTarget)target;
            QualifiedName tableName = tableJobTarget.tableName();
            return ((CompletableFuture)this.getTable(tableName).thenCompose(table -> table.partitionManager().primaryReplicasAsync())).thenCompose(replicas -> {
                CompletableFuture[] futures = (CompletableFuture[])replicas.keySet().stream().map(partition -> this.doExecutePartitionedAsync(tableName, (Partition)partition, descriptor, taskId, arg)).toArray(CompletableFuture[]::new);
                return this.mapSubmitFutures(futures, descriptor, cancellationToken);
            });
        }
        throw new IllegalArgumentException("Unsupported job target: " + target);
    }

    private <T, R> CompletableFuture<BroadcastExecution<R>> mapSubmitFutures(CompletableFuture<SubmitResult>[] futures, JobDescriptor<T, R> descriptor, @Nullable CancellationToken cancellationToken) {
        return CompletableFuture.allOf(futures).handle((unused, throwable) -> new BroadcastJobExecutionImpl(Arrays.stream(futures).map(fut -> this.mapSubmitResult(descriptor, cancellationToken, (CompletableFuture<SubmitResult>)fut)).collect(Collectors.toList())));
    }

    private <T, R> JobExecution<R> mapSubmitResult(JobDescriptor<T, R> descriptor, @Nullable CancellationToken cancellationToken, CompletableFuture<SubmitResult> submitResultFut) {
        SubmitResult submitResult;
        try {
            submitResult = submitResultFut.join();
        }
        catch (Exception e) {
            return new FailedExecution(ExceptionUtils.unwrapCause(e));
        }
        ClientJobExecution<R> execution = new ClientJobExecution<R>(this.ch, submitResult, descriptor.resultMarshaller(), descriptor.resultClass());
        if (cancellationToken != null) {
            CancelHandleHelper.addCancelAction(cancellationToken, execution::cancelAsync, execution.resultAsync());
        }
        return execution;
    }

    private <T, R> CompletableFuture<SubmitResult> submit0(JobTarget target, JobDescriptor<T, R> descriptor, T arg) {
        if (target instanceof AnyNodeJobTarget) {
            AnyNodeJobTarget anyNodeJobTarget = (AnyNodeJobTarget)target;
            return this.executeOnAnyNodeAsync(anyNodeJobTarget.nodes(), descriptor, null, arg);
        }
        if (target instanceof ColocatedJobTarget) {
            ColocatedJobTarget colocatedTarget = (ColocatedJobTarget)target;
            Mapper<?> mapper = colocatedTarget.keyMapper();
            QualifiedName qualifiedName = colocatedTarget.tableName();
            if (mapper != null) {
                return this.doExecuteColocatedAsync(qualifiedName, colocatedTarget.key(), mapper, descriptor, arg);
            }
            return this.doExecuteColocatedAsync(qualifiedName, (Tuple)colocatedTarget.key(), descriptor, arg);
        }
        throw new IllegalArgumentException("Unsupported job target: " + target);
    }

    @Override
    public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        return ClientCompute.sync(this.executeAsync(target, descriptor, arg, cancellationToken));
    }

    @Override
    public <T, R> Collection<R> execute(BroadcastJobTarget target, JobDescriptor<T, R> descriptor, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        return ClientCompute.sync(this.executeAsync(target, descriptor, arg, cancellationToken));
    }

    private <T, R> CompletableFuture<SubmitResult> doExecuteColocatedAsync(QualifiedName tableName, Tuple key, JobDescriptor<T, R> descriptor, T arg) {
        return ((CompletableFuture)((CompletableFuture)this.getTable(tableName).thenCompose(table -> ClientCompute.executeColocatedTupleKey(table, key, descriptor, arg))).handle((res, err) -> this.handleMissingTable(tableName, (SubmitResult)res, (Throwable)err, () -> this.doExecuteColocatedAsync(tableName, key, descriptor, arg)))).thenCompose(Function.identity());
    }

    private <K, T, R> CompletableFuture<SubmitResult> doExecuteColocatedAsync(QualifiedName tableName, K key, Mapper<K> keyMapper, JobDescriptor<T, R> descriptor, T arg) {
        return ((CompletableFuture)((CompletableFuture)this.getTable(tableName).thenCompose(table -> ClientCompute.executeColocatedObjectKey(table, key, keyMapper, descriptor, arg))).handle((res, err) -> this.handleMissingTable(tableName, (SubmitResult)res, (Throwable)err, () -> this.doExecuteColocatedAsync(tableName, key, keyMapper, descriptor, arg)))).thenCompose(Function.identity());
    }

    @Override
    public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        Objects.requireNonNull(taskDescriptor);
        ClientTaskExecution<R> clientExecution = new ClientTaskExecution<R>(this.ch, this.doExecuteMapReduceAsync(taskDescriptor, arg), taskDescriptor.reduceJobResultMarshaller(), taskDescriptor.reduceJobResultClass());
        if (cancellationToken != null) {
            CancelHandleHelper.addCancelAction(cancellationToken, clientExecution::cancelAsync, clientExecution.resultAsync());
        }
        return clientExecution;
    }

    @Override
    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        return ClientCompute.sync(this.executeMapReduceAsync(taskDescriptor, arg, cancellationToken));
    }

    private <T, R> CompletableFuture<SubmitTaskResult> doExecuteMapReduceAsync(TaskDescriptor<T, R> taskDescriptor, @Nullable T arg) {
        return this.ch.serviceAsync(64, w -> ClientCompute.packTask(w.out(), taskDescriptor, arg), ClientCompute::unpackSubmitTaskResult, (String)null, null, true);
    }

    private <T, R> CompletableFuture<SubmitResult> executeOnAnyNodeAsync(Set<ClusterNode> nodes, JobDescriptor<T, R> descriptor, @Nullable UUID taskId, @Nullable T arg) {
        ClusterNode node = ClientCompute.randomNode(nodes);
        return this.ch.serviceAsync(47, w -> {
            ClientCompute.packNodeNames(w.out(), nodes);
            ClientCompute.packJob(w, descriptor, arg);
            ClientCompute.packTaskId(w, taskId);
        }, ClientCompute::unpackSubmitResult, node.name(), null, true);
    }

    private static ClusterNode randomNode(Set<ClusterNode> nodes) {
        if (nodes.size() == 1) {
            return nodes.iterator().next();
        }
        int nodesToSkip = ThreadLocalRandom.current().nextInt(nodes.size());
        Iterator<ClusterNode> iterator = nodes.iterator();
        for (int i = 0; i < nodesToSkip; ++i) {
            iterator.next();
        }
        return iterator.next();
    }

    private static <K, T, R> CompletableFuture<SubmitResult> executeColocatedObjectKey(ClientTable t, K key, Mapper<K> keyMapper, JobDescriptor<T, R> descriptor, T arg) {
        return ClientCompute.executeColocatedInternal(t, (outputChannel, schema) -> ClientRecordSerializer.writeRecRaw(key, keyMapper, schema, outputChannel.out(), TuplePart.KEY, true), ClientTupleSerializer.getPartitionAwarenessProvider(keyMapper, key), descriptor, arg);
    }

    private static <T, R> CompletableFuture<SubmitResult> executeColocatedTupleKey(ClientTable t, Tuple key, JobDescriptor<T, R> descriptor, T arg) {
        return ClientCompute.executeColocatedInternal(t, (outputChannel, schema) -> ClientTupleSerializer.writeTupleRaw(key, schema, outputChannel, true), ClientTupleSerializer.getPartitionAwarenessProvider(key), descriptor, arg);
    }

    private static <T, R> CompletableFuture<SubmitResult> executeColocatedInternal(ClientTable t, BiConsumer<PayloadOutputChannel, ClientSchema> keyWriter, PartitionAwarenessProvider partitionAwarenessProvider, JobDescriptor<T, R> descriptor, T arg) {
        return t.doSchemaOutOpAsync(49, (schema, outputChannel, unused) -> {
            ClientMessagePacker w = outputChannel.out();
            w.packInt(t.tableId());
            w.packInt(schema.version());
            keyWriter.accept((PayloadOutputChannel)outputChannel, (ClientSchema)schema);
            ClientCompute.packJob(outputChannel, descriptor, arg);
            ClientCompute.packTaskId(outputChannel, null);
        }, ClientCompute::unpackSubmitResult, partitionAwarenessProvider, true, null);
    }

    private <T, R> CompletableFuture<SubmitResult> doExecutePartitionedAsync(QualifiedName tableName, Partition partition, JobDescriptor<T, R> descriptor, UUID taskId, @Nullable T arg) {
        return this.getTable(tableName).thenCompose(table -> ((CompletableFuture)ClientCompute.executePartitioned(table, partition, descriptor, taskId, arg).handle((res, err) -> this.handleMissingTable(tableName, (SubmitResult)res, (Throwable)err, () -> this.doExecutePartitionedAsync(tableName, partition, descriptor, taskId, arg)))).thenCompose(Function.identity()));
    }

    private static <T, R> CompletableFuture<SubmitResult> executePartitioned(ClientTable t, Partition partition, JobDescriptor<T, R> descriptor, UUID taskId, @Nullable T arg) {
        int partitionId = ((HashPartition)partition).partitionId();
        return t.doSchemaOutOpAsync(69, (schema, outputChannel, unused) -> {
            ClientMessagePacker w = outputChannel.out();
            w.packInt(t.tableId());
            w.packInt(partitionId);
            ClientCompute.packJob(outputChannel, descriptor, arg);
            ClientCompute.packTaskId(outputChannel, taskId);
        }, ClientCompute::unpackSubmitResult, PartitionAwarenessProvider.of(partitionId), true, null);
    }

    private CompletableFuture<ClientTable> getTable(QualifiedName tableName) {
        ClientTable cached = this.tableCache.get(tableName);
        if (cached != null) {
            return CompletableFuture.completedFuture(cached);
        }
        return this.tables.tableAsync(tableName).thenApply(t -> {
            if (t == null) {
                throw new TableNotFoundException(tableName);
            }
            ClientTable clientTable = (ClientTable)t;
            this.tableCache.put(t.qualifiedName(), clientTable);
            return clientTable;
        });
    }

    private CompletableFuture<SubmitResult> handleMissingTable(QualifiedName tableName, SubmitResult res, Throwable err, Supplier<CompletableFuture<SubmitResult>> retry) {
        IgniteException clientEx;
        if (err instanceof CompletionException) {
            err = err.getCause();
        }
        if (err instanceof IgniteException && (clientEx = (IgniteException)err).code() == ErrorGroups.Client.TABLE_ID_NOT_FOUND_ERR) {
            this.tableCache.remove(tableName);
            return retry.get();
        }
        if (err != null) {
            throw new CompletionException(err);
        }
        return CompletableFuture.completedFuture(res);
    }

    private static void packNodeNames(ClientMessagePacker w, Set<ClusterNode> nodes) {
        w.packInt(nodes.size());
        for (ClusterNode node : nodes) {
            w.packString(node.name());
        }
    }

    private static <T, R> void packJob(PayloadOutputChannel out, JobDescriptor<T, R> descriptor, T arg) {
        boolean platformComputeSupported = out.clientChannel().protocolContext().isFeatureSupported(ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB);
        ClientComputeJobPacker.packJob(descriptor, arg, platformComputeSupported, out.out());
    }

    private static void packTaskId(PayloadOutputChannel out, @Nullable UUID taskId) {
        if (out.clientChannel().protocolContext().isFeatureSupported(ProtocolBitmaskFeature.COMPUTE_TASK_ID)) {
            out.out().packUuidNullable(taskId);
        }
    }

    private static <T, R> void packTask(ClientMessagePacker w, TaskDescriptor<T, R> taskDescriptor, @Nullable T arg) {
        w.packDeploymentUnits(taskDescriptor.units());
        w.packString(taskDescriptor.taskClassName());
        ClientComputeJobPacker.packJobArgument(arg, taskDescriptor.splitJobArgumentMarshaller(), w);
    }

    private static SubmitResult unpackSubmitResult(PayloadInputChannel ch) {
        return new SubmitResult(ch.in().unpackUuid(), TcpIgniteClient.unpackClusterNode(ch), ch.notificationFuture());
    }

    private static SubmitTaskResult unpackSubmitTaskResult(PayloadInputChannel ch) {
        UUID jobId = ch.in().unpackUuid();
        int size = ch.in().unpackInt();
        ArrayList<UUID> jobIds = new ArrayList<UUID>(size);
        for (int i = 0; i < size; ++i) {
            jobIds.add(ch.in().unpackUuid());
        }
        return new SubmitTaskResult(jobId, jobIds, ch.clientChannel().protocolContext().clusterNode(), ch.notificationFuture());
    }

    private static <R> R sync(CompletableFuture<R> future) {
        try {
            return future.join();
        }
        catch (CompletionException e) {
            throw (RuntimeException)ExceptionUtils.sneakyThrow(ViewUtils.ensurePublicException(e));
        }
    }
}

