/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.mvcc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class CacheMvccStreamingInsertTest
extends CacheMvccAbstractTest {
    private IgniteCache<Object, Object> sqlNexus;
    private Connection conn;

    protected CacheMode cacheMode() {
        return CacheMode.PARTITIONED;
    }

    protected void beforeTest() throws Exception {
        super.beforeTest();
        IgniteEx ignite = this.startGrid(0);
        this.sqlNexus = ignite.getOrCreateCache(new CacheConfiguration("sqlNexus").setSqlSchema("PUBLIC"));
        this.sqlNexus.query(CacheMvccStreamingInsertTest.q("create table person(  id int not null primary key,  name varchar not null) with \"atomicity=transactional_snapshot\""));
        Properties props = new Properties();
        props.setProperty("ignite.jdbc.streaming", "true");
        this.conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1", props);
    }

    @Test
    public void testStreamingInsertWithoutOverwrite() throws Exception {
        this.conn.createStatement().execute("SET STREAMING 1 BATCH_SIZE 2 ALLOW_OVERWRITE 0  PER_NODE_BUFFER_SIZE 1000 FLUSH_FREQUENCY 100");
        this.sqlNexus.query(CacheMvccStreamingInsertTest.q("insert into person values(1, 'ivan')"));
        PreparedStatement batchStmt = this.conn.prepareStatement("insert into person values(?, ?)");
        batchStmt.setInt(1, 1);
        batchStmt.setString(2, "foo");
        batchStmt.addBatch();
        batchStmt.setInt(1, 2);
        batchStmt.setString(2, "bar");
        batchStmt.addBatch();
        TimeUnit.MILLISECONDS.sleep(500L);
        List rows = this.sqlNexus.query(CacheMvccStreamingInsertTest.q("select * from person")).getAll();
        List<List> exp = Arrays.asList(Arrays.asList(1, "ivan"), Arrays.asList(2, "bar"));
        CacheMvccStreamingInsertTest.assertEquals(exp, (Object)rows);
    }

    @Test
    public void testUpdateWithOverwrite() throws Exception {
        this.conn.createStatement().execute("SET STREAMING 1 BATCH_SIZE 2 ALLOW_OVERWRITE 1  PER_NODE_BUFFER_SIZE 1000 FLUSH_FREQUENCY 100");
        this.sqlNexus.query(CacheMvccStreamingInsertTest.q("insert into person values(1, 'ivan')"));
        PreparedStatement batchStmt = this.conn.prepareStatement("insert into person values(?, ?)");
        batchStmt.setInt(1, 1);
        batchStmt.setString(2, "foo");
        batchStmt.addBatch();
        batchStmt.setInt(1, 2);
        batchStmt.setString(2, "bar");
        batchStmt.addBatch();
        TimeUnit.MILLISECONDS.sleep(500L);
        List rows = this.sqlNexus.query(CacheMvccStreamingInsertTest.q("select * from person")).getAll();
        List<List> exp = Arrays.asList(Arrays.asList(1, "foo"), Arrays.asList(2, "bar"));
        CacheMvccStreamingInsertTest.assertEquals(exp, (Object)rows);
    }

    private static SqlFieldsQuery q(String sql) {
        return new SqlFieldsQuery(sql);
    }
}

