/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache;

import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.GridDebug;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;

public class IgniteCacheQueryLargeRecordsOomTest
extends GridCommonAbstractTest {
    private static final int READERS = 2;
    private static final int ITERATIONS = 10;
    private static final String HEAP_DUMP_FILE_NAME = "test.hprof";
    private static final float MAX_LEAK = 30.0f;

    @Test
    public void testMemoryLeakDistributed() throws Exception {
        this.checkMemoryLeak(false, false);
    }

    @Test
    public void testMemoryLeakDistributedLazy() throws Exception {
        this.checkMemoryLeak(false, true);
    }

    @Test
    public void testMemoryLeakLocal() throws Exception {
        this.checkMemoryLeak(true, false);
    }

    @Test
    public void testMemoryLeakLocalLazy() throws Exception {
        this.checkMemoryLeak(true, true);
    }

    private void checkMemoryLeak(boolean loc, boolean lazy) throws Exception {
        this.startGridsMultiThreaded(2);
        IgniteCache cache = this.grid(0).cache("default");
        for (long i = 0L; i < 1000L; ++i) {
            Person val = new Person(new byte[0x100000]);
            cache.put((Object)i, (Object)val);
        }
        if (log.isInfoEnabled()) {
            log.info("Data loaded");
        }
        QueriesRunner runner = new QueriesRunner(cache, 2, this.getTestTimeout());
        runner.runQueries(3, loc, lazy);
        GridDebug.dumpHeap((String)HEAP_DUMP_FILE_NAME, (boolean)true);
        File dumpFile = new File(HEAP_DUMP_FILE_NAME);
        long size0 = dumpFile.length();
        runner.runQueries(10, loc, lazy);
        GridDebug.dumpHeap((String)HEAP_DUMP_FILE_NAME, (boolean)true);
        float leakSize = (float)(dumpFile.length() - size0) / 1024.0f / 1024.0f;
        if (log.isInfoEnabled()) {
            log.info("Current leak size=" + leakSize + "MB, heap size after warm up=" + size0 / 1024L / 1024L + "MB.");
        }
        IgniteCacheQueryLargeRecordsOomTest.assertTrue((String)("The memory leak detected : " + leakSize + "MB. See heap dump '" + dumpFile.getAbsolutePath() + "'"), (leakSize < 30.0f ? 1 : 0) != 0);
        dumpFile.delete();
        runner.shutdown();
    }

    protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
        cfg.setCacheConfiguration(new CacheConfiguration[]{new CacheConfiguration("default").setIndexedTypes(new Class[]{Long.class, Person.class})});
        cfg.setDataStorageConfiguration(new DataStorageConfiguration().setMetricsEnabled(true).setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMetricsEnabled(true).setMaxSize(0x40000000L)));
        TcpCommunicationSpi communicationSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
        communicationSpi.setAckSendThreshold(1);
        return cfg;
    }

    protected long getTestTimeout() {
        return 120000L;
    }

    protected void afterTest() throws Exception {
        this.stopAllGrids(true);
    }

    private static class QueriesRunner {
        private final ExecutorService exec;
        private final int runners;
        private final long timeout;
        private final IgniteCache<Object, Object> cache;

        private QueriesRunner(IgniteCache<Object, Object> cache, int runners, long timeout) {
            this.cache = cache;
            this.timeout = timeout;
            this.exec = Executors.newFixedThreadPool(runners, new ThreadFactory(){

                @Override
                public Thread newThread(@NotNull Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("test-async-query-runner-" + t.hashCode());
                    return t;
                }
            });
            this.runners = runners;
        }

        void runQueries(final int iterations, final boolean loc, final boolean lazy) throws Exception {
            ArrayList<Future<Object>> futs = new ArrayList<Future<Object>>(this.runners);
            for (int i = 0; i < this.runners; ++i) {
                Future<Object> f = this.exec.submit(new Callable<Object>(){

                    @Override
                    public Object call() {
                        for (int j = 0; j < iterations; ++j) {
                            if (Thread.currentThread().isInterrupted()) {
                                return null;
                            }
                            if (log.isInfoEnabled()) {
                                log.info("Iteration " + j);
                            }
                            String sql = "select * from Person limit " + (j + 1) + (j % 2 == 0 ? "" : " offset 1");
                            FieldsQueryCursor qry = cache.query(new SqlFieldsQuery(sql).setLazy(lazy).setLocal(loc));
                            qry.getAll();
                            qry.close();
                        }
                        return null;
                    }
                });
                futs.add(f);
            }
            for (Future<Object> f : futs) {
                f.get();
            }
        }

        public void shutdown() throws InterruptedException {
            this.exec.shutdownNow();
            this.exec.awaitTermination(this.timeout, TimeUnit.MILLISECONDS);
        }
    }

    private static class Person {
        byte[] b;

        Person(byte[] b) {
            this.b = b;
        }
    }
}

