/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.compute;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.ignite3.compute.ComputeException;
import org.apache.ignite3.compute.JobExecution;
import org.apache.ignite3.compute.JobState;
import org.apache.ignite3.internal.compute.CancellableJobExecution;
import org.apache.ignite3.internal.compute.Cleaner;
import org.apache.ignite3.internal.compute.FailSafeJobExecution;
import org.apache.ignite3.internal.compute.configuration.ComputeConfiguration;
import org.apache.ignite3.internal.compute.messaging.RemoteJobExecution;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.lang.ErrorGroups;
import org.gridgain.internal.security.context.SecurityContext;
import org.gridgain.internal.security.context.SecurityContextIsNotSetException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class ExecutionManager {
    private final ComputeConfiguration computeConfiguration;
    private final TopologyService topologyService;
    private final Cleaner<JobExecution<?>> cleaner = new Cleaner();
    private final Map<UUID, RemoteJobExecution> remoteExecutions = new ConcurrentHashMap<UUID, RemoteJobExecution>();
    private final Map<UUID, CancellableJobExecution<?>> localExecutions = new ConcurrentHashMap();
    private final Map<UUID, String> ownership = new ConcurrentHashMap<UUID, String>();

    ExecutionManager(ComputeConfiguration computeConfiguration, TopologyService topologyService) {
        this.computeConfiguration = computeConfiguration;
        this.topologyService = topologyService;
    }

    void addRemoteExecution(UUID jobId, RemoteJobExecution execution, SecurityContext securityContext) {
        if (securityContext == null) {
            throw new SecurityContextIsNotSetException();
        }
        this.ownership.put(jobId, securityContext.authentication().username());
        this.remoteExecutions.put(jobId, execution);
        execution.resultAsync().whenComplete((r, throwable) -> this.cleaner.scheduleRemove(jobId));
    }

    void addLocalExecution(UUID jobId, CancellableJobExecution<?> execution, SecurityContext securityContext) {
        if (securityContext == null) {
            throw new SecurityContextIsNotSetException();
        }
        this.ownership.put(jobId, securityContext.authentication().username());
        this.localExecutions.put(jobId, execution);
        execution.resultAsync().whenComplete((r, throwable) -> this.cleaner.scheduleRemove(jobId));
    }

    void start() {
        long ttlMillis = (Long)this.computeConfiguration.statesLifetimeMillis().value();
        String nodeName = this.topologyService.localMember().name();
        this.cleaner.start(this::cleanExecution, ttlMillis, nodeName);
    }

    private void cleanExecution(UUID key) {
        this.remoteExecutions.remove(key);
        this.localExecutions.remove(key);
        this.ownership.remove(key);
    }

    void stop() {
        this.cleaner.stop();
    }

    public CompletableFuture<?> localResultAsync(UUID jobId) {
        JobExecution execution = this.localExecutions.get(jobId);
        if (execution != null) {
            return execution.resultAsync();
        }
        return CompletableFuture.failedFuture(new ComputeException(ErrorGroups.Compute.RESULT_NOT_FOUND_ERR, "Job result not found for the job with ID: " + jobId));
    }

    public CompletableFuture<List<JobState>> localStatesAsync(@Nullable String owner) {
        CompletableFuture[] statesFutures = (CompletableFuture[])this.localExecutions.values().stream().filter(it -> !(it instanceof FailSafeJobExecution)).map(JobExecution::stateAsync).toArray(CompletableFuture[]::new);
        return CompletableFutures.allOfToList(statesFutures).thenApply(states -> states.stream().filter(Objects::nonNull).filter(s -> owner == null || owner.equals(this.ownership.get(s.id()))).collect(Collectors.toList()));
    }

    public CompletableFuture<@Nullable JobState> stateAsync(UUID jobId) {
        JobExecution execution = this.localExecutions.get(jobId);
        if (execution != null) {
            return execution.stateAsync();
        }
        return CompletableFutures.nullCompletedFuture();
    }

    public CompletableFuture<@Nullable Boolean> cancelAsync(UUID jobId) {
        CancellableJobExecution<?> execution = this.localExecutions.get(jobId);
        if (execution != null) {
            return execution.cancelAsync();
        }
        return CompletableFutures.nullCompletedFuture();
    }

    public CompletableFuture<@Nullable Boolean> changePriorityAsync(UUID jobId, int newPriority) {
        JobExecution execution = this.localExecutions.get(jobId);
        if (execution != null) {
            return execution.changePriorityAsync(newPriority);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    @TestOnly
    public Map<UUID, RemoteJobExecution> remoteExecutions() {
        return this.remoteExecutions;
    }

    @TestOnly
    public Map<UUID, CancellableJobExecution<?>> localExecutions() {
        return this.localExecutions;
    }

    public String getOwner(UUID jobId) {
        return this.ownership.get(jobId);
    }
}

