/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.query;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CyclicBarrier;
import javax.cache.Cache;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SpiQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

public class IgniteQueryDedicatedPoolTest
extends GridCommonAbstractTest {
    private static final String CACHE_NAME = "query_pool_test";
    private static ListeningTestLogger testLog;
    private Integer qryPoolSize;

    protected void beforeTestsStarted() throws Exception {
        super.beforeTestsStarted();
        testLog = new ListeningTestLogger(false, log);
    }

    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName).setGridLogger((IgniteLogger)testLog);
        CacheConfiguration ccfg = new CacheConfiguration("default");
        ccfg.setIndexedTypes(new Class[]{Integer.class, Integer.class});
        ccfg.setIndexedTypes(new Class[]{Byte.class, Byte.class});
        ccfg.setSqlFunctionClasses(new Class[]{IgniteQueryDedicatedPoolTest.class});
        ccfg.setName(CACHE_NAME);
        cfg.setCacheConfiguration(new CacheConfiguration[]{ccfg});
        if ("client".equals(gridName)) {
            cfg.setClientMode(true);
        }
        cfg.setIndexingSpi((IndexingSpi)new TestIndexingSpi());
        if (Objects.nonNull(this.qryPoolSize)) {
            cfg.setQueryThreadPoolSize(this.qryPoolSize.intValue());
        }
        return cfg;
    }

    protected void afterTest() throws Exception {
        super.afterTest();
        this.stopAllGrids();
        testLog.clearListeners();
    }

    @Test
    public void testSqlQueryUsesDedicatedThreadPool() throws Exception {
        this.startGrid("server");
        try (IgniteEx client = this.startGrid("client");){
            IgniteCache cache = client.cache(CACHE_NAME);
            cache.put((Object)1, (Object)1);
            FieldsQueryCursor cursor = cache.query(new SqlFieldsQuery("select currentPolicy() from Integer"));
            List result = cursor.getAll();
            cursor.close();
            IgniteQueryDedicatedPoolTest.assertEquals((int)1, (int)result.size());
            Byte plc = (Byte)((List)result.get(0)).get(0);
            IgniteQueryDedicatedPoolTest.assertNotNull((Object)plc);
            IgniteQueryDedicatedPoolTest.assertEquals((byte)10, (byte)plc);
        }
    }

    @Test
    public void testScanQueryUsesDedicatedThreadPool() throws Exception {
        this.startGrid("server");
        try (IgniteEx client = this.startGrid("client");){
            IgniteCache cache = client.cache(CACHE_NAME);
            cache.put((Object)0, (Object)0);
            QueryCursor cursor = cache.query((Query)new ScanQuery((IgniteBiPredicate)new IgniteBiPredicate<Object, Object>(){

                public boolean apply(Object o, Object o2) {
                    return F.eq((Object)GridIoManager.currentPolicy(), (Object)10);
                }
            }));
            IgniteQueryDedicatedPoolTest.assertEquals((int)1, (int)cursor.getAll().size());
            cursor.close();
        }
    }

    @Test
    public void testSpiQueryUsesDedicatedThreadPool() throws Exception {
        this.startGrid("server");
        try (IgniteEx client = this.startGrid("client");){
            IgniteCache cache = client.cache(CACHE_NAME);
            for (byte b = 0; b < 127; b = (byte)(b + 1)) {
                cache.put((Object)b, (Object)b);
            }
            QueryCursor cursor = cache.query((Query)new SpiQuery());
            List all = cursor.getAll();
            IgniteQueryDedicatedPoolTest.assertEquals((int)1, (int)all.size());
            IgniteQueryDedicatedPoolTest.assertEquals((byte)10, (byte)((Byte)((Cache.Entry)all.get(0)).getValue()));
            cursor.close();
        }
    }

    @Test
    @WithSystemProperty(key="IGNITE_STARVATION_CHECK_INTERVAL", value="10")
    public void testContainsStarvationQryPoolInLog() throws Exception {
        this.checkStarvationQryPoolInLog(10000L, "Possible thread pool starvation detected (no task completed in last 10ms, is query thread pool size large enough?)", true);
    }

    @Test
    @WithSystemProperty(key="IGNITE_STARVATION_CHECK_INTERVAL", value="0")
    public void testNotContainsStarvationQryPoolInLog() throws Exception {
        this.checkStarvationQryPoolInLog(1000L, "Possible thread pool starvation detected (no task completed in", false);
    }

    private void checkStarvationQryPoolInLog(long checkTimeout, String findLogMsg, boolean contains) throws Exception {
        IgniteQueryDedicatedPoolTest.assertNotNull((Object)findLogMsg);
        this.qryPoolSize = 1;
        this.startGrid("server");
        IgniteEx clientNode = this.startGrid("client");
        IgniteCache cache = clientNode.cache(CACHE_NAME);
        cache.put((Object)0, (Object)0);
        int qrySize = 2;
        CyclicBarrier barrier = new CyclicBarrier(qrySize);
        LogListener logLsnr = LogListener.matches((String)findLogMsg).build();
        testLog.registerListener(logLsnr);
        for (int i = 0; i < qrySize; ++i) {
            GridTestUtils.runAsync(() -> {
                barrier.await();
                cache.query((Query)new ScanQuery((IgniteBiPredicate & Serializable)(o, o2) -> {
                    IgniteQueryDedicatedPoolTest.doSleep((long)500L);
                    return true;
                })).getAll();
                return null;
            });
        }
        if (contains) {
            IgniteQueryDedicatedPoolTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> ((LogListener)logLsnr).check(), (long)checkTimeout));
        } else {
            IgniteQueryDedicatedPoolTest.assertFalse((boolean)GridTestUtils.waitForCondition(() -> ((LogListener)logLsnr).check(), (long)checkTimeout));
        }
    }

    @QuerySqlFunction(alias="currentPolicy")
    public static Byte currentPolicy() {
        return GridIoManager.currentPolicy();
    }

    private static class TestIndexingSpi
    extends IgniteSpiAdapter
    implements IndexingSpi {
        private final SortedMap<Object, Object> idx = new TreeMap<Object, Object>();

        private TestIndexingSpi() {
        }

        public void spiStart(@Nullable String gridName) {
        }

        public void spiStop() {
        }

        public Iterator<Cache.Entry<?, ?>> query(@Nullable String cacheName, Collection<Object> params, @Nullable IndexingQueryFilter filters) {
            return this.idx.containsKey((byte)10) ? Collections.singletonList(new CacheEntryImpl((Object)10, (Object)10)).iterator() : Collections.emptyList().iterator();
        }

        public void store(@Nullable String cacheName, Object key, Object val, long expirationTime) {
            this.idx.put(key, val);
        }

        public void remove(@Nullable String cacheName, Object key) {
        }
    }
}

