package org.apache.ignite3.internal.compute;

import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite3.compute.AnyNodeJobTarget;
import org.apache.ignite3.compute.ColocatedJobTarget;
import org.apache.ignite3.compute.ComputeException;
import org.apache.ignite3.compute.JobDescriptor;
import org.apache.ignite3.compute.JobExecution;
import org.apache.ignite3.compute.JobExecutionOptions;
import org.apache.ignite3.compute.JobState;
import org.apache.ignite3.compute.JobTarget;
import org.apache.ignite3.compute.NodeNotFoundException;
import org.apache.ignite3.compute.TaskDescriptor;
import org.apache.ignite3.compute.task.MapReduceJob;
import org.apache.ignite3.compute.task.TaskExecution;
import org.apache.ignite3.deployment.DeploymentUnit;
import org.apache.ignite3.internal.client.proto.StreamerReceiverSerializer;
import org.apache.ignite3.internal.compute.streamer.StreamerReceiverJob;
import org.apache.ignite3.internal.hlc.HybridClock;
import org.apache.ignite3.internal.lang.IgniteExceptionMapperUtil;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.placementdriver.PlacementDriver;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.table.IgniteTablesInternal;
import org.apache.ignite3.internal.table.StreamerReceiverRunner;
import org.apache.ignite3.internal.table.TableViewInternal;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.lang.IgniteException;
import org.apache.ignite3.lang.TableNotFoundException;
import org.apache.ignite3.lang.util.IgniteNameUtils;
import org.apache.ignite3.marshalling.Marshaller;
import org.apache.ignite3.network.ClusterNode;
import org.apache.ignite3.table.ReceiverDescriptor;
import org.apache.ignite3.table.Tuple;
import org.apache.ignite3.table.mapper.Mapper;
import org.gridgain.internal.rbac.authorization.Authorizer;
import org.gridgain.internal.rbac.privileges.Action;
import org.gridgain.internal.rbac.privileges.Privilege;
import org.gridgain.internal.rbac.privileges.Selector;
import org.gridgain.internal.security.context.GridGainSecurity;
import org.gridgain.internal.security.context.SecurityContext;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/* loaded from: input_file:org/apache/ignite3/internal/compute/IgniteComputeImpl.class */
public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceiverRunner {
    private final TopologyService topologyService;
    private final IgniteTablesInternal tables;
    private final ComputeComponent computeComponent;
    private final Authorizer authorizer;
    private final PlacementDriver placementDriver;
    private final HybridClock clock;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite3/internal/compute/IgniteComputeImpl$DeqNextWorkerSelector.class */
    public static class DeqNextWorkerSelector implements NextWorkerSelector {
        private final ConcurrentLinkedDeque<ClusterNode> deque;

        private DeqNextWorkerSelector(ConcurrentLinkedDeque<ClusterNode> concurrentLinkedDeque) {
            this.deque = concurrentLinkedDeque;
        }

        @Override // org.apache.ignite3.internal.compute.NextWorkerSelector
        public CompletableFuture<ClusterNode> next() {
            try {
                return CompletableFuture.completedFuture(this.deque.pop());
            } catch (NoSuchElementException e) {
                return CompletableFutures.nullCompletedFuture();
            }
        }
    }

    public IgniteComputeImpl(PlacementDriver placementDriver, TopologyService topologyService, IgniteTablesInternal igniteTablesInternal, ComputeComponent computeComponent, HybridClock hybridClock, Authorizer authorizer) {
        this.placementDriver = placementDriver;
        this.topologyService = topologyService;
        this.tables = igniteTablesInternal;
        this.computeComponent = computeComponent;
        this.authorizer = authorizer;
        this.clock = hybridClock;
        igniteTablesInternal.setStreamerReceiverRunner(this);
    }

    @Override // org.apache.ignite3.compute.IgniteCompute
    public <T, R> JobExecution<R> submit(JobTarget jobTarget, JobDescriptor<T, R> jobDescriptor, T t) {
        Objects.requireNonNull(jobTarget);
        Objects.requireNonNull(jobDescriptor);
        Marshaller<T, byte[]> argumentMarshaller = jobDescriptor.argumentMarshaller();
        Marshaller<R, byte[]> resultMarshaller = jobDescriptor.resultMarshaller();
        if (jobTarget instanceof AnyNodeJobTarget) {
            return new ResultUnmarshallingJobExecution(executeAsyncWithFailover(((AnyNodeJobTarget) jobTarget).nodes(), jobDescriptor.units(), jobDescriptor.jobClassName(), jobDescriptor.options(), GridGainSecurity.systemContext(), Marshaller.tryMarshalOrCast(argumentMarshaller, t)), resultMarshaller);
        }
        if (!(jobTarget instanceof ColocatedJobTarget)) {
            throw new IllegalArgumentException("Unsupported job target: " + jobTarget);
        }
        ColocatedJobTarget colocatedJobTarget = (ColocatedJobTarget) jobTarget;
        Mapper<?> keyMapper = colocatedJobTarget.keyMapper();
        String tableName = colocatedJobTarget.tableName();
        Object key = colocatedJobTarget.key();
        return new ResultUnmarshallingJobExecution(new JobExecutionFutureWrapper(keyMapper != null ? requiredTable(tableName).thenCompose(tableViewInternal -> {
            return primaryReplicaForPartitionByMappedKey(tableViewInternal, key, keyMapper).thenApply(clusterNode -> {
                return executeOnOneNodeWithFailover(clusterNode, new NextColocatedWorkerSelector(this.placementDriver, this.topologyService, this.clock, tableViewInternal, key, keyMapper), jobDescriptor.units(), jobDescriptor.jobClassName(), jobDescriptor.options(), GridGainSecurity.systemContext(), Marshaller.tryMarshalOrCast(argumentMarshaller, t));
            });
        }) : requiredTable(tableName).thenCompose(tableViewInternal2 -> {
            return submitColocatedInternal(tableViewInternal2, (Tuple) key, jobDescriptor.units(), jobDescriptor.jobClassName(), jobDescriptor.options(), GridGainSecurity.systemContext(), Marshaller.tryMarshalOrCast(argumentMarshaller, t));
        }).thenApply((Function<? super U, ? extends U>) jobExecution -> {
            return jobExecution;
        })), resultMarshaller);
    }

    @Override // org.apache.ignite3.compute.IgniteCompute
    public <T, R> R execute(JobTarget jobTarget, JobDescriptor<T, R> jobDescriptor, T t) {
        return (R) sync(executeAsync(jobTarget, jobDescriptor, t));
    }

    @Override // org.apache.ignite3.internal.compute.IgniteComputeInternal
    public <R> JobExecution<R> executeAsyncWithFailover(Set<ClusterNode> set, List<DeploymentUnit> list, String str, JobExecutionOptions jobExecutionOptions, SecurityContext securityContext, @Nullable Object obj) {
        HashSet hashSet = new HashSet();
        for (ClusterNode clusterNode : set) {
            if (this.topologyService.getByConsistentId(clusterNode.name()) != null) {
                hashSet.add(clusterNode);
            }
        }
        if (hashSet.isEmpty()) {
            return new FailedExecution(new NodeNotFoundException((Set) set.stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toSet())));
        }
        ClusterNode randomNode = randomNode(hashSet);
        hashSet.remove(randomNode);
        return new JobExecutionWrapper(executeOnOneNodeWithFailover(randomNode, new DeqNextWorkerSelector(new ConcurrentLinkedDeque(hashSet)), list, str, jobExecutionOptions, securityContext, obj));
    }

    private static ClusterNode randomNode(Set<ClusterNode> set) {
        int nextInt = ThreadLocalRandom.current().nextInt(set.size());
        Iterator<ClusterNode> it = set.iterator();
        for (int i = 0; i < nextInt; i++) {
            it.next();
        }
        return it.next();
    }

    private <T, R> JobExecution<R> executeOnOneNodeWithFailover(ClusterNode clusterNode, NextWorkerSelector nextWorkerSelector, List<DeploymentUnit> list, String str, JobExecutionOptions jobExecutionOptions, SecurityContext securityContext, @Nullable T t) {
        ExecutionOptions from = ExecutionOptions.from(jobExecutionOptions);
        this.authorizer.authorizeAsync(securityContext, getPrivileges(list)).join();
        return (JobExecution) GridGainSecurity.with(securityContext, () -> {
            return isLocal(clusterNode) ? this.computeComponent.executeLocally(from, list, str, t) : this.computeComponent.executeRemotelyWithFailover(clusterNode, nextWorkerSelector, list, str, from, t);
        }).get();
    }

    private static Set<Privilege> getPrivileges(List<DeploymentUnit> list) {
        return list.isEmpty() ? Set.of(Privilege.fromAction(Action.EXEC_JOB)) : (Set) list.stream().map((v0) -> {
            return v0.name();
        }).map(Selector::deploymentUnit).map(selector -> {
            return Privilege.builder().selector(selector).action(Action.EXEC_JOB).build();
        }).collect(Collectors.toSet());
    }

    private boolean isLocal(ClusterNode clusterNode) {
        return clusterNode.name().equals(this.topologyService.localMember().name());
    }

    @Override // org.apache.ignite3.internal.compute.IgniteComputeInternal
    public <R> CompletableFuture<JobExecution<R>> submitColocatedInternal(TableViewInternal tableViewInternal, Tuple tuple, List<DeploymentUnit> list, String str, JobExecutionOptions jobExecutionOptions, SecurityContext securityContext, @Nullable Object obj) {
        return (CompletableFuture<JobExecution<R>>) primaryReplicaForPartitionByTupleKey(tableViewInternal, tuple).thenApply(clusterNode -> {
            return executeOnOneNodeWithFailover(clusterNode, new NextColocatedWorkerSelector(this.placementDriver, this.topologyService, this.clock, tableViewInternal, tuple), list, str, jobExecutionOptions, securityContext, obj);
        });
    }

    private CompletableFuture<TableViewInternal> requiredTable(String str) {
        String parseSimpleName = IgniteNameUtils.parseSimpleName(str);
        return this.tables.tableViewAsync(parseSimpleName).thenApply(tableViewInternal -> {
            if (tableViewInternal == null) {
                throw new TableNotFoundException("PUBLIC", parseSimpleName);
            }
            return tableViewInternal;
        });
    }

    private CompletableFuture<ClusterNode> primaryReplicaForPartitionByTupleKey(TableViewInternal tableViewInternal, Tuple tuple) {
        return primaryReplicaForPartition(tableViewInternal, tableViewInternal.partition(tuple));
    }

    private <K> CompletableFuture<ClusterNode> primaryReplicaForPartitionByMappedKey(TableViewInternal tableViewInternal, K k, Mapper<K> mapper) {
        return primaryReplicaForPartition(tableViewInternal, tableViewInternal.partition(k, mapper));
    }

    private CompletableFuture<ClusterNode> primaryReplicaForPartition(TableViewInternal tableViewInternal, int i) {
        return this.placementDriver.awaitPrimaryReplica(new TablePartitionId(tableViewInternal.tableId(), i), this.clock.now(), 30L, TimeUnit.SECONDS).thenApply(replicaMeta -> {
            if (replicaMeta == null || replicaMeta.getLeaseholderId() == null) {
                throw new ComputeException(ErrorGroups.Compute.PRIMARY_REPLICA_RESOLVE_ERR, "Can not find primary replica for [table=" + tableViewInternal.name() + ", partition=" + i + "].");
            }
            return this.topologyService.getById(replicaMeta.getLeaseholderId());
        });
    }

    @Override // org.apache.ignite3.compute.IgniteCompute
    public <T, R> Map<ClusterNode, JobExecution<R>> submitBroadcast(Set<ClusterNode> set, JobDescriptor<T, R> jobDescriptor, T t) {
        Objects.requireNonNull(set);
        Objects.requireNonNull(jobDescriptor);
        Marshaller<T, byte[]> argumentMarshaller = jobDescriptor.argumentMarshaller();
        Marshaller<R, byte[]> resultMarshaller = jobDescriptor.resultMarshaller();
        return (Map) set.stream().collect(Collectors.toUnmodifiableMap(Function.identity(), clusterNode -> {
            return this.topologyService.getByConsistentId(clusterNode.name()) == null ? new FailedExecution(new NodeNotFoundException(Set.of(clusterNode.name()))) : new ResultUnmarshallingJobExecution(new JobExecutionWrapper(executeOnOneNodeWithFailover(clusterNode, CompletableFutures::nullCompletedFuture, jobDescriptor.units(), jobDescriptor.jobClassName(), jobDescriptor.options(), GridGainSecurity.systemContext(), Marshaller.tryMarshalOrCast(argumentMarshaller, t))), resultMarshaller);
        }));
    }

    @Override // org.apache.ignite3.compute.IgniteCompute
    public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T t) {
        Objects.requireNonNull(taskDescriptor);
        return submitMapReduceInternal(taskDescriptor.units(), taskDescriptor.taskClassName(), GridGainSecurity.systemContext(), t);
    }

    @Override // org.apache.ignite3.internal.compute.IgniteComputeInternal
    public <R> TaskExecution<R> submitMapReduceInternal(List<DeploymentUnit> list, String str, SecurityContext securityContext, Object obj) {
        return new TaskExecutionFutureWrapper(this.authorizer.authorizeAsync(securityContext, getPrivileges(list)).thenApply(GridGainSecurity.with(securityContext, r10 -> {
            return this.computeComponent.executeTask(this::submitJob, list, str, obj);
        })));
    }

    @Override // org.apache.ignite3.compute.IgniteCompute
    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T t) {
        return (R) sync(executeMapReduceAsync(taskDescriptor, t));
    }

    private <M, T> JobExecution<T> submitJob(MapReduceJob<M, T> mapReduceJob) {
        return submit(JobTarget.anyNode(mapReduceJob.nodes()), mapReduceJob.jobDescriptor(), mapReduceJob.arg());
    }

    @Override // org.apache.ignite3.internal.compute.IgniteComputeInternal
    public CompletableFuture<Collection<JobState>> statesAsync() {
        return this.authorizer.authorizeAsync(Action.GET_JOB_STATES).thenCompose(r3 -> {
            return this.computeComponent.statesAsync();
        });
    }

    @Override // org.apache.ignite3.internal.compute.IgniteComputeInternal
    public CompletableFuture<JobState> stateAsync(UUID uuid) {
        return authorizeJobAction(uuid, Action.GET_JOB_STATE, () -> {
            return this.computeComponent.stateAsync(uuid);
        });
    }

    @Override // org.apache.ignite3.internal.compute.IgniteComputeInternal
    public CompletableFuture<Boolean> cancelAsync(UUID uuid) {
        return authorizeJobAction(uuid, Action.KILL_JOB, () -> {
            return this.computeComponent.cancelAsync(uuid);
        });
    }

    @Override // org.apache.ignite3.internal.compute.IgniteComputeInternal
    public CompletableFuture<Boolean> changePriorityAsync(UUID uuid, int i) {
        return authorizeJobAction(uuid, Action.CHANGE_JOB_PRIORITY, () -> {
            return this.computeComponent.changePriorityAsync(uuid, i);
        });
    }

    private <T> CompletableFuture<T> authorizeJobAction(UUID uuid, Action action, Supplier<CompletableFuture<T>> supplier) {
        return (CompletableFuture<T>) this.computeComponent.getOwner(uuid).thenCompose(GridGainSecurity.withContext(str -> {
            if (str == null) {
                return CompletableFutures.nullCompletedFuture();
            }
            return this.authorizer.authorizeThenCompose(Privilege.builder().action(action).selector(Selector.user(str)).build(), supplier);
        }));
    }

    @Override // org.apache.ignite3.internal.table.StreamerReceiverRunner
    public <A, I, R> CompletableFuture<Collection<R>> runReceiverAsync(ReceiverDescriptor<A> receiverDescriptor, @Nullable A a, Collection<I> collection, ClusterNode clusterNode, List<DeploymentUnit> list) {
        return (CompletableFuture<Collection<R>>) runReceiverAsync(StreamerReceiverSerializer.serializeReceiverInfoWithElementCount(receiverDescriptor, a, collection), clusterNode, list, GridGainSecurity.systemContext()).thenApply(StreamerReceiverSerializer::deserializeReceiverJobResults);
    }

    @Override // org.apache.ignite3.internal.table.StreamerReceiverRunner
    public CompletableFuture<byte[]> runReceiverAsync(byte[] bArr, ClusterNode clusterNode, List<DeploymentUnit> list, SecurityContext securityContext) {
        return executeAsyncWithFailover(Set.of(clusterNode), list, StreamerReceiverJob.class.getName(), JobExecutionOptions.DEFAULT, securityContext, bArr).resultAsync().handle((bArr2, th) -> {
            if (th != null) {
                if (th.getCause() instanceof ComputeException) {
                    ComputeException computeException = (ComputeException) th.getCause();
                    throw new IgniteException(ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR, "Streamer receiver failed: " + computeException.getMessage(), computeException);
                }
                ExceptionUtils.sneakyThrow(th);
            }
            return bArr2;
        });
    }

    @TestOnly
    ComputeComponent computeComponent() {
        return this.computeComponent;
    }

    private static <R> R sync(CompletableFuture<R> completableFuture) {
        try {
            return completableFuture.join();
        } catch (CompletionException e) {
            throw ((RuntimeException) ExceptionUtils.sneakyThrow(IgniteExceptionMapperUtil.mapToPublicException(ExceptionUtils.unwrapCause(e))));
        }
    }
}
