/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.query;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation;
import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
import org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapResult;
import org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapper;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
@WithSystemProperty(key="IGNITE_SQL_MAX_EXTRACTED_PARTS_FROM_BETWEEN", value="21")
public class KillQueryTest
extends GridCommonAbstractTest {
    @Parameterized.Parameter
    public boolean asyncCancel;
    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
    private static final String BULKLOAD_20_000_LINE_CSV_FILE = Objects.requireNonNull(IgniteUtils.resolveIgnitePath((String)"/modules/clients/src/test/resources/bulkload20_000.csv")).getAbsolutePath();
    private static final int MAX_ROWS = 10000;
    public static final int TIMEOUT = 5000;
    protected static final byte NODES_COUNT = 3;
    public static final int CHECK_RESULT_TIMEOUT = 1000;
    public static final int PARTS_CNT = 20;
    private Connection conn;
    private Statement stmt;
    protected IgniteEx ignite;
    private IgniteEx igniteForKillRequest;
    private static int cntr;
    private static AtomicInteger tblCnt;
    private static volatile CyclicBarrier barrier;
    private static TestRecordingCommunicationSpi clientBlocker;

    @Parameterized.Parameters(name="asyncCancel = {0}")
    public static Iterable<Object[]> valuesForAsync() {
        return Arrays.asList({true}, {false});
    }

    protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
        CacheConfiguration cache = GridAbstractTest.defaultCacheConfiguration();
        cache.setAffinity((AffinityFunction)new RendezvousAffinityFunction(false, 20));
        cache.setCacheMode(CacheMode.PARTITIONED);
        cache.setBackups(1);
        cache.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
        cache.setSqlFunctionClasses(new Class[]{TestSQLFunctions.class});
        cache.setIndexedTypes(new Class[]{Integer.class, Integer.class, Long.class, Long.class, String.class, Person.class});
        cfg.setCacheConfiguration(new CacheConfiguration[]{cache});
        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
        cfg.setCommunicationSpi((CommunicationSpi)commSpi);
        if (++cntr == 3) {
            cfg.setClientMode(true);
            clientBlocker = commSpi;
        }
        cfg.setDiscoverySpi((DiscoverySpi)new TcpDiscoverySpi(){

            public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
                if (msg instanceof CustomMessageWrapper) {
                    DiscoveryCustomMessage delegate = ((CustomMessageWrapper)msg).delegate();
                    if (delegate instanceof DynamicCacheChangeBatch) {
                        try {
                            KillQueryTest.awaitTimeout();
                        }
                        catch (Exception e) {
                            this.log.error(e.getMessage(), (Throwable)e);
                        }
                    } else if (delegate instanceof SchemaProposeDiscoveryMessage) {
                        try {
                            KillQueryTest.awaitTimeout();
                        }
                        catch (Exception e) {
                            this.log.error(e.getMessage(), (Throwable)e);
                        }
                    }
                }
                super.sendCustomEvent(msg);
            }
        }.setIpFinder(IP_FINDER));
        return cfg;
    }

    private void createJoinCache(String cacheName, int shift) {
        CacheConfiguration ccfg = GridAbstractTest.defaultCacheConfiguration();
        ccfg.setName(cacheName);
        ccfg.setCacheMode(CacheMode.PARTITIONED);
        ccfg.setBackups(1);
        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
        ccfg.setSqlFunctionClasses(new Class[]{TestSQLFunctions.class});
        ccfg.setQueryEntities(Collections.singleton(new QueryEntity(Integer.class.getName(), Person.class.getName()).setTableName("PERSON").setKeyFieldName("rec_id").addQueryField("rec_id", Integer.class.getName(), null).addQueryField("id", Integer.class.getName(), null).addQueryField("lastName", String.class.getName(), null).setIndexes(Collections.singleton(new QueryIndex("id", true, "idx_" + cacheName)))));
        this.grid(0).createCache(ccfg);
        try (IgniteDataStreamer ds = this.grid(0).dataStreamer(cacheName);){
            for (int recordId = 0; recordId < 10000; ++recordId) {
                int intTabIdFK = (recordId + shift) % 10000;
                ds.addData((Object)recordId, (Object)new Person(intTabIdFK, "Name_" + recordId, "LastName_" + recordId, 42));
            }
        }
    }

    protected void beforeTestsStarted() throws Exception {
        super.beforeTestsStarted();
        cntr = 0;
        GridQueryProcessor.idxCls = MockedIndexing.class;
        this.startGrids(3);
        this.awaitPartitionMapExchange(true, true, null);
        long curTop = this.grid(0).cluster().topologyVersion();
        this.grid(0).cluster().baselineAutoAdjustEnabled(false);
        this.grid(0).cluster().setBaselineTopology(curTop);
        this.awaitPartitionMapExchange(true, true, null);
        try (IgniteDataStreamer ds = this.grid(0).dataStreamer("default");){
            for (int i = 0; i < 10000; ++i) {
                ds.addData((Object)i, (Object)i);
                ds.addData((Object)i, (Object)i);
            }
        }
        this.createJoinCache("PERS1", 1);
        this.createJoinCache("PERS2", 2);
    }

    protected void afterTestsStopped() throws Exception {
        GridQueryProcessor.idxCls = null;
        super.afterTestsStopped();
    }

    protected IgniteEx getKillRequestNode() {
        return this.grid(0);
    }

    @Before
    public void before() throws Exception {
        TestSQLFunctions.reset();
        KillQueryTest.newBarrier(1);
        tblCnt.incrementAndGet();
        this.conn = GridTestUtils.connect((IgniteEx)this.grid(0), null);
        this.conn.setSchema("\"default\"");
        this.stmt = this.conn.createStatement();
        this.ignite = this.grid(0);
        this.igniteForKillRequest = this.getKillRequestNode();
        MockedIndexing.resetToDefault();
    }

    @After
    public void after() throws Exception {
        MockedIndexing.resetToDefault();
        clientBlocker.stopBlock(false);
        if (this.stmt != null && !this.stmt.isClosed()) {
            this.stmt.close();
            assert (this.stmt.isClosed());
        }
        this.conn.close();
        KillQueryTest.assertTrue((boolean)this.ignite.context().query().runningQueries(-1L).isEmpty());
    }

    @Test
    public void testBulkLoadCancellationUnsupported() throws Exception {
        String path = Objects.requireNonNull(IgniteUtils.resolveIgnitePath((String)"/modules/clients/src/test/resources/bulkload1.csv")).getAbsolutePath();
        String createTab = "CREATE TABLE " + this.currentTestTableName() + "(id integer primary key, age integer, firstName varchar, lastname varchar)";
        String copy = "COPY FROM '" + path + "' INTO " + this.currentTestTableName() + " (id, age, firstName, lastName) format csv charset 'ascii'";
        IgniteEx clientNode = this.grid(2);
        try (Connection clConn = GridTestUtils.connect((IgniteEx)clientNode, null);
             Statement client = clConn.createStatement();){
            client.execute(createTab);
            clientBlocker.blockMessages((IgniteBiPredicate & Serializable)(dstNode, msg) -> msg instanceof DataStreamerRequest);
            IgniteInternalFuture copyIsDone = GridTestUtils.runAsync(() -> client.execute(copy));
            clientBlocker.waitForBlocked(1, 5000L);
            String globQryId = this.findOneRunningQuery(copy, clientNode);
            GridTestUtils.assertThrowsAnyCause((IgniteLogger)log, () -> this.igniteForKillRequest.cache("default").query(this.createKillQuery(globQryId, this.asyncCancel)), CacheException.class, (String)"Query doesn't support cancellation");
            clientBlocker.stopBlock(true);
            copyIsDone.get(5000L);
            int tabSize = clientNode.cache("default").query(new SqlFieldsQuery("SELECT * FROM " + this.currentTestTableName() + " ").setSchema("PUBLIC")).getAll().size();
            KillQueryTest.assertEquals((String)"COPY command inserted incorrect number of rows.", (int)1, (int)tabSize);
        }
    }

    private String findOneRunningQuery(String query, IgniteEx node) {
        List<GridRunningQueryInfo> qryList = this.findQueriesOnNode(query, node);
        KillQueryTest.assertEquals((String)("Expected only one running query: " + query + "\nBut found: " + qryList), (int)1, (int)qryList.size());
        return qryList.get(0).globalQueryId();
    }

    private List<GridRunningQueryInfo> findQueriesOnNode(String query, IgniteEx node) {
        List allQrs = (List)node.context().query().runningQueries(-1L);
        return allQrs.stream().filter(q -> q.query().equals(query)).collect(Collectors.toList());
    }

    @Test
    public void testCreateTableCancellationUnsupported() throws Exception {
        this.checkCancellationUnsupported(Collections.emptyList(), "CREATE TABLE " + this.currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)", this.asyncCancel);
    }

    @Test
    public void testAlterTableCancellationUnsupported() throws Exception {
        this.checkCancellationUnsupported(Arrays.asList("CREATE TABLE " + this.currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)"), "ALTER TABLE " + this.currentTestTableName() + " ADD COLUMN COL VARCHAR", this.asyncCancel);
    }

    @Test
    public void testCreateIndexCancellationUnsupported() throws Exception {
        this.checkCancellationUnsupported(Arrays.asList("CREATE TABLE " + this.currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)"), "CREATE INDEX " + this.currentTestTableName() + "_IDX ON " + this.currentTestTableName() + "(name, id)", this.asyncCancel);
    }

    @Test
    public void testDropIndexCancellationUnsupported() throws Exception {
        this.checkCancellationUnsupported(Arrays.asList("CREATE TABLE " + this.currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)", "CREATE INDEX " + this.currentTestTableName() + "_IDX ON " + this.currentTestTableName() + "(name, id)"), "DROP INDEX " + this.currentTestTableName() + "_IDX", this.asyncCancel);
    }

    private String currentTestTableName() {
        return "TST_TABLE_" + tblCnt.get();
    }

    private void checkCancellationUnsupported(List<String> prepareSteps, String sqlCmd, boolean async) throws Exception {
        for (String sql : prepareSteps) {
            try {
                this.stmt.execute(sql);
            }
            catch (SQLException e) {
                throw new IgniteException((Throwable)e);
            }
        }
        KillQueryTest.newBarrier(2);
        IgniteInternalFuture cancelRes = this.cancelQueryWithBarrier(sqlCmd, "Query doesn't support cancellation", async);
        this.stmt.execute(sqlCmd);
        cancelRes.get(5000L);
    }

    @Test
    public void testKillUnknownQry() {
        UUID nodeId = this.ignite.localNode().id();
        GridTestUtils.assertThrows((IgniteLogger)log, () -> {
            this.igniteForKillRequest.cache("default").query(this.createKillQuery(nodeId, Long.MAX_VALUE, this.asyncCancel));
            return null;
        }, CacheException.class, (String)("Query with provided ID doesn't exist [nodeId=" + nodeId));
    }

    @Test
    public void testKillQryUnknownNode() {
        GridTestUtils.assertThrows((IgniteLogger)log, () -> {
            this.igniteForKillRequest.cache("default").query(this.createKillQuery(UUID.randomUUID(), Long.MAX_VALUE, this.asyncCancel));
            return null;
        }, CacheException.class, (String)"Failed to cancel query, node is not alive");
    }

    @Test
    public void testKillAlreadyKilledQuery() throws Exception {
        IgniteCache cache = this.ignite.cache("default");
        FieldsQueryCursor cur = cache.query(new SqlFieldsQuery("select * from Integer where awaitLatchCancelled() = 0"));
        List runningQueries = (List)this.ignite.context().query().runningQueries(-1L);
        GridRunningQueryInfo runQryInfo = (GridRunningQueryInfo)runningQueries.get(0);
        SqlFieldsQuery killQry = this.createKillQuery(runQryInfo.globalQueryId(), this.asyncCancel);
        IgniteCache reqCache = this.igniteForKillRequest.cache("default");
        IgniteInternalFuture killFut = this.cancel(1, this.asyncCancel, new String[0]);
        GridTestUtils.assertThrows((IgniteLogger)log, () -> (List)cur.iterator().next(), QueryCancelledException.class, (String)"The query was cancelled while executing");
        killFut.get(1000L);
        GridTestUtils.assertThrows((IgniteLogger)log, () -> reqCache.query(killQry), CacheException.class, (String)"Query with provided ID doesn't exist");
        cur.close();
    }

    private SqlFieldsQuery createKillQuery(UUID nodeId, long qryId, boolean async) {
        return this.createKillQuery(nodeId + "_" + qryId, async);
    }

    private SqlFieldsQuery createKillQuery(String globalQryId, boolean async) {
        return new SqlFieldsQuery("KILL QUERY" + (async ? " ASYNC" : "") + " '" + globalQryId + "'");
    }

    @Test
    public void testCancelQuery() throws Exception {
        IgniteInternalFuture cancelRes = this.cancel(1, this.asyncCancel, new String[0]);
        GridTestUtils.assertThrows((IgniteLogger)log, () -> {
            this.stmt.executeQuery("select * from Integer where _key in (select abs(_key) from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()");
            return null;
        }, SQLException.class, (String)"The query was cancelled while executing.");
        cancelRes.get(1000L);
    }

    @Test
    public void testCancelBeforeIteratorObtained() throws Exception {
        FieldsQueryCursor cur = this.ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from \"default\".Integer").setLazy(false), false);
        Long qryId = ((GridRunningQueryInfo)this.ignite.context().query().runningQueries(-1L).iterator().next()).id();
        this.igniteForKillRequest.context().query().querySqlFields(this.createKillQuery(this.ignite.context().localNodeId(), qryId, this.asyncCancel), false).getAll();
        if (this.asyncCancel) {
            KillQueryTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> this.ignite.context().query().runningQueries(-1L).isEmpty(), (long)1000L));
        } else {
            KillQueryTest.assertTrue((boolean)this.ignite.context().query().runningQueries(-1L).isEmpty());
        }
        cur.close();
    }

    @Test
    public void testCancelMultiStatementLazyKillFirst() throws Exception {
        this.cancelMultiStatement(true, true);
    }

    @Test
    public void testCancelMultiStatementLazyKillLast() throws Exception {
        this.cancelMultiStatement(true, false);
    }

    @Test
    public void testCancelMultiStatementKillFirst() throws Exception {
        this.cancelMultiStatement(false, true);
    }

    @Test
    public void testCancelMultiStatementKillLast() throws Exception {
        this.cancelMultiStatement(false, false);
    }

    private void cancelMultiStatement(boolean lazy, boolean killFirst) throws Exception {
        GridQueryProcessor qryProc = this.ignite.context().query();
        String query = "DROP TABLE IF EXISTS CITY;CREATE TABLE CITY (  ID INT,  Name VARCHAR,  CountryCode CHAR(3),  District VARCHAR,  Population INT,  TimeS TIMESTAMP,  PRIMARY KEY (ID));INSERT INTO City(ID, Name, CountryCode, District, Population, TimeS) VALUES (1,'Kabul','AFG','Kabol',1780000, %s);INSERT INTO City(ID, Name, CountryCode, District, Population, TimeS) VALUES (2,'Qandahar','AFG','Qandahar',237500, %s);SELECT * FROM City;SELECT * FROM City;INSERT INTO City(ID, Name, CountryCode, District, Population, TimeS) VALUES (3,'Herat','AFG','Herat',186800, %s);SELECT * FROM City;";
        for (String param : new String[]{"null", "CURRENT_TIMESTAMP()"}) {
            SqlFieldsQuery qry = new SqlFieldsQuery(String.format(query, param, param, param));
            qry.setLazy(lazy);
            List res = qryProc.querySqlFields(qry, true, false);
            ArrayList queries = new ArrayList(this.ignite.context().query().runningQueries(-1L));
            KillQueryTest.assertFalse((boolean)queries.isEmpty());
            GridRunningQueryInfo qi = killFirst ? (GridRunningQueryInfo)queries.get(0) : (GridRunningQueryInfo)queries.get(queries.size() - 1);
            SqlFieldsQuery killQuery = this.createKillQuery(this.ignite.context().localNodeId(), qi.id(), this.asyncCancel);
            this.igniteForKillRequest.context().query().querySqlFields(killQuery, false).getAll();
            if (this.asyncCancel) {
                KillQueryTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> this.ignite.context().query().runningQueries(-1L).isEmpty(), (long)1000L));
            } else {
                KillQueryTest.assertTrue((boolean)this.ignite.context().query().runningQueries(-1L).isEmpty());
            }
            IgniteCache reqCache = this.igniteForKillRequest.cache("default");
            GridTestUtils.assertThrows((IgniteLogger)log, () -> reqCache.query(killQuery).getAll(), CacheException.class, (String)"Query with provided ID doesn't exist");
            GridTestUtils.assertThrows((IgniteLogger)log, () -> ((FieldsQueryCursor)res.get(4)).getAll(), CacheException.class, (String)"The query was cancelled");
        }
    }

    @Test
    public void testCancelAfterIteratorObtained() throws Exception {
        FieldsQueryCursor cur = this.ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from \"default\".Integer").setLazy(false), false);
        cur.iterator();
        Long qryId = ((GridRunningQueryInfo)this.ignite.context().query().runningQueries(-1L).iterator().next()).id();
        this.igniteForKillRequest.context().query().querySqlFields(this.createKillQuery(this.ignite.context().localNodeId(), qryId, this.asyncCancel), false).getAll();
        if (this.asyncCancel) {
            KillQueryTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> this.ignite.context().query().runningQueries(-1L).isEmpty(), (long)1000L));
        }
        cur.close();
    }

    @Test
    public void testCancelAfterResultSetPartiallyRead() throws Exception {
        FieldsQueryCursor cur = this.ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from \"default\".Integer").setLazy(false), false);
        Iterator it = cur.iterator();
        it.next();
        Long qryId = ((GridRunningQueryInfo)this.ignite.context().query().runningQueries(-1L).iterator().next()).id();
        this.igniteForKillRequest.context().query().querySqlFields(this.createKillQuery(this.ignite.context().localNodeId(), qryId, this.asyncCancel), false).getAll();
        if (this.asyncCancel) {
            KillQueryTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> this.ignite.context().query().runningQueries(-1L).isEmpty(), (long)1000L));
        }
    }

    @Test
    public void testCancelBeforeIteratorObtainedLazy() throws Exception {
        FieldsQueryCursor cur = this.ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from \"default\".Integer").setLazy(true), false);
        Long qryId = ((GridRunningQueryInfo)this.ignite.context().query().runningQueries(-1L).iterator().next()).id();
        this.igniteForKillRequest.context().query().querySqlFields(this.createKillQuery(this.ignite.context().localNodeId(), qryId, this.asyncCancel), false).getAll();
        if (this.asyncCancel) {
            KillQueryTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> this.ignite.context().query().runningQueries(-1L).isEmpty(), (long)1000L));
        }
    }

    @Test
    public void testCancelAfterIteratorObtainedLazy() throws Exception {
        FieldsQueryCursor cur = this.ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from \"default\".Integer").setLazy(true), false);
        cur.iterator();
        Long qryId = ((GridRunningQueryInfo)this.ignite.context().query().runningQueries(-1L).iterator().next()).id();
        this.igniteForKillRequest.context().query().querySqlFields(this.createKillQuery(this.ignite.context().localNodeId(), qryId, this.asyncCancel), false).getAll();
        if (this.asyncCancel) {
            KillQueryTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> this.ignite.context().query().runningQueries(-1L).isEmpty(), (long)1000L));
        }
    }

    @Test
    public void testCancelAfterResultSetPartiallyReadLazy() throws Exception {
        FieldsQueryCursor cur = this.ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from \"default\".Integer").setLazy(true), false);
        Iterator it = cur.iterator();
        it.next();
        Long qryId = ((GridRunningQueryInfo)this.ignite.context().query().runningQueries(-1L).iterator().next()).id();
        this.igniteForKillRequest.context().query().querySqlFields(this.createKillQuery(this.ignite.context().localNodeId(), qryId, this.asyncCancel), false).getAll();
        if (this.asyncCancel) {
            KillQueryTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> this.ignite.context().query().runningQueries(-1L).isEmpty(), (long)1000L));
        }
    }

    @Test
    public void testCancelQueryPartitionPruning() throws Exception {
        IgniteInternalFuture cancelRes = this.cancel(1, this.asyncCancel, new String[0]);
        int ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL = 400;
        GridTestUtils.assertThrows((IgniteLogger)log, () -> {
            this.stmt.executeQuery("select * from Integer where _key between 1000 and 2000 and awaitLatchCancelled() = 0 and shouldNotBeCalledMoreThan(400)");
            return null;
        }, SQLException.class, (String)"The query was cancelled while executing.");
        cancelRes.get(1000L);
    }

    @Test
    public void testCancelLocalQueryNative() throws Exception {
        IgniteInternalFuture cancelRes = this.cancel(1, this.asyncCancel, new String[0]);
        GridTestUtils.assertThrowsAnyCause((IgniteLogger)log, () -> {
            this.ignite.cache("default").query(new SqlFieldsQuery("select * from Integer where _key in (select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledMoreThan(128)").setLocal(true)).getAll();
            return null;
        }, QueryCancelledException.class, (String)"The query was cancelled while executing.");
        cancelRes.get(1000L);
    }

    @Test
    public void testCancelDistributeJoin() throws Exception {
        IgniteInternalFuture cancelRes = this.cancel(1, this.asyncCancel, new String[0]);
        int ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL = 9999;
        GridTestUtils.assertThrows((IgniteLogger)log, () -> {
            this.ignite.cache("default").query(new SqlFieldsQuery("SELECT p1.rec_id, p1.id, p2.rec_id FROM PERS1.Person p1 JOIN PERS2.Person p2 ON p1.id = p2.id AND shouldNotBeCalledMoreThan(9999)AND awaitLatchCancelled() = 0").setDistributedJoins(true)).getAll();
            return null;
        }, CacheException.class, (String)"The query was cancelled while executing.");
        cancelRes.get(1000L);
    }

    @Test
    public void testKillMultipleStatementsQuery() throws Exception {
        try (Statement anotherStatement = this.conn.createStatement();){
            anotherStatement.setFetchSize(1);
            String sql = "select * from Integer";
            ResultSet rs = anotherStatement.executeQuery(sql);
            assert (rs.next());
            IgniteInternalFuture cancelRes = this.cancel(3, this.asyncCancel, sql, "select 100 from Integer");
            GridTestUtils.assertThrows((IgniteLogger)log, () -> {
                this.stmt.execute("select 100 from Integer;select _key from Integer where awaitLatchCancelled() = 0;");
                return null;
            }, SQLException.class, (String)"The query was cancelled while executing");
            assert (rs.next()) : "The other cursor mustn't be closed";
            cancelRes.get(1000L);
        }
    }

    @Test
    public void testCancelBatchQuery() throws Exception {
        try (Statement stmt2 = this.conn.createStatement();){
            stmt2.setFetchSize(1);
            String sql = "SELECT * from Integer";
            ResultSet rs = stmt2.executeQuery(sql);
            Assert.assertTrue((boolean)rs.next());
            IgniteInternalFuture cancelRes = this.cancel(2, this.asyncCancel, sql);
            GridTestUtils.assertThrows((IgniteLogger)log, () -> {
                this.stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30)");
                this.stmt.addBatch("update Long set _val = _val + 1 where awaitLatchCancelled() = 0");
                this.stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30)");
                this.stmt.addBatch("update Long set _val = _val + 1 where shouldNotBeCalledInCaseOfCancellation()");
                this.stmt.executeBatch();
                return null;
            }, SQLException.class, (String)"The query was cancelled while executing");
            Assert.assertTrue((String)"The other cursor mustn't be closed", (boolean)rs.next());
            cancelRes.get(1000L);
        }
    }

    @Test
    public void testCancelQueryIfPartitionsCantBeReservedOnMapNodes() throws Exception {
        GridMessageListener qryStarted = (node, msg, plc) -> {
            if (msg instanceof GridH2QueryRequest) {
                TestSQLFunctions.cancelLatch.countDown();
            }
        };
        for (int i = 0; i < 3; ++i) {
            this.grid(i).context().io().addMessageListener(GridTopic.TOPIC_QUERY, qryStarted);
        }
        MockedIndexing.failReservations = true;
        try {
            IgniteInternalFuture cancelFut = this.cancel(1, this.asyncCancel, new String[0]);
            GridTestUtils.assertThrows((IgniteLogger)log, () -> {
                this.ignite.cache("default").query(new SqlFieldsQuery("select * from Integer where _val <> 42")).getAll();
                return null;
            }, CacheException.class, (String)"The query was cancelled while executing.");
            cancelFut.get(1000L);
        }
        finally {
            for (int i = 0; i < 3; ++i) {
                this.grid(i).context().io().removeMessageListener(GridTopic.TOPIC_QUERY, qryStarted);
            }
        }
    }

    @Test
    public void testCancelQueryIfUnableToGetNodesForPartitions() throws Exception {
        MockedIndexing.retryNodePartMapping = true;
        String select = "select * from Integer where _val <> 42";
        IgniteInternalFuture runQueryFut = GridTestUtils.runAsync(() -> this.ignite.cache("default").query(new SqlFieldsQuery(select)).getAll());
        boolean gotOneFreezedSelect = GridTestUtils.waitForCondition(() -> this.findQueriesOnNode(select, this.ignite).size() == 1, (long)5000L);
        if (!gotOneFreezedSelect) {
            if (runQueryFut.isDone()) {
                this.printFuturesException("Got exception getting running the query.", runQueryFut);
            }
            Assert.fail((String)("Failed to wait for query to be in running queries list exactly one time [select=" + select + ", node=" + this.ignite.localNode().id() + ", timeout=" + 5000 + "ms]."));
        }
        SqlFieldsQuery killQry = this.createKillQuery(this.findOneRunningQuery(select, this.ignite), this.asyncCancel);
        this.ignite.cache("default").query(killQry);
        GridTestUtils.assertThrowsAnyCause((IgniteLogger)log, () -> runQueryFut.get(1000L), CacheException.class, (String)"The query was cancelled while executing.");
    }

    private void printFuturesException(String msg, IgniteInternalFuture fut) {
        try {
            fut.get(5000L);
        }
        catch (Exception e) {
            log.error(msg, (Throwable)e);
        }
    }

    @Test
    public void testCancelQueryWithPartitions() throws Exception {
        Affinity aff = this.ignite.affinity("default");
        int halfOfNodeParts = 5;
        int[] firstParts = Arrays.stream(aff.primaryPartitions(this.grid(0).localNode())).limit(halfOfNodeParts).toArray();
        int[] secondParts = Arrays.stream(aff.primaryPartitions(this.grid(1).localNode())).limit(halfOfNodeParts).toArray();
        int[] mixedParts = IntStream.concat(Arrays.stream(firstParts).limit(halfOfNodeParts), Arrays.stream(secondParts).limit(halfOfNodeParts)).toArray();
        this.checkPartitions(firstParts);
        this.checkPartitions(secondParts);
        this.checkPartitions(mixedParts);
    }

    public void checkPartitions(int[] partitions) throws Exception {
        TestSQLFunctions.reset();
        IgniteInternalFuture cancelRes = this.cancel(1, this.asyncCancel, new String[0]);
        GridTestUtils.assertThrows((IgniteLogger)log, () -> {
            this.ignite.cache("default").query(new SqlFieldsQuery("select * from Integer where _key in (select abs(_key) from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()").setPartitions(partitions)).getAll();
            return null;
        }, CacheException.class, (String)"The query was cancelled while executing.");
        cancelRes.get(1000L);
    }

    private void ensureMapQueriesHasFinished(IgniteEx node) throws Exception {
        boolean noTasksInQryPool = GridTestUtils.waitForCondition(() -> this.queryPoolIsEmpty(node), (long)5000L);
        Assert.assertTrue((String)("Node " + node.localNode().id() + " has not finished its tasks in the query pool"), (boolean)noTasksInQryPool);
    }

    private boolean queryPoolIsEmpty(IgniteEx node) {
        ThreadPoolExecutor qryPool = (ThreadPoolExecutor)node.context().pools().getQueryExecutorService();
        return qryPool.getQueue().isEmpty() && qryPool.getActiveCount() == 0;
    }

    private IgniteInternalFuture cancelQueryWithBarrier(String qry, String expErrMsg, boolean async) {
        return GridTestUtils.runAsync(() -> {
            try {
                ArrayList runningQueries = new ArrayList();
                GridTestUtils.waitForCondition(() -> {
                    List r = (List)this.ignite.context().query().runningQueries(-1L);
                    runningQueries.addAll(r.stream().filter(q -> q.query().equals(qry)).collect(Collectors.toList()));
                    return !runningQueries.isEmpty();
                }, (long)5000L);
                KillQueryTest.assertFalse((boolean)runningQueries.isEmpty());
                for (GridRunningQueryInfo runningQuery : runningQueries) {
                    GridTestUtils.assertThrowsAnyCause((IgniteLogger)log, () -> this.igniteForKillRequest.cache("default").query(this.createKillQuery(runningQuery.globalQueryId(), async)), CacheException.class, (String)expErrMsg);
                }
            }
            catch (Exception e) {
                log.error("Unexpected exception.", (Throwable)e);
                Assert.fail((String)"Unexpected exception");
            }
            finally {
                try {
                    KillQueryTest.awaitTimeout();
                }
                catch (Exception e) {
                    log.error("Unexpected exception.", (Throwable)e);
                    Assert.fail((String)"Unexpected exception");
                }
            }
        });
    }

    private IgniteInternalFuture cancel(int expQryNum, boolean async, String ... skipSqls) {
        return GridTestUtils.runAsync(() -> {
            try {
                TestSQLFunctions.cancelLatch.await();
                List runningQueries = (List)this.ignite.context().query().runningQueries(-1L);
                ArrayList<IgniteInternalFuture> res = new ArrayList<IgniteInternalFuture>();
                for (GridRunningQueryInfo runningQuery : runningQueries) {
                    if (!Stream.of(skipSqls).noneMatch(skipSql -> runningQuery.query().equals(skipSql))) continue;
                    res.add(GridTestUtils.runAsync(() -> this.igniteForKillRequest.cache("default").query(this.createKillQuery(runningQuery.globalQueryId(), async))));
                }
                KillQueryTest.doSleep((long)500L);
                if (expQryNum != runningQueries.size()) {
                    log.error("Found running queries are incorrect, expected only " + expQryNum + " queries. Found : " + runningQueries);
                }
                KillQueryTest.assertEquals((int)expQryNum, (int)runningQueries.size());
                TestSQLFunctions.reqLatch.countDown();
                for (IgniteInternalFuture fut : res) {
                    fut.get(5000L);
                }
                this.ensureMapQueriesHasFinished(this.grid(0));
                this.ensureMapQueriesHasFinished(this.grid(1));
            }
            catch (Exception e) {
                log.error("Unexpected exception.", (Throwable)e);
                Assert.fail((String)"Unexpected exception");
            }
        });
    }

    private IgniteInternalFuture<Long> fillServerThreadPool(List<Statement> statements, int qryCnt) {
        AtomicInteger idx = new AtomicInteger(0);
        return GridTestUtils.runMultiThreadedAsync(() -> {
            try {
                ((Statement)statements.get(idx.getAndIncrement())).executeQuery("select * from Integer where awaitQuerySuspensionLatch();");
            }
            catch (SQLException e) {
                log.error("Unexpected exception.", (Throwable)e);
                Assert.fail((String)"Unexpected exception");
            }
        }, (int)qryCnt, (String)"ThreadName");
    }

    private static void newBarrier(int parties) {
        if (barrier != null) {
            barrier.reset();
        }
        barrier = new CyclicBarrier(parties);
    }

    private static void awaitTimeout() throws InterruptedException, TimeoutException, BrokenBarrierException {
        barrier.await(5000L, TimeUnit.MILLISECONDS);
    }

    static {
        tblCnt = new AtomicInteger();
        barrier = new CyclicBarrier(1);
    }

    static class MockedIndexing
    extends IgniteH2Indexing {
        static volatile boolean failReservations = false;
        static volatile boolean retryNodePartMapping = false;
        private static final ReducePartitionMapResult RETRY_RESULT = new ReducePartitionMapResult(null, null, null);

        MockedIndexing() {
        }

        static void resetToDefault() {
            failReservations = false;
            retryNodePartMapping = false;
        }

        public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws IgniteCheckedException {
            super.start(ctx, busyLock);
            this.partReservationMgr = new PartitionReservationManager(this.ctx){

                public PartitionReservation reservePartitions(@Nullable List<Integer> cacheIds, AffinityTopologyVersion reqTopVer, int[] explicitParts, UUID nodeId, long reqId) throws IgniteCheckedException {
                    if (failReservations) {
                        return new PartitionReservation(null, "[TESTS]: Failed to reserve partitions for the testing purpose!");
                    }
                    return super.reservePartitions(cacheIds, reqTopVer, explicitParts, nodeId, reqId);
                }
            };
            this.setMapper(new ReducePartitionMapper(ctx, ctx.log(GridReduceQueryExecutor.class)){

                public ReducePartitionMapResult nodesForPartitions(List<Integer> cacheIds, AffinityTopologyVersion topVer, int[] parts, boolean isReplicatedOnly) {
                    if (retryNodePartMapping) {
                        return RETRY_RESULT;
                    }
                    return super.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly);
                }
            });
        }

        private void setMapper(ReducePartitionMapper mock) {
            try {
                GridReduceQueryExecutor rdcExec = this.reduceQueryExecutor();
                Field mapperFld = GridReduceQueryExecutor.class.getDeclaredField("mapper");
                mapperFld.setAccessible(true);
                mapperFld.set(rdcExec, mock);
            }
            catch (Exception rethrown) {
                throw new RuntimeException(rethrown);
            }
        }
    }

    static class Person
    implements Serializable {
        @QuerySqlField
        private final int id;
        @QuerySqlField
        private final String firstName;
        @QuerySqlField
        private final String lastName;
        @QuerySqlField
        private final int age;

        Person(int id, String firstName, String lastName, int age) {
            assert (!F.isEmpty((String)firstName));
            assert (!F.isEmpty((String)lastName));
            assert (age > 0);
            this.id = id;
            this.firstName = firstName;
            this.lastName = lastName;
            this.age = age;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Person person = (Person)o;
            if (this.id != person.id) {
                return false;
            }
            if (this.age != person.age) {
                return false;
            }
            if (!Objects.equals(this.firstName, person.firstName)) {
                return false;
            }
            return Objects.equals(this.lastName, person.lastName);
        }

        public int hashCode() {
            int result = this.id;
            result = 31 * result + (this.firstName != null ? this.firstName.hashCode() : 0);
            result = 31 * result + (this.lastName != null ? this.lastName.hashCode() : 0);
            result = 31 * result + this.age;
            return result;
        }
    }

    public static class TestSQLFunctions {
        static volatile CountDownLatch reqLatch;
        static volatile CountDownLatch cancelLatch;
        static volatile CountDownLatch suspendQryLatch;
        static volatile AtomicInteger funCallCnt;

        static void reset() {
            TestSQLFunctions.releaseLatches(reqLatch, cancelLatch, suspendQryLatch);
            reqLatch = new CountDownLatch(1);
            cancelLatch = new CountDownLatch(1);
            suspendQryLatch = new CountDownLatch(1);
            funCallCnt = new AtomicInteger(0);
        }

        private static void releaseLatches(CountDownLatch ... latches) {
            for (CountDownLatch l : latches) {
                if (l == null) continue;
                l.countDown();
            }
        }

        @QuerySqlFunction
        public static long awaitLatchCancelled() {
            try {
                cancelLatch.countDown();
                reqLatch.await();
            }
            catch (Exception exception) {
                // empty catch block
            }
            return 0L;
        }

        @QuerySqlFunction
        public static long awaitQuerySuspensionLatch() {
            try {
                suspendQryLatch.await();
            }
            catch (Exception exception) {
                // empty catch block
            }
            return 0L;
        }

        @QuerySqlFunction
        public static boolean shouldNotBeCalledMoreThan(int times) {
            if (funCallCnt.incrementAndGet() >= times) {
                KillQueryTest.fail((String)"Query is running too long since it was canceled.");
            }
            return true;
        }

        @QuerySqlFunction
        public static long shouldNotBeCalledInCaseOfCancellation() {
            KillQueryTest.fail((String)"Query wasn't actually cancelled.");
            return 0L;
        }

        @QuerySqlFunction
        public static int sleep_func(int v) {
            try {
                Thread.sleep(v);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            return v;
        }
    }
}

