package org.apache.ignite.internal.compute.executor.platform.dotnet;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.executor.platform.PlatformComputeConnection;
import org.apache.ignite.internal.compute.executor.platform.PlatformComputeTransport;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.TraceableException;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/compute/executor/platform/dotnet/DotNetComputeExecutor.class */
public class DotNetComputeExecutor {
    private static final IgniteLogger LOG;
    private static final SecureRandom SECURE_RANDOM;
    private static final String DOTNET_BINARY_PATH;
    private static final int PROCESS_START_TIMEOUT_MS = 5000;
    private static final int PROCESS_START_MAX_ATTEMPTS = 2;
    private final PlatformComputeTransport transport;
    private final AtomicLong jobIdGen = new AtomicLong();
    private DotNetExecutorProcess process;
    static final /* synthetic */ boolean $assertionsDisabled;

    public DotNetComputeExecutor(PlatformComputeTransport platformComputeTransport) {
        if (!$assertionsDisabled && platformComputeTransport == null) {
            throw new AssertionError();
        }
        this.transport = platformComputeTransport;
    }

    public Callable<CompletableFuture<ComputeJobDataHolder>> getJobCallable(List<String> list, String str, ComputeJobDataHolder computeJobDataHolder, JobExecutionContext jobExecutionContext) {
        return () -> {
            return executeJobAsync(list, str, computeJobDataHolder, jobExecutionContext);
        };
    }

    public synchronized void stop() {
        if (this.process != null) {
            this.process.process().destroy();
        }
    }

    private CompletableFuture<ComputeJobDataHolder> executeJobAsync(List<String> list, String str, ComputeJobDataHolder computeJobDataHolder, JobExecutionContext jobExecutionContext) {
        if (jobExecutionContext.isCancelled()) {
            return CompletableFuture.failedFuture(new CancellationException("Job was cancelled"));
        }
        long incrementAndGet = this.jobIdGen.incrementAndGet();
        return getPlatformComputeConnectionWithRetryAsync().thenCompose(dotNetExecutorProcess -> {
            return dotNetExecutorProcess.connectionFut().thenCompose(platformComputeConnection -> {
                return platformComputeConnection.executeJobAsync(incrementAndGet, list, str, computeJobDataHolder);
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                TraceableException unwrapCause = ExceptionUtils.unwrapCause(th);
                if (!(unwrapCause instanceof TraceableException)) {
                    throw new IgniteException(ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR, ".NET job failed: " + unwrapCause.getMessage(), th);
                }
                TraceableException traceableException = unwrapCause;
                if (traceableException.code() != ErrorGroups.Client.SERVER_TO_CLIENT_REQUEST_ERR) {
                    throw new IgniteException(traceableException.traceId(), traceableException.code(), ".NET job failed: " + unwrapCause.getMessage(), th);
                }
                throw new IgniteException(traceableException.traceId(), traceableException.code(), ".NET compute executor connection lost", handleTransportError(dotNetExecutorProcess.process(), unwrapCause));
            });
        });
    }

    private CompletableFuture<DotNetExecutorProcess> getPlatformComputeConnectionWithRetryAsync() {
        CompletableFuture<DotNetExecutorProcess> completableFuture = new CompletableFuture<>();
        getPlatformComputeConnectionWithRetryAsync(completableFuture, null);
        return completableFuture;
    }

    private void getPlatformComputeConnectionWithRetryAsync(CompletableFuture<DotNetExecutorProcess> completableFuture, @Nullable List<Throwable> list) {
        getPlatformComputeConnection().handle((dotNetExecutorProcess, th) -> {
            if (th == null) {
                completableFuture.complete(dotNetExecutorProcess);
                return null;
            }
            List arrayList = list == null ? new ArrayList() : list;
            arrayList.add(th);
            if (arrayList.size() < 2) {
                getPlatformComputeConnectionWithRetryAsync(completableFuture, arrayList);
                return null;
            }
            IgniteException igniteException = new IgniteException(ErrorGroups.Common.INTERNAL_ERR, "Could not start .NET executor process in 2 attempts");
            Iterator<Throwable> it = arrayList.iterator();
            while (it.hasNext()) {
                igniteException.addSuppressed(it.next());
            }
            completableFuture.completeExceptionally(igniteException);
            return null;
        });
    }

    private CompletableFuture<DotNetExecutorProcess> getPlatformComputeConnection() {
        CompletableFuture<DotNetExecutorProcess> completableFuture = new CompletableFuture<>();
        DotNetExecutorProcess ensureProcessStarted = ensureProcessStarted();
        ensureProcessStarted.connectionFut().orTimeout(5000L, TimeUnit.MILLISECONDS).handle((platformComputeConnection, th) -> {
            if (th == null && platformComputeConnection.isActive()) {
                completableFuture.complete(ensureProcessStarted);
                return null;
            }
            completableFuture.completeExceptionally(handleTransportError(ensureProcessStarted.process(), th));
            return null;
        });
        return completableFuture;
    }

