/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.compute.executor;

import java.nio.file.Path;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.JobExecutorType;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.ComputeJobDataType;
import org.apache.ignite.internal.compute.ComputeUtils;
import org.apache.ignite.internal.compute.ExecutionOptions;
import org.apache.ignite.internal.compute.JobExecutionContextImpl;
import org.apache.ignite.internal.compute.SharedComputeUtils;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
import org.apache.ignite.internal.compute.executor.ComputeExecutor;
import org.apache.ignite.internal.compute.executor.JobExecutionInternal;
import org.apache.ignite.internal.compute.executor.SecuredIgniteProvider;
import org.apache.ignite.internal.compute.executor.platform.PlatformComputeTransport;
import org.apache.ignite.internal.compute.executor.platform.dotnet.DotNetComputeExecutor;
import org.apache.ignite.internal.compute.executor.wasm.ChicoryWasmComputeExecutor;
import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
import org.apache.ignite.internal.compute.queue.QueueExecution;
import org.apache.ignite.internal.compute.state.ComputeStateMachine;
import org.apache.ignite.internal.compute.task.JobSubmitter;
import org.apache.ignite.internal.compute.task.TaskExecutionContextImpl;
import org.apache.ignite.internal.compute.task.TaskExecutionInternal;
import org.apache.ignite.internal.deployunit.loader.UnitsClassLoader;
import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.marshalling.Marshaller;
import org.gridgain.internal.security.context.GridGainSecurity;
import org.gridgain.internal.security.context.SecurityContext;
import org.jetbrains.annotations.Nullable;

