/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.compute;

import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite3.compute.JobExecution;
import org.apache.ignite3.compute.JobState;
import org.apache.ignite3.deployment.DeploymentUnit;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite3.internal.compute.CancellableJobExecution;
import org.apache.ignite3.internal.compute.CancellableTaskExecution;
import org.apache.ignite3.internal.compute.ClassLoaderExceptionsMapper;
import org.apache.ignite3.internal.compute.ComputeComponent;
import org.apache.ignite3.internal.compute.ComputeJobDataHolder;
import org.apache.ignite3.internal.compute.ComputeJobFailover;
import org.apache.ignite3.internal.compute.ComputeViewProvider;
import org.apache.ignite3.internal.compute.DelegatingJobExecution;
import org.apache.ignite3.internal.compute.ExecutionContext;
import org.apache.ignite3.internal.compute.ExecutionManager;
import org.apache.ignite3.internal.compute.NextWorkerSelector;
import org.apache.ignite3.internal.compute.TaskToJobExecutionWrapper;
import org.apache.ignite3.internal.compute.configuration.ComputeConfiguration;
import org.apache.ignite3.internal.compute.events.ComputeEventMetadataBuilder;
import org.apache.ignite3.internal.compute.executor.ComputeExecutor;
import org.apache.ignite3.internal.compute.executor.JobExecutionInternal;
import org.apache.ignite3.internal.compute.loader.JobContext;
import org.apache.ignite3.internal.compute.loader.JobContextManager;
import org.apache.ignite3.internal.compute.messaging.ComputeMessaging;
import org.apache.ignite3.internal.compute.messaging.RemoteJobExecution;
import org.apache.ignite3.internal.compute.task.DelegatingTaskExecution;
import org.apache.ignite3.internal.compute.task.JobSubmitter;
import org.apache.ignite3.internal.compute.task.TaskExecutionInternal;
import org.apache.ignite3.internal.eventlog.api.EventLog;
import org.apache.ignite3.internal.future.InFlightFutures;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.MessagingService;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.systemview.api.SystemView;
import org.apache.ignite3.internal.systemview.api.SystemViewProvider;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.lang.CancelHandleHelper;
import org.apache.ignite3.lang.CancellationToken;
import org.apache.ignite3.lang.ErrorGroups;
import org.gridgain.internal.security.context.SecurityContext;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class ComputeComponentImpl
implements ComputeComponent,
SystemViewProvider {
    private static final IgniteLogger LOG = Loggers.forClass(ComputeComponentImpl.class);
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final InFlightFutures inFlightFutures = new InFlightFutures();
    private final TopologyService topologyService;
    private final LogicalTopologyService logicalTopologyService;
    private final JobContextManager jobContextManager;
    private final ComputeExecutor executor;
    private final EventLog eventLog;
    private final ComputeMessaging messaging;
    private final ExecutionManager executionManager;
    private final ExecutorService failoverExecutor;
    private final ComputeViewProvider computeViewProvider = new ComputeViewProvider();

    public ComputeComponentImpl(String nodeName, MessagingService messagingService, TopologyService topologyService, LogicalTopologyService logicalTopologyService, JobContextManager jobContextManager, ComputeExecutor executor, ComputeConfiguration computeConfiguration, EventLog eventLog) {
        this.topologyService = topologyService;
        this.logicalTopologyService = logicalTopologyService;
        this.jobContextManager = jobContextManager;
        this.executor = executor;
        this.eventLog = eventLog;
        this.executionManager = new ExecutionManager(computeConfiguration, topologyService);
        this.messaging = new ComputeMessaging(this.executionManager, messagingService, topologyService);
        this.failoverExecutor = Executors.newSingleThreadExecutor(IgniteThreadFactory.create(nodeName, "compute-job-failover", LOG, new ThreadOperation[0]));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> executeLocally(ExecutionContext executionContext, @Nullable CancellationToken cancellationToken) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)new NodeStoppingException()));
        }
        try {
            CompletableFuture<JobContext> classLoaderFut = this.jobContextManager.acquireClassLoader(executionContext.units());
            CompletionStage future = ClassLoaderExceptionsMapper.mapClassLoaderExceptions(classLoaderFut, executionContext.jobClassName()).thenApply(context -> {
                JobExecutionInternal<ComputeJobDataHolder> execution = this.execJob((JobContext)context, executionContext);
                execution.resultAsync().whenComplete((result, e) -> context.close());
                this.inFlightFutures.registerFuture(execution.resultAsync());
                if (cancellationToken != null) {
                    CancelHandleHelper.addCancelAction(cancellationToken, execution::cancel, execution.resultAsync());
                }
                DelegatingJobExecution delegatingExecution = new DelegatingJobExecution(execution);
                this.executionManager.addLocalExecution(execution.state().id(), delegatingExecution, executionContext.securityContext());
                return delegatingExecution;
            });
            this.inFlightFutures.registerFuture(future);
            this.inFlightFutures.registerFuture(classLoaderFut);
            if (cancellationToken != null) {
                CancelHandleHelper.addCancelAction(cancellationToken, classLoaderFut);
                CancelHandleHelper.addCancelAction(cancellationToken, future);
            }
            CompletionStage completionStage = future;
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <I, M, T, R> CancellableTaskExecution<R> executeTask(JobSubmitter<M, T> jobSubmitter, List<DeploymentUnit> units, String taskClassName, ComputeEventMetadataBuilder metadataBuilder, SecurityContext securityContext, @Nullable I arg) {
        if (!this.busyLock.enterBusy()) {
            return new DelegatingTaskExecution(CompletableFuture.failedFuture(new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)new NodeStoppingException())));
        }
        try {
            CompletionStage taskFuture = ClassLoaderExceptionsMapper.mapClassLoaderExceptions(this.jobContextManager.acquireClassLoader(units), taskClassName).thenApply(context -> {
                TaskExecutionInternal execution = this.execTask((JobContext)context, jobSubmitter, taskClassName, metadataBuilder, arg);
                execution.resultAsync().whenComplete((r, e) -> context.close());
                this.inFlightFutures.registerFuture(execution.resultAsync());
                return execution;
            });
            this.inFlightFutures.registerFuture(taskFuture);
            DelegatingTaskExecution result = new DelegatingTaskExecution(taskFuture);
            result.idAsync().thenAccept(jobId -> this.executionManager.addLocalExecution((UUID)jobId, new TaskToJobExecutionWrapper(result, this.topologyService.localMember()), securityContext));
            DelegatingTaskExecution delegatingTaskExecution = result;
            return delegatingTaskExecution;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> executeRemotely(InternalClusterNode remoteNode, ExecutionContext executionContext, @Nullable CancellationToken cancellationToken) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)new NodeStoppingException()));
        }
        try {
            CompletableFuture<UUID> jobIdFuture = this.messaging.remoteExecuteRequestAsync(remoteNode, executionContext);
            this.inFlightFutures.registerFuture(jobIdFuture);
            CompletionStage completionStage = jobIdFuture.thenApply(jobId -> {
                RemoteJobExecution execution = new RemoteJobExecution(remoteNode, (UUID)jobId, this.inFlightFutures, this.messaging);
                if (cancellationToken != null) {
                    CancelHandleHelper.addCancelAction(cancellationToken, execution::cancelAsync, execution.resultAsync());
                }
                this.executionManager.addRemoteExecution((UUID)jobId, execution, executionContext.securityContext());
                return execution;
            });
            return completionStage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    @Override
    public CompletableFuture<JobExecution<ComputeJobDataHolder>> executeRemotelyWithFailover(InternalClusterNode remoteNode, NextWorkerSelector nextWorkerSelector, ExecutionContext executionContext, @Nullable CancellationToken cancellationToken) {
        return ComputeJobFailover.failSafeExecute(this, this.logicalTopologyService, this.topologyService, this.failoverExecutor, this.eventLog, remoteNode, nextWorkerSelector, executionContext).thenApply(execution -> {
            if (cancellationToken != null) {
                CancelHandleHelper.addCancelAction(cancellationToken, execution::cancelAsync, execution.resultAsync());
            }
            execution.idAsync().thenAccept(jobId -> this.executionManager.addLocalExecution((UUID)jobId, (CancellableJobExecution<?>)execution, executionContext.securityContext()));
            return execution;
        });
    }

    @Override
    public CompletableFuture<Collection<JobState>> statesAsync() {
        return this.messaging.broadcastStatesAsync();
    }

    @Override
    public CompletableFuture<@Nullable JobState> stateAsync(UUID jobId) {
        return this.executionManager.stateAsync(jobId).thenCompose(state -> {
            if (state != null) {
                return CompletableFuture.completedFuture(state);
            }
            return this.messaging.broadcastStateAsync(jobId);
        });
    }

    @Override
    public CompletableFuture<@Nullable Boolean> cancelAsync(UUID jobId) {
        return this.executionManager.cancelAsync(jobId).thenCompose(result -> {
            if (result != null) {
                return CompletableFuture.completedFuture(result);
            }
            return this.messaging.broadcastCancelAsync(jobId);
        });
    }

    @Override
    public CompletableFuture<@Nullable Boolean> changePriorityAsync(UUID jobId, int newPriority) {
        return this.executionManager.changePriorityAsync(jobId, newPriority).thenCompose(result -> {
            if (result != null) {
                return CompletableFuture.completedFuture(result);
            }
            return this.messaging.broadcastChangePriorityAsync(jobId, newPriority);
        });
    }

    @Override
    public CompletableFuture<@Nullable String> getOwner(UUID jobId) {
        String owner = this.executionManager.getOwner(jobId);
        if (owner != null) {
            return CompletableFuture.completedFuture(owner);
        }
        return this.messaging.broadcastGetOwnerAsync(jobId);
    }

    @Override
    public CompletableFuture<Collection<JobState>> statesAsyncByOwner(String owner) {
        return this.messaging.broadcastStatesByOwnerAsync(owner);
    }

    @Override
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        this.executor.start();
        this.executionManager.start();
        this.computeViewProvider.init(this.executionManager);
        return CompletableFutures.nullCompletedFuture();
    }

    @Override
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.busyLock.block();
        this.inFlightFutures.cancelInFlightFutures();
        this.executionManager.stop();
        this.messaging.stop();
        this.executor.stop();
        this.computeViewProvider.stop();
        IgniteUtils.shutdownAndAwaitTermination(this.failoverExecutor, 10L, TimeUnit.SECONDS);
        return CompletableFutures.nullCompletedFuture();
    }

    public void enable() {
        this.messaging.start(executionContext -> this.executeLocally(executionContext, null));
    }

    private JobExecutionInternal<ComputeJobDataHolder> execJob(JobContext context, ExecutionContext executionContext) {
        try {
            return this.executor.executeJob(executionContext.options(), executionContext.jobClassName(), context.classLoader(), executionContext.metadataBuilder(), executionContext.securityContext(), executionContext.arg());
        }
        catch (Throwable e) {
            context.close();
            throw e;
        }
    }

    private <I, M, T, R> TaskExecutionInternal<I, M, T, R> execTask(JobContext context, JobSubmitter<M, T> jobSubmitter, String taskClassName, ComputeEventMetadataBuilder metadataBuilder, @Nullable I arg) {
        try {
            return this.executor.executeTask(jobSubmitter, taskClassName, context.classLoader(), metadataBuilder, arg);
        }
        catch (Throwable e) {
            context.close();
            throw e;
        }
    }

    @TestOnly
    public ExecutionManager executionManager() {
        return this.executionManager;
    }

    @Override
    public List<SystemView<?>> systemViews() {
        return List.of(this.computeViewProvider.get());
    }
}

