/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.compute.messaging;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.ComputeMessageTypes;
import org.apache.ignite.internal.compute.ComputeMessagesFactory;
import org.apache.ignite.internal.compute.ComputeUtils;
import org.apache.ignite.internal.compute.ExecutionContext;
import org.apache.ignite.internal.compute.ExecutionManager;
import org.apache.ignite.internal.compute.JobStarter;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
import org.apache.ignite.internal.compute.message.ExecuteRequest;
import org.apache.ignite.internal.compute.message.ExecuteRequestV2;
import org.apache.ignite.internal.compute.message.ExecuteResponse;
import org.apache.ignite.internal.compute.message.JobCancelRequest;
import org.apache.ignite.internal.compute.message.JobCancelResponse;
import org.apache.ignite.internal.compute.message.JobChangePriorityRequest;
import org.apache.ignite.internal.compute.message.JobChangePriorityResponse;
import org.apache.ignite.internal.compute.message.JobOwnerRequest;
import org.apache.ignite.internal.compute.message.JobOwnerResponse;
import org.apache.ignite.internal.compute.message.JobResultRequest;
import org.apache.ignite.internal.compute.message.JobResultResponse;
import org.apache.ignite.internal.compute.message.JobStateRequest;
import org.apache.ignite.internal.compute.message.JobStateResponse;
import org.apache.ignite.internal.compute.message.JobStatesRequest;
import org.apache.ignite.internal.compute.message.JobStatesResponse;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.ErrorGroups;
import org.gridgain.internal.security.context.Authentication;
import org.gridgain.internal.security.context.GridGainSecurity;
import org.gridgain.internal.security.context.SecurityContext;
import org.gridgain.internal.security.context.message.UserContext;
import org.gridgain.internal.security.context.message.UserContextMessagesFactory;
import org.gridgain.lang.GridgainErrorGroups;
import org.jetbrains.annotations.Nullable;

public class ComputeMessaging {
    private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
    private final UserContextMessagesFactory userContextMessagesFactory = new UserContextMessagesFactory();
    private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory();
    private final ExecutionManager executionManager;
    private final MessagingService messagingService;
    private final TopologyService topologyService;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

    public ComputeMessaging(ExecutionManager executionManager, MessagingService messagingService, TopologyService topologyService) {
        this.executionManager = executionManager;
        this.messagingService = messagingService;
        this.topologyService = topologyService;
    }

