package org.apache.ignite3.internal.client.compute;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite3.compute.AllNodesBroadcastJobTarget;
import org.apache.ignite3.compute.AnyNodeJobTarget;
import org.apache.ignite3.compute.BroadcastExecution;
import org.apache.ignite3.compute.BroadcastJobTarget;
import org.apache.ignite3.compute.ColocatedJobTarget;
import org.apache.ignite3.compute.IgniteCompute;
import org.apache.ignite3.compute.JobDescriptor;
import org.apache.ignite3.compute.JobExecution;
import org.apache.ignite3.compute.JobTarget;
import org.apache.ignite3.compute.TableJobTarget;
import org.apache.ignite3.compute.TaskDescriptor;
import org.apache.ignite3.compute.task.TaskExecution;
import org.apache.ignite3.internal.client.PayloadInputChannel;
import org.apache.ignite3.internal.client.PayloadOutputChannel;
import org.apache.ignite3.internal.client.ReliableChannel;
import org.apache.ignite3.internal.client.TcpIgniteClient;
import org.apache.ignite3.internal.client.proto.ClientComputeJobPacker;
import org.apache.ignite3.internal.client.proto.ClientMessagePacker;
import org.apache.ignite3.internal.client.proto.ProtocolBitmaskFeature;
import org.apache.ignite3.internal.client.proto.TuplePart;
import org.apache.ignite3.internal.client.table.ClientRecordSerializer;
import org.apache.ignite3.internal.client.table.ClientSchema;
import org.apache.ignite3.internal.client.table.ClientTable;
import org.apache.ignite3.internal.client.table.ClientTables;
import org.apache.ignite3.internal.client.table.ClientTupleSerializer;
import org.apache.ignite3.internal.client.table.PartitionAwarenessProvider;
import org.apache.ignite3.internal.compute.BroadcastJobExecutionImpl;
import org.apache.ignite3.internal.compute.FailedExecution;
import org.apache.ignite3.internal.table.partition.HashPartition;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.ViewUtils;
import org.apache.ignite3.lang.CancelHandleHelper;
import org.apache.ignite3.lang.CancellationToken;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.lang.IgniteException;
import org.apache.ignite3.lang.TableNotFoundException;
import org.apache.ignite3.network.ClusterNode;
import org.apache.ignite3.table.QualifiedName;
import org.apache.ignite3.table.Tuple;
import org.apache.ignite3.table.mapper.Mapper;
import org.apache.ignite3.table.partition.Partition;
import org.apache.ignite3.tx.Transaction;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite3/internal/client/compute/ClientCompute.class */
public class ClientCompute implements IgniteCompute {
    private final ReliableChannel ch;
    private final ClientTables tables;
    private final ConcurrentHashMap<QualifiedName, ClientTable> tableCache = new ConcurrentHashMap<>();

    public ClientCompute(ReliableChannel reliableChannel, ClientTables clientTables) {
        this.ch = reliableChannel;
        this.tables = clientTables;
    }

    @Override // org.apache.ignite3.compute.IgniteCompute
    public <T, R> CompletableFuture<JobExecution<R>> submitAsync(JobTarget jobTarget, JobDescriptor<T, R> jobDescriptor, @Nullable T t, @Nullable CancellationToken cancellationToken) {
        Objects.requireNonNull(jobTarget);
        Objects.requireNonNull(jobDescriptor);
        return (CompletableFuture<JobExecution<R>>) submit0(jobTarget, jobDescriptor, t).thenApply(submitResult -> {
            ClientJobExecution clientJobExecution = new ClientJobExecution(this.ch, submitResult, jobDescriptor.resultMarshaller(), jobDescriptor.resultClass());
            if (cancellationToken != null) {
                Objects.requireNonNull(clientJobExecution);
                CancelHandleHelper.addCancelAction(cancellationToken, clientJobExecution::cancelAsync, clientJobExecution.resultAsync());
            }
            return clientJobExecution;
        });
    }

    @Override // org.apache.ignite3.compute.IgniteCompute
    public <T, R> CompletableFuture<BroadcastExecution<R>> submitAsync(BroadcastJobTarget broadcastJobTarget, JobDescriptor<T, R> jobDescriptor, @Nullable T t, @Nullable CancellationToken cancellationToken) {
        Objects.requireNonNull(broadcastJobTarget);
        Objects.requireNonNull(jobDescriptor);
        if (broadcastJobTarget instanceof AllNodesBroadcastJobTarget) {
            return mapSubmitFutures((CompletableFuture[]) ((AllNodesBroadcastJobTarget) broadcastJobTarget).nodes().stream().map(clusterNode -> {
                return executeOnAnyNodeAsync(Set.of(clusterNode), jobDescriptor, t);
            }).toArray(i -> {
                return new CompletableFuture[i];
            }), jobDescriptor, cancellationToken);
        }
        if (!(broadcastJobTarget instanceof TableJobTarget)) {
            throw new IllegalArgumentException("Unsupported job target: " + broadcastJobTarget);
        }
        QualifiedName tableName = ((TableJobTarget) broadcastJobTarget).tableName();
        return getTable(tableName).thenCompose(clientTable -> {
            return clientTable.partitionManager().primaryReplicasAsync();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) map -> {
            return mapSubmitFutures((CompletableFuture[]) map.keySet().stream().map(partition -> {
                return doExecutePartitionedAsync(tableName, partition, jobDescriptor, t);
            }).toArray(i2 -> {
                return new CompletableFuture[i2];
            }), jobDescriptor, cancellationToken);
        });
    }