public class ComputeExecutorImpl
implements ComputeExecutor {
    private static final IgniteLogger LOG = Loggers.forClass(ComputeExecutorImpl.class);
    private final Ignite ignite;
    private final SecuredIgniteProvider securedIgniteProvider;
    private final ComputeConfiguration configuration;
    private final ComputeStateMachine stateMachine;
    private final TopologyService topologyService;
    private final ClockService clockService;
    private final EventLog eventLog;
    private PriorityQueueExecutor executorService;
    @Nullable
    private DotNetComputeExecutor dotNetComputeExecutor;
    private final ChicoryWasmComputeExecutor wasmComputeExecutor;

    public ComputeExecutorImpl(Ignite ignite, SecuredIgniteProvider securedIgniteProvider, ComputeStateMachine stateMachine, ComputeConfiguration configuration, TopologyService topologyService, ClockService clockService, EventLog eventLog) {
        this.ignite = ignite;
        this.securedIgniteProvider = securedIgniteProvider;
        this.configuration = configuration;
        this.stateMachine = stateMachine;
        this.topologyService = topologyService;
        this.clockService = clockService;
        this.eventLog = eventLog;
        this.wasmComputeExecutor = new ChicoryWasmComputeExecutor(configuration.wasm());
    }

    public void setPlatformComputeTransport(PlatformComputeTransport transport) {
        this.dotNetComputeExecutor = new DotNetComputeExecutor(transport);
    }

    @Override
    public JobExecutionInternal<ComputeJobDataHolder> executeJob(ExecutionOptions options, String jobClassName, UnitsClassLoader classLoader, ComputeEventMetadataBuilder metadataBuilder, SecurityContext securityContext, @Nullable ComputeJobDataHolder arg) {
        assert (this.executorService != null);
        AtomicBoolean isInterrupted = new AtomicBoolean();
        Ignite securedIgnite = this.securedIgniteProvider.createSecuredIgnite(securityContext);
        JobExecutionContextImpl context = new JobExecutionContextImpl(securedIgnite, isInterrupted, classLoader, options.partition());
        metadataBuilder.jobClassName(jobClassName).targetNode(this.ignite.name());
        Callable<CompletableFuture<ComputeJobDataHolder>> jobCallable = this.getJobCallable(options.executorType(), jobClassName, classLoader, arg, context);
        jobCallable = ComputeExecutorImpl.addObservableTimestamp(jobCallable, this.clockService);
        QueueExecution<ComputeJobDataHolder> execution = this.executorService.submit(GridGainSecurity.callWith((SecurityContext)securityContext, jobCallable), options.priority(), options.maxRetries(), metadataBuilder);
        return new JobExecutionInternal<ComputeJobDataHolder>(execution, isInterrupted, null, false, this.topologyService.localMember());
    }

    private static Callable<CompletableFuture<ComputeJobDataHolder>> addObservableTimestamp(Callable<CompletableFuture<ComputeJobDataHolder>> jobCallable, ClockService clockService) {
        return () -> {
            CompletableFuture jobFut = (CompletableFuture)jobCallable.call();
            if (jobFut == null) {
                return CompletableFuture.completedFuture(new ComputeJobDataHolder(ComputeJobDataType.NATIVE, null, Long.valueOf(clockService.currentLong())));
            }
            return jobFut.thenApply(holder -> {
                if (holder == null) {
                    return new ComputeJobDataHolder(ComputeJobDataType.NATIVE, null, Long.valueOf(clockService.currentLong()));
                }
                return new ComputeJobDataHolder(holder.type(), holder.data(), Long.valueOf(clockService.currentLong()));
            });
        };
    }

    private Callable<CompletableFuture<ComputeJobDataHolder>> getJobCallable(JobExecutorType executorType, String jobClassName, UnitsClassLoader classLoader, @Nullable ComputeJobDataHolder arg, JobExecutionContext context) {
        executorType = executorType == null ? JobExecutorType.JAVA_EMBEDDED : executorType;
        switch (executorType) {
            case JAVA_EMBEDDED: {
                return ComputeExecutorImpl.getJavaJobCallable(jobClassName, classLoader, arg, context);
            }
            case DOTNET_SIDECAR: {
                DotNetComputeExecutor dotNetExec = this.dotNetComputeExecutor;
                if (dotNetExec == null) {
                    throw new IllegalStateException("DotNetComputeExecutor is not set");
                }
                return dotNetExec.getJobCallable(jobClassName, arg, context);
            }
            case WASM_EMBEDDED: {
                return this.wasmComputeExecutor.getJobCallable(jobClassName, arg, context);
            }
        }
        throw new IllegalArgumentException("Unsupported executor type: " + executorType);
    }

    private static Callable<CompletableFuture<ComputeJobDataHolder>> getJavaJobCallable(String jobClassName, UnitsClassLoader classLoader, @Nullable ComputeJobDataHolder arg, JobExecutionContext context) {
        Class jobClass = ComputeUtils.jobClass(classLoader, jobClassName);
        ComputeJob jobInstance = ComputeUtils.instantiateJob(jobClass);
        Marshaller inputMarshaller = jobInstance.inputMarshaller();
        Marshaller resultMarshaller = jobInstance.resultMarshaller();
        return ComputeExecutorImpl.unmarshalExecMarshal(arg, jobClass, jobInstance, context, inputMarshaller, resultMarshaller);
    }

    private static <T, R> Callable<CompletableFuture<ComputeJobDataHolder>> unmarshalExecMarshal(@Nullable ComputeJobDataHolder arg, Class<? extends ComputeJob<T, R>> jobClass, ComputeJob<T, R> jobInstance, JobExecutionContext context, @Nullable Marshaller<T, byte[]> inputMarshaller, @Nullable Marshaller<R, byte[]> resultMarshaller) {
        return () -> {
            CompletableFuture userJobFut = jobInstance.executeAsync(context, ComputeUtils.unmarshalOrNotIfNull(inputMarshaller, arg, ComputeUtils.getJobExecuteArgumentType(jobClass), jobClass.getClassLoader()));
            return userJobFut == null ? null : userJobFut.thenApply(r -> SharedComputeUtils.marshalArgOrResult((Object)r, (Marshaller)resultMarshaller));
        };
    }

    @Override
    public <I, M, T, R> TaskExecutionInternal<I, M, T, R> executeTask(JobSubmitter<M, T> jobSubmitter, String taskClassName, UnitsClassLoader classLoader, ComputeEventMetadataBuilder metadataBuilder, @Nullable I arg) {
        assert (this.executorService != null);
        AtomicBoolean isCancelled = new AtomicBoolean();
        TaskExecutionContextImpl context = new TaskExecutionContextImpl(this.ignite, isCancelled);
        metadataBuilder.jobClassName(taskClassName).targetNode(this.ignite.name());
        Class taskClass = ComputeUtils.taskClass(classLoader, taskClassName);
        return new TaskExecutionInternal(this.executorService, this.eventLog, jobSubmitter, taskClass, context, isCancelled, metadataBuilder, arg);
    }

    @Override
    public void start() {
        this.stateMachine.start();
        IgniteThreadFactory threadFactory = IgniteThreadFactory.create((String)this.ignite.name(), (String)"compute", (IgniteLogger)LOG, (ThreadOperation[])new ThreadOperation[]{ThreadOperation.STORAGE_READ, ThreadOperation.STORAGE_WRITE});
        this.executorService = new PriorityQueueExecutor(this.configuration, (ThreadFactory)threadFactory, this.stateMachine, this.eventLog);
    }

    @Override
    public void stop() {
        this.stateMachine.stop();
        this.executorService.shutdown();
        DotNetComputeExecutor dotNetExec = this.dotNetComputeExecutor;
        if (dotNetExec != null) {
            dotNetExec.stop();
        }
    }

    public void onUnitRemoving(Path unitPath) {
        DotNetComputeExecutor dotNetExec = this.dotNetComputeExecutor;
        if (dotNetExec != null) {
            dotNetExec.beginUndeployUnit(unitPath);
        }
    }
}

