/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.control.agent.processor.export.queries;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomUtils;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.resource.DependencyResolver;
import org.assertj.core.api.AbstractListAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.groups.Tuple;
import org.gridgain.control.agent.AgentCommonAbstractTest;
import org.gridgain.control.agent.StompDestinationsUtils;
import org.gridgain.control.agent.dto.action.query.QueryInfo;
import org.gridgain.control.agent.dto.action.query.QueryInfoStatus;
import org.gridgain.control.agent.processor.export.queries.RunningQueryExporter;
import org.gridgain.control.agent.test.TestSqlTestFunctions;
import org.gridgain.control.agent.test.TestUtils;
import org.gridgain.control.agent.utils.RegistryNoopDependencyResolver;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RunningQueryExporterTest
extends AgentCommonAbstractTest {
    private final Logger log = LoggerFactory.getLogger(RunningQueryExporterTest.class);
    private RegistryNoopDependencyResolver di;

    @Override
    @Before
    public void setup() throws Exception {
        this.di = new RegistryNoopDependencyResolver();
        IgnitionEx.dependencyResolver((DependencyResolver)this.di);
        super.cleanup();
    }

    @Override
    @After
    public void teardown() {
        System.clearProperty("EXPORT_INITIAL_DELAY");
        System.clearProperty("EXPORT_PERIOD");
        IgnitionEx.dependencyResolver(null);
        super.teardown();
    }

    @Override
    protected IgniteConfiguration getConfiguration(String instanceName) {
        return super.getConfiguration(instanceName).setCacheConfiguration(new CacheConfiguration[]{new CacheConfiguration().setName("TestCache").setIndexedTypes(new Class[]{Integer.class, String.class}).setSqlFunctionClasses(new Class[]{TestSqlTestFunctions.class})});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldSendTaskUpdatesToTopic() {
        IgniteEx ignite = this.prepareGrids(1, 100L, 1000L);
        UUID clusterId = ignite.cluster().id();
        String longRunningQry = "SELECT count(*), delay(10000) AS \"1\" FROM \"TestCache\".STRING";
        Thread longRunningQryThread = new Thread(() -> {
            try (FieldsQueryCursor cur = this.createCacheWithSqlTestFunctions(ignite).query(new SqlFieldsQuery("SELECT count(*), delay(10000) AS \"1\" FROM \"TestCache\".STRING"));){
                cur.getAll();
            }
        });
        try {
            longRunningQryThread.start();
            int batchIdx = 0;
            this.log.info("Assert query is running");
            this.assertQuery(clusterId, batchIdx, batch -> {
                AbstractListAssert cfr_ignored_0 = (AbstractListAssert)Assertions.assertThat((List)batch).extracting(new String[]{"query", "cancellable", "status"}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"SELECT count(*), delay(10000) AS \"1\" FROM \"TestCache\".STRING", true, QueryInfoStatus.RUNNING})});
            });
            this.log.info("Assert query is finished");
            this.assertQuery(clusterId, ++batchIdx, batch -> {
                AbstractListAssert cfr_ignored_0 = (AbstractListAssert)Assertions.assertThat((List)batch).extracting(new String[]{"query", "cancellable", "status", "failedReason", "stacktrace"}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"SELECT count(*), delay(10000) AS \"1\" FROM \"TestCache\".STRING", false, QueryInfoStatus.FINISHED, null, null})});
            });
        }
        finally {
            longRunningQryThread.interrupt();
        }
    }

    @Test
    public void shouldSendUpdatesInSingleBatch() {
        IgniteEx ignite = this.prepareGrids(1, 100L, 2000L);
        UUID clusterId = ignite.cluster().id();
        String qry = "SELECT count(*) AS \"1\" FROM \"TestCache\".STRING";
        try (FieldsQueryCursor cur = this.createCacheWithSqlTestFunctions(ignite).query(new SqlFieldsQuery("SELECT count(*) AS \"1\" FROM \"TestCache\".STRING"));){
            cur.getAll();
        }
        this.log.info("Assert query is finished");
        this.assertQuery(clusterId, 0, batch -> {
            AbstractListAssert cfr_ignored_0 = (AbstractListAssert)Assertions.assertThat((List)batch).extracting(new String[]{"query", "status"}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"SELECT count(*) AS \"1\" FROM \"TestCache\".STRING", QueryInfoStatus.FINISHED})});
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldSendCancelledStatusToTopic() {
        IgniteEx ignite = this.prepareGrids(1, 100L, 5000L);
        UUID clusterId = ignite.cluster().id();
        String qry = "SELECT count(*), delay(2000) AS \"1\" FROM \"TestCache\".STRING";
        Thread longRunningQryThread = new Thread(() -> {
            try (FieldsQueryCursor cur = this.createCacheWithSqlTestFunctions(ignite).query(new SqlFieldsQuery("SELECT count(*), delay(2000) AS \"1\" FROM \"TestCache\".STRING"));){
                cur.getAll();
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        try {
            longRunningQryThread.start();
            TestUtils.assertWithPoll(() -> ignite.context().query().runningQueries(0L).size() > 0);
            List qryIds = ignite.context().query().runningQueries(0L).stream().map(GridRunningQueryInfo::id).collect(Collectors.toList());
            ignite.context().query().cancelQueries(qryIds);
            this.assertQuery(clusterId, 0, batch -> {
                AbstractListAssert cfr_ignored_0 = (AbstractListAssert)Assertions.assertThat((List)batch).extracting(new String[]{"query", "status"}).containsExactly((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"SELECT count(*), delay(2000) AS \"1\" FROM \"TestCache\".STRING", QueryInfoStatus.CANCELED})});
            });
        }
        finally {
            longRunningQryThread.interrupt();
        }
    }

    @Test
    public void shouldSendFromOtherNodes() {
        IgniteEx ignite = this.prepareGrids(2, 100L, 500L);
        UUID clusterId = ignite.cluster().id();
        String qry = "SELECT count(*) AS \"1\" FROM \"TestCache\".STRING";
        try (FieldsQueryCursor cur = this.createCacheWithSqlTestFunctions(ignite).query(new SqlFieldsQuery("SELECT count(*) AS \"1\" FROM \"TestCache\".STRING"));){
            cur.getAll();
        }
        cur = this.createCacheWithSqlTestFunctions(this.ignite(1)).query(new SqlFieldsQuery("SELECT count(*) AS \"1\" FROM \"TestCache\".STRING"));
        var5_5 = null;
        try {
            cur.getAll();
        }
        catch (Throwable throwable) {
            var5_5 = throwable;
            throw throwable;
        }
        finally {
            if (cur != null) {
                if (var5_5 != null) {
                    try {
                        cur.close();
                    }
                    catch (Throwable throwable) {
                        var5_5.addSuppressed(throwable);
                    }
                } else {
                    cur.close();
                }
            }
        }
        TestUtils.assertWithPoll(() -> {
            List queries = this.inInterceptor.getAllListPayloads(StompDestinationsUtils.buildSaveQueriesDest((UUID)clusterId), QueryInfo.class).stream().flatMap(Collection::stream).collect(Collectors.toList());
            if (queries.size() < 2) {
                return false;
            }
            Assertions.assertThat(queries).extracting(new String[]{"query", "status", "nodeId"}).containsExactlyInAnyOrder((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{"SELECT count(*) AS \"1\" FROM \"TestCache\".STRING", QueryInfoStatus.FINISHED, ignite.localNode().id()}), Assertions.tuple((Object[])new Object[]{"SELECT count(*) AS \"1\" FROM \"TestCache\".STRING", QueryInfoStatus.FINISHED, this.ignite(1).localNode().id()})});
            return true;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldSendBatchImmediatelly() {
        int testBatchSize = 5;
        ArrayList testStatuses = new ArrayList(testBatchSize);
        try {
            int i;
            System.setProperty("TASK_BATCH_SIZE", String.valueOf(testBatchSize));
            IgniteEx ignite = this.prepareGrids(1, 25000L, 25000L);
            UUID clusterId = ignite.cluster().id();
            RunningQueryExporter exporter = this.di.getDependency(RunningQueryExporter.class);
            String qry = "SELECT count(*) AS \"1\" FROM \"TestCache\".STRING";
            for (i = 0; i < testBatchSize; ++i) {
                exporter.processQuery(this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.RUNNING));
            }
            TestUtils.assertWithPoll(() -> this.checkPackageSent(clusterId, 1, testBatchSize, QueryInfoStatus.RUNNING));
            testStatuses.clear();
            for (i = 0; i < testBatchSize; ++i) {
                exporter.processQuery(this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.FINISHED));
            }
            TestUtils.assertWithPoll(() -> this.checkPackageSent(clusterId, 2, testBatchSize, QueryInfoStatus.FINISHED));
        }
        finally {
            System.clearProperty("TASK_BATCH_SIZE");
            IgnitionEx.dependencyResolver(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldKeepTerminatedQryStatusDuringMergeInfo() {
        int testBatchSize = 2;
        try {
            System.setProperty("TASK_BATCH_SIZE", String.valueOf(testBatchSize));
            IgniteEx ignite = this.prepareGrids(1, 100L, 2000L);
            UUID clusterId = ignite.cluster().id();
            RunningQueryExporter exporter = this.di.getDependency(RunningQueryExporter.class);
            String qry1 = "SELECT count(*) AS \"1\" FROM \"TestCache1\".STRING";
            String qry2 = "SELECT count(*) AS \"1\" FROM \"TestCache2\".STRING";
            exporter.processQuery(this.prepareTestQueryInfo(ignite, 11L, qry1, QueryInfoStatus.RUNNING));
            exporter.processQuery(this.prepareTestQueryInfo(ignite, 11L, qry1, QueryInfoStatus.FINISHED));
            exporter.processQuery(this.prepareTestQueryInfo(ignite, 11L, qry1, QueryInfoStatus.RUNNING));
            exporter.processQuery(this.prepareTestQueryInfo(ignite, 12L, qry2, QueryInfoStatus.RUNNING));
            this.assertQuery(clusterId, 0, batch -> {
                AbstractListAssert cfr_ignored_0 = (AbstractListAssert)Assertions.assertThat((List)batch).extracting(new String[]{"query", "status"}).containsExactlyInAnyOrder((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{qry1, QueryInfoStatus.FINISHED}), Assertions.tuple((Object[])new Object[]{qry2, QueryInfoStatus.RUNNING})});
            });
        }
        finally {
            System.clearProperty("TASK_BATCH_SIZE");
            IgnitionEx.dependencyResolver(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldLimitMaximumBuffer() {
        int bufSz = 5;
        ArrayList<QueryInfo> expectedQueries = new ArrayList<QueryInfo>(bufSz);
        ArrayList<QueryInfo> testQueries = new ArrayList<QueryInfo>();
        try {
            int i;
            System.setProperty("EXPORT_BUFFER_SIZE", String.valueOf(bufSz));
            IgniteEx ignite = this.prepareGrids(1, 500L, 500L);
            UUID clusterId = ignite.cluster().id();
            String qry = "SELECT count(*) AS \"1\" FROM \"TestCache\".STRING";
            QueryInfo info = this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.FINISHED);
            testQueries.add(info);
            expectedQueries.add(info);
            for (i = 0; i < 3; ++i) {
                info = this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.RUNNING);
                testQueries.add(info);
                if (i != 0) continue;
                expectedQueries.add(info);
            }
            for (i = 0; i < 3; ++i) {
                info = this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.FINISHED);
                testQueries.add(info);
                expectedQueries.add(info);
            }
            testQueries.forEach(arg_0 -> ((RunningQueryExporter)this.di.getDependency(RunningQueryExporter.class)).processQuery(arg_0));
            this.assertQuery(clusterId, 0, batch -> ((ListAssert)Assertions.assertThat((List)batch).usingRecursiveFieldByFieldElementComparator()).containsExactlyInAnyOrderElementsOf((Iterable)expectedQueries));
        }
        finally {
            System.clearProperty("EXPORT_BUFFER_SIZE");
            IgnitionEx.dependencyResolver(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldFilterQueriesByDuration() throws InterruptedException {
        int batchSize = 4;
        long minDuration = 2000L;
        long period = 500L;
        ArrayList<QueryInfo> expQueries = new ArrayList<QueryInfo>();
        try {
            System.setProperty("TASK_BATCH_SIZE", String.valueOf(batchSize));
            System.setProperty("RUNNING_QUERY_MIN_DURATION", String.valueOf(minDuration));
            IgniteEx ignite = this.prepareGrids(1, 60000L, period);
            UUID clusterId = ignite.cluster().id();
            String qry = "SELECT count(*) AS \"1\" FROM \"TestCache\".STRING";
            QueryInfo finishedQry1000Ms = this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.FINISHED).setDuration(Long.valueOf(1000L));
            QueryInfo finishedQry1001Ms = this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.FINISHED).setDuration(Long.valueOf(1001L));
            QueryInfo finishedQry3000Ms = this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.FINISHED).setDuration(Long.valueOf(3000L));
            QueryInfo finishedQry3001Ms = this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.FINISHED).setDuration(Long.valueOf(3001L));
            long now = System.currentTimeMillis();
            QueryInfo runningQry3000ms = this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.RUNNING).setStartedAt(now - 3000L);
            QueryInfo runningQry1000ms = this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.RUNNING).setStartedAt(now - 1501L);
            expQueries.add(finishedQry3000Ms);
            expQueries.add(finishedQry3001Ms);
            expQueries.add(runningQry3000ms);
            RunningQueryExporter exporter = this.di.getDependency(RunningQueryExporter.class);
            exporter.processQuery(finishedQry3000Ms);
            exporter.processQuery(finishedQry1000Ms);
            exporter.processQuery(runningQry3000ms);
            exporter.processQuery(finishedQry3001Ms);
            exporter.processQuery(finishedQry1001Ms);
            exporter.processQuery(runningQry1000ms);
            this.assertQuery(clusterId, 0, batch -> ((ListAssert)Assertions.assertThat((List)batch).usingRecursiveFieldByFieldElementComparator()).containsExactlyInAnyOrderElementsOf((Iterable)expQueries));
            Thread.sleep(2000L);
            QueryInfo shortLostRunningQry = this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.RUNNING).setGlobalQueryId(finishedQry1000Ms.getGlobalQueryId()).setStartedAt(now - 3000L);
            QueryInfo finishedQry2500Ms = this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.FINISHED).setDuration(Long.valueOf(2500L));
            QueryInfo finishedQry2501Ms = this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.FINISHED).setDuration(Long.valueOf(2501L));
            QueryInfo finishedQry2502Ms = this.prepareTestQueryInfo(ignite, qry, QueryInfoStatus.FINISHED).setDuration(Long.valueOf(2502L));
            expQueries.clear();
            expQueries.add(runningQry1000ms);
            expQueries.add(finishedQry2500Ms);
            expQueries.add(finishedQry2501Ms);
            expQueries.add(finishedQry2502Ms);
            exporter.processQuery(shortLostRunningQry);
            exporter.processQuery(finishedQry2500Ms);
            exporter.processQuery(finishedQry2501Ms);
            exporter.processQuery(finishedQry2502Ms);
            this.assertQuery(clusterId, 1, batch -> ((ListAssert)Assertions.assertThat((List)batch).usingRecursiveFieldByFieldElementComparator()).containsExactlyInAnyOrderElementsOf((Iterable)expQueries));
        }
        finally {
            System.clearProperty("EXPORT_BUFFER_SIZE");
            System.clearProperty("RUNNING_QUERY_MIN_DURATION");
            IgnitionEx.dependencyResolver(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldFailWithInavalidSyntax() {
        IgniteEx ignite = this.prepareGrids(1, 100L, 5000L);
        UUID clusterId = ignite.cluster().id();
        String qry = "SELECT can_fail(TRUE)";
        Thread qryThread = new Thread(() -> {
            try (FieldsQueryCursor cur = this.createCacheWithSqlTestFunctions(ignite).query(new SqlFieldsQuery(qry));){
                cur.getAll();
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        try {
            qryThread.start();
            TestUtils.assertWithPoll(() -> {
                List queries = this.inInterceptor.getAllListPayloads(StompDestinationsUtils.buildSaveQueriesDest((UUID)clusterId), QueryInfo.class).stream().flatMap(Collection::stream).collect(Collectors.toList());
                if (queries.isEmpty()) {
                    return false;
                }
                Assertions.assertThat(queries).extracting(new String[]{"query", "status", "cancellable", "nodeId"}).containsExactlyInAnyOrder((Object[])new Tuple[]{Assertions.tuple((Object[])new Object[]{qry, QueryInfoStatus.FAILED, false, ignite.localNode().id()})});
                return true;
            });
        }
        finally {
            qryThread.interrupt();
        }
    }

    private IgniteEx prepareGrids(int gridCnt, long initDelay, long period) {
        System.setProperty("EXPORT_INITIAL_DELAY", String.valueOf(initDelay));
        System.setProperty("EXPORT_PERIOD", String.valueOf(period));
        IgniteEx ignite = this.startGrids(gridCnt);
        ignite.cluster().state(ClusterState.ACTIVE);
        this.changeAgentConfiguration(ignite);
        IgniteClusterEx cluster = ignite.cluster();
        this.attachCluster(cluster);
        TestUtils.assertWithPoll(() -> this.inInterceptor.getListPayload(StompDestinationsUtils.buildSaveQueriesDest((UUID)cluster.id()), QueryInfo.class).isEmpty());
        return ignite;
    }

    private QueryInfo prepareTestQueryInfo(IgniteEx ignite, String qry, QueryInfoStatus status) {
        return this.prepareTestQueryInfo(ignite, RandomUtils.nextLong(), qry, status);
    }

    private QueryInfo prepareTestQueryInfo(IgniteEx ignite, long qryId, String qry, QueryInfoStatus status) {
        UUID nid = ignite.context().localNodeId();
        try {
            Thread.sleep(1L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        return new QueryInfo().setId(qryId).setGlobalQueryId(QueryUtils.globalQueryId((UUID)nid, (long)qryId)).setNodeId(nid).setQuery(qry).setQueryType(GridCacheQueryType.SQL_FIELDS).setStartedAt(System.currentTimeMillis()).setDuration(status == QueryInfoStatus.RUNNING ? null : Long.valueOf(RandomUtils.nextLong())).setStatus(status);
    }

    private boolean checkPackageSent(UUID clusterId, int pkgIdx, int testBatchSize, QueryInfoStatus status) {
        List<List<QueryInfo>> batches = this.inInterceptor.getAllListPayloads(StompDestinationsUtils.buildSaveQueriesDest((UUID)clusterId), QueryInfo.class);
        if (batches.size() != pkgIdx) {
            return false;
        }
        List<QueryInfo> batch = batches.get(pkgIdx - 1);
        if (batch.size() != testBatchSize) {
            return false;
        }
        return batch.stream().allMatch(info -> info.getStatus() == status);
    }

    private void assertQuery(UUID clusterId, int batchIdx, Consumer<List<QueryInfo>> batchConsumer) {
        TestUtils.assertWithPoll(() -> {
            List<List<QueryInfo>> batches = this.inInterceptor.getAllListPayloads(StompDestinationsUtils.buildSaveQueriesDest((UUID)clusterId), QueryInfo.class);
            if (batchIdx >= batches.size()) {
                return false;
            }
            batchConsumer.accept(batches.get(batchIdx));
            return true;
        });
    }

    private <K, V> IgniteCache<K, V> createCacheWithSqlTestFunctions(IgniteEx ignite) {
        CacheConfiguration cfg = this.cacheConfiguration("TestCache").setIndexedTypes(new Class[]{Integer.class, String.class}).setSqlFunctionClasses(new Class[]{TestSqlTestFunctions.class});
        return ignite.getOrCreateCache(cfg);
    }
}

