package org.apache.ignite3.internal.compute.messaging;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite3.compute.ComputeException;
import org.apache.ignite3.compute.JobState;
import org.apache.ignite3.deployment.DeploymentUnit;
import org.apache.ignite3.internal.compute.ComputeMessageTypes;
import org.apache.ignite3.internal.compute.ComputeMessagesFactory;
import org.apache.ignite3.internal.compute.ComputeUtils;
import org.apache.ignite3.internal.compute.ExecutionManager;
import org.apache.ignite3.internal.compute.ExecutionOptions;
import org.apache.ignite3.internal.compute.JobStarter;
import org.apache.ignite3.internal.compute.message.ExecuteRequest;
import org.apache.ignite3.internal.compute.message.ExecuteResponse;
import org.apache.ignite3.internal.compute.message.JobCancelRequest;
import org.apache.ignite3.internal.compute.message.JobCancelResponse;
import org.apache.ignite3.internal.compute.message.JobChangePriorityRequest;
import org.apache.ignite3.internal.compute.message.JobChangePriorityResponse;
import org.apache.ignite3.internal.compute.message.JobOwnerRequest;
import org.apache.ignite3.internal.compute.message.JobOwnerResponse;
import org.apache.ignite3.internal.compute.message.JobResultRequest;
import org.apache.ignite3.internal.compute.message.JobResultResponse;
import org.apache.ignite3.internal.compute.message.JobStateRequest;
import org.apache.ignite3.internal.compute.message.JobStateResponse;
import org.apache.ignite3.internal.compute.message.JobStatesRequest;
import org.apache.ignite3.internal.compute.message.JobStatesResponse;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.network.MessagingService;
import org.apache.ignite3.internal.network.NetworkMessage;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.network.ClusterNode;
import org.gridgain.internal.security.context.Authentication;
import org.gridgain.internal.security.context.GridGainSecurity;
import org.gridgain.internal.security.context.SecurityContext;
import org.gridgain.internal.security.context.SecurityContextHolder;
import org.gridgain.internal.security.context.message.UserContext;
import org.gridgain.internal.security.context.message.UserContextMessagesFactory;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite3/internal/compute/messaging/ComputeMessaging.class */
public class ComputeMessaging {
    private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
    private final ExecutionManager executionManager;
    private final MessagingService messagingService;
    private final TopologyService topologyService;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final UserContextMessagesFactory userContextMessagesFactory = new UserContextMessagesFactory();
    private final ComputeMessagesFactory messagesFactory = new ComputeMessagesFactory();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

    public ComputeMessaging(ExecutionManager executionManager, MessagingService messagingService, TopologyService topologyService) {
        this.executionManager = executionManager;
        this.messagingService = messagingService;
        this.topologyService = topologyService;
    }