    private static Throwable handleTransportError(Process process, @Nullable Throwable th) {
        String processOutputTail = getProcessOutputTail(process, 10000);
        if (process.isAlive()) {
            process.destroyForcibly();
        }
        return (th != null && (ExceptionUtils.unwrapCause(th) instanceof TraceableException) && ((TraceableException) th).code() == ErrorGroups.Client.PROTOCOL_COMPATIBILITY_ERR) ? th : new IgniteException(ErrorGroups.Compute.COMPUTE_PLATFORM_EXECUTOR_ERR, ".NET executor process failed to establish connection with the server: " + processOutputTail, th);
    }

    private static String getProcessOutputTail(Process process, int i) {
        try {
            InputStream inputStream = process.getInputStream();
            while (inputStream.available() > i) {
                int available = inputStream.available() - i;
                long skip = inputStream.skip(available);
                if (!$assertionsDisabled && skip != available) {
                    AssertionError assertionError = new AssertionError(skip + " != " + assertionError);
                    throw assertionError;
                }
            }
            return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
        } catch (IOException e) {
            return "Failed to read process output: " + e.getMessage();
        }
    }

    private synchronized DotNetExecutorProcess ensureProcessStarted() {
        if (isDead(this.process)) {
            String generateSecureRandomId = generateSecureRandomId();
            CompletableFuture<PlatformComputeConnection> registerComputeExecutorId = this.transport.registerComputeExecutorId(generateSecureRandomId);
            String str = DOTNET_BINARY_PATH;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Starting .NET executor process [executorId={}, binaryPath={}]", new Object[]{generateSecureRandomId, str});
            }
            Process startDotNetProcess = startDotNetProcess(this.transport.serverAddress(), this.transport.sslEnabled(), generateSecureRandomId, str);
            startDotNetProcess.onExit().thenRun(() -> {
                if (registerComputeExecutorId.completeExceptionally(handleTransportError(startDotNetProcess, new IgniteException(ErrorGroups.Compute.COMPUTE_PLATFORM_EXECUTOR_ERR, ".NET executor process exited")))) {
                    return;
                }
                registerComputeExecutorId.thenAccept((v0) -> {
                    v0.close();
                });
            });
            this.process = new DotNetExecutorProcess(startDotNetProcess, registerComputeExecutorId);
        }
        return this.process;
    }

    private static boolean isDead(DotNetExecutorProcess dotNetExecutorProcess) {
        if (dotNetExecutorProcess == null || !dotNetExecutorProcess.process().isAlive()) {
            return true;
        }
        PlatformComputeConnection now = dotNetExecutorProcess.connectionFut().getNow(null);
        return (now == null || now.isActive()) ? false : true;
    }

    static Process startDotNetProcess(String str, boolean z, String str2, String str3) {
        ProcessBuilder processBuilder = new ProcessBuilder("dotnet", str3);
        processBuilder.redirectErrorStream(true);
        processBuilder.environment().put("IGNITE_COMPUTE_EXECUTOR_SERVER_ADDRESS", str);
        processBuilder.environment().put("IGNITE_COMPUTE_EXECUTOR_SERVER_SSL_ENABLED", Boolean.toString(z));
        processBuilder.environment().put("IGNITE_COMPUTE_EXECUTOR_ID", str2);
        try {
            return processBuilder.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static String resolveDotNetBinaryPath() {
        return resolveDotNetBinaryDir().resolve("Apache.Ignite.Internal.ComputeExecutor.dll").normalize().toString();
    }

    private static Path resolveDotNetBinaryDir() {
        Path currentClassPath = getCurrentClassPath();
        return currentClassPath.endsWith(Paths.get("modules", "compute", "build", "classes", "java", "main")) ? currentClassPath.resolve(Path.of("..", "..", "..", "..", "..", "platforms", "dotnet", "Apache.Ignite.Internal.ComputeExecutor", "bin", "Debug", "net8.0")) : currentClassPath.getParent().endsWith(Paths.get("modules", "compute", "build", "libs")) ? currentClassPath.getParent().resolve(Path.of("..", "..", "..", "platforms", "dotnet", "Apache.Ignite.Internal.ComputeExecutor", "bin", "Debug", "net8.0")) : currentClassPath.getParent().resolve("dotnet");
    }

    private static Path getCurrentClassPath() {
        try {
            return Paths.get(DotNetComputeExecutor.class.getProtectionDomain().getCodeSource().getLocation().toURI());
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }

    private static String generateSecureRandomId() {
        byte[] bArr = new byte[64];
        SECURE_RANDOM.nextBytes(bArr);
        return new String(Base64.getEncoder().encode(bArr), StandardCharsets.UTF_8);
    }

    static {
        $assertionsDisabled = !DotNetComputeExecutor.class.desiredAssertionStatus();
        LOG = Loggers.forClass(DotNetComputeExecutor.class);
        SECURE_RANDOM = new SecureRandom();
        DOTNET_BINARY_PATH = resolveDotNetBinaryPath();
    }
}