    private <T, R> CompletableFuture<BroadcastExecution<R>> mapSubmitFutures(CompletableFuture<SubmitResult>[] completableFutureArr, JobDescriptor<T, R> jobDescriptor, @Nullable CancellationToken cancellationToken) {
        return (CompletableFuture<BroadcastExecution<R>>) CompletableFuture.allOf(completableFutureArr).handle((r11, th) -> {
            return new BroadcastJobExecutionImpl((Collection) Arrays.stream(completableFutureArr).map(completableFuture -> {
                return mapSubmitResult(jobDescriptor, cancellationToken, completableFuture);
            }).collect(Collectors.toList()));
        });
    }

    private <T, R> JobExecution<R> mapSubmitResult(JobDescriptor<T, R> jobDescriptor, @Nullable CancellationToken cancellationToken, CompletableFuture<SubmitResult> completableFuture) {
        try {
            ClientJobExecution clientJobExecution = new ClientJobExecution(this.ch, completableFuture.join(), jobDescriptor.resultMarshaller(), jobDescriptor.resultClass());
            if (cancellationToken != null) {
                Objects.requireNonNull(clientJobExecution);
                CancelHandleHelper.addCancelAction(cancellationToken, clientJobExecution::cancelAsync, clientJobExecution.resultAsync());
            }
            return clientJobExecution;
        } catch (Exception e) {
            return new FailedExecution(ExceptionUtils.unwrapCause(e));
        }
    }

    private <T, R> CompletableFuture<SubmitResult> submit0(JobTarget jobTarget, JobDescriptor<T, R> jobDescriptor, T t) {
        if (jobTarget instanceof AnyNodeJobTarget) {
            return executeOnAnyNodeAsync(((AnyNodeJobTarget) jobTarget).nodes(), jobDescriptor, t);
        }
        if (!(jobTarget instanceof ColocatedJobTarget)) {
            throw new IllegalArgumentException("Unsupported job target: " + jobTarget);
        }
        ColocatedJobTarget colocatedJobTarget = (ColocatedJobTarget) jobTarget;
        Mapper<?> keyMapper = colocatedJobTarget.keyMapper();
        QualifiedName tableName = colocatedJobTarget.tableName();
        return keyMapper != null ? doExecuteColocatedAsync(tableName, colocatedJobTarget.key(), keyMapper, jobDescriptor, t) : doExecuteColocatedAsync(tableName, (Tuple) colocatedJobTarget.key(), jobDescriptor, t);
    }

    @Override // org.apache.ignite3.compute.IgniteCompute
    public <T, R> R execute(JobTarget jobTarget, JobDescriptor<T, R> jobDescriptor, @Nullable T t, @Nullable CancellationToken cancellationToken) {
        return (R) sync(executeAsync(jobTarget, (JobDescriptor<JobDescriptor<T, R>, R>) jobDescriptor, (JobDescriptor<T, R>) t, cancellationToken));
    }

    @Override // org.apache.ignite3.compute.IgniteCompute
    public <T, R> Collection<R> execute(BroadcastJobTarget broadcastJobTarget, JobDescriptor<T, R> jobDescriptor, @Nullable T t, @Nullable CancellationToken cancellationToken) {
        return (Collection) sync(executeAsync(broadcastJobTarget, (JobDescriptor<JobDescriptor<T, R>, R>) jobDescriptor, (JobDescriptor<T, R>) t, cancellationToken));
    }

    private <T, R> CompletableFuture<SubmitResult> doExecuteColocatedAsync(QualifiedName qualifiedName, Tuple tuple, JobDescriptor<T, R> jobDescriptor, T t) {
        return getTable(qualifiedName).thenCompose(clientTable -> {
            return executeColocatedTupleKey(clientTable, tuple, jobDescriptor, t);
        }).handle((BiFunction<? super U, Throwable, ? extends U>) (submitResult, th) -> {
            return handleMissingTable(qualifiedName, submitResult, th, () -> {
                return doExecuteColocatedAsync(qualifiedName, tuple, jobDescriptor, t);
            });
        }).thenCompose(Function.identity());
    }

