/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.util.future;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.JobContextResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@GridInternal
public class IgniteRemoteMapTask<T, R>
extends ComputeTaskAdapter<T, R> {
    private static final long serialVersionUID = 0L;
    private final ClusterNode node;
    private final ComputeTask<T, R> remoteTask;

    public IgniteRemoteMapTask(ClusterNode node, ComputeTask<T, R> remoteTask) {
        this.node = node;
        this.remoteTask = remoteTask;
    }

    @Override
    @NotNull
    public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable T arg) throws IgniteException {
        for (ClusterNode node : subgrid) {
            if (!node.equals(this.node)) continue;
            return Collections.singletonMap(new Job<T, R>(this.remoteTask, arg), node);
        }
        throw new IgniteException("Node " + this.node + " is not present in subgrid.");
    }

    @Override
    @Nullable
    public R reduce(List<ComputeJobResult> results) throws IgniteException {
        assert (results.size() == 1);
        return (R)results.get(0).getData();
    }

    private static class Job<T, R>
    extends ComputeJobAdapter {
        private static final long serialVersionUID = 0L;
        @JobContextResource
        private ComputeJobContext jobCtx;
        @IgniteInstanceResource
        private Ignite ignite;
        private final ComputeTask<T, R> remoteTask;
        @Nullable
        private final T arg;
        @Nullable
        private ComputeTaskFuture<R> future;

        public Job(ComputeTask<T, R> remoteTask, @Nullable T arg) {
            this.remoteTask = remoteTask;
            this.arg = arg;
        }

        @Override
        public Object execute() throws IgniteException {
            if (this.future == null) {
                IgniteCompute compute = this.ignite.compute().withAsync();
                compute.execute(this.remoteTask, this.arg);
                IgniteFuture future = compute.future();
                this.future = future;
                this.jobCtx.holdcc();
                future.listen(new IgniteInClosure<IgniteFuture<R>>(){

                    @Override
                    public void apply(IgniteFuture<R> future) {
                        jobCtx.callcc();
                    }
                });
                return null;
            }
            return this.future.get();
        }
    }
}

