/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.jdbc2;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.jdbc.thin.JdbcThinAbstractSelfTest;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;

public class JdbcStreamingSelfTest
extends JdbcThinAbstractSelfTest {
    private static final String BASE_URL = "jdbc:ignite:cfg://cache=default@modules/clients/src/test/config/jdbc-config.xml";
    private static final String STREAMING_URL = "jdbc:ignite:cfg://cache=person@modules/clients/src/test/config/jdbc-config.xml";

    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
        return this.getConfiguration0(gridName);
    }

    private IgniteConfiguration getConfiguration0(String gridName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);
        CacheConfiguration cache = JdbcStreamingSelfTest.defaultCacheConfiguration();
        cache.setCacheMode(CacheMode.PARTITIONED);
        cache.setBackups(1);
        cache.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
        cache.setIndexedTypes(new Class[]{Integer.class, Integer.class});
        cfg.setCacheConfiguration(new CacheConfiguration[]{cache});
        cfg.setLocalHost("127.0.0.1");
        TcpDiscoverySpi disco = new TcpDiscoverySpi();
        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
        ipFinder.setAddresses(Collections.singleton("127.0.0.1:47500..47501"));
        disco.setIpFinder((TcpDiscoveryIpFinder)ipFinder);
        cfg.setDiscoverySpi((DiscoverySpi)disco);
        cfg.setConnectorConfiguration(new ConnectorConfiguration());
        return cfg;
    }

    protected void beforeTestsStarted() throws Exception {
        super.beforeTestsStarted();
        this.startGrids(2);
        try (Connection c = this.createOrdinaryConnection();
             Statement s = c.createStatement();){
            s.execute("CREATE TABLE PUBLIC.Person(\"id\" int primary key, \"name\" varchar) WITH \"cache_name=person,value_type=Person\"");
        }
        U.sleep((long)1000L);
    }

    protected Connection createOrdinaryConnection() throws SQLException {
        Connection res = DriverManager.getConnection(BASE_URL, new Properties());
        res.setSchema("PUBLIC");
        return res;
    }

    protected Connection createStreamedConnection(boolean allowOverwrite) throws Exception {
        return this.createStreamedConnection(allowOverwrite, 500L);
    }

    protected Connection createStreamedConnection(boolean allowOverwrite, long flushTimeout) throws Exception {
        Properties props = new Properties();
        props.setProperty("ignite.jdbc.streaming", "true");
        props.setProperty("ignite.jdbc.streamingFlushFrequency", String.valueOf(flushTimeout));
        if (allowOverwrite) {
            props.setProperty("ignite.jdbc.streamingAllowOverwrite", "true");
        }
        Connection res = DriverManager.getConnection(STREAMING_URL, props);
        res.setSchema("PUBLIC");
        return res;
    }

    protected void afterTest() throws Exception {
        this.cache().clear();
        super.afterTest();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStreamedInsertFailsOnReadOnlyMode() throws Exception {
        try (Connection conn = this.createStreamedConnection(true);){
            this.populateData(conn, 0, 1);
            this.grid(0).cluster().state(ClusterState.ACTIVE_READ_ONLY);
            try {
                JdbcStreamingSelfTest.assertEquals((Object)ClusterState.ACTIVE_READ_ONLY, (Object)this.grid(0).cluster().state());
                try (Connection ordinalCon = this.createOrdinaryConnection();){
                    JdbcStreamingSelfTest.assertEquals((long)1L, (long)this.countPersons(ordinalCon));
                    try {
                        this.populateData(conn, 1, 100);
                        JdbcStreamingSelfTest.fail((String)"Insert should be failed!");
                    }
                    catch (Exception e) {
                        log.error("Insert failed", (Throwable)e);
                        JdbcStreamingSelfTest.assertTrue((String)"Wrong exception", (boolean)X.hasCause((Throwable)e, (Class[])new Class[]{IgniteClusterReadOnlyException.class}));
                    }
                    JdbcStreamingSelfTest.assertEquals((String)"Insert should be failed", (long)1L, (long)this.countPersons(ordinalCon));
                }
            }
            finally {
                this.grid(0).cluster().state(ClusterState.ACTIVE);
            }
        }
    }

    @Test
    public void testStreamedInsert() throws Exception {
        for (int i = 10; i <= 100; i += 10) {
            this.put(i, this.nameForId(i * 100));
        }
        try (Connection conn = this.createStreamedConnection(false);){
            this.populateData(conn, 1, 100);
        }
        U.sleep((long)500L);
        for (int i = 1; i <= 100; ++i) {
            if (i % 10 != 0) {
                JdbcStreamingSelfTest.assertEquals((String)this.nameForId(i), (String)this.nameForIdInCache(i));
                continue;
            }
            JdbcStreamingSelfTest.assertEquals((String)this.nameForId(i * 100), (String)this.nameForIdInCache(i));
        }
    }

    @Test
    public void testStreamedInsertWithoutColumnsList() throws Exception {
        for (int i = 10; i <= 100; i += 10) {
            this.put(i, this.nameForId(i * 100));
        }
        try (Connection conn = this.createStreamedConnection(false);){
            this.populateData(conn, 1, 100);
        }
        U.sleep((long)500L);
        for (int i = 1; i <= 100; ++i) {
            if (i % 10 != 0) {
                JdbcStreamingSelfTest.assertEquals((String)this.nameForId(i), (String)this.nameForIdInCache(i));
                continue;
            }
            JdbcStreamingSelfTest.assertEquals((String)this.nameForId(i * 100), (String)this.nameForIdInCache(i));
        }
    }

    @Test
    public void testStreamedInsertWithOverwritesAllowed() throws Exception {
        for (int i = 10; i <= 100; i += 10) {
            this.put(i, this.nameForId(i * 100));
        }
        try (Connection conn = this.createStreamedConnection(true);){
            this.populateData(conn, 1, 100);
        }
        U.sleep((long)500L);
        for (int i = 1; i <= 100; ++i) {
            JdbcStreamingSelfTest.assertEquals((String)this.nameForId(i), (String)this.nameForIdInCache(i));
        }
    }

    @Test
    public void testOnlyInsertsAllowed() {
        this.assertStatementForbidden("CREATE TABLE PUBLIC.X (x int primary key, y int)");
        this.assertStatementForbidden("CREATE INDEX idx_1 ON Person(name)");
        this.assertStatementForbidden("SELECT * from Person");
        this.assertStatementForbidden("insert into PUBLIC.Person(\"id\", \"name\") (select \"id\" + 1, CONCAT(\"name\", '1') from Person)");
        this.assertStatementForbidden("DELETE from Person");
        this.assertStatementForbidden("UPDATE Person SET \"name\" = 'name0'");
        this.assertStatementForbidden("alter table Person add column y int");
    }

    protected void assertStatementForbidden(final String sql) {
        GridTestUtils.assertThrows(null, (Callable)new IgniteCallable<Object>(){

            public Object call() throws Exception {
                try (Connection c = JdbcStreamingSelfTest.this.createStreamedConnection(false);
                     PreparedStatement s = c.prepareStatement(sql);){
                    s.execute();
                }
                return null;
            }
        }, SQLException.class, (String)"Streaming mode supports only INSERT commands without subqueries.");
    }

    protected IgniteCache<Integer, Object> cache() {
        return this.grid(0).cache("person");
    }

    protected void put(int id, String name) {
        BinaryObjectBuilder bldr = this.grid(0).binary().builder("Person");
        bldr.setField("name", (Object)name);
        this.cache().put((Object)id, (Object)bldr.build());
    }

    protected String nameForId(int id) {
        return "Person" + id;
    }

    protected String nameForIdInCache(int id) {
        Object o = this.cache().withKeepBinary().get((Object)id);
        JdbcStreamingSelfTest.assertTrue((String)String.valueOf(o), (boolean)(o instanceof BinaryObject));
        return (String)((BinaryObject)o).field("name");
    }

    private void populateData(Connection conn, int from, int count) throws SQLException {
        try (PreparedStatement stmt = conn.prepareStatement("insert into PUBLIC.Person(\"id\", \"name\") values (?, ?)");){
            for (int i = from; i < from + count; ++i) {
                stmt.setInt(1, i);
                stmt.setString(2, this.nameForId(i));
                stmt.executeUpdate();
            }
        }
    }

    /*
     * Exception decompiling
     */
    private long countPersons(Connection conn) throws SQLException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }
}

