/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.client;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.client.GridClientConfiguration;
import org.apache.ignite.internal.client.GridClientDataAffinity;
import org.apache.ignite.internal.client.GridClientDataConfiguration;
import org.apache.ignite.internal.client.GridClientException;
import org.apache.ignite.internal.client.GridClientFactory;
import org.apache.ignite.internal.client.GridClientPartitionAffinity;
import org.apache.ignite.internal.client.GridClientProtocol;
import org.apache.ignite.internal.client.balancer.GridClientLoadBalancer;
import org.apache.ignite.internal.client.balancer.GridClientRoundRobinBalancer;
import org.apache.ignite.internal.client.ssl.GridSslContextFactory;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;

public abstract class ClientAbstractMultiThreadedSelfTest
extends GridCommonAbstractTest {
    protected static final String PARTITIONED_CACHE_NAME = "partitioned";
    protected static final String PARTITIONED_ASYNC_BACKUP_CACHE_NAME = "partitioned-async-backup";
    private static final String REPLICATED_CACHE_NAME = "replicated";
    private static final String REPLICATED_ASYNC_CACHE_NAME = "replicated-async";
    protected static final int NODES_CNT = 5;
    private static final int THREAD_CNT = 20;
    private static final int TASK_EXECUTION_CNT = 50000;
    private static final int CACHE_PUT_CNT = 10000;
    private static final int TOP_REFRESH_FREQ = 1000;
    private static final int STATISTICS_PRINT_STEP = 5000;
    public static final String HOST = "127.0.0.1";
    public static final int REST_TCP_PORT_BASE = 12345;
    protected GridClient client;

    protected abstract GridClientProtocol protocol();

    protected abstract String serverAddress();

    protected abstract boolean useSsl();

    protected abstract GridSslContextFactory sslContextFactory();

    protected int topologyRefreshFrequency() {
        return 1000;
    }

    protected int maxConnectionIdleTime() {
        return 5000;
    }

    protected int taskExecutionCount() {
        return 50000;
    }

    protected int cachePutCount() {
        return 10000;
    }

    protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
        c.setLocalHost(HOST);
        assert (c.getConnectorConfiguration() == null);
        ConnectorConfiguration clientCfg = new ConnectorConfiguration();
        clientCfg.setPort(12345);
        if (this.useSsl()) {
            clientCfg.setSslEnabled(true);
            clientCfg.setSslContextFactory(this.sslContextFactory());
        }
        c.setConnectorConfiguration(clientCfg);
        c.setCacheConfiguration(new CacheConfiguration[]{this.cacheConfiguration("default"), this.cacheConfiguration(PARTITIONED_CACHE_NAME), this.cacheConfiguration(REPLICATED_CACHE_NAME), this.cacheConfiguration(PARTITIONED_ASYNC_BACKUP_CACHE_NAME), this.cacheConfiguration(REPLICATED_ASYNC_CACHE_NAME)});
        return c;
    }

    private CacheConfiguration cacheConfiguration(@NotNull String cacheName) throws Exception {
        CacheConfiguration cfg = ClientAbstractMultiThreadedSelfTest.defaultCacheConfiguration();
        cfg.setAffinity((AffinityFunction)new RendezvousAffinityFunction());
        cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
        switch (cacheName) {
            case "default": {
                cfg.setCacheMode(CacheMode.LOCAL);
                break;
            }
            case "partitioned": {
                cfg.setCacheMode(CacheMode.PARTITIONED);
                cfg.setBackups(0);
                break;
            }
            case "partitioned-async-backup": {
                cfg.setCacheMode(CacheMode.PARTITIONED);
                cfg.setBackups(1);
                break;
            }
            default: {
                cfg.setCacheMode(CacheMode.REPLICATED);
            }
        }
        cfg.setName(cacheName);
        if (!"default".equals(cacheName) && !cacheName.contains("async")) {
            cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
        }
        return cfg;
    }

    protected void beforeTestsStarted() throws Exception {
        this.startGridsMultiThreaded(5);
    }

    protected void beforeTest() throws Exception {
        this.client = GridClientFactory.start((GridClientConfiguration)this.clientConfiguration());
    }

    protected void afterTest() throws Exception {
        GridClientFactory.stop((UUID)this.client.id(), (boolean)false);
        this.client = null;
    }

    @Test
    public void testMultithreadedTaskRun() throws Exception {
        final AtomicLong cnt = new AtomicLong();
        final AtomicReference err = new AtomicReference();
        final ConcurrentLinkedQueue execQueue = new ConcurrentLinkedQueue();
        IgniteInternalFuture fut = this.multithreadedAsync(new Runnable(){

            @Override
            public void run() {
                long processed;
                while ((processed = cnt.getAndIncrement()) < (long)ClientAbstractMultiThreadedSelfTest.this.taskExecutionCount()) {
                    try {
                        if (processed > 0L && processed % 5000L == 0L) {
                            ClientAbstractMultiThreadedSelfTest.this.info(">>>>>>> " + processed + " tasks finished.");
                        }
                        String res = (String)ClientAbstractMultiThreadedSelfTest.this.client.compute().execute(TestTask.class.getName(), null);
                        execQueue.add(res);
                    }
                    catch (GridClientException e) {
                        err.compareAndSet(null, e);
                    }
                }
            }
        }, 20, "client-task-request");
        fut.get();
        if (err.get() != null) {
            throw new Exception((Throwable)err.get());
        }
        ClientAbstractMultiThreadedSelfTest.assertEquals((int)this.taskExecutionCount(), (int)execQueue.size());
        HashSet executionIds = new HashSet(execQueue);
        ClientAbstractMultiThreadedSelfTest.assertTrue((executionIds.size() == 5 ? 1 : 0) != 0);
        HashMap<String, AtomicInteger> statisticsMap = new HashMap<String, AtomicInteger>();
        for (String string : executionIds) {
            statisticsMap.put(string, new AtomicInteger());
        }
        for (String string : execQueue) {
            ((AtomicInteger)statisticsMap.get(string)).incrementAndGet();
        }
        this.info(">>>>>>> Execution statistics per node:");
        for (Map.Entry entry : statisticsMap.entrySet()) {
            this.info(">>>>>>> " + (String)entry.getKey() + " run " + ((AtomicInteger)entry.getValue()).get() + " tasks");
        }
    }

    protected long getTestTimeout() {
        return 300000L;
    }

    private GridClientConfiguration clientConfiguration() {
        GridClientConfiguration cfg = new GridClientConfiguration();
        cfg.setTopologyRefreshFrequency((long)this.topologyRefreshFrequency());
        cfg.setMaxConnectionIdleTime((long)this.maxConnectionIdleTime());
        cfg.setProtocol(this.protocol());
        cfg.setServers(Arrays.asList(this.serverAddress()));
        cfg.setBalancer((GridClientLoadBalancer)new GridClientRoundRobinBalancer());
        if (this.useSsl()) {
            cfg.setSslContextFactory(this.sslContextFactory());
        }
        GridClientDataConfiguration loc = new GridClientDataConfiguration();
        GridClientDataConfiguration partitioned = new GridClientDataConfiguration();
        partitioned.setName(PARTITIONED_CACHE_NAME);
        partitioned.setAffinity((GridClientDataAffinity)new GridClientPartitionAffinity());
        GridClientDataConfiguration partitionedAsyncBackup = new GridClientDataConfiguration();
        partitionedAsyncBackup.setName(PARTITIONED_ASYNC_BACKUP_CACHE_NAME);
        partitionedAsyncBackup.setAffinity((GridClientDataAffinity)new GridClientPartitionAffinity());
        GridClientDataConfiguration replicated = new GridClientDataConfiguration();
        replicated.setName(REPLICATED_CACHE_NAME);
        GridClientDataConfiguration replicatedAsync = new GridClientDataConfiguration();
        replicatedAsync.setName(REPLICATED_ASYNC_CACHE_NAME);
        cfg.setDataConfigurations(Arrays.asList(loc, partitioned, replicated, replicatedAsync, partitionedAsyncBackup));
        return cfg;
    }

    private static class TestTask
    extends ComputeTaskSplitAdapter<Object, String> {
        @IgniteInstanceResource
        private Ignite ignite;
        private int gridSize;

        private TestTask() {
        }

        protected Collection<? extends ComputeJob> split(int gridSize, Object arg) {
            ArrayList<1> jobs = new ArrayList<1>(gridSize);
            this.gridSize = gridSize;
            final String locNodeId = this.ignite.cluster().localNode().id().toString();
            for (int i = 0; i < gridSize; ++i) {
                jobs.add(new ComputeJobAdapter(){

                    public Object execute() {
                        return new IgniteBiTuple((Object)locNodeId, (Object)1);
                    }
                });
            }
            return jobs;
        }

        public String reduce(List<ComputeJobResult> results) {
            int sum = 0;
            String locNodeId = null;
            for (ComputeJobResult res : results) {
                Integer i;
                IgniteBiTuple part = (IgniteBiTuple)res.getData();
                if (locNodeId == null) {
                    locNodeId = (String)part.get1();
                }
                if ((i = (Integer)part.get2()) == null) continue;
                sum += i.intValue();
            }
            assert (this.gridSize == sum);
            return locNodeId;
        }
    }
}

