/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.compute;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.compute.AllNodesBroadcastJobTarget;
import org.apache.ignite.compute.AnyNodeJobTarget;
import org.apache.ignite.compute.BroadcastExecution;
import org.apache.ignite.compute.BroadcastJobTarget;
import org.apache.ignite.compute.ColocatedJobTarget;
import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobExecutorType;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.compute.NodeNotFoundException;
import org.apache.ignite.compute.TableJobTarget;
import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.client.proto.StreamerReceiverSerializer;
import org.apache.ignite.internal.components.NodeProperties;
import org.apache.ignite.internal.compute.BroadcastJobExecutionImpl;
import org.apache.ignite.internal.compute.CancellableTaskExecution;
import org.apache.ignite.internal.compute.ComputeComponent;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.ComputeUtils;
import org.apache.ignite.internal.compute.ExecutionContext;
import org.apache.ignite.internal.compute.ExecutionOptions;
import org.apache.ignite.internal.compute.FailedExecution;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.compute.JobExecutionWrapper;
import org.apache.ignite.internal.compute.NextColocatedWorkerSelector;
import org.apache.ignite.internal.compute.NextWorkerSelector;
import org.apache.ignite.internal.compute.PartitionNextWorkerSelector;
import org.apache.ignite.internal.compute.ResultUnmarshallingJobExecution;
import org.apache.ignite.internal.compute.SharedComputeUtils;
import org.apache.ignite.internal.compute.TaskExecutionFutureWrapper;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
import org.apache.ignite.internal.compute.streamer.StreamerReceiverJob;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.partition.HashPartition;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.CancelHandleHelper;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.DataStreamerReceiverDescriptor;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.ReceiverExecutionOptions;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.table.partition.Partition;
import org.gridgain.internal.rbac.authorization.Authorizer;
import org.gridgain.internal.rbac.privileges.Action;
import org.gridgain.internal.rbac.privileges.Privilege;
import org.gridgain.internal.rbac.privileges.Selector;
import org.gridgain.internal.security.context.GridGainSecurity;
import org.gridgain.internal.security.context.SecurityContext;
import org.gridgain.internal.security.context.SecurityContextHolder;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class IgniteComputeImpl
implements IgniteComputeInternal,
StreamerReceiverRunner {
    private final String nodeName;
    private final PlacementDriver placementDriver;
    private final TopologyService topologyService;
    private final IgniteTablesInternal tables;
    private final ComputeComponent computeComponent;
    private final HybridClock clock;
    private final NodeProperties nodeProperties;
    private final HybridTimestampTracker observableTimestampTracker;
    private final Authorizer authorizer;

    public IgniteComputeImpl(String nodeName, PlacementDriver placementDriver, TopologyService topologyService, IgniteTablesInternal tables, ComputeComponent computeComponent, HybridClock clock, NodeProperties nodeProperties, HybridTimestampTracker observableTimestampTracker, Authorizer authorizer) {
        this.nodeName = nodeName;
        this.placementDriver = placementDriver;
        this.topologyService = topologyService;
        this.tables = tables;
        this.computeComponent = computeComponent;
        this.clock = clock;
        this.nodeProperties = nodeProperties;
        this.observableTimestampTracker = observableTimestampTracker;
        this.authorizer = authorizer;
        tables.setStreamerReceiverRunner((StreamerReceiverRunner)this);
    }

    public <T, R> CompletableFuture<JobExecution<R>> submitAsync(JobTarget target, JobDescriptor<T, R> descriptor, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        return this.submitAsync(target, descriptor, ComputeEventMetadata.builder(ComputeEventMetadata.Type.SINGLE), arg, cancellationToken);
    }

    private <T, R> CompletableFuture<JobExecution<R>> submitAsync(JobTarget target, JobDescriptor<T, R> descriptor, ComputeEventMetadataBuilder metadataBuilder, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        Objects.requireNonNull(target);
        Objects.requireNonNull(descriptor);
        ComputeJobDataHolder argHolder = SharedComputeUtils.marshalArgOrResult(arg, (Marshaller)descriptor.argumentMarshaller());
        SecurityContext securityContext = SecurityContextHolder.getOrThrow();
        ExecutionContext executionContext = new ExecutionContext(descriptor, metadataBuilder, securityContext, argHolder);
        if (target instanceof AnyNodeJobTarget) {
            Set<InternalClusterNode> nodes = IgniteComputeImpl.internalNodesFromPublicNodes(((AnyNodeJobTarget)target).nodes());
            return IgniteComputeImpl.unmarshalResult(this.executeAsyncWithFailover(nodes, executionContext, cancellationToken), descriptor, this.observableTimestampTracker);
        }
        if (target instanceof ColocatedJobTarget) {
            ColocatedJobTarget colocatedTarget = (ColocatedJobTarget)target;
            Mapper mapper = colocatedTarget.keyMapper();
            QualifiedName tableName = colocatedTarget.tableName();
            Object key = colocatedTarget.key();
            executionContext.metadataBuilder().tableName(tableName.toCanonicalForm());
            CompletionStage jobFut = mapper != null ? this.requiredTable(tableName).thenCompose(table -> this.primaryReplicaForPartitionByMappedKey((TableViewInternal)table, (Object)key, (Mapper)mapper).thenCompose(primaryNode -> this.executeOnOneNodeWithFailover((InternalClusterNode)primaryNode, new NextColocatedWorkerSelector<Object>(this.placementDriver, this.topologyService, this.clock, this.nodeProperties, (TableViewInternal)table, key, (Mapper<Object>)mapper), executionContext, cancellationToken))) : this.requiredTable(tableName).thenCompose(table -> this.submitColocatedInternal((TableViewInternal)table, (Tuple)key, executionContext, cancellationToken));
            return IgniteComputeImpl.unmarshalResult((CompletableFuture<JobExecution<ComputeJobDataHolder>>)jobFut, descriptor, this.observableTimestampTracker);
        }
        throw new IllegalArgumentException("Unsupported job target: " + String.valueOf(target));
    }

    public <T, R> CompletableFuture<BroadcastExecution<R>> submitAsync(BroadcastJobTarget target, JobDescriptor<T, R> descriptor, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        Objects.requireNonNull(target);
        Objects.requireNonNull(descriptor);
        ComputeJobDataHolder argHolder = SharedComputeUtils.marshalArgOrResult(arg, (Marshaller)descriptor.argumentMarshaller());
        SecurityContext securityContext = SecurityContextHolder.getOrThrow();
        UUID taskId = UUID.randomUUID();
        if (target instanceof AllNodesBroadcastJobTarget) {
            AllNodesBroadcastJobTarget allNodesBroadcastTarget = (AllNodesBroadcastJobTarget)target;
            Set<InternalClusterNode> nodes = IgniteComputeImpl.internalNodesFromPublicNodes(allNodesBroadcastTarget.nodes());
            return IgniteComputeImpl.toBroadcastExecution(nodes.stream().map(node -> this.submitForBroadcast((InternalClusterNode)node, descriptor, taskId, securityContext, argHolder, cancellationToken)));
        }
        if (target instanceof TableJobTarget) {
            TableJobTarget tableJobTarget = (TableJobTarget)target;
            return this.requiredTable(tableJobTarget.tableName()).thenCompose(table -> table.partitionManager().primaryReplicasAsync().thenCompose(replicas -> IgniteComputeImpl.toBroadcastExecution(replicas.entrySet().stream().map(entry -> new IgniteBiTuple((Object)((Partition)entry.getKey()), (Object)this.findNodeByConsistentId((ClusterNode)entry.getValue()))).filter(entry -> entry.getValue() != null).map(entry -> this.submitForBroadcast((InternalClusterNode)entry.getValue(), (Partition)entry.getKey(), table.zoneId(), table.tableId(), descriptor, tableJobTarget.tableName().toCanonicalForm(), taskId, securityContext, argHolder, cancellationToken)))));
        }
        throw new IllegalArgumentException("Unsupported job target: " + String.valueOf(target));
    }

    private static Set<InternalClusterNode> internalNodesFromPublicNodes(Set<ClusterNode> nodes) {
        return nodes.stream().map(ClusterNodeImpl::fromPublicClusterNode).collect(Collectors.toSet());
    }

    @Nullable
    private InternalClusterNode findNodeByConsistentId(ClusterNode clusterNode) {
        return this.topologyService.getByConsistentId(clusterNode.name());
    }

    private static <R> CompletableFuture<BroadcastExecution<R>> toBroadcastExecution(Stream<CompletableFuture<JobExecution<R>>> executionsStream) {
        CompletableFuture[] futures = (CompletableFuture[])executionsStream.toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(futures).handle((unused, throwable) -> new BroadcastJobExecutionImpl((Collection)Arrays.stream(futures).map(IgniteComputeImpl::mapExecution).collect(Collectors.toList())));
    }

    private static <T, R> JobExecution<R> mapExecution(CompletableFuture<JobExecution<R>> future) {
        try {
            return future.join();
        }
        catch (Exception e) {
            return new FailedExecution(ExceptionUtils.unwrapCause((Throwable)e));
        }
    }

    private <T, R> CompletableFuture<JobExecution<R>> submitForBroadcast(InternalClusterNode node, JobDescriptor<T, R> descriptor, UUID taskId, SecurityContext securityContext, @Nullable ComputeJobDataHolder argHolder, @Nullable CancellationToken cancellationToken) {
        ExecutionOptions options = ExecutionOptions.from(descriptor.options());
        NextWorkerSelector nextWorkerSelector = CompletableFutures::nullCompletedFuture;
        return this.submitForBroadcast(node, descriptor, options, nextWorkerSelector, taskId, null, securityContext, argHolder, cancellationToken);
    }

    private <T, R> CompletableFuture<JobExecution<R>> submitForBroadcast(InternalClusterNode node, Partition partition, int zoneId, int tableId, JobDescriptor<T, R> descriptor, String tableName, UUID taskId, SecurityContext securityContext, @Nullable ComputeJobDataHolder argHolder, @Nullable CancellationToken cancellationToken) {
        ExecutionOptions options = ExecutionOptions.builder().priority(descriptor.options().priority()).maxRetries(descriptor.options().maxRetries()).executorType(descriptor.options().executorType()).partition(partition).build();
        PartitionNextWorkerSelector nextWorkerSelector = new PartitionNextWorkerSelector(this.placementDriver, this.topologyService, this.clock, this.nodeProperties, zoneId, tableId, partition);
        return this.submitForBroadcast(node, descriptor, options, nextWorkerSelector, taskId, tableName, securityContext, argHolder, cancellationToken);
    }

    private <T, R> CompletableFuture<JobExecution<R>> submitForBroadcast(InternalClusterNode node, JobDescriptor<T, R> descriptor, ExecutionOptions options, NextWorkerSelector nextWorkerSelector, UUID taskId, @Nullable String tableName, SecurityContext securityContext, @Nullable ComputeJobDataHolder argHolder, @Nullable CancellationToken cancellationToken) {
        if (this.topologyService.getByConsistentId(node.name()) == null) {
            return CompletableFuture.failedFuture((Throwable)new NodeNotFoundException(Set.of(node.name())));
        }
        ComputeEventMetadataBuilder metadataBuilder = ComputeEventMetadata.builder(ComputeEventMetadata.Type.BROADCAST).taskId(taskId).tableName(tableName);
        return IgniteComputeImpl.unmarshalResult(this.executeOnOneNodeWithFailover(node, nextWorkerSelector, new ExecutionContext(options, (List<DeploymentUnit>)descriptor.units(), descriptor.jobClassName(), metadataBuilder, securityContext, argHolder), cancellationToken), descriptor, this.observableTimestampTracker);
    }

    private static <T, R> CompletableFuture<JobExecution<R>> unmarshalResult(CompletableFuture<JobExecution<ComputeJobDataHolder>> executionFuture, JobDescriptor<T, R> descriptor, HybridTimestampTracker observableTimestampTracker) {
        return executionFuture.thenApply(execution -> new ResultUnmarshallingJobExecution((JobExecution<ComputeJobDataHolder>)execution, descriptor.resultMarshaller(), descriptor.resultClass(), observableTimestampTracker));
    }

    public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        return IgniteComputeImpl.sync(this.executeAsync(target, descriptor, arg, cancellationToken));
    }

    public <T, R> Collection<R> execute(BroadcastJobTarget target, JobDescriptor<T, R> descriptor, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        return (Collection)IgniteComputeImpl.sync(this.executeAsync(target, descriptor, arg, cancellationToken));
    }

    @Override
    public CompletableFuture<JobExecution<ComputeJobDataHolder>> executeAsyncWithFailover(Set<InternalClusterNode> nodes, ExecutionContext executionContext, @Nullable CancellationToken cancellationToken) {
        HashSet<InternalClusterNode> candidates1 = new HashSet<InternalClusterNode>();
        for (InternalClusterNode node : nodes) {
            if (this.topologyService.getByConsistentId(node.name()) == null) continue;
            candidates1.add(node);
        }
        HashSet<InternalClusterNode> candidates = candidates1;
        if (candidates.isEmpty()) {
            Set nodeNames = nodes.stream().map(InternalClusterNode::name).collect(Collectors.toSet());
            return CompletableFuture.failedFuture((Throwable)new NodeNotFoundException(nodeNames));
        }
        InternalClusterNode targetNode = IgniteComputeImpl.randomNode(candidates);
        candidates.remove(targetNode);
        DeqNextWorkerSelector selector = new DeqNextWorkerSelector(new ConcurrentLinkedDeque<InternalClusterNode>(candidates));
        return this.executeOnOneNodeWithFailover(targetNode, selector, executionContext, cancellationToken);
    }

    private static InternalClusterNode randomNode(Set<InternalClusterNode> nodes) {
        int nodesToSkip = ThreadLocalRandom.current().nextInt(nodes.size());
        Iterator<InternalClusterNode> iterator = nodes.iterator();
        for (int i = 0; i < nodesToSkip; ++i) {
            iterator.next();
        }
        return iterator.next();
    }

    private CompletableFuture<JobExecution<ComputeJobDataHolder>> executeOnOneNodeWithFailover(InternalClusterNode targetNode, NextWorkerSelector nextWorkerSelector, ExecutionContext executionContext, @Nullable CancellationToken cancellationToken) {
        return this.authorizer.authorizeThenCompose(executionContext.securityContext(), IgniteComputeImpl.getPrivileges(executionContext.units()), () -> ComputeUtils.convertToComputeFuture(this.executeOnOneNodeWithFailoverInternal(targetNode, nextWorkerSelector, executionContext, cancellationToken)).thenApply(JobExecutionWrapper::new));
    }

    private CompletableFuture<? extends JobExecution<ComputeJobDataHolder>> executeOnOneNodeWithFailoverInternal(InternalClusterNode targetNode, NextWorkerSelector nextWorkerSelector, ExecutionContext executionContext, @Nullable CancellationToken cancellationToken) {
        executionContext.metadataBuilder().initiatorNode(this.nodeName);
        if (this.isLocal(targetNode)) {
            return this.computeComponent.executeLocally(executionContext, cancellationToken);
        }
        return this.computeComponent.executeRemotelyWithFailover(targetNode, nextWorkerSelector, executionContext, cancellationToken);
    }

    private static Set<Privilege> getPrivileges(List<DeploymentUnit> units) {
        if (units.isEmpty()) {
            return Set.of(Privilege.fromAction((Action)Action.EXEC_JOB));
        }
        return units.stream().map(DeploymentUnit::name).map(Selector::deploymentUnit).map(selector -> Privilege.builder().selector(selector).action(Action.EXEC_JOB).build()).collect(Collectors.toSet());
    }

    private boolean isLocal(InternalClusterNode targetNode) {
        return targetNode.name().equals(this.topologyService.localMember().name());
    }

    @Override
    public CompletableFuture<JobExecution<ComputeJobDataHolder>> submitColocatedInternal(TableViewInternal table, Tuple key, ExecutionContext executionContext, @Nullable CancellationToken cancellationToken) {
        return this.primaryReplicaForPartitionByTupleKey(table, key).thenCompose(primaryNode -> this.executeOnOneNodeWithFailover((InternalClusterNode)primaryNode, new NextColocatedWorkerSelector(this.placementDriver, this.topologyService, this.clock, this.nodeProperties, table, key), executionContext, cancellationToken));
    }

    @Override
    public CompletableFuture<JobExecution<ComputeJobDataHolder>> submitPartitionedInternal(TableViewInternal table, int partitionId, List<DeploymentUnit> units, String jobClassName, JobExecutionOptions jobExecutionOptions, ComputeEventMetadataBuilder metadataBuilder, SecurityContext securityContext, @Nullable ComputeJobDataHolder arg, @Nullable CancellationToken cancellationToken) {
        HashPartition partition = new HashPartition(partitionId);
        ExecutionOptions options = ExecutionOptions.builder().priority(jobExecutionOptions.priority()).maxRetries(jobExecutionOptions.maxRetries()).executorType(jobExecutionOptions.executorType()).partition((Partition)partition).build();
        return this.primaryReplicaForPartition(table, partitionId).thenCompose(primaryNode -> this.executeOnOneNodeWithFailover((InternalClusterNode)primaryNode, new PartitionNextWorkerSelector(this.placementDriver, this.topologyService, this.clock, this.nodeProperties, table.zoneId(), table.tableId(), (Partition)partition), new ExecutionContext(options, units, jobClassName, metadataBuilder, securityContext, arg), cancellationToken));
    }

    private CompletableFuture<TableViewInternal> requiredTable(QualifiedName tableName) {
        return this.tables.tableViewAsync(tableName).thenApply(table -> {
            if (table == null) {
                throw new TableNotFoundException(tableName);
            }
            return table;
        });
    }

    private CompletableFuture<InternalClusterNode> primaryReplicaForPartitionByTupleKey(TableViewInternal table, Tuple key) {
        return this.primaryReplicaForPartition(table, table.partitionId(key));
    }

    private <K> CompletableFuture<InternalClusterNode> primaryReplicaForPartitionByMappedKey(TableViewInternal table, K key, Mapper<K> keyMapper) {
        return this.primaryReplicaForPartition(table, table.partitionId(key, keyMapper));
    }

    private CompletableFuture<InternalClusterNode> primaryReplicaForPartition(TableViewInternal table, int partitionIndex) {
        ZonePartitionId replicationGroupId = this.nodeProperties.colocationEnabled() ? new ZonePartitionId(table.zoneId(), partitionIndex) : new TablePartitionId(table.tableId(), partitionIndex);
        return this.placementDriver.awaitPrimaryReplica((ReplicationGroupId)replicationGroupId, this.clock.now(), 30L, TimeUnit.SECONDS).thenApply(replicaMeta -> {
            if (replicaMeta != null && replicaMeta.getLeaseholderId() != null) {
                return this.topologyService.getById(replicaMeta.getLeaseholderId());
            }
            throw new ComputeException(ErrorGroups.Compute.PRIMARY_REPLICA_RESOLVE_ERR, "Can not find primary replica for [table=" + table.name() + ", partition=" + partitionIndex + "].");
        });
    }

    @Override
    public <T, R> TaskExecution<R> submitMapReduceInternal(TaskDescriptor<T, R> taskDescriptor, ComputeEventMetadataBuilder metadataBuilder, SecurityContext securityContext, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        Objects.requireNonNull(taskDescriptor);
        return new TaskExecutionFutureWrapper(this.authorizer.authorizeAsync(securityContext, IgniteComputeImpl.getPrivileges(taskDescriptor.units())).thenApply(unused -> {
            CancellableTaskExecution taskExecution = this.computeComponent.executeTask((runners, builder, token) -> this.submitJobs(runners, builder, token, securityContext), taskDescriptor.units(), taskDescriptor.taskClassName(), metadataBuilder, securityContext, arg);
            if (cancellationToken != null) {
                CancelHandleHelper.addCancelAction((CancellationToken)cancellationToken, taskExecution::cancelAsync, (CompletableFuture)taskExecution.resultAsync());
            }
            return taskExecution;
        }));
    }

    public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        return this.submitMapReduceInternal(taskDescriptor, ComputeEventMetadata.builder(ComputeEventMetadata.Type.MAP_REDUCE), SecurityContextHolder.getOrThrow(), arg, cancellationToken);
    }

    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T arg, @Nullable CancellationToken cancellationToken) {
        return IgniteComputeImpl.sync(this.executeMapReduceAsync(taskDescriptor, arg, cancellationToken));
    }

    private <M, T> CompletableFuture<List<JobExecution<T>>> submitJobs(List<MapReduceJob<M, T>> runners, ComputeEventMetadataBuilder metadataBuilder, CancellationToken cancellationToken, SecurityContext securityContext) {
        return CompletableFutures.allOfToList((CompletableFuture[])((CompletableFuture[])runners.stream().map(runner -> (CompletableFuture)GridGainSecurity.getWith((SecurityContext)securityContext, () -> this.submitAsync(JobTarget.anyNode((Set)runner.nodes()), runner.jobDescriptor(), metadataBuilder.copyOf(), runner.arg(), cancellationToken))).toArray(CompletableFuture[]::new)));
    }

    @Override
    public CompletableFuture<Collection<JobState>> statesAsync() {
        SecurityContext sc = SecurityContextHolder.getOrThrow();
        String callerUsername = sc.authentication().username();
        Privilege privilege = Privilege.fromAction((Action)Action.GET_JOB_STATE);
        return this.authorizer.authorizeThenCompose(privilege, this.computeComponent::statesAsync).exceptionally(ex -> this.computeComponent.statesAsyncByOwner(callerUsername).join());
    }

    @Override
    public CompletableFuture<@Nullable JobState> stateAsync(UUID jobId) {
        return this.authorizeJobAction(jobId, Action.GET_JOB_STATE, () -> this.computeComponent.stateAsync(jobId));
    }

    @Override
    public CompletableFuture<@Nullable Boolean> cancelAsync(UUID jobId) {
        return this.authorizeJobAction(jobId, Action.KILL_JOB, () -> this.computeComponent.cancelAsync(jobId));
    }

    @Override
    public CompletableFuture<@Nullable Boolean> changePriorityAsync(UUID jobId, int newPriority) {
        return this.authorizeJobAction(jobId, Action.CHANGE_JOB_PRIORITY, () -> this.computeComponent.changePriorityAsync(jobId, newPriority));
    }

    private <T> CompletableFuture<@Nullable T> authorizeJobAction(UUID jobId, Action action, Supplier<CompletableFuture<@Nullable T>> actionFuture) {
        return this.computeComponent.getOwner(jobId).thenCompose(GridGainSecurity.withContext(owner -> {
            if (owner == null) {
                return CompletableFutures.nullCompletedFuture();
            }
            Privilege privilege = Privilege.builder().action(action).selector(Selector.user((String)owner)).build();
            return this.authorizer.authorizeThenCompose(privilege, actionFuture);
        }));
    }

    public <A, I, R> CompletableFuture<Collection<R>> runReceiverAsync(DataStreamerReceiverDescriptor<I, A, R> receiver, @Nullable A receiverArg, Collection<I> items, InternalClusterNode node, List<DeploymentUnit> deploymentUnits) {
        byte[] payload = StreamerReceiverSerializer.serializeReceiverInfoWithElementCount(receiver, receiverArg, (Marshaller)receiver.payloadMarshaller(), (Marshaller)receiver.argumentMarshaller(), items);
        return this.runReceiverAsync(payload, node, deploymentUnits, receiver.options(), GridGainSecurity.systemContext()).thenApply(r -> {
            byte[] resBytes = (byte[])r.get1();
            assert (r.get2() != null) : "Observable timestamp should not be null";
            long observableTimestamp = (Long)r.get2();
            this.observableTimestampTracker.update(observableTimestamp);
            return StreamerReceiverSerializer.deserializeReceiverJobResults((byte[])resBytes, (Marshaller)receiver.resultMarshaller());
        });
    }

    public CompletableFuture<IgniteBiTuple<byte[], Long>> runReceiverAsync(byte[] payload, InternalClusterNode node, List<DeploymentUnit> deploymentUnits, ReceiverExecutionOptions options, SecurityContext securityContext) {
        ExecutionOptions jobOptions = ExecutionOptions.builder().priority(options.priority()).maxRetries(options.maxRetries()).executorType(options.executorType()).build();
        ExecutionContext executionContext = new ExecutionContext(jobOptions, deploymentUnits, IgniteComputeImpl.getReceiverJobClassName(options.executorType()), ComputeEventMetadata.builder(ComputeEventMetadata.Type.DATA_RECEIVER), securityContext, SharedComputeUtils.marshalArgOrResult((Object)payload, null));
        return ((CompletableFuture)this.executeAsyncWithFailover(Set.of(node), executionContext, null).thenCompose(JobExecution::resultAsync)).handle((res, err) -> {
            if (err != null) {
                if (err.getCause() instanceof ComputeException) {
                    ComputeException computeErr = (ComputeException)err.getCause();
                    throw new IgniteException(ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR, "Streamer receiver failed: " + computeErr.getMessage(), (Throwable)computeErr);
                }
                ExceptionUtils.sneakyThrow((Throwable)err);
            }
            byte[] resBytes = (byte[])SharedComputeUtils.unmarshalArgOrResult((ComputeJobDataHolder)res, null, null);
            return new IgniteBiTuple((Object)resBytes, (Object)res.observableTimestamp());
        });
    }

    @TestOnly
    public ComputeComponent computeComponent() {
        return this.computeComponent;
    }

    private static <R> R sync(CompletableFuture<R> future) {
        try {
            return future.join();
        }
        catch (CompletionException e) {
            throw (RuntimeException)ExceptionUtils.sneakyThrow((Throwable)IgniteExceptionMapperUtil.mapToPublicException((Throwable)ExceptionUtils.unwrapCause((Throwable)e)));
        }
    }

    private static String getReceiverJobClassName(JobExecutorType executorType) {
        switch (executorType) {
            case JAVA_EMBEDDED: {
                return StreamerReceiverJob.class.getName();
            }
            case DOTNET_SIDECAR: {
                return "Apache.Ignite.Internal.Table.StreamerReceiverJob, Apache.Ignite";
            }
        }
        throw new IllegalArgumentException("Unsupported job executor type: " + String.valueOf(executorType));
    }

    private static class DeqNextWorkerSelector
    implements NextWorkerSelector {
        private final ConcurrentLinkedDeque<InternalClusterNode> deque;

        private DeqNextWorkerSelector(ConcurrentLinkedDeque<InternalClusterNode> deque) {
            this.deque = deque;
        }

        @Override
        public CompletableFuture<InternalClusterNode> next() {
            try {
                return CompletableFuture.completedFuture(this.deque.pop());
            }
            catch (NoSuchElementException ex) {
                return CompletableFutures.nullCompletedFuture();
            }
        }
    }
}

