/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.query;

import java.sql.Connection;
import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.SqlClientContext;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;

public class RunningQueriesTest
extends AbstractIndexingCommonTest {
    private static final long TIMEOUT_IN_SEC = 5L;
    private static final long TIMEOUT_IN_MS = 5000L;
    private static volatile CyclicBarrier barrier;
    private static IgniteEx ignite;
    private static final int NODE_CNT = 2;
    @Rule
    public final TestWatcher restarter = new TestWatcher(){

        protected void failed(Throwable e, Description lastTest) {
            try {
                RunningQueriesTest.this.log().error("Last test failed [name=" + lastTest.getMethodName() + ", reason=" + e.getMessage() + "]. Restarting the grid.");
                if (barrier != null) {
                    barrier.reset();
                }
                RunningQueriesTest.this.stopAllGrids();
                RunningQueriesTest.this.beforeTestsStarted();
                RunningQueriesTest.this.log().error("Grid restarted.");
            }
            catch (Exception restartFailure) {
                throw new RuntimeException("Failed to recover after test failure [test=" + lastTest.getMethodName() + ", reason=" + e.getMessage() + "]. Subsequent test results of this test class are incorrect.", restartFailure);
            }
        }
    };

    protected void beforeTestsStarted() throws Exception {
        super.beforeTestsStarted();
        GridQueryProcessor.idxCls = BlockingIndexing.class;
        ignite = this.startGrids(2);
    }

    protected void beforeTest() throws Exception {
        super.beforeTest();
        RunningQueriesTest.newBarrier(1);
        ignite.destroyCache("default");
        IgniteCache cache = ignite.getOrCreateCache(new CacheConfiguration().setName("default").setQueryEntities(Collections.singletonList(new QueryEntity(Integer.class, Integer.class))).setIndexedTypes(new Class[]{Integer.class, Integer.class}));
        cache.put((Object)100000, (Object)0);
    }

    @Override
    protected void afterTestsStopped() throws Exception {
        this.stopAllGrids();
        ignite = null;
        super.afterTestsStopped();
    }

    protected void afterTest() throws Exception {
        super.afterTest();
        Assert.assertEquals((long)0L, (long)barrier.getNumberWaiting());
        this.assertNoRunningQueries(new IgniteEx[0]);
    }

    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);
        cfg.setDiscoverySpi((DiscoverySpi)new TcpDiscoverySpi(){

            public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
                if (CustomMessageWrapper.class.isAssignableFrom(msg.getClass())) {
                    DiscoveryCustomMessage delegate = ((CustomMessageWrapper)msg).delegate();
                    if (DynamicCacheChangeBatch.class.isAssignableFrom(delegate.getClass())) {
                        ((DynamicCacheChangeBatch)delegate).requests().stream().filter(c -> !c.cacheName().equalsIgnoreCase("default")).findAny().ifPresent(c -> {
                            try {
                                RunningQueriesTest.awaitTimeout();
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                            }
                        });
                    } else if (SchemaProposeDiscoveryMessage.class.isAssignableFrom(delegate.getClass())) {
                        try {
                            RunningQueriesTest.awaitTimeout();
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
                super.sendCustomEvent(msg);
            }
        });
        cfg.setCommunicationSpi((CommunicationSpi)new TcpCommunicationSpi(){

            public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
                Message gridMsg;
                if (GridIoMessage.class.isAssignableFrom(msg.getClass()) && (GridNearAtomicSingleUpdateFilterRequest.class.isAssignableFrom((gridMsg = ((GridIoMessage)msg).message()).getClass()) || GridNearAtomicFullUpdateRequest.class.isAssignableFrom(gridMsg.getClass()))) {
                    try {
                        RunningQueriesTest.awaitTimeout();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
                super.sendMessage(node, msg, ackC);
            }
        });
        return cfg;
    }

    @Test
    public void testCloseRunningQueriesOnNodeStop() throws Exception {
        IgniteEx ign = this.startGrid(super.getConfiguration("TST"));
        IgniteCache cache = ign.getOrCreateCache(new CacheConfiguration().setName("TST").setQueryEntities(Collections.singletonList(new QueryEntity(Integer.class, Integer.class))));
        for (int i = 0; i < 10000; ++i) {
            cache.put((Object)i, (Object)i);
        }
        cache.query(new SqlFieldsQuery("SELECT * FROM Integer order by _key"));
        Assert.assertEquals((String)"Should be one running query", (long)1L, (long)ign.context().query().runningQueries(-1L).size());
        ign.close();
        Assert.assertEquals((long)0L, (long)ign.context().query().runningQueries(-1L).size());
    }

    @Test
    public void testAutoCloseQueryAfterIteratorIsExhausted() {
        IgniteCache cache = ignite.cache("default");
        for (int i = 0; i < 100; ++i) {
            cache.put((Object)i, (Object)i);
        }
        FieldsQueryCursor query = cache.query(new SqlFieldsQuery("SELECT * FROM Integer order by _key"));
        query.iterator().forEachRemaining(e -> Assert.assertEquals((String)"Should be one running query", (long)1L, (long)ignite.context().query().runningQueries(-1L).size()));
        this.assertNoRunningQueries(new IgniteEx[0]);
    }

    @Test
    public void testClusterWideQueryIdGeneration() {
        RunningQueriesTest.newBarrier(1);
        IgniteCache cache = ignite.cache("default");
        for (int i = 0; i < 100; ++i) {
            FieldsQueryCursor cursor = cache.query(new SqlFieldsQuery("SELECT * FROM Integer WHERE 1 = 1"));
            Collection runningQueries = ignite.context().query().runningQueries(-1L);
            RunningQueriesTest.assertEquals((int)1, (int)runningQueries.size());
            GridRunningQueryInfo r = (GridRunningQueryInfo)runningQueries.iterator().next();
            RunningQueriesTest.assertEquals((String)(ignite.context().localNodeId() + "_" + r.id()), (String)r.globalQueryId());
            cursor.close();
        }
    }

    @Test
    public void testQueries() throws Exception {
        RunningQueriesTest.newBarrier(3);
        IgniteCache cache = ignite.cache("default");
        IgniteInternalFuture fut1 = GridTestUtils.runAsync(() -> cache.query(new SqlFieldsQuery("SELECT * FROM /* comment */ Integer WHERE 1 = 1")).getAll());
        IgniteInternalFuture fut2 = GridTestUtils.runAsync(() -> cache.query((Query)new SqlQuery(Integer.class, "FROM /* comment */ Integer WHERE 1 = 1")).getAll());
        Assert.assertTrue((boolean)GridTestUtils.waitForCondition(() -> barrier.getNumberWaiting() == 2, (long)5000L));
        Collection runningQueries = ignite.context().query().runningQueries(-1L);
        RunningQueriesTest.assertEquals((int)2, (int)runningQueries.size());
        for (GridRunningQueryInfo info : runningQueries) {
            RunningQueriesTest.assertTrue((String)("Failed to find comment in query: " + info.query()), (boolean)info.query().contains("/* comment */"));
        }
        this.assertNoRunningQueries(ignite);
        RunningQueriesTest.awaitTimeout();
        fut1.get(5000L);
        fut2.get(5000L);
    }

    @Ignore(value="https://issues.apache.org/jira/browse/IGNITE-11510")
    @Test
    public void testQueryDmlDelete() throws Exception {
        this.testQueryDML("DELETE FROM /* comment */ Integer");
    }

    @Ignore(value="https://issues.apache.org/jira/browse/IGNITE-11510")
    @Test
    public void testQueryDmlInsert() throws Exception {
        this.testQueryDML("INSERT INTO Integer(_key, _val) VALUES(1,1)");
    }

    @Ignore(value="https://issues.apache.org/jira/browse/IGNITE-11510")
    @Test
    public void testQueryDmlUpdate() throws Exception {
        this.testQueryDML("UPDATE Integer set _val = 1 where 1=1");
    }

    public void testQueryDML(String dmlQry) throws Exception {
        RunningQueriesTest.newBarrier(2);
        IgniteCache cache = ignite.cache("default");
        SqlFieldsQuery qry = new SqlFieldsQuery(dmlQry);
        IgniteInternalFuture fut = GridTestUtils.runAsync(() -> cache.query(qry).getAll());
        this.assertWaitingOnBarrier();
        Collection runningQueries = ignite.context().query().runningQueries(-1L);
        RunningQueriesTest.assertEquals((int)1, (int)runningQueries.size());
        this.assertNoRunningQueries(ignite);
        runningQueries.forEach(info -> Assert.assertEquals((Object)qry.getSql(), (Object)info.query()));
        IgniteInternalFuture fut1 = GridTestUtils.runAsync(() -> barrier.await());
        RunningQueriesTest.awaitTimeout();
        fut1.get(5000L);
        fut.get(5000L);
    }

    @Test
    public void testQueryDdlDropIndex() throws Exception {
        RunningQueriesTest.newBarrier(1);
        ignite.cache("default").query(new SqlFieldsQuery("CREATE TABLE tst_idx_drop(id long PRIMARY KEY, cnt integer)"));
        ignite.cache("default").query(new SqlFieldsQuery("CREATE INDEX tst_idx_drop_idx ON tst_idx_drop(cnt)"));
        this.testQueryDDL("DROP INDEX tst_idx_drop_idx");
    }

    @Test
    public void testQueryDdlCreateIndex() throws Exception {
        RunningQueriesTest.newBarrier(1);
        ignite.cache("default").query(new SqlFieldsQuery("CREATE TABLE tst_idx_create(id long PRIMARY KEY, cnt integer)"));
        this.testQueryDDL("CREATE INDEX tst_idx_create_idx ON tst_idx_create(cnt)");
    }

    @Test
    public void testQueryDdlDropTable() throws Exception {
        RunningQueriesTest.newBarrier(1);
        ignite.cache("default").query(new SqlFieldsQuery("CREATE TABLE tst_drop(id long PRIMARY KEY, cnt integer)"));
        this.testQueryDDL("DROP TABLE tst_drop");
    }

    @Test
    public void testQueryDdlCreateTable() throws Exception {
        this.testQueryDDL("CREATE TABLE tst_create(id long PRIMARY KEY, cnt integer)");
    }

    public void testQueryDDL(String sql) throws Exception {
        RunningQueriesTest.newBarrier(2);
        IgniteCache cache = ignite.cache("default");
        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
        IgniteInternalFuture fut = GridTestUtils.runAsync(() -> cache.query(qry).getAll());
        this.assertWaitingOnBarrier();
        Collection runningQueries = ignite.context().query().runningQueries(-1L);
        RunningQueriesTest.assertEquals((int)1, (int)runningQueries.size());
        this.assertNoRunningQueries(ignite);
        runningQueries.forEach(info -> Assert.assertEquals((Object)qry.getSql(), (Object)info.query()));
        RunningQueriesTest.awaitTimeout();
        RunningQueriesTest.awaitTimeout();
        fut.get(5000L);
    }

    @Test
    public void testJdbcBatchDML() throws Exception {
        RunningQueriesTest.newBarrier(2);
        try (Connection conn = GridTestUtils.connect((IgniteEx)ignite, null);
             Statement stmt = conn.createStatement();){
            conn.setSchema("\"default\"");
            int BATCH_SIZE = 10;
            int key = 0;
            for (int i = 0; i < 10; ++i) {
                while (ignite.affinity("default").isPrimary(ignite.localNode(), (Object)key)) {
                    ++key;
                }
                stmt.addBatch("insert into Integer (_key, _val) values (" + key + "," + key + ")");
                ++key;
            }
            IgniteInternalFuture fut = GridTestUtils.runAsync(stmt::executeBatch);
            for (int i = 0; i < 10; ++i) {
                this.assertWaitingOnBarrier();
                Collection runningQueries = ignite.context().query().runningQueries(-1L);
                RunningQueriesTest.assertEquals((int)1, (int)runningQueries.size());
                RunningQueriesTest.awaitTimeout();
                this.assertWaitingOnBarrier();
                RunningQueriesTest.awaitTimeout();
            }
            fut.get(5000L);
        }
    }

    @Test
    public void testMultiStatement() throws Exception {
        RunningQueriesTest.newBarrier(2);
        int key = 0;
        int[] notAffinityKey = new int[2];
        for (int i = 0; i < notAffinityKey.length; ++i) {
            while (ignite.affinity("default").isPrimary(ignite.localNode(), (Object)key)) {
                ++key;
            }
            notAffinityKey[i] = key++;
        }
        CharSequence[] queries = new String[]{"create table test(ID int primary key, NAME varchar(20))", "insert into test (ID, NAME) values (" + notAffinityKey[0] + ", 'name')", "insert into test (ID, NAME) values (" + notAffinityKey[1] + ", 'name')", "SELECT * FROM test"};
        String sql = String.join((CharSequence)";", queries);
        try (Connection conn = GridTestUtils.connect((IgniteEx)ignite, null);
             Statement stmt = conn.createStatement();){
            IgniteInternalFuture fut = GridTestUtils.runAsync(() -> stmt.execute(sql));
            for (CharSequence query : queries) {
                this.assertWaitingOnBarrier();
                List runningQueries = (List)ignite.context().query().runningQueries(-1L);
                RunningQueriesTest.assertEquals((int)1, (int)runningQueries.size());
                RunningQueriesTest.assertEquals((String)query, (String)((GridRunningQueryInfo)runningQueries.get(0)).query());
                RunningQueriesTest.awaitTimeout();
            }
            fut.get(5000L);
        }
    }

    @Test
    public void testCopyCommand() throws Exception {
        try (Connection conn = GridTestUtils.connect((IgniteEx)ignite, null);
             Statement stmt = conn.createStatement();){
            conn.setSchema("\"default\"");
            RunningQueriesTest.newBarrier(1);
            stmt.execute("CREATE TABLE Person(id integer primary key, age integer, firstName varchar, lastname varchar)");
            String path = Objects.requireNonNull(IgniteUtils.resolveIgnitePath((String)"/modules/clients/src/test/resources/bulkload1.csv")).getAbsolutePath();
            RunningQueriesTest.newBarrier(2);
            String sql = "copy from '" + path + "' into Person (_key, age, firstName, lastName) format csv charset 'ascii'";
            IgniteInternalFuture fut = GridTestUtils.runAsync(() -> stmt.executeUpdate(sql));
            this.assertWaitingOnBarrier();
            List runningQueries = (List)ignite.context().query().runningQueries(-1L);
            RunningQueriesTest.assertEquals((int)1, (int)runningQueries.size());
            RunningQueriesTest.assertEquals((String)sql, (String)((GridRunningQueryInfo)runningQueries.get(0)).query());
            RunningQueriesTest.awaitTimeout();
            fut.get(5000L);
        }
    }

    private void assertWaitingOnBarrier() throws IgniteInterruptedCheckedException {
        Assert.assertTrue((String)("Still waiting " + barrier.getNumberWaiting() + " parties"), (boolean)GridTestUtils.waitForCondition(() -> barrier.getNumberWaiting() == 1, (long)5000L));
    }

    private void assertNoRunningQueries(IgniteEx ... excludeNodes) {
        Set excludeIds = Stream.of(excludeNodes).map(ignite -> ignite.localNode().id()).collect(Collectors.toSet());
        for (Ignite g : G.allGrids()) {
            IgniteEx node = (IgniteEx)g;
            if (excludeIds.contains(node.localNode().id())) continue;
            Collection runningQueries = node.context().query().runningQueries(-1L);
            Assert.assertEquals((long)0L, (long)runningQueries.size());
        }
    }

    private static void newBarrier(int parties) {
        barrier = new CyclicBarrier(parties);
    }

    private static void awaitTimeout() throws InterruptedException, TimeoutException, BrokenBarrierException {
        barrier.await(5000L, TimeUnit.SECONDS);
    }

    private static class BlockingIndexing
    extends IgniteH2Indexing {
        private BlockingIndexing() {
        }

        public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) {
            List res = super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, cancel);
            try {
                RunningQueriesTest.awaitTimeout();
            }
            catch (Exception e) {
                throw new IgniteException((Throwable)e);
            }
            return res;
        }
    }
}

