package org.apache.ignite.internal.compute;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.network.ClusterNode;
import org.gridgain.internal.security.context.GridGainSecurity;
import org.gridgain.internal.security.context.SecurityContext;

/* loaded from: input_file:org/apache/ignite/internal/compute/ComputeJobFailover.class */
class ComputeJobFailover<R> {
    private static final IgniteLogger LOG = Loggers.forClass(ComputeJobFailover.class);
    private final Executor executor;
    private final ComputeComponent computeComponent;
    private final LogicalTopologyService logicalTopologyService;
    private final TopologyService topologyService;
    private final NextWorkerSelector nextWorkerSelector;
    private final AtomicReference<ClusterNode> runningWorkerNode;
    private final RemoteExecutionContext<?, R> jobContext;

    /* loaded from: input_file:org/apache/ignite/internal/compute/ComputeJobFailover$OnNodeLeft.class */
    class OnNodeLeft implements LogicalTopologyEventListener {
        OnNodeLeft() {
        }

        public void onNodeLeft(LogicalNode logicalNode, LogicalTopologySnapshot logicalTopologySnapshot) {
            if (ComputeJobFailover.this.runningWorkerNode.get().id().equals(logicalNode.id())) {
                ComputeJobFailover.LOG.info("Worker node {} has left the cluster.", new Object[]{logicalNode.name()});
                ComputeJobFailover.this.executor.execute(this::selectNewWorker);
            }
        }

        private void selectNewWorker() {
            ComputeJobFailover.this.nextWorkerSelector.next().thenAccept(clusterNode -> {
                if (clusterNode == null) {
                    ComputeJobFailover.LOG.warn("No more worker nodes to restart the job. Failing the job {}.", new Object[]{ComputeJobFailover.this.jobContext.jobClassName()});
                    ComputeJobFailover.this.jobContext.failSafeJobExecution().completeExceptionally(new IgniteInternalException(ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR));
                } else if (ComputeJobFailover.this.topologyService.getByConsistentId(clusterNode.name()) == null) {
                    ComputeJobFailover.LOG.warn("Worker node {} is not found in the cluster", new Object[]{clusterNode.name()});
                    ComputeJobFailover.this.executor.execute(this::selectNewWorker);
                } else {
                    ComputeJobFailover.LOG.info("Restarting the job {} on node {}.", new Object[]{ComputeJobFailover.this.jobContext.jobClassName(), clusterNode.name()});
                    ComputeJobFailover.this.runningWorkerNode.set(clusterNode);
                    ComputeJobFailover.this.jobContext.updateJobExecution(ComputeJobFailover.this.launchJobOn(ComputeJobFailover.this.runningWorkerNode.get()));
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ComputeJobFailover(ComputeComponent computeComponent, LogicalTopologyService logicalTopologyService, TopologyService topologyService, ClusterNode clusterNode, NextWorkerSelector nextWorkerSelector, Executor executor, SecurityContext securityContext, List<DeploymentUnit> list, String str, ExecutionOptions executionOptions, Object obj) {
        this.computeComponent = computeComponent;
        this.runningWorkerNode = new AtomicReference<>(clusterNode);
        this.logicalTopologyService = logicalTopologyService;
        this.topologyService = topologyService;
        this.nextWorkerSelector = nextWorkerSelector;
        this.jobContext = new RemoteExecutionContext<>(securityContext, list, str, executionOptions, obj);
        this.executor = executor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JobExecution<R> failSafeExecute() {
        JobExecution<R> launchJobOn = launchJobOn(this.runningWorkerNode.get());
        this.jobContext.initJobExecution(new FailSafeJobExecution<>(launchJobOn));
        OnNodeLeft onNodeLeft = new OnNodeLeft();
        this.logicalTopologyService.addEventListener(onNodeLeft);
        launchJobOn.resultAsync().whenComplete((obj, th) -> {
            this.logicalTopologyService.removeEventListener(onNodeLeft);
        });
        return this.jobContext.failSafeJobExecution();
    }

    private JobExecution<R> launchJobOn(ClusterNode clusterNode) {
        return (JobExecution) GridGainSecurity.getWith(this.jobContext.securityContext(), () -> {
            return clusterNode.name().equals(this.topologyService.localMember().name()) ? this.computeComponent.executeLocally(this.jobContext.executionOptions(), this.jobContext.units(), this.jobContext.jobClassName(), this.jobContext.arg()) : this.computeComponent.executeRemotely(this.jobContext.executionOptions(), clusterNode, this.jobContext.units(), this.jobContext.jobClassName(), this.jobContext.arg());
        });
    }
}
