/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.query;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.messages.GridQueryKillResponse;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class KillQueryOnClientDisconnectTest
extends GridCommonAbstractTest {
    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
    private static int cntr;
    protected Statement stmt;
    public static final int TIMEOUT = 5000;

    protected void beforeTestsStarted() throws Exception {
        super.beforeTestsStarted();
        cntr = 0;
        this.startGrids(2);
        for (int i = 0; i < 1000; ++i) {
            this.grid(0).cache("default").put((Object)i, (Object)i);
        }
    }

    @Before
    public void before() throws Exception {
        TestSQLFunctions.init();
        Connection conn = GridTestUtils.connect((IgniteEx)this.grid(0), null);
        conn.setSchema("\"default\"");
        this.stmt = conn.createStatement();
    }

    protected IgniteEx clientNode() {
        IgniteEx clientNode = this.grid(1);
        KillQueryOnClientDisconnectTest.assertTrue((boolean)clientNode.context().clientNode());
        return clientNode;
    }

    protected IgniteEx serverNode() {
        IgniteEx srvNode = this.grid(0);
        KillQueryOnClientDisconnectTest.assertFalse((boolean)srvNode.context().clientNode());
        return srvNode;
    }

    protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
        CacheConfiguration cache = GridAbstractTest.defaultCacheConfiguration();
        cache.setCacheMode(CacheMode.PARTITIONED);
        cache.setBackups(1);
        cache.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
        cache.setSqlFunctionClasses(new Class[]{TestSQLFunctions.class});
        cache.setIndexedTypes(new Class[]{Integer.class, Integer.class});
        cfg.setCacheConfiguration(new CacheConfiguration[]{cache});
        TcpDiscoverySpi disco = new TcpDiscoverySpi();
        disco.setIpFinder(IP_FINDER);
        cfg.setDiscoverySpi((DiscoverySpi)disco);
        if (++cntr == 2) {
            cfg.setClientMode(true);
        }
        cfg.setCommunicationSpi((CommunicationSpi)new TcpCommunicationSpi(){

            public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
                Message gridMsg;
                if (GridIoMessage.class.isAssignableFrom(msg.getClass()) && GridQueryKillResponse.class.isAssignableFrom((gridMsg = ((GridIoMessage)msg).message()).getClass())) {
                    KillQueryOnClientDisconnectTest.this.grid(0).configuration().getDiscoverySpi().failNode(KillQueryOnClientDisconnectTest.this.clientNode().cluster().localNode().id(), null);
                    return;
                }
                super.sendMessage(node, msg, ackC);
            }
        });
        return cfg;
    }

    @Test
    public void clientDisconnectFromCluster() throws Exception {
        IgniteInternalFuture cancelRes = this.cancelAndCheckClientDisconnect();
        GridTestUtils.assertThrows((IgniteLogger)log, () -> {
            this.stmt.executeQuery("select * from Integer where _key in (select abs(_key) from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()");
            return null;
        }, SQLException.class, (String)"The query was cancelled while executing.");
        cancelRes.get(5000L);
    }

    protected IgniteInternalFuture cancelAndCheckClientDisconnect() {
        return GridTestUtils.runAsync(() -> {
            try {
                TestSQLFunctions.cancelLatch.await();
                List runningQueries = (List)this.serverNode().context().query().runningQueries(-1L);
                KillQueryOnClientDisconnectTest.assertEquals((int)1, (int)runningQueries.size());
                IgniteInternalFuture fut = GridTestUtils.runAsync(() -> this.clientNode().cache("default").query(new SqlFieldsQuery("KILL QUERY '" + ((GridRunningQueryInfo)runningQueries.get(0)).globalQueryId() + "'")));
                KillQueryOnClientDisconnectTest.doSleep((long)500L);
                TestSQLFunctions.reqLatch.countDown();
                GridTestUtils.assertThrows((IgniteLogger)log, () -> fut.get(5000L), IgniteCheckedException.class, (String)"Failed to cancel query because local client node has been disconnected from the cluster");
            }
            catch (Exception e) {
                log.error("Unexpected exception.", (Throwable)e);
                Assert.fail((String)"Unexpected exception");
            }
        });
    }

    public static class TestSQLFunctions {
        static CountDownLatch reqLatch;
        static CountDownLatch cancelLatch;
        static CountDownLatch suspendQryLatch;

        static void init() {
            reqLatch = new CountDownLatch(1);
            cancelLatch = new CountDownLatch(1);
            suspendQryLatch = new CountDownLatch(1);
        }

        @QuerySqlFunction
        public static long awaitLatchCancelled() {
            try {
                cancelLatch.countDown();
                reqLatch.await();
            }
            catch (Exception exception) {
                // empty catch block
            }
            return 0L;
        }

        @QuerySqlFunction
        public static long awaitQuerySuspensionLatch() {
            try {
                suspendQryLatch.await();
            }
            catch (Exception exception) {
                // empty catch block
            }
            return 0L;
        }

        @QuerySqlFunction
        public static long shouldNotBeCalledInCaseOfCancellation() {
            KillQueryOnClientDisconnectTest.fail((String)"Query wasn't actually cancelled.");
            return 0L;
        }

        @QuerySqlFunction
        public static int sleep_func(int v) {
            try {
                Thread.sleep(v);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            return v;
        }
    }
}

