/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.common;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.jdbc.thin.JdbcThinAbstractSelfTest;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;

public class RunningQueryInfoCheckInitiatorTest
extends JdbcThinAbstractSelfTest {
    protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        return super.getConfiguration(igniteInstanceName).setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))).setAuthenticationEnabled(true).setCacheConfiguration(new CacheConfiguration[]{new CacheConfiguration().setName("test").setSqlSchema("TEST").setSqlFunctionClasses(new Class[]{TestSQLFunctions.class}).setIndexedTypes(new Class[]{Integer.class, Integer.class})});
    }

    protected void beforeTestsStarted() throws Exception {
        super.beforeTestsStarted();
        this.cleanPersistenceDir();
        this.startGrid(0);
        this.startClientGrid(1);
        this.grid(0).cluster().active(true);
    }

    protected void afterTest() throws Exception {
        for (String cache : this.grid(0).cacheNames()) {
            if (cache.equals("test")) continue;
            this.grid(0).cache(cache).destroy();
        }
        super.afterTest();
    }

    @Test
    public void testUserDefinedInitiatorId() throws Exception {
        String initiatorId = "TestUserSpecifiedOriginator";
        Consumer<String> sqlExec = sql -> GridTestUtils.runAsync(() -> {
            try {
                this.grid(0).context().query().querySqlFields(new SqlFieldsQuery(sql).setQueryInitiatorId("TestUserSpecifiedOriginator"), false).getAll();
            }
            catch (Exception e) {
                log.error("Unexpected exception", (Throwable)e);
                RunningQueryInfoCheckInitiatorTest.fail((String)"Unexpected exception");
            }
        });
        Consumer<String> initiatorChecker = initId0 -> RunningQueryInfoCheckInitiatorTest.assertEquals((String)"TestUserSpecifiedOriginator", (String)initId0);
        this.check(sqlExec, initiatorChecker);
    }

    @Test
    public void testMultipleStatementsUserDefinedInitiatorId() throws Exception {
        String initiatorId = "TestUserSpecifiedOriginator";
        GridTestUtils.runAsync(() -> {
            List curs = this.grid(0).context().query().querySqlFields(new SqlFieldsQuery("SELECT 'qry0', test.await(); SELECT 'qry1', test.await()").setQueryInitiatorId("TestUserSpecifiedOriginator"), false, false);
            for (FieldsQueryCursor cur : curs) {
                cur.getAll();
            }
        });
        RunningQueryInfoCheckInitiatorTest.assertEquals((String)"TestUserSpecifiedOriginator", (String)this.initiatorId(this.grid(0), "qry0", 1000));
        TestSQLFunctions.unlockQuery();
        RunningQueryInfoCheckInitiatorTest.assertEquals((String)"TestUserSpecifiedOriginator", (String)this.initiatorId(this.grid(0), "qry1", 1000));
        TestSQLFunctions.unlockQuery();
        this.checkRunningQueriesCount(this.grid(0), 0, 1000);
    }

    @Test
    public void testJdbcThinInitiatorId() throws Exception {
        Consumer<String> sqlExec = sql -> GridTestUtils.runAsync(() -> {
            try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1:" + RunningQueryInfoCheckInitiatorTest.clientPort(this.grid(0)) + "/?user=ignite&password=ignite");
                 Statement stmt = conn.createStatement();){
                stmt.execute((String)sql);
            }
            catch (SQLException e) {
                log.error("Unexpected exception", (Throwable)e);
            }
        });
        Consumer<String> initiatorChecker = initiatorId -> RunningQueryInfoCheckInitiatorTest.assertTrue((String)("Invalid initiator ID: " + initiatorId), (boolean)Pattern.compile("jdbc-thin:127\\.0\\.0\\.1:[0-9]+@ignite").matcher((CharSequence)initiatorId).matches());
        this.check(sqlExec, initiatorChecker);
    }

    @Test
    public void testThinClientInitiatorId() throws Exception {
        Consumer<String> sqlExec = sql -> GridTestUtils.runAsync(() -> {
            try (IgniteClient cli = Ignition.startClient((ClientConfiguration)new ClientConfiguration().setAddresses(new String[]{"127.0.0.1:" + RunningQueryInfoCheckInitiatorTest.clientPort(this.grid(0))}).setUserName("ignite").setUserPassword("ignite"));){
                cli.query(new SqlFieldsQuery(sql)).getAll();
            }
            catch (Exception e) {
                log.error("Unexpected exception", (Throwable)e);
            }
        });
        Consumer<String> initiatorChecker = initiatorId -> RunningQueryInfoCheckInitiatorTest.assertTrue((String)("Invalid initiator ID: " + initiatorId), (boolean)Pattern.compile("cli:127\\.0\\.0\\.1:[0-9]+@ignite").matcher((CharSequence)initiatorId).matches());
        this.check(sqlExec, initiatorChecker);
    }

    @Test
    public void testJobDefaultInitiatorId() throws Exception {
        Consumer<String> sqlExec = sql -> this.grid(1).cluster().forServers().ignite().compute().runAsync((IgniteRunnable)new TestSqlJob((String)sql));
        Consumer<String> initiatorChecker = initiatorId -> RunningQueryInfoCheckInitiatorTest.assertTrue((String)("Invalid initiator ID: " + initiatorId), (initiatorId.startsWith("task:" + TestSqlJob.class.getName()) && initiatorId.endsWith(this.grid(1).context().localNodeId().toString()) ? 1 : 0) != 0);
        this.check(sqlExec, initiatorChecker);
    }

    @Test
    public void testJdbcV2InitiatorId() throws Exception {
        Consumer<String> sqlExec = sql -> {
            UUID grid0NodeId = this.grid(0).cluster().localNode().id();
            GridTestUtils.runAsync(() -> {
                try (Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://lazy=false:nodeId=" + grid0NodeId + "@modules/clients/src/test/config/jdbc-config.xml");
                     Statement stmt = conn.createStatement();){
                    stmt.execute((String)sql);
                }
                catch (SQLException e) {
                    log.error("Unexpected exception", (Throwable)e);
                }
            });
        };
        Consumer<String> initiatorChecker = initiatorId -> RunningQueryInfoCheckInitiatorTest.assertTrue((String)("Invalid initiator ID: " + initiatorId), (boolean)Pattern.compile("jdbc-v2:127\\.0\\.0\\.1:sqlGrid-ignite-jdbc-driver-[0-9a-fA-F-]+").matcher((CharSequence)initiatorId).matches());
        this.check(sqlExec, initiatorChecker);
    }

    @Test
    public void testJdbcThinStreamerInitiatorId() throws Exception {
        AtomicBoolean end = new AtomicBoolean();
        IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
            try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1:" + RunningQueryInfoCheckInitiatorTest.clientPort(this.grid(0)) + "/?user=ignite&password=ignite");
                 Statement stmt = conn.createStatement();){
                stmt.execute("CREATE TABLE T (ID INT PRIMARY KEY, VAL INT)");
                stmt.execute("SET STREAMING ON");
                int i = 0;
                while (!end.get()) {
                    stmt.execute("INSERT INTO T VALUES(" + i + " , 0)");
                    ++i;
                }
            }
            catch (SQLException e) {
                log.error("Unexpected exception", (Throwable)e);
            }
        });
        Consumer<String> initiatorChecker = initiatorId -> RunningQueryInfoCheckInitiatorTest.assertTrue((String)("Invalid initiator ID: " + initiatorId), (boolean)Pattern.compile("jdbc-thin:127\\.0\\.0\\.1:[0-9]+@ignite").matcher((CharSequence)initiatorId).matches());
        initiatorChecker.accept(this.initiatorId(this.grid(0), "INSERT", 2000));
        end.set(true);
        f.get();
    }

    private void check(Consumer<String> sqlExec, Consumer<String> initiatorChecker) throws Exception {
        this.checkInitiatorId(sqlExec, initiatorChecker, "SELECT test.await()", "await");
        this.grid(0).context().query().querySqlFields(new SqlFieldsQuery("CREATE TABLE T (ID INT PRIMARY KEY, VAL INT)"), false).getAll();
        U.sleep((long)500L);
        this.checkInitiatorId(sqlExec, initiatorChecker, "INSERT INTO T VALUES (0, test.await())", "await");
        this.checkInitiatorId(sqlExec, initiatorChecker, "UPDATE T SET VAL=test.await() WHERE ID = 0", "await");
    }

    private void checkInitiatorId(Consumer<String> sqlExecutor, Consumer<String> initiatorChecker, String sql, String sqlMatch) throws Exception {
        sqlExecutor.accept(sql);
        initiatorChecker.accept(this.initiatorId(this.grid(0), sqlMatch, 2000));
        TestSQLFunctions.unlockQuery();
        this.checkRunningQueriesCount(this.grid(0), 0, 2000);
    }

    private String initiatorId(IgniteEx node, String sqlMatch, int timeout) throws Exception {
        long t0 = U.currentTimeMillis();
        while (true) {
            if (U.currentTimeMillis() - t0 > (long)timeout) {
                RunningQueryInfoCheckInitiatorTest.fail((String)("Timeout. Cannot find query with: " + sqlMatch));
            }
            List res = node.context().query().querySqlFields(new SqlFieldsQuery("SELECT sql, initiator_id FROM " + QueryUtils.sysSchemaName() + ".SQL_QUERIES"), false).getAll();
            for (List row : res) {
                if (!((String)row.get(0)).toUpperCase().contains(sqlMatch.toUpperCase())) continue;
                return (String)row.get(1);
            }
            U.sleep((long)200L);
        }
    }

    private void checkRunningQueriesCount(IgniteEx node, int expectedQryCount, int timeout) {
        long t0 = U.currentTimeMillis();
        List res;
        while ((res = node.context().query().querySqlFields(new SqlFieldsQuery("SELECT * FROM " + QueryUtils.sysSchemaName() + ".SQL_QUERIES"), false).getAll()).size() != expectedQryCount + 1) {
            if (U.currentTimeMillis() - t0 <= (long)timeout) continue;
            RunningQueryInfoCheckInitiatorTest.fail((String)("Timeout. There are unexpected running queries: " + res));
        }
        return;
    }

    private static int clientPort(IgniteEx ign) {
        return ign.context().sqlListener().port();
    }

    public static class TestSqlJob
    implements IgniteRunnable {
        String sql;
        @IgniteInstanceResource
        Ignite ign;

        public TestSqlJob(String sql) {
            this.sql = sql;
        }

        public void run() {
            ((IgniteEx)this.ign).context().query().querySqlFields(new SqlFieldsQuery(this.sql), false).getAll();
        }
    }

    public static class TestSQLFunctions {
        static final Phaser ph = new Phaser(2);

        static void unlockQuery() {
            ph.arriveAndAwaitAdvance();
        }

        @QuerySqlFunction
        public static long await() {
            try {
                ph.arriveAndAwaitAdvance();
            }
            catch (Exception exception) {
                // empty catch block
            }
            return 0L;
        }
    }
}