    private <K, T, R> CompletableFuture<SubmitResult> doExecuteColocatedAsync(QualifiedName qualifiedName, K k, Mapper<K> mapper, JobDescriptor<T, R> jobDescriptor, T t) {
        return getTable(qualifiedName).thenCompose(clientTable -> {
            return executeColocatedObjectKey(clientTable, k, mapper, jobDescriptor, t);
        }).handle((BiFunction<? super U, Throwable, ? extends U>) (submitResult, th) -> {
            return handleMissingTable(qualifiedName, submitResult, th, () -> {
                return doExecuteColocatedAsync(qualifiedName, k, mapper, jobDescriptor, t);
            });
        }).thenCompose(Function.identity());
    }

    @Override // org.apache.ignite3.compute.IgniteCompute
    public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T t, @Nullable CancellationToken cancellationToken) {
        Objects.requireNonNull(taskDescriptor);
        ClientTaskExecution clientTaskExecution = new ClientTaskExecution(this.ch, doExecuteMapReduceAsync(taskDescriptor, t), taskDescriptor.reduceJobResultMarshaller(), taskDescriptor.reduceJobResultClass());
        if (cancellationToken != null) {
            Objects.requireNonNull(clientTaskExecution);
            CancelHandleHelper.addCancelAction(cancellationToken, clientTaskExecution::cancelAsync, clientTaskExecution.resultAsync());
        }
        return clientTaskExecution;
    }

    @Override // org.apache.ignite3.compute.IgniteCompute
    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T t, @Nullable CancellationToken cancellationToken) {
        return (R) sync(executeMapReduceAsync(taskDescriptor, t, cancellationToken));
    }

    private <T, R> CompletableFuture<SubmitTaskResult> doExecuteMapReduceAsync(TaskDescriptor<T, R> taskDescriptor, @Nullable T t) {
        return this.ch.serviceAsync(64, payloadOutputChannel -> {
            packTask(payloadOutputChannel.out(), taskDescriptor, t);
        }, ClientCompute::unpackSubmitTaskResult, null, null, null, true);
    }

    private <T, R> CompletableFuture<SubmitResult> executeOnAnyNodeAsync(Set<ClusterNode> set, JobDescriptor<T, R> jobDescriptor, T t) {
        return this.ch.serviceAsync(47, payloadOutputChannel -> {
            packNodeNames(payloadOutputChannel.out(), set);
            packJob(payloadOutputChannel, jobDescriptor, t);
        }, ClientCompute::unpackSubmitResult, randomNode(set).name(), null, null, true);
    }

    private static ClusterNode randomNode(Set<ClusterNode> set) {
        if (set.size() == 1) {
            return set.iterator().next();
        }
        int nextInt = ThreadLocalRandom.current().nextInt(set.size());
        Iterator<ClusterNode> it = set.iterator();
        for (int i = 0; i < nextInt; i++) {
            it.next();
        }
        return it.next();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K, T, R> CompletableFuture<SubmitResult> executeColocatedObjectKey(ClientTable clientTable, K k, Mapper<K> mapper, JobDescriptor<T, R> jobDescriptor, T t) {
        return executeColocatedInternal(clientTable, (payloadOutputChannel, clientSchema) -> {
            ClientRecordSerializer.writeRecRaw(k, mapper, clientSchema, payloadOutputChannel.out(), TuplePart.KEY, true);
        }, ClientTupleSerializer.getPartitionAwarenessProvider((Mapper<?>) mapper, k), jobDescriptor, t);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T, R> CompletableFuture<SubmitResult> executeColocatedTupleKey(ClientTable clientTable, Tuple tuple, JobDescriptor<T, R> jobDescriptor, T t) {
        return executeColocatedInternal(clientTable, (payloadOutputChannel, clientSchema) -> {
            ClientTupleSerializer.writeTupleRaw(tuple, clientSchema, payloadOutputChannel, true);
        }, ClientTupleSerializer.getPartitionAwarenessProvider(tuple), jobDescriptor, t);
    }

    private static <T, R> CompletableFuture<SubmitResult> executeColocatedInternal(ClientTable clientTable, BiConsumer<PayloadOutputChannel, ClientSchema> biConsumer, PartitionAwarenessProvider partitionAwarenessProvider, JobDescriptor<T, R> jobDescriptor, T t) {
        return clientTable.doSchemaOutOpAsync(49, (clientSchema, payloadOutputChannel, writeContext) -> {
            ClientMessagePacker out = payloadOutputChannel.out();
            out.packInt(clientTable.tableId());
            out.packInt(clientSchema.version());
            biConsumer.accept(payloadOutputChannel, clientSchema);
            packJob(payloadOutputChannel, jobDescriptor, t);
        }, (Function) ClientCompute::unpackSubmitResult, partitionAwarenessProvider, true, (Transaction) null);
    }

    private <T, R> CompletableFuture<SubmitResult> doExecutePartitionedAsync(QualifiedName qualifiedName, Partition partition, JobDescriptor<T, R> jobDescriptor, @Nullable T t) {
        return getTable(qualifiedName).thenCompose(clientTable -> {
            return executePartitioned(clientTable, partition, jobDescriptor, t).handle((submitResult, th) -> {
                return handleMissingTable(qualifiedName, submitResult, th, () -> {
                    return doExecutePartitionedAsync(qualifiedName, partition, jobDescriptor, t);
                });
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
        });
    }

    private static <T, R> CompletableFuture<SubmitResult> executePartitioned(ClientTable clientTable, Partition partition, JobDescriptor<T, R> jobDescriptor, @Nullable T t) {
        int partitionId = ((HashPartition) partition).partitionId();
        return clientTable.doSchemaOutOpAsync(69, (clientSchema, payloadOutputChannel, writeContext) -> {
            ClientMessagePacker out = payloadOutputChannel.out();
            out.packInt(clientTable.tableId());
            out.packInt(partitionId);
            packJob(payloadOutputChannel, jobDescriptor, t);
        }, (Function) ClientCompute::unpackSubmitResult, PartitionAwarenessProvider.of(Integer.valueOf(partitionId)), true, (Transaction) null);
    }

    private CompletableFuture<ClientTable> getTable(QualifiedName qualifiedName) {
        ClientTable clientTable = this.tableCache.get(qualifiedName);
        return clientTable != null ? CompletableFuture.completedFuture(clientTable) : this.tables.tableAsync(qualifiedName).thenApply(table -> {
            if (table == null) {
                throw new TableNotFoundException(qualifiedName);
            }
            ClientTable clientTable2 = (ClientTable) table;
            this.tableCache.put(table.qualifiedName(), clientTable2);
            return clientTable2;
        });
    }

    private CompletableFuture<SubmitResult> handleMissingTable(QualifiedName qualifiedName, SubmitResult submitResult, Throwable th, Supplier<CompletableFuture<SubmitResult>> supplier) {
        if (th instanceof CompletionException) {
            th = th.getCause();
        }
        if ((th instanceof IgniteException) && ((IgniteException) th).code() == ErrorGroups.Client.TABLE_ID_NOT_FOUND_ERR) {
            this.tableCache.remove(qualifiedName);
            return supplier.get();
        }
        if (th != null) {
            throw new CompletionException(th);
        }
        return CompletableFuture.completedFuture(submitResult);
    }

    private static void packNodeNames(ClientMessagePacker clientMessagePacker, Set<ClusterNode> set) {
        clientMessagePacker.packInt(set.size());
        Iterator<ClusterNode> it = set.iterator();
        while (it.hasNext()) {
            clientMessagePacker.packString(it.next().name());
        }
    }

    private static <T, R> void packJob(PayloadOutputChannel payloadOutputChannel, JobDescriptor<T, R> jobDescriptor, T t) {
        ClientComputeJobPacker.packJob(jobDescriptor, t, payloadOutputChannel.clientChannel().protocolContext().isFeatureSupported(ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB), payloadOutputChannel.out());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T, R> void packTask(ClientMessagePacker clientMessagePacker, TaskDescriptor<T, R> taskDescriptor, @Nullable T t) {
        clientMessagePacker.packDeploymentUnits(taskDescriptor.units());
        clientMessagePacker.packString(taskDescriptor.taskClassName());
        ClientComputeJobPacker.packJobArgument(t, taskDescriptor.splitJobArgumentMarshaller(), clientMessagePacker);
    }

    private static SubmitResult unpackSubmitResult(PayloadInputChannel payloadInputChannel) {
        return new SubmitResult(payloadInputChannel.in().unpackUuid(), TcpIgniteClient.unpackClusterNode(payloadInputChannel), payloadInputChannel.notificationFuture());
    }

    private static SubmitTaskResult unpackSubmitTaskResult(PayloadInputChannel payloadInputChannel) {
        UUID unpackUuid = payloadInputChannel.in().unpackUuid();
        int unpackInt = payloadInputChannel.in().unpackInt();
        ArrayList arrayList = new ArrayList(unpackInt);
        for (int i = 0; i < unpackInt; i++) {
            arrayList.add(payloadInputChannel.in().unpackUuid());
        }
        return new SubmitTaskResult(unpackUuid, arrayList, payloadInputChannel.clientChannel().protocolContext().clusterNode(), payloadInputChannel.notificationFuture());
    }

    private static <R> R sync(CompletableFuture<R> completableFuture) {
        try {
            return completableFuture.join();
        } catch (CompletionException e) {
            throw ((RuntimeException) ExceptionUtils.sneakyThrow(ViewUtils.ensurePublicException(e)));
        }
    }
}
