package org.apache.ignite.internal.compute;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.compute.AllNodesBroadcastJobTarget;
import org.apache.ignite.compute.AnyNodeJobTarget;
import org.apache.ignite.compute.BroadcastExecution;
import org.apache.ignite.compute.BroadcastJobTarget;
import org.apache.ignite.compute.ColocatedJobTarget;
import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.compute.NodeNotFoundException;
import org.apache.ignite.compute.TableJobTarget;
import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.client.proto.StreamerReceiverSerializer;
import org.apache.ignite.internal.compute.streamer.StreamerReceiverJob;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.partition.HashPartition;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.CancelHandleHelper;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.ReceiverDescriptor;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.table.partition.Partition;
import org.gridgain.internal.rbac.authorization.Authorizer;
import org.gridgain.internal.rbac.privileges.Action;
import org.gridgain.internal.rbac.privileges.Privilege;
import org.gridgain.internal.rbac.privileges.Selector;
import org.gridgain.internal.security.context.GridGainSecurity;
import org.gridgain.internal.security.context.SecurityContext;
import org.gridgain.internal.security.context.SecurityContextHolder;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/* loaded from: input_file:org/apache/ignite/internal/compute/IgniteComputeImpl.class */
public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceiverRunner {
    private final TopologyService topologyService;
    private final IgniteTablesInternal tables;
    private final ComputeComponent computeComponent;
    private final Authorizer authorizer;
    private final PlacementDriver placementDriver;
    private final HybridClock clock;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/compute/IgniteComputeImpl$DeqNextWorkerSelector.class */
    public static class DeqNextWorkerSelector implements NextWorkerSelector {
        private final ConcurrentLinkedDeque<ClusterNode> deque;

        private DeqNextWorkerSelector(ConcurrentLinkedDeque<ClusterNode> concurrentLinkedDeque) {
            this.deque = concurrentLinkedDeque;
        }

        @Override // org.apache.ignite.internal.compute.NextWorkerSelector
        public CompletableFuture<ClusterNode> next() {
            try {
                return CompletableFuture.completedFuture(this.deque.pop());
            } catch (NoSuchElementException e) {
                return CompletableFutures.nullCompletedFuture();
            }
        }
    }

    public IgniteComputeImpl(PlacementDriver placementDriver, TopologyService topologyService, IgniteTablesInternal igniteTablesInternal, ComputeComponent computeComponent, HybridClock hybridClock, Authorizer authorizer) {
        this.placementDriver = placementDriver;
        this.topologyService = topologyService;
        this.tables = igniteTablesInternal;
        this.computeComponent = computeComponent;
        this.authorizer = authorizer;
        this.clock = hybridClock;
        igniteTablesInternal.setStreamerReceiverRunner(this);
    }

    public <T, R> CompletableFuture<JobExecution<R>> submitAsync(JobTarget jobTarget, JobDescriptor<T, R> jobDescriptor, @Nullable T t, @Nullable CancellationToken cancellationToken) {
        Objects.requireNonNull(jobTarget);
        Objects.requireNonNull(jobDescriptor);
        ComputeJobDataHolder marshalArgOrResult = SharedComputeUtils.marshalArgOrResult(t, jobDescriptor.argumentMarshaller());
        if (jobTarget instanceof AnyNodeJobTarget) {
            return unmarshalResult(executeAsyncWithFailover(((AnyNodeJobTarget) jobTarget).nodes(), jobDescriptor.units(), jobDescriptor.jobClassName(), jobDescriptor.options(), GridGainSecurity.systemContext(), marshalArgOrResult, cancellationToken), jobDescriptor);
        }
        if (!(jobTarget instanceof ColocatedJobTarget)) {
            throw new IllegalArgumentException("Unsupported job target: " + jobTarget);
        }
        ColocatedJobTarget colocatedJobTarget = (ColocatedJobTarget) jobTarget;
        Mapper keyMapper = colocatedJobTarget.keyMapper();
        QualifiedName tableName = colocatedJobTarget.tableName();
        Object key = colocatedJobTarget.key();
        return unmarshalResult(keyMapper != null ? requiredTable(tableName).thenCompose(tableViewInternal -> {
            return primaryReplicaForPartitionByMappedKey(tableViewInternal, key, keyMapper).thenCompose(clusterNode -> {
                return executeOnOneNodeWithFailover(clusterNode, new NextColocatedWorkerSelector(this.placementDriver, this.topologyService, this.clock, tableViewInternal, key, keyMapper), jobDescriptor.units(), jobDescriptor.jobClassName(), jobDescriptor.options(), GridGainSecurity.systemContext(), marshalArgOrResult, cancellationToken);
            });
        }) : requiredTable(tableName).thenCompose(tableViewInternal2 -> {
            return submitColocatedInternal(tableViewInternal2, (Tuple) key, jobDescriptor.units(), jobDescriptor.jobClassName(), jobDescriptor.options(), GridGainSecurity.systemContext(), marshalArgOrResult, cancellationToken);
        }), jobDescriptor);
    }

    public <T, R> CompletableFuture<BroadcastExecution<R>> submitAsync(BroadcastJobTarget broadcastJobTarget, JobDescriptor<T, R> jobDescriptor, @Nullable T t, @Nullable CancellationToken cancellationToken) {
        Objects.requireNonNull(broadcastJobTarget);
        Objects.requireNonNull(jobDescriptor);
        ComputeJobDataHolder marshalArgOrResult = SharedComputeUtils.marshalArgOrResult(t, jobDescriptor.argumentMarshaller());
        if (broadcastJobTarget instanceof AllNodesBroadcastJobTarget) {
            return toBroadcastExecution(((AllNodesBroadcastJobTarget) broadcastJobTarget).nodes().stream().map(clusterNode -> {
                return submitForBroadcast(clusterNode, jobDescriptor, marshalArgOrResult, cancellationToken);
            }));
        }
        if (broadcastJobTarget instanceof TableJobTarget) {
            return (CompletableFuture<BroadcastExecution<R>>) requiredTable(((TableJobTarget) broadcastJobTarget).tableName()).thenCompose(tableViewInternal -> {
                return tableViewInternal.partitionManager().primaryReplicasAsync().thenCompose(map -> {
                    return toBroadcastExecution(map.entrySet().stream().map(entry -> {
                        return submitForBroadcast((ClusterNode) entry.getValue(), (Partition) entry.getKey(), tableViewInternal.zoneId(), tableViewInternal.tableId(), jobDescriptor, marshalArgOrResult, cancellationToken);
                    }));
                });
            });
        }
        throw new IllegalArgumentException("Unsupported job target: " + broadcastJobTarget);
    }

    private static <R> CompletableFuture<BroadcastExecution<R>> toBroadcastExecution(Stream<CompletableFuture<JobExecution<R>>> stream) {
        CompletableFuture[] completableFutureArr = (CompletableFuture[]) stream.toArray(i -> {
            return new CompletableFuture[i];
        });
        return (CompletableFuture<BroadcastExecution<R>>) CompletableFuture.allOf(completableFutureArr).handle((r6, th) -> {
            return new BroadcastJobExecutionImpl((Collection) Arrays.stream(completableFutureArr).map(IgniteComputeImpl::mapExecution).collect(Collectors.toList()));
        });
    }

    private static <T, R> JobExecution<R> mapExecution(CompletableFuture<JobExecution<R>> completableFuture) {
        try {
            return completableFuture.join();
        } catch (Exception e) {
            return new FailedExecution(ExceptionUtils.unwrapCause(e));
        }
    }

    private <T, R> CompletableFuture<JobExecution<R>> submitForBroadcast(ClusterNode clusterNode, JobDescriptor<T, R> jobDescriptor, @Nullable ComputeJobDataHolder computeJobDataHolder, @Nullable CancellationToken cancellationToken) {
        return submitForBroadcast(clusterNode, jobDescriptor, ExecutionOptions.from(jobDescriptor.options()), CompletableFutures::nullCompletedFuture, computeJobDataHolder, cancellationToken);
    }

    private <T, R> CompletableFuture<JobExecution<R>> submitForBroadcast(ClusterNode clusterNode, Partition partition, int i, int i2, JobDescriptor<T, R> jobDescriptor, @Nullable ComputeJobDataHolder computeJobDataHolder, @Nullable CancellationToken cancellationToken) {
        return submitForBroadcast(clusterNode, jobDescriptor, ExecutionOptions.builder().priority(jobDescriptor.options().priority()).maxRetries(jobDescriptor.options().maxRetries()).partition(partition).build(), new PartitionNextWorkerSelector(this.placementDriver, this.topologyService, this.clock, i, i2, partition), computeJobDataHolder, cancellationToken);
    }

    private <T, R> CompletableFuture<JobExecution<R>> submitForBroadcast(ClusterNode clusterNode, JobDescriptor<T, R> jobDescriptor, ExecutionOptions executionOptions, NextWorkerSelector nextWorkerSelector, @Nullable ComputeJobDataHolder computeJobDataHolder, @Nullable CancellationToken cancellationToken) {
        return this.topologyService.getByConsistentId(clusterNode.name()) == null ? CompletableFuture.failedFuture(new NodeNotFoundException(Set.of(clusterNode.name()))) : unmarshalResult(executeOnOneNodeWithFailover(clusterNode, nextWorkerSelector, jobDescriptor.units(), jobDescriptor.jobClassName(), executionOptions, GridGainSecurity.systemContext(), computeJobDataHolder, cancellationToken), jobDescriptor);
    }

    private static <T, R> CompletableFuture<JobExecution<R>> unmarshalResult(CompletableFuture<JobExecution<ComputeJobDataHolder>> completableFuture, JobDescriptor<T, R> jobDescriptor) {
        return (CompletableFuture<JobExecution<R>>) completableFuture.thenApply(jobExecution -> {
            return new ResultUnmarshallingJobExecution(jobExecution, jobDescriptor.resultMarshaller(), jobDescriptor.resultClass());
        });
    }

    public <T, R> R execute(JobTarget jobTarget, JobDescriptor<T, R> jobDescriptor, @Nullable T t, @Nullable CancellationToken cancellationToken) {
        return (R) sync(executeAsync(jobTarget, jobDescriptor, t, cancellationToken));
    }

    public <T, R> Collection<R> execute(BroadcastJobTarget broadcastJobTarget, JobDescriptor<T, R> jobDescriptor, @Nullable T t, @Nullable CancellationToken cancellationToken) {
        return (Collection) sync(executeAsync(broadcastJobTarget, jobDescriptor, t, cancellationToken));
    }

    @Override // org.apache.ignite.internal.compute.IgniteComputeInternal
    public CompletableFuture<JobExecution<ComputeJobDataHolder>> executeAsyncWithFailover(Set<ClusterNode> set, List<DeploymentUnit> list, String str, JobExecutionOptions jobExecutionOptions, SecurityContext securityContext, @Nullable ComputeJobDataHolder computeJobDataHolder, @Nullable CancellationToken cancellationToken) {
        HashSet hashSet = new HashSet();
        for (ClusterNode clusterNode : set) {
            if (this.topologyService.getByConsistentId(clusterNode.name()) != null) {
                hashSet.add(clusterNode);
            }
        }
        if (hashSet.isEmpty()) {
            return CompletableFuture.failedFuture(new NodeNotFoundException((Set) set.stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toSet())));
        }
        ClusterNode randomNode = randomNode(hashSet);
        hashSet.remove(randomNode);
        return executeOnOneNodeWithFailover(randomNode, new DeqNextWorkerSelector(new ConcurrentLinkedDeque(hashSet)), list, str, jobExecutionOptions, securityContext, computeJobDataHolder, cancellationToken);
    }

    private static ClusterNode randomNode(Set<ClusterNode> set) {
        int nextInt = ThreadLocalRandom.current().nextInt(set.size());
        Iterator<ClusterNode> it = set.iterator();
        for (int i = 0; i < nextInt; i++) {
            it.next();
        }
        return it.next();
    }

    private CompletableFuture<JobExecution<ComputeJobDataHolder>> executeOnOneNodeWithFailover(ClusterNode clusterNode, NextWorkerSelector nextWorkerSelector, List<DeploymentUnit> list, String str, JobExecutionOptions jobExecutionOptions, SecurityContext securityContext, @Nullable ComputeJobDataHolder computeJobDataHolder, @Nullable CancellationToken cancellationToken) {
        return executeOnOneNodeWithFailover(clusterNode, nextWorkerSelector, list, str, ExecutionOptions.from(jobExecutionOptions), securityContext, computeJobDataHolder, cancellationToken);
    }

    private CompletableFuture<JobExecution<ComputeJobDataHolder>> executeOnOneNodeWithFailover(ClusterNode clusterNode, NextWorkerSelector nextWorkerSelector, List<DeploymentUnit> list, String str, ExecutionOptions executionOptions, SecurityContext securityContext, @Nullable ComputeJobDataHolder computeJobDataHolder, @Nullable CancellationToken cancellationToken) {
        this.authorizer.authorizeAsync(securityContext, getPrivileges(list)).join();
        return (CompletableFuture) GridGainSecurity.with(securityContext, () -> {
            return isLocal(clusterNode) ? ComputeUtils.convertToComputeFuture(this.computeComponent.executeLocally(executionOptions, list, str, computeJobDataHolder, cancellationToken)).thenApply((v1) -> {
                return new JobExecutionWrapper(v1);
            }) : ComputeUtils.convertToComputeFuture(this.computeComponent.executeRemotelyWithFailover(clusterNode, nextWorkerSelector, list, str, executionOptions, computeJobDataHolder, cancellationToken)).thenApply(JobExecutionWrapper::new);
        }).get();
    }

    private static Set<Privilege> getPrivileges(List<DeploymentUnit> list) {
        return list.isEmpty() ? Set.of(Privilege.fromAction(Action.EXEC_JOB)) : (Set) list.stream().map((v0) -> {
            return v0.name();
        }).map(Selector::deploymentUnit).map(selector -> {
            return Privilege.builder().selector(selector).action(Action.EXEC_JOB).build();
        }).collect(Collectors.toSet());
    }

    private boolean isLocal(ClusterNode clusterNode) {
        return clusterNode.name().equals(this.topologyService.localMember().name());
    }

    @Override // org.apache.ignite.internal.compute.IgniteComputeInternal
    public CompletableFuture<JobExecution<ComputeJobDataHolder>> submitColocatedInternal(TableViewInternal tableViewInternal, Tuple tuple, List<DeploymentUnit> list, String str, JobExecutionOptions jobExecutionOptions, SecurityContext securityContext, @Nullable ComputeJobDataHolder computeJobDataHolder, @Nullable CancellationToken cancellationToken) {
        return primaryReplicaForPartitionByTupleKey(tableViewInternal, tuple).thenCompose(clusterNode -> {
            return executeOnOneNodeWithFailover(clusterNode, new NextColocatedWorkerSelector(this.placementDriver, this.topologyService, this.clock, tableViewInternal, tuple), (List<DeploymentUnit>) list, str, jobExecutionOptions, securityContext, computeJobDataHolder, cancellationToken);
        });
    }

    @Override // org.apache.ignite.internal.compute.IgniteComputeInternal
    public CompletableFuture<JobExecution<ComputeJobDataHolder>> submitPartitionedInternal(TableViewInternal tableViewInternal, int i, List<DeploymentUnit> list, String str, JobExecutionOptions jobExecutionOptions, SecurityContext securityContext, @Nullable ComputeJobDataHolder computeJobDataHolder, @Nullable CancellationToken cancellationToken) {
        Partition hashPartition = new HashPartition(i);
        ExecutionOptions build = ExecutionOptions.builder().priority(jobExecutionOptions.priority()).maxRetries(jobExecutionOptions.maxRetries()).partition(hashPartition).build();
        return primaryReplicaForPartition(tableViewInternal, i).thenCompose(clusterNode -> {
            return executeOnOneNodeWithFailover(clusterNode, new PartitionNextWorkerSelector(this.placementDriver, this.topologyService, this.clock, tableViewInternal.zoneId(), tableViewInternal.tableId(), hashPartition), (List<DeploymentUnit>) list, str, build, securityContext, computeJobDataHolder, cancellationToken);
        });
    }

    private CompletableFuture<TableViewInternal> requiredTable(QualifiedName qualifiedName) {
        return this.tables.tableViewAsync(qualifiedName).thenApply(tableViewInternal -> {
            if (tableViewInternal == null) {
                throw new TableNotFoundException(qualifiedName);
            }
            return tableViewInternal;
        });
    }

    private CompletableFuture<ClusterNode> primaryReplicaForPartitionByTupleKey(TableViewInternal tableViewInternal, Tuple tuple) {
        return primaryReplicaForPartition(tableViewInternal, tableViewInternal.partitionId(tuple));
    }

    private <K> CompletableFuture<ClusterNode> primaryReplicaForPartitionByMappedKey(TableViewInternal tableViewInternal, K k, Mapper<K> mapper) {
        return primaryReplicaForPartition(tableViewInternal, tableViewInternal.partitionId(k, mapper));
    }

    private CompletableFuture<ClusterNode> primaryReplicaForPartition(TableViewInternal tableViewInternal, int i) {
        return this.placementDriver.awaitPrimaryReplica(IgniteSystemProperties.enabledColocation() ? new ZonePartitionId(tableViewInternal.zoneId(), i) : new TablePartitionId(tableViewInternal.tableId(), i), this.clock.now(), 30L, TimeUnit.SECONDS).thenApply(replicaMeta -> {
            if (replicaMeta == null || replicaMeta.getLeaseholderId() == null) {
                throw new ComputeException(ErrorGroups.Compute.PRIMARY_REPLICA_RESOLVE_ERR, "Can not find primary replica for [table=" + tableViewInternal.name() + ", partition=" + i + "].");
            }
            return this.topologyService.getById(replicaMeta.getLeaseholderId());
        });
    }

    @Override // org.apache.ignite.internal.compute.IgniteComputeInternal
    public <T, R> TaskExecution<R> submitMapReduceInternal(TaskDescriptor<T, R> taskDescriptor, SecurityContext securityContext, @Nullable T t, @Nullable CancellationToken cancellationToken) {
        return new TaskExecutionFutureWrapper(this.authorizer.authorizeAsync(securityContext, getPrivileges(taskDescriptor.units())).thenApply(GridGainSecurity.with(securityContext, r10 -> {
            CancellableTaskExecution executeTask = this.computeComponent.executeTask(this::submitJobs, taskDescriptor.units(), taskDescriptor.taskClassName(), t);
            if (cancellationToken != null) {
                Objects.requireNonNull(executeTask);
                CancelHandleHelper.addCancelAction(cancellationToken, executeTask::cancelAsync, executeTask.resultAsync());
            }
            return executeTask;
        })));
    }

    public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T t, @Nullable CancellationToken cancellationToken) {
        Objects.requireNonNull(taskDescriptor);
        return submitMapReduceInternal(taskDescriptor, GridGainSecurity.systemContext(), t, cancellationToken);
    }

    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T t, @Nullable CancellationToken cancellationToken) {
        return (R) sync(executeMapReduceAsync(taskDescriptor, t, cancellationToken));
    }

    private <M, T> CompletableFuture<List<JobExecution<T>>> submitJobs(List<MapReduceJob<M, T>> list, CancellationToken cancellationToken) {
        return CompletableFutures.allOfToList((CompletableFuture[]) list.stream().map(mapReduceJob -> {
            return submitAsync(JobTarget.anyNode(mapReduceJob.nodes()), (JobDescriptor<JobDescriptor, R>) mapReduceJob.jobDescriptor(), (JobDescriptor) mapReduceJob.arg(), cancellationToken);
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    @Override // org.apache.ignite.internal.compute.IgniteComputeInternal
    public CompletableFuture<Collection<JobState>> statesAsync() {
        String username = SecurityContextHolder.getOrThrow().authentication().username();
        Privilege fromAction = Privilege.fromAction(Action.GET_JOB_STATE);
        Authorizer authorizer = this.authorizer;
        ComputeComponent computeComponent = this.computeComponent;
        Objects.requireNonNull(computeComponent);
        return authorizer.authorizeThenCompose(fromAction, computeComponent::statesAsync).exceptionally(th -> {
            return this.computeComponent.statesAsyncByOwner(username).join();
        });
    }

    @Override // org.apache.ignite.internal.compute.IgniteComputeInternal
    public CompletableFuture<JobState> stateAsync(UUID uuid) {
        return authorizeJobAction(uuid, Action.GET_JOB_STATE, () -> {
            return this.computeComponent.stateAsync(uuid);
        });
    }

    @Override // org.apache.ignite.internal.compute.IgniteComputeInternal
    public CompletableFuture<Boolean> cancelAsync(UUID uuid) {
        return authorizeJobAction(uuid, Action.KILL_JOB, () -> {
            return this.computeComponent.cancelAsync(uuid);
        });
    }

    @Override // org.apache.ignite.internal.compute.IgniteComputeInternal
    public CompletableFuture<Boolean> changePriorityAsync(UUID uuid, int i) {
        return authorizeJobAction(uuid, Action.CHANGE_JOB_PRIORITY, () -> {
            return this.computeComponent.changePriorityAsync(uuid, i);
        });
    }

    private <T> CompletableFuture<T> authorizeJobAction(UUID uuid, Action action, Supplier<CompletableFuture<T>> supplier) {
        return (CompletableFuture<T>) this.computeComponent.getOwner(uuid).thenCompose(GridGainSecurity.withContext(str -> {
            if (str == null) {
                return CompletableFutures.nullCompletedFuture();
            }
            return this.authorizer.authorizeThenCompose(Privilege.builder().action(action).selector(Selector.user(str)).build(), supplier);
        }));
    }

    public <A, I, R> CompletableFuture<Collection<R>> runReceiverAsync(ReceiverDescriptor<A> receiverDescriptor, @Nullable A a, Collection<I> collection, ClusterNode clusterNode, List<DeploymentUnit> list) {
        return (CompletableFuture<Collection<R>>) runReceiverAsync(StreamerReceiverSerializer.serializeReceiverInfoWithElementCount(receiverDescriptor, a, collection), clusterNode, list, GridGainSecurity.systemContext()).thenApply(StreamerReceiverSerializer::deserializeReceiverJobResults);
    }

    public CompletableFuture<byte[]> runReceiverAsync(byte[] bArr, ClusterNode clusterNode, List<DeploymentUnit> list, SecurityContext securityContext) {
        return executeAsyncWithFailover(Set.of(clusterNode), list, StreamerReceiverJob.class.getName(), JobExecutionOptions.DEFAULT, securityContext, SharedComputeUtils.marshalArgOrResult(bArr, (Marshaller) null), null).thenCompose((v0) -> {
            return v0.resultAsync();
        }).handle((BiFunction<? super U, Throwable, ? extends U>) (computeJobDataHolder, th) -> {
            if (th != null) {
                if (th.getCause() instanceof ComputeException) {
                    ComputeException cause = th.getCause();
                    throw new IgniteException(ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR, "Streamer receiver failed: " + cause.getMessage(), cause);
                }
                ExceptionUtils.sneakyThrow(th);
            }
            return (byte[]) SharedComputeUtils.unmarshalArgOrResult(computeJobDataHolder, (Marshaller) null, (Class) null);
        });
    }

    @TestOnly
    ComputeComponent computeComponent() {
        return this.computeComponent;
    }

    private static <R> R sync(CompletableFuture<R> completableFuture) {
        try {
            return completableFuture.join();
        } catch (CompletionException e) {
            throw ((RuntimeException) ExceptionUtils.sneakyThrow(IgniteExceptionMapperUtil.mapToPublicException(ExceptionUtils.unwrapCause(e))));
        }
    }
}