    public void start(JobStarter starter) {
        this.messagingService.addMessageHandler(ComputeMessageTypes.class, (message, sender, correlationId) -> {
            assert (correlationId != null);
            if (!this.busyLock.enterBusy()) {
                this.sendException(message, sender, Objects.requireNonNull(correlationId, "correlationId is null"), new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, (Throwable)new NodeStoppingException()));
                return;
            }
            try {
                this.processRequest(message, sender, Objects.requireNonNull(correlationId), starter);
            }
            finally {
                this.busyLock.leaveBusy();
            }
        });
    }

    private void sendException(NetworkMessage message, InternalClusterNode sender, long correlationId, IgniteInternalException ex) {
        if (message instanceof ExecuteRequest) {
            this.sendExecuteResponse(null, (Throwable)ex, sender, correlationId);
        } else if (message instanceof JobResultRequest) {
            this.sendJobResultResponse(null, (Throwable)ex, sender, correlationId);
        } else if (message instanceof JobStatesRequest) {
            this.sendJobStatesResponse(null, (Throwable)ex, sender, correlationId);
        } else if (message instanceof JobStateRequest) {
            this.sendJobStateResponse(null, (Throwable)ex, sender, correlationId);
        } else if (message instanceof JobCancelRequest) {
            this.sendJobCancelResponse(null, (Throwable)ex, sender, correlationId);
        } else if (message instanceof JobChangePriorityRequest) {
            this.sendJobChangePriorityResponse(null, (Throwable)ex, sender, correlationId);
        } else if (message instanceof JobOwnerRequest) {
            this.sendJobOwnerResponse(null, (Throwable)ex, sender, correlationId);
        }
    }

    private void processRequest(NetworkMessage message, InternalClusterNode sender, long correlationId, JobStarter starter) {
        if (message instanceof ExecuteRequest) {
            this.processExecuteRequest(starter, (ExecuteRequest)message, sender, correlationId);
        } else if (message instanceof JobResultRequest) {
            this.processJobResultRequest((JobResultRequest)message, sender, correlationId);
        } else if (message instanceof JobStatesRequest) {
            this.processJobStatesRequest((JobStatesRequest)message, sender, correlationId);
        } else if (message instanceof JobStateRequest) {
            this.processJobStateRequest((JobStateRequest)message, sender, correlationId);
        } else if (message instanceof JobCancelRequest) {
            this.processJobCancelRequest((JobCancelRequest)message, sender, correlationId);
        } else if (message instanceof JobChangePriorityRequest) {
            this.processJobChangePriorityRequest((JobChangePriorityRequest)message, sender, correlationId);
        } else if (message instanceof JobOwnerRequest) {
            this.processJobOwnerRequest((JobOwnerRequest)message, sender, correlationId);
        }
    }

    public void stop() {
        this.busyLock.block();
    }

    public CompletableFuture<UUID> remoteExecuteRequestAsync(InternalClusterNode remoteNode, ExecutionContext executionContext) {
        List<DeploymentUnitMsg> deploymentUnitMsgs = executionContext.units().stream().map(ComputeUtils::toDeploymentUnitMsg).collect(Collectors.toList());
        ExecuteRequestV2 executeRequest = this.messagesFactory.executeRequestV2().userContext(this.userContextFromSecurityContext(executionContext.securityContext())).executeOptions(executionContext.options()).deploymentUnits(deploymentUnitMsgs).jobClassName(executionContext.jobClassName()).metadataBuilder(executionContext.metadataBuilder()).input(executionContext.arg()).build();
        return this.invoke(remoteNode, (NetworkMessage)executeRequest).thenCompose(networkMessage -> ComputeUtils.jobIdFromExecuteResponse((ExecuteResponse)networkMessage));
    }

    private void processExecuteRequest(JobStarter starter, ExecuteRequest request, InternalClusterNode sender, long correlationId) {
        List<DeploymentUnit> units = ComputeUtils.toDeploymentUnit(request.deploymentUnits());
        ComputeEventMetadataBuilder metadataBuilder = request instanceof ExecuteRequestV2 ? ((ExecuteRequestV2)request).metadataBuilder() : ComputeEventMetadata.builder();
        ExecutionContext executionContext = new ExecutionContext(request.executeOptions(), units, request.jobClassName(), metadataBuilder, ComputeMessaging.securityContextFromUserContext(request.userContext()), request.input());
        starter.start(executionContext).whenComplete((execution, err) -> {
            if (err != null) {
                this.sendExecuteResponse(null, (Throwable)err, sender, correlationId);
            } else {
                execution.idAsync().whenComplete((jobId, idErr) -> this.sendExecuteResponse((UUID)jobId, (Throwable)idErr, sender, correlationId));
            }
        });
    }

    private void sendExecuteResponse(@Nullable UUID jobId, @Nullable Throwable ex, InternalClusterNode sender, Long correlationId) {
        ExecuteResponse executeResponse = this.messagesFactory.executeResponse().jobId(jobId).throwable(ex).build();
        this.respond(sender, executeResponse, correlationId);
    }

    public CompletableFuture<ComputeJobDataHolder> remoteJobResultRequestAsync(InternalClusterNode remoteNode, UUID jobId) {
        JobResultRequest jobResultRequest = this.messagesFactory.jobResultRequest().jobId(jobId).build();
        return this.invoke(remoteNode, jobResultRequest).thenCompose(networkMessage -> ComputeUtils.resultFromJobResultResponse((JobResultResponse)networkMessage));
    }

    private void processJobResultRequest(JobResultRequest request, InternalClusterNode sender, long correlationId) {
        this.executionManager.localResultAsync(request.jobId()).whenComplete((result, err) -> this.sendJobResultResponse((ComputeJobDataHolder)result, (Throwable)err, sender, correlationId));
    }

    private void sendJobResultResponse(@Nullable ComputeJobDataHolder result, @Nullable Throwable ex, InternalClusterNode sender, long correlationId) {
        JobResultResponse jobResultResponse = this.messagesFactory.jobResultResponse().result(result).throwable(ex).build();
        this.respond(sender, jobResultResponse, correlationId);
    }

    private CompletableFuture<Collection<JobState>> remoteStatesAsync(InternalClusterNode remoteNode, @Nullable String owner) {
        JobStatesRequest jobStatesRequest = this.messagesFactory.jobStatesRequest().owner(owner).build();
        return this.invoke(remoteNode, jobStatesRequest).thenCompose(networkMessage -> ComputeUtils.statesFromJobStatesResponse((JobStatesResponse)networkMessage));
    }

    private void processJobStatesRequest(JobStatesRequest message, InternalClusterNode sender, long correlationId) {
        this.executionManager.localStatesAsync(message.owner()).whenComplete((states, throwable) -> this.sendJobStatesResponse((Collection<JobState>)states, (Throwable)throwable, sender, correlationId));
    }

    private void sendJobStatesResponse(@Nullable Collection<JobState> states, @Nullable Throwable throwable, InternalClusterNode sender, Long correlationId) {
        JobStatesResponse jobStatesResponse = this.messagesFactory.jobStatesResponse().states(states).throwable(throwable).build();
        this.respond(sender, jobStatesResponse, correlationId);
    }

    CompletableFuture<@Nullable JobState> remoteStateAsync(InternalClusterNode remoteNode, UUID jobId) {
        JobStateRequest jobStateRequest = this.messagesFactory.jobStateRequest().jobId(jobId).build();
        return this.invoke(remoteNode, jobStateRequest).thenCompose(networkMessage -> ComputeUtils.stateFromJobStateResponse((JobStateResponse)networkMessage));
    }

    private void processJobStateRequest(JobStateRequest request, InternalClusterNode sender, long correlationId) {
        this.executionManager.stateAsync(request.jobId()).whenComplete((state, throwable) -> this.sendJobStateResponse((JobState)state, (Throwable)throwable, sender, correlationId));
    }

    private void sendJobStateResponse(@Nullable JobState state, @Nullable Throwable throwable, InternalClusterNode sender, Long correlationId) {
        JobStateResponse jobStateResponse = this.messagesFactory.jobStateResponse().state(state).throwable(throwable).build();
        this.respond(sender, jobStateResponse, correlationId);
    }

    CompletableFuture<@Nullable Boolean> remoteCancelAsync(InternalClusterNode remoteNode, UUID jobId) {
        JobCancelRequest jobCancelRequest = this.messagesFactory.jobCancelRequest().jobId(jobId).build();
        return this.invoke(remoteNode, jobCancelRequest).thenCompose(networkMessage -> ComputeUtils.cancelFromJobCancelResponse((JobCancelResponse)networkMessage));
    }

    private void processJobCancelRequest(JobCancelRequest request, InternalClusterNode sender, long correlationId) {
        this.executionManager.cancelAsync(request.jobId()).whenComplete((result, err) -> this.sendJobCancelResponse((Boolean)result, (Throwable)err, sender, correlationId));
    }

    private void sendJobCancelResponse(@Nullable Boolean result, @Nullable Throwable throwable, InternalClusterNode sender, Long correlationId) {
        JobCancelResponse jobCancelResponse = this.messagesFactory.jobCancelResponse().result(result).throwable(throwable).build();
        this.respond(sender, jobCancelResponse, correlationId);
    }

    CompletableFuture<@Nullable Boolean> remoteChangePriorityAsync(InternalClusterNode remoteNode, UUID jobId, int newPriority) {
        JobChangePriorityRequest jobChangePriorityRequest = this.messagesFactory.jobChangePriorityRequest().jobId(jobId).priority(newPriority).build();
        return this.invoke(remoteNode, jobChangePriorityRequest).thenCompose(networkMessage -> ComputeUtils.changePriorityFromJobChangePriorityResponse((JobChangePriorityResponse)networkMessage));
    }

    private void processJobChangePriorityRequest(JobChangePriorityRequest request, InternalClusterNode sender, long correlationId) {
        this.executionManager.changePriorityAsync(request.jobId(), request.priority()).whenComplete((result, err) -> this.sendJobChangePriorityResponse((Boolean)result, (Throwable)err, sender, correlationId));
    }

    private void sendJobChangePriorityResponse(@Nullable Boolean result, @Nullable Throwable throwable, InternalClusterNode sender, Long correlationId) {
        JobChangePriorityResponse jobChangePriorityResponse = this.messagesFactory.jobChangePriorityResponse().throwable(throwable).result(result).build();
        this.respond(sender, jobChangePriorityResponse, correlationId);
    }

    public CompletableFuture<Collection<JobState>> broadcastStatesByOwnerAsync(String owner) {
        return this.broadcastAsyncAndCollect(node -> this.remoteStatesAsync((InternalClusterNode)node, owner), throwable -> new ComputeException(ErrorGroups.Compute.FAIL_TO_GET_JOB_STATE_ERR, "Failed to retrieve states", throwable)).thenApply(states -> states.stream().flatMap(Collection::stream).filter(Objects::nonNull).collect(Collectors.toList()));
    }

    public CompletableFuture<Collection<JobState>> broadcastStatesAsync() {
        return this.broadcastAsyncAndCollect(node -> this.remoteStatesAsync((InternalClusterNode)node, null), throwable -> new ComputeException(ErrorGroups.Compute.FAIL_TO_GET_JOB_STATE_ERR, "Failed to retrieve states", throwable)).thenApply(states -> states.stream().flatMap(Collection::stream).filter(Objects::nonNull).collect(Collectors.toList()));
    }

    private CompletableFuture<@Nullable String> remoteOwnerAsync(InternalClusterNode remoteNode, UUID jobId) {
        JobOwnerRequest jobOwnerRequest = this.messagesFactory.jobOwnerRequest().jobId(jobId).build();
        return this.messagingService.invoke(remoteNode, (NetworkMessage)jobOwnerRequest, Long.MAX_VALUE).thenCompose(networkMessage -> ComputeUtils.ownerFromJobOwnerResponse((JobOwnerResponse)networkMessage));
    }

    private void processJobOwnerRequest(JobOwnerRequest request, InternalClusterNode sender, long correlationId) {
        String owner = this.executionManager.getOwner(request.jobId());
        this.sendJobOwnerResponse(owner, null, sender, correlationId);
    }

    private void sendJobOwnerResponse(@Nullable String owner, @Nullable Throwable throwable, InternalClusterNode sender, Long correlationId) {
        JobOwnerResponse jobOwnerResponse = this.messagesFactory.jobOwnerResponse().owner(owner).throwable(throwable).build();
        this.messagingService.respond(sender, (NetworkMessage)jobOwnerResponse, correlationId.longValue());
    }

    public CompletableFuture<@Nullable JobState> broadcastStateAsync(UUID jobId) {
        return this.broadcastAsync(node -> this.remoteStateAsync((InternalClusterNode)node, jobId), throwable -> new ComputeException(ErrorGroups.Compute.FAIL_TO_GET_JOB_STATE_ERR, "Failed to retrieve state of the job with ID: " + String.valueOf(jobId), throwable));
    }

    public CompletableFuture<@Nullable Boolean> broadcastCancelAsync(UUID jobId) {
        return this.broadcastAsync(node -> this.remoteCancelAsync((InternalClusterNode)node, jobId), throwable -> new ComputeException(ErrorGroups.Compute.CANCELLING_ERR, "Failed to cancel job with ID: " + String.valueOf(jobId), throwable));
    }

    public CompletableFuture<@Nullable Boolean> broadcastChangePriorityAsync(UUID jobId, int newPriority) {
        return this.broadcastAsync(node -> this.remoteChangePriorityAsync((InternalClusterNode)node, jobId, newPriority), throwable -> new ComputeException(ErrorGroups.Compute.CHANGE_JOB_PRIORITY_ERR, "Failed to change priority for job with ID: " + String.valueOf(jobId), throwable));
    }

    public CompletableFuture<@Nullable String> broadcastGetOwnerAsync(UUID jobId) {
        return this.broadcastAsync(node -> this.remoteOwnerAsync((InternalClusterNode)node, jobId), throwable -> new ComputeException(GridgainErrorGroups.Security.FAIL_TO_GET_JOB_OWNER_ERR, "Failed to retrieve owner of the job with ID: " + String.valueOf(jobId), throwable));
    }

    private <R> CompletableFuture<@Nullable R> broadcastAsync(Function<InternalClusterNode, CompletableFuture<@Nullable R>> request, Function<Throwable, Throwable> error) {
        CompletableFuture<@Nullable T> result = new CompletableFuture();
        InternalClusterNode localMember = this.topologyService.localMember();
        CompletableFuture[] futures = (CompletableFuture[])this.topologyService.allMembers().stream().filter(node -> !node.name().equals(localMember.name())).map(node -> ((CompletableFuture)request.apply((InternalClusterNode)node)).thenAccept(response -> {
            if (response != null) {
                result.complete(response);
            }
        })).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).whenComplete((unused, throwable) -> {
            if (!result.isDone()) {
                if (throwable == null) {
                    result.complete(null);
                    return;
                }
                result.completeExceptionally((Throwable)error.apply((Throwable)throwable));
            }
        });
        return result;
    }

    private <R> CompletableFuture<List<R>> broadcastAsyncAndCollect(Function<InternalClusterNode, CompletableFuture<@Nullable R>> request, Function<Throwable, RuntimeException> error) {
        CompletableFuture[] futures = (CompletableFuture[])this.topologyService.allMembers().stream().map(request::apply).toArray(CompletableFuture[]::new);
        return CompletableFutures.allOfToList((CompletableFuture[])futures).exceptionally(throwable -> {
            throw (RuntimeException)error.apply((Throwable)throwable);
        });
    }

    private CompletableFuture<NetworkMessage> invoke(InternalClusterNode remoteNode, NetworkMessage msg) {
        return this.messagingService.invoke(remoteNode.name(), msg, Long.MAX_VALUE);
    }

    private void respond(InternalClusterNode sender, NetworkMessage msg, long correlationId) {
        this.messagingService.respond(sender.name(), msg, correlationId);
    }

    private static SecurityContext securityContextFromUserContext(UserContext userContext) {
        return GridGainSecurity.context((String)userContext.username(), (Set)userContext.roles());
    }

    private UserContext userContextFromSecurityContext(SecurityContext securityContext) {
        Authentication authentication = securityContext.authentication();
        return this.userContextMessagesFactory.userContext().username(authentication.username()).roles(authentication.roles()).build();
    }
}

