/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.control.agent.compute;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.jetbrains.annotations.Nullable;

public class ComputeGenerationService
implements Service {
    private long tasksGenerationPeriod = 100L;
    private int tasksAmountPerPeriod = 100;
    public int tasksPeakAmount = 10000;
    private long tasksMaxTimeout = TimeUnit.SECONDS.toMillis(3L);
    private long serviceMaxTimeout = TimeUnit.MINUTES.toMillis(60L);
    private static final List<Class<? extends TimeoutTask>> TASK_CLASSES = Arrays.asList(SuccessTask.class, FailTask.class, CancelTask.class, LiteTask.class, FailLiteTask.class);
    private static long endTime;
    private ScheduledExecutorService threadExec;
    private ScheduledExecutorService cancelTaskExec;
    @IgniteInstanceResource
    private Ignite ignite;
    @LoggerResource
    private IgniteLogger log;

    public ComputeGenerationService(Map<String, Object> arg) {
        if (F.isEmpty(arg)) {
            return;
        }
        this.tasksGenerationPeriod = (Long)arg.getOrDefault("TASK_GENERATION_PERIOD", this.tasksGenerationPeriod);
        this.tasksGenerationPeriod = (Long)arg.getOrDefault("TASK_GENERATION_PERIOD", this.tasksGenerationPeriod);
        this.tasksAmountPerPeriod = (Integer)arg.getOrDefault("TASKS_AMOUNT_PER_PERIOD", this.tasksAmountPerPeriod);
        this.tasksPeakAmount = (Integer)arg.getOrDefault("TASKS_PEAK_AMOUNT", this.tasksPeakAmount);
        this.tasksMaxTimeout = (Long)arg.getOrDefault("TASKS_MAX_TIMEOUT", this.tasksMaxTimeout);
        this.serviceMaxTimeout = (Long)arg.getOrDefault("TASKS_SERVICE_MAX_TIMEOUT", this.serviceMaxTimeout);
    }

    public void init(ServiceContext ctx) {
        if (this.log.isInfoEnabled()) {
            this.log.info("Service was initialized: " + ctx.name());
        }
        this.threadExec = Executors.newScheduledThreadPool(10);
        this.cancelTaskExec = Executors.newScheduledThreadPool(2);
    }

    public void cancel(ServiceContext ctx) {
        U.shutdownNow(ComputeGenerationService.class, (ExecutorService)this.threadExec, null);
        U.shutdownNow(ComputeGenerationService.class, (ExecutorService)this.cancelTaskExec, null);
        if (this.log.isInfoEnabled()) {
            this.log.info("Service was cancelled: " + ctx.name());
        }
    }

    public void execute(ServiceContext ctx) {
        this.log.info("Executing compute load service: " + ctx.name());
        endTime = System.currentTimeMillis() + this.serviceMaxTimeout;
        this.threadExec.scheduleAtFixedRate(() -> this.startTasksIfNeeded(this.ignite), 0L, this.tasksGenerationPeriod, TimeUnit.MILLISECONDS);
    }

    private void startTasksIfNeeded(Ignite ignite) {
        int waitingJobsCnt;
        if (System.currentTimeMillis() > endTime) {
            this.threadExec.shutdown();
        }
        if ((waitingJobsCnt = ignite.cluster().node().metrics().getCurrentWaitingJobs()) < this.tasksPeakAmount) {
            for (int i = 0; i < Integer.min(this.tasksPeakAmount - waitingJobsCnt, this.tasksAmountPerPeriod); ++i) {
                this.startTask(TASK_CLASSES.get(i % TASK_CLASSES.size()), ignite);
            }
        }
    }

    private <T extends TimeoutTask> void startTask(Class<T> taskCls, Ignite ignite) {
        long timeout = ThreadLocalRandom.current().nextLong(this.tasksMaxTimeout);
        ComputeTaskFuture fut = ignite.compute().executeAsync(taskCls, (Object)new Object[]{timeout, this.tasksPeakAmount});
        if (CancelTask.class.isAssignableFrom(taskCls)) {
            this.cancelTaskExec.schedule(() -> ((ComputeTaskFuture)fut).cancel(), timeout / 2L, TimeUnit.MILLISECONDS);
        }
    }

    static abstract class TimeoutTask
    extends ComputeTaskAdapter<Object[], Void> {
        private static final ThreadLocalRandom RND = ThreadLocalRandom.current();
        @TaskSessionResource
        private transient ComputeTaskSession ses;

        TimeoutTask() {
        }

        public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Object[] arg) throws IgniteException {
            this.createAttributes(this.ses, (int)((Integer)arg[1]));
            return subgrid.stream().collect(Collectors.toMap(node -> new ComputeJobAdapter(arg){
                @TaskSessionResource
                private transient ComputeTaskSession ses;

                public Void execute() {
                    @Nullable Long timeout = (Long)this.argument(0);
                    if (timeout != null) {
                        try {
                            U.sleep((long)timeout);
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                    }
                    this.doOnNode();
                    return null;
                }
            }, Function.identity()));
        }

        public Void reduce(List<ComputeJobResult> results) throws IgniteException {
            for (ComputeJobResult result : results) {
                if (result.getException() == null) continue;
                throw result.getException();
            }
            return null;
        }

        public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
            IgniteException e = res.getException();
            if (e != null) {
                if (e instanceof ClusterTopologyException) {
                    return ComputeJobResultPolicy.FAILOVER;
                }
                return ComputeJobResultPolicy.REDUCE;
            }
            return ComputeJobResultPolicy.WAIT;
        }

        void doOnNode() {
        }

        void createAttributes(ComputeTaskSession ses, Integer tasksPeakAmount) {
            HashMap<String, Object> attrs = new HashMap<String, Object>();
            attrs.put("with long value", this.random(500));
            attrs.put("with numeric value", RND.nextInt(1, tasksPeakAmount));
            attrs.put("with long key " + this.random(500), "value");
            attrs.put("with empty value", "");
            attrs.put("", "value");
            attrs.put("with html tags in key </br><p>", "value");
            attrs.put("with html tags in value", "value<p></br>");
            ses.setAttributes(attrs);
        }

        private String random(int targetStringLength) {
            int leftLimit = 97;
            int rightLimit = 122;
            return RND.ints(leftLimit, rightLimit + 1).limit(targetStringLength).collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString();
        }
    }

    public static class FailLiteTask
    extends TimeoutTask {
        @Override
        void doOnNode() {
            throw new UnsupportedOperationException("The job failed to finish");
        }

        @Override
        void createAttributes(ComputeTaskSession ses, Integer tasksPeakAmount) {
        }
    }

    public static class LiteTask
    extends TimeoutTask {
        @Override
        void createAttributes(ComputeTaskSession ses, Integer tasksPeakAmount) {
        }
    }

    @ComputeTaskSessionFullSupport
    public static class CancelTask
    extends TimeoutTask {
        @Override
        void createAttributes(ComputeTaskSession ses, Integer tasksPeakAmount) {
            super.createAttributes(ses, tasksPeakAmount);
            ses.setAttribute((Object)"cancel", (Object)true);
        }
    }

    @ComputeTaskSessionFullSupport
    public static class FailTask
    extends TimeoutTask {
        @Override
        void createAttributes(ComputeTaskSession ses, Integer tasksPeakAmount) {
            super.createAttributes(ses, tasksPeakAmount);
            ses.setAttribute((Object)"fail", (Object)true);
        }

        @Override
        void doOnNode() {
            throw new UnsupportedOperationException("The job failed to finish");
        }
    }

    @ComputeTaskSessionFullSupport
    public static class SuccessTask
    extends TimeoutTask {
        @Override
        void createAttributes(ComputeTaskSession ses, Integer tasksPeakAmount) {
            super.createAttributes(ses, tasksPeakAmount);
            ses.setAttribute((Object)"success", (Object)true);
        }
    }
}