    public void start(JobStarter jobStarter) {
        this.messagingService.addMessageHandler(ComputeMessageTypes.class, (networkMessage, clusterNode, l) -> {
            if (!$assertionsDisabled && l == null) {
                throw new AssertionError();
            }
            if (!this.busyLock.enterBusy()) {
                sendException(networkMessage, clusterNode, ((Long) Objects.requireNonNull(l, "correlationId is null")).longValue(), new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, new NodeStoppingException()));
                return;
            }
            try {
                processRequest(networkMessage, clusterNode, ((Long) Objects.requireNonNull(l)).longValue(), jobStarter);
                this.busyLock.leaveBusy();
            } catch (Throwable th) {
                this.busyLock.leaveBusy();
                throw th;
            }
        });
    }

    private void sendException(NetworkMessage networkMessage, ClusterNode clusterNode, long j, IgniteInternalException igniteInternalException) {
        if (networkMessage instanceof ExecuteRequest) {
            sendExecuteResponse(null, igniteInternalException, clusterNode, Long.valueOf(j));
            return;
        }
        if (networkMessage instanceof JobResultRequest) {
            sendJobResultResponse(null, igniteInternalException, clusterNode, j);
            return;
        }
        if (networkMessage instanceof JobStatesRequest) {
            sendJobStatesResponse(null, igniteInternalException, clusterNode, Long.valueOf(j));
            return;
        }
        if (networkMessage instanceof JobStateRequest) {
            sendJobStateResponse(null, igniteInternalException, clusterNode, Long.valueOf(j));
            return;
        }
        if (networkMessage instanceof JobCancelRequest) {
            sendJobCancelResponse(null, igniteInternalException, clusterNode, Long.valueOf(j));
        } else if (networkMessage instanceof JobChangePriorityRequest) {
            sendJobChangePriorityResponse(null, igniteInternalException, clusterNode, Long.valueOf(j));
        } else if (networkMessage instanceof JobOwnerRequest) {
            sendJobOwnerResponse(null, igniteInternalException, clusterNode, Long.valueOf(j));
        }
    }

    private void processRequest(NetworkMessage networkMessage, ClusterNode clusterNode, long j, JobStarter jobStarter) {
        if (networkMessage instanceof ExecuteRequest) {
            processExecuteRequest(jobStarter, (ExecuteRequest) networkMessage, clusterNode, j);
            return;
        }
        if (networkMessage instanceof JobResultRequest) {
            processJobResultRequest((JobResultRequest) networkMessage, clusterNode, j);
            return;
        }
        if (networkMessage instanceof JobStatesRequest) {
            processJobStatesRequest((JobStatesRequest) networkMessage, clusterNode, j);
            return;
        }
        if (networkMessage instanceof JobStateRequest) {
            processJobStateRequest((JobStateRequest) networkMessage, clusterNode, j);
            return;
        }
        if (networkMessage instanceof JobCancelRequest) {
            processJobCancelRequest((JobCancelRequest) networkMessage, clusterNode, j);
        } else if (networkMessage instanceof JobChangePriorityRequest) {
            processJobChangePriorityRequest((JobChangePriorityRequest) networkMessage, clusterNode, j);
        } else if (networkMessage instanceof JobOwnerRequest) {
            processJobOwnerRequest((JobOwnerRequest) networkMessage, clusterNode, j);
        }
    }

    public void stop() {
        this.busyLock.block();
    }

    public <T> CompletableFuture<UUID> remoteExecuteRequestAsync(ExecutionOptions executionOptions, ClusterNode clusterNode, List<DeploymentUnit> list, String str, T t) {
        return this.messagingService.invoke(clusterNode, this.messagesFactory.executeRequest().userContext(userContextFromSecurityContext()).executeOptions(executionOptions).deploymentUnits((List) list.stream().map(ComputeUtils::toDeploymentUnitMsg).collect(Collectors.toList())).jobClassName(str).input(t).build(), Long.MAX_VALUE).thenCompose(networkMessage -> {
            return ComputeUtils.jobIdFromExecuteResponse((ExecuteResponse) networkMessage);
        });
    }

    private void processExecuteRequest(JobStarter jobStarter, ExecuteRequest executeRequest, ClusterNode clusterNode, long j) {
        jobStarter.start(executeRequest.executeOptions(), ComputeUtils.toDeploymentUnit(executeRequest.deploymentUnits()), executeRequest.jobClassName(), securityContextFromUserContext(executeRequest.userContext()), executeRequest.input()).idAsync().whenComplete((uuid, th) -> {
            sendExecuteResponse(uuid, th, clusterNode, Long.valueOf(j));
        });
    }

    private void sendExecuteResponse(@Nullable UUID uuid, @Nullable Throwable th, ClusterNode clusterNode, Long l) {
        this.messagingService.respond(clusterNode, this.messagesFactory.executeResponse().jobId(uuid).throwable(th).build(), l.longValue());
    }

    public <R> CompletableFuture<R> remoteJobResultRequestAsync(ClusterNode clusterNode, UUID uuid) {
        return (CompletableFuture<R>) this.messagingService.invoke(clusterNode, this.messagesFactory.jobResultRequest().jobId(uuid).build(), Long.MAX_VALUE).thenCompose(networkMessage -> {
            return ComputeUtils.resultFromJobResultResponse((JobResultResponse) networkMessage);
        });
    }

    private void processJobResultRequest(JobResultRequest jobResultRequest, ClusterNode clusterNode, long j) {
        this.executionManager.resultAsync(jobResultRequest.jobId()).whenComplete((obj, th) -> {
            sendJobResultResponse(obj, th, clusterNode, j);
        });
    }

    private void sendJobResultResponse(@Nullable Object obj, @Nullable Throwable th, ClusterNode clusterNode, long j) {
        this.messagingService.respond(clusterNode, this.messagesFactory.jobResultResponse().result(obj).throwable(th).build(), j);
    }

    CompletableFuture<Collection<JobState>> remoteStatesAsync(ClusterNode clusterNode) {
        return this.messagingService.invoke(clusterNode, this.messagesFactory.jobStatesRequest().build(), Long.MAX_VALUE).thenCompose(networkMessage -> {
            return ComputeUtils.statesFromJobStatesResponse((JobStatesResponse) networkMessage);
        });
    }

    private void processJobStatesRequest(JobStatesRequest jobStatesRequest, ClusterNode clusterNode, long j) {
        this.executionManager.localStatesAsync().whenComplete((list, th) -> {
            sendJobStatesResponse(list, th, clusterNode, Long.valueOf(j));
        });
    }

    private void sendJobStatesResponse(@Nullable Collection<JobState> collection, @Nullable Throwable th, ClusterNode clusterNode, Long l) {
        this.messagingService.respond(clusterNode, this.messagesFactory.jobStatesResponse().states(collection).throwable(th).build(), l.longValue());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<JobState> remoteStateAsync(ClusterNode clusterNode, UUID uuid) {
        return this.messagingService.invoke(clusterNode, this.messagesFactory.jobStateRequest().jobId(uuid).build(), Long.MAX_VALUE).thenCompose(networkMessage -> {
            return ComputeUtils.stateFromJobStateResponse((JobStateResponse) networkMessage);
        });
    }

    private void processJobStateRequest(JobStateRequest jobStateRequest, ClusterNode clusterNode, long j) {
        this.executionManager.stateAsync(jobStateRequest.jobId()).whenComplete((jobState, th) -> {
            sendJobStateResponse(jobState, th, clusterNode, Long.valueOf(j));
        });
    }

    private void sendJobStateResponse(@Nullable JobState jobState, @Nullable Throwable th, ClusterNode clusterNode, Long l) {
        this.messagingService.respond(clusterNode, this.messagesFactory.jobStateResponse().state(jobState).throwable(th).build(), l.longValue());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Boolean> remoteCancelAsync(ClusterNode clusterNode, UUID uuid) {
        return this.messagingService.invoke(clusterNode, this.messagesFactory.jobCancelRequest().jobId(uuid).build(), Long.MAX_VALUE).thenCompose(networkMessage -> {
            return ComputeUtils.cancelFromJobCancelResponse((JobCancelResponse) networkMessage);
        });
    }

    private void processJobCancelRequest(JobCancelRequest jobCancelRequest, ClusterNode clusterNode, long j) {
        this.executionManager.cancelAsync(jobCancelRequest.jobId()).whenComplete((bool, th) -> {
            sendJobCancelResponse(bool, th, clusterNode, Long.valueOf(j));
        });
    }

    private void sendJobCancelResponse(@Nullable Boolean bool, @Nullable Throwable th, ClusterNode clusterNode, Long l) {
        this.messagingService.respond(clusterNode, this.messagesFactory.jobCancelResponse().result(bool).throwable(th).build(), l.longValue());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Boolean> remoteChangePriorityAsync(ClusterNode clusterNode, UUID uuid, int i) {
        return this.messagingService.invoke(clusterNode, this.messagesFactory.jobChangePriorityRequest().jobId(uuid).priority(i).build(), Long.MAX_VALUE).thenCompose(networkMessage -> {
            return ComputeUtils.changePriorityFromJobChangePriorityResponse((JobChangePriorityResponse) networkMessage);
        });
    }

    private void processJobChangePriorityRequest(JobChangePriorityRequest jobChangePriorityRequest, ClusterNode clusterNode, long j) {
        this.executionManager.changePriorityAsync(jobChangePriorityRequest.jobId(), jobChangePriorityRequest.priority()).whenComplete((bool, th) -> {
            sendJobChangePriorityResponse(bool, th, clusterNode, Long.valueOf(j));
        });
    }

    private void sendJobChangePriorityResponse(@Nullable Boolean bool, @Nullable Throwable th, ClusterNode clusterNode, Long l) {
        this.messagingService.respond(clusterNode, this.messagesFactory.jobChangePriorityResponse().throwable(th).result(bool).build(), l.longValue());
    }

    public CompletableFuture<Collection<JobState>> broadcastStatesAsync() {
        return broadcastAsyncAndCollect(this::remoteStatesAsync, th -> {
            return new ComputeException(ErrorGroups.Compute.FAIL_TO_GET_JOB_STATE_ERR, "Failed to retrieve states", th);
        }).thenApply(list -> {
            return (Collection) list.stream().flatMap((v0) -> {
                return v0.stream();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
        });
    }

    private CompletableFuture<String> remoteOwnerAsync(ClusterNode clusterNode, UUID uuid) {
        return this.messagingService.invoke(clusterNode, this.messagesFactory.jobOwnerRequest().jobId(uuid).build(), Long.MAX_VALUE).thenCompose(networkMessage -> {
            return ComputeUtils.ownerFromJobOwnerResponse((JobOwnerResponse) networkMessage);
        });
    }

    private void processJobOwnerRequest(JobOwnerRequest jobOwnerRequest, ClusterNode clusterNode, long j) {
        sendJobOwnerResponse(this.executionManager.getOwner(jobOwnerRequest.jobId()), null, clusterNode, Long.valueOf(j));
    }

    private void sendJobOwnerResponse(@Nullable String str, @Nullable Throwable th, ClusterNode clusterNode, Long l) {
        this.messagingService.respond(clusterNode, this.messagesFactory.jobOwnerResponse().owner(str).throwable(th).build(), l.longValue());
    }

    public CompletableFuture<JobState> broadcastStateAsync(UUID uuid) {
        return broadcastAsync(clusterNode -> {
            return remoteStateAsync(clusterNode, uuid);
        }, th -> {
            return new ComputeException(ErrorGroups.Compute.FAIL_TO_GET_JOB_STATE_ERR, "Failed to retrieve state of the job with ID: " + uuid, th);
        });
    }

    public CompletableFuture<Boolean> broadcastCancelAsync(UUID uuid) {
        return broadcastAsync(clusterNode -> {
            return remoteCancelAsync(clusterNode, uuid);
        }, th -> {
            return new ComputeException(ErrorGroups.Compute.CANCELLING_ERR, "Failed to cancel job with ID: " + uuid, th);
        });
    }

    public CompletableFuture<Boolean> broadcastChangePriorityAsync(UUID uuid, int i) {
        return broadcastAsync(clusterNode -> {
            return remoteChangePriorityAsync(clusterNode, uuid, i);
        }, th -> {
            return new ComputeException(ErrorGroups.Compute.CHANGE_JOB_PRIORITY_ERR, "Failed to change priority for job with ID: " + uuid, th);
        });
    }

    public CompletableFuture<String> broadcastGetOwnerAsync(UUID uuid) {
        return broadcastAsync(clusterNode -> {
            return remoteOwnerAsync(clusterNode, uuid);
        }, th -> {
            return new ComputeException(ErrorGroups.Compute.FAIL_TO_GET_JOB_OWNER_ERR, "Failed to retrieve owner of the job with ID: " + uuid, th);
        });
    }

    private <R> CompletableFuture<R> broadcastAsync(Function<ClusterNode, CompletableFuture<R>> function, Function<Throwable, Throwable> function2) {
        CompletableFuture<R> completableFuture = new CompletableFuture<>();
        ClusterNode localMember = this.topologyService.localMember();
        CompletableFuture.allOf((CompletableFuture[]) this.topologyService.allMembers().stream().filter(clusterNode -> {
            return !clusterNode.equals(localMember);
        }).map(clusterNode2 -> {
            return ((CompletableFuture) function.apply(clusterNode2)).thenAccept(obj -> {
                if (obj != null) {
                    completableFuture.complete(obj);
                }
            });
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).whenComplete((r6, th) -> {
            if (completableFuture.isDone()) {
                return;
            }
            if (th == null) {
                completableFuture.complete(null);
            } else {
                completableFuture.completeExceptionally((Throwable) function2.apply(th));
            }
        });
        return completableFuture;
    }

    private <R> CompletableFuture<List<R>> broadcastAsyncAndCollect(Function<ClusterNode, CompletableFuture<R>> function, Function<Throwable, RuntimeException> function2) {
        Stream<ClusterNode> stream = this.topologyService.allMembers().stream();
        Objects.requireNonNull(function);
        return CompletableFutures.allOfToList((CompletableFuture[]) stream.map((v1) -> {
            return r1.apply(v1);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).exceptionally(th -> {
            throw ((RuntimeException) function2.apply(th));
        });
    }

    private static SecurityContext securityContextFromUserContext(UserContext userContext) {
        return GridGainSecurity.context(userContext.username(), userContext.roles());
    }

    private UserContext userContextFromSecurityContext() {
        Authentication authentication = SecurityContextHolder.getOrThrow().authentication();
        return this.userContextMessagesFactory.userContext().username(authentication.username()).roles(authentication.roles()).build();
    }

    static {
        $assertionsDisabled = !ComputeMessaging.class.desiredAssertionStatus();
    }
}
