/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.mvcc;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.CacheException;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class CacheMvccBasicContinuousQueryTest
extends CacheMvccAbstractTest {
    private static final long LATCH_TIMEOUT = 5000L;

    protected CacheMode cacheMode() {
        return CacheMode.PARTITIONED;
    }

    protected void afterTest() throws Exception {
        GridTestUtils.waitForCondition((GridAbsPredicate)new PA(){

            public boolean apply() {
                for (Ignite node : G.allGrids()) {
                    GridContinuousProcessor proc = ((IgniteEx)node).context().continuous();
                    if (((Map)U.field((Object)proc, (String)"rmtInfos")).size() <= 0) continue;
                    return false;
                }
                return true;
            }
        }, (long)3000L);
        for (Ignite node : G.allGrids()) {
            GridContinuousProcessor proc = ((IgniteEx)node).context().continuous();
            CacheMvccBasicContinuousQueryTest.assertEquals((int)1, (int)((Map)U.field((Object)proc, (String)"locInfos")).size());
            CacheMvccBasicContinuousQueryTest.assertEquals((int)0, (int)((Map)U.field((Object)proc, (String)"rmtInfos")).size());
            CacheMvccBasicContinuousQueryTest.assertEquals((int)0, (int)((Map)U.field((Object)proc, (String)"startFuts")).size());
            CacheMvccBasicContinuousQueryTest.assertEquals((int)0, (int)((Map)U.field((Object)proc, (String)"stopFuts")).size());
            CacheMvccBasicContinuousQueryTest.assertEquals((int)0, (int)((Map)U.field((Object)proc, (String)"bufCheckThreads")).size());
            CacheContinuousQueryManager mgr = ((IgniteEx)node).context().cache().internalCache("default").context().continuousQueries();
            CacheMvccBasicContinuousQueryTest.assertEquals((int)0, (int)((Map)U.field((Object)mgr, (String)"lsnrs")).size());
            MvccCachingManager cachingMgr = ((IgniteEx)node).context().cache().context().mvccCaching();
            CacheMvccBasicContinuousQueryTest.assertEquals((int)0, (int)((Map)U.field((Object)cachingMgr, (String)"enlistCache")).size());
            CacheMvccBasicContinuousQueryTest.assertEquals((int)0, (int)((Map)U.field((Object)cachingMgr, (String)"cntrs")).size());
        }
        super.afterTest();
    }

    @Test
    public void testAllEntries() throws Exception {
        Ignite node = this.startGrids(3);
        IgniteCache cache = node.createCache(this.cacheConfiguration(this.cacheMode(), CacheWriteSynchronizationMode.FULL_SYNC, 1, 2).setCacheMode(CacheMode.REPLICATED).setIndexedTypes(new Class[]{Integer.class, Integer.class}));
        ContinuousQuery qry = new ContinuousQuery();
        final HashMap map = new HashMap();
        final CountDownLatch latch = new CountDownLatch(5);
        qry.setLocalListener((CacheEntryUpdatedListener)new CacheEntryUpdatedListener<Integer, Integer>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
                    Map map2 = map;
                    synchronized (map2) {
                        ArrayList<Object> vals = (ArrayList<Object>)map.get(e.getKey());
                        if (vals == null) {
                            vals = new ArrayList<Object>();
                            map.put(e.getKey(), vals);
                        }
                        vals.add(e.getValue());
                    }
                    latch.countDown();
                }
            }
        });
        try (QueryCursor ignored = cache.query((Query)qry);){
            String dml;
            try (Transaction tx = node.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);){
                dml = "INSERT INTO Integer (_key, _val) values (1,1),(2,2)";
                cache.query(new SqlFieldsQuery(dml)).getAll();
                tx.commit();
            }
            tx = node.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
            var9_11 = null;
            try {
                String dml1 = "MERGE INTO Integer (_key, _val) values (3,3)";
                cache.query(new SqlFieldsQuery(dml1)).getAll();
                String dml2 = "DELETE FROM Integer WHERE _key = 2";
                cache.query(new SqlFieldsQuery(dml2)).getAll();
                String dml3 = "UPDATE Integer SET _val = 10 WHERE _key = 1";
                cache.query(new SqlFieldsQuery(dml3)).getAll();
                tx.commit();
            }
            catch (Throwable dml1) {
                var9_11 = dml1;
                throw dml1;
            }
            finally {
                if (tx != null) {
                    if (var9_11 != null) {
                        try {
                            tx.close();
                        }
                        catch (Throwable dml1) {
                            var9_11.addSuppressed(dml1);
                        }
                    } else {
                        tx.close();
                    }
                }
            }
            tx = node.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
            var9_11 = null;
            try {
                dml = "INSERT INTO Integer (_key, _val) values (4,4),(5,5)";
                cache.query(new SqlFieldsQuery(dml)).getAll();
                tx.rollback();
            }
            catch (Throwable throwable) {
                var9_11 = throwable;
                throw throwable;
            }
            finally {
                if (tx != null) {
                    if (var9_11 != null) {
                        try {
                            tx.close();
                        }
                        catch (Throwable throwable) {
                            var9_11.addSuppressed(throwable);
                        }
                    } else {
                        tx.close();
                    }
                }
            }
            assert (latch.await(5000L, TimeUnit.MILLISECONDS));
            CacheMvccBasicContinuousQueryTest.assertEquals((int)3, (int)map.size());
            List vals = (List)map.get(1);
            CacheMvccBasicContinuousQueryTest.assertNotNull((Object)vals);
            CacheMvccBasicContinuousQueryTest.assertEquals((int)2, (int)vals.size());
            CacheMvccBasicContinuousQueryTest.assertEquals((int)1, (int)((Integer)vals.get(0)));
            CacheMvccBasicContinuousQueryTest.assertEquals((int)10, (int)((Integer)vals.get(1)));
            vals = (List)map.get(2);
            CacheMvccBasicContinuousQueryTest.assertNotNull((Object)vals);
            CacheMvccBasicContinuousQueryTest.assertEquals((int)2, (int)vals.size());
            CacheMvccBasicContinuousQueryTest.assertEquals((int)2, (int)((Integer)vals.get(0)));
            CacheMvccBasicContinuousQueryTest.assertEquals((int)2, (int)((Integer)vals.get(1)));
            vals = (List)map.get(3);
            CacheMvccBasicContinuousQueryTest.assertNotNull((Object)vals);
            CacheMvccBasicContinuousQueryTest.assertEquals((int)1, (int)vals.size());
            CacheMvccBasicContinuousQueryTest.assertEquals((int)3, (int)((Integer)vals.get(0)));
        }
    }

    @Test
    public void testCachingMaxSize() throws Exception {
        final Ignite node = this.startGrids(1);
        final IgniteCache cache = node.createCache(this.cacheConfiguration(this.cacheMode(), CacheWriteSynchronizationMode.FULL_SYNC, 1, 2).setCacheMode(CacheMode.PARTITIONED).setIndexedTypes(new Class[]{Integer.class, Integer.class}));
        final ContinuousQuery qry = new ContinuousQuery();
        qry.setLocalListener((CacheEntryUpdatedListener)new CacheEntryUpdatedListener<Integer, Integer>(){

            public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
            }
        });
        GridTestUtils.assertThrows((IgniteLogger)this.log, (Callable)new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                try (QueryCursor ignored = cache.query((Query)qry);
                     Transaction tx = node.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);){
                    for (int i = 0; i < MvccCachingManager.TX_SIZE_THRESHOLD + 1; ++i) {
                        cache.query(new SqlFieldsQuery("INSERT INTO Integer (_key, _val) values (" + i + ", 1)")).getAll();
                    }
                    tx.commit();
                }
                return null;
            }
        }, CacheException.class, (String)"Failed to run update. Transaction is too large. Consider reducing transaction size");
    }

    @Ignore(value="https://issues.apache.org/jira/browse/IGNITE-10768")
    @Test
    public void testUpdateCountersGapClosedSimplePartitioned() throws Exception {
        this.checkUpdateCountersGapIsProcessedSimple(CacheMode.PARTITIONED);
    }

    @Test
    public void testUpdateCountersGapClosedSimpleReplicated() throws Exception {
        this.checkUpdateCountersGapIsProcessedSimple(CacheMode.REPLICATED);
    }

    private void checkUpdateCountersGapIsProcessedSimple(CacheMode cacheMode) throws Exception {
        this.testSpi = true;
        final int srvCnt = 4;
        this.startGridsMultiThreaded(srvCnt);
        this.client = true;
        final IgniteEx nearNode = this.startGrid(srvCnt);
        IgniteCache cache = nearNode.createCache(this.cacheConfiguration(cacheMode, CacheWriteSynchronizationMode.FULL_SYNC, srvCnt - 1, srvCnt).setIndexedTypes(new Class[]{Integer.class, Integer.class}));
        IgniteEx primary = this.grid(0);
        List keys = this.primaryKeys(primary.cache("default"), 3);
        ContinuousQuery qry = new ContinuousQuery();
        final ArrayList arrivedEvts = new ArrayList();
        final CountDownLatch latch = new CountDownLatch(2);
        qry.setLocalListener((CacheEntryUpdatedListener)new CacheEntryUpdatedListener<Integer, Integer>(){

            public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
                    arrivedEvts.add(e);
                    latch.countDown();
                }
            }
        });
        QueryCursor cur = nearNode.cache("default").query((Query)qry);
        cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(new Object[]{keys.get(0)})).getAll();
        Transaction txA = nearNode.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi((Ignite)primary);
        spi.blockMessages((IgniteBiPredicate)new IgniteBiPredicate<ClusterNode, Message>(){
            private final AtomicInteger limiter = new AtomicInteger();

            public boolean apply(ClusterNode node, Message msg) {
                if (msg instanceof GridDhtTxPrepareRequest) {
                    return this.limiter.getAndIncrement() < srvCnt - 1;
                }
                return msg instanceof GridContinuousMessage;
            }
        });
        cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(new Object[]{keys.get(1)})).getAll();
        txA.commitAsync();
        GridTestUtils.waitForCondition((GridAbsPredicate)new GridAbsPredicate(){

            public boolean apply() {
                return nearNode.context().cache().context().tm().activeTransactions().stream().allMatch(tx -> tx.state() == TransactionState.PREPARING);
            }
        }, (long)3000L);
        GridTestUtils.runAsync(() -> {
            try (Transaction txB = nearNode.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);){
                cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(new Object[]{keys.get(2)}));
                txB.commit();
            }
        }).get();
        long primaryUpdCntr = this.getUpdateCounter(primary, (Integer)keys.get(0));
        CacheMvccBasicContinuousQueryTest.assertEquals((long)3L, (long)primaryUpdCntr);
        this.stopGrid(primary.name());
        GridTestUtils.waitForCondition((GridAbsPredicate)new GridAbsPredicate(){

            public boolean apply() {
                boolean allRolledBack = true;
                for (int i = 1; i < srvCnt; ++i) {
                    boolean rolledBack = CacheMvccBasicContinuousQueryTest.this.grid(i).context().cache().context().tm().activeTransactions().stream().allMatch(tx -> tx.state() == TransactionState.ROLLED_BACK);
                    allRolledBack &= rolledBack;
                }
                return allRolledBack;
            }
        }, (long)3000L);
        for (int i = 1; i < srvCnt; ++i) {
            IgniteCache backupCache = this.grid(i).cache("default");
            int size = backupCache.query(new SqlFieldsQuery("select * from Integer")).getAll().size();
            long backupCntr = this.getUpdateCounter(this.grid(i), (Integer)keys.get(0));
            CacheMvccBasicContinuousQueryTest.assertEquals((int)2, (int)size);
            CacheMvccBasicContinuousQueryTest.assertEquals((long)primaryUpdCntr, (long)backupCntr);
        }
        CacheMvccBasicContinuousQueryTest.assertTrue((boolean)latch.await(3L, TimeUnit.SECONDS));
        CacheMvccBasicContinuousQueryTest.assertEquals((int)2, (int)arrivedEvts.size());
        CacheMvccBasicContinuousQueryTest.assertEquals(keys.get(0), (Object)((CacheEntryEvent)arrivedEvts.get(0)).getKey());
        CacheMvccBasicContinuousQueryTest.assertEquals(keys.get(2), (Object)((CacheEntryEvent)arrivedEvts.get(1)).getKey());
        cur.close();
        nearNode.close();
    }

    @Ignore(value="https://issues.apache.org/jira/browse/IGNITE-10756")
    @Test
    public void testUpdateCountersGapClosedPartitioned() throws Exception {
        this.checkUpdateCountersGapsClosed(CacheMode.PARTITIONED);
    }

    @Test
    public void testUpdateCountersGapClosedReplicated() throws Exception {
        this.checkUpdateCountersGapsClosed(CacheMode.REPLICATED);
    }

    private void checkUpdateCountersGapsClosed(CacheMode cacheMode) throws Exception {
        this.testSpi = true;
        final int srvCnt = 4;
        this.startGridsMultiThreaded(srvCnt);
        IgniteEx nearNode = this.grid(srvCnt - 1);
        IgniteCache cache = nearNode.createCache(this.cacheConfiguration(cacheMode, CacheWriteSynchronizationMode.FULL_SYNC, srvCnt - 1, srvCnt).setIndexedTypes(new Class[]{Integer.class, Integer.class}));
        final IgniteEx primary = this.grid(0);
        Affinity aff = nearNode.affinity(cache.getName());
        int[] nearBackupParts = aff.backupPartitions(nearNode.localNode());
        int[] primaryParts = aff.primaryPartitions(primary.localNode());
        HashSet<Integer> nearSet = new HashSet<Integer>();
        for (int part : nearBackupParts) {
            nearSet.add(part);
        }
        HashSet<Integer> primarySet = new HashSet<Integer>();
        for (int part : primaryParts) {
            primarySet.add(part);
        }
        nearSet.retainAll(primarySet);
        List<Integer> keys = this.singlePartKeys((IgniteCache<Object, Object>)primary.cache("default"), 20, (Integer)nearSet.iterator().next());
        int range = 3;
        ContinuousQuery qry = new ContinuousQuery();
        final ArrayList arrivedEvts = new ArrayList();
        final CountDownLatch latch = new CountDownLatch(range * 2);
        qry.setLocalListener((CacheEntryUpdatedListener)new CacheEntryUpdatedListener<Integer, Integer>(){

            public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
                    arrivedEvts.add(e);
                    latch.countDown();
                }
            }
        });
        QueryCursor cur = nearNode.cache("default").query((Query)qry);
        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi((Ignite)primary);
        spi.blockMessages((IgniteBiPredicate)new IgniteBiPredicate<ClusterNode, Message>(){
            private final AtomicInteger limiter = new AtomicInteger();

            public boolean apply(ClusterNode node, Message msg) {
                if (msg instanceof GridDhtTxPrepareRequest) {
                    return this.limiter.getAndIncrement() < srvCnt - 1;
                }
                return false;
            }
        });
        Transaction txA = primary.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
        for (int i = 0; i < range; ++i) {
            primary.cache("default").put((Object)keys.get(i), (Object)2);
        }
        txA.commitAsync();
        GridTestUtils.runAsync(() -> {
            try (Transaction tx = primary.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);){
                for (int i = range; i < range * 2; ++i) {
                    primary.cache("default").put(keys.get(i), (Object)1);
                }
                tx.commit();
            }
        }).get();
        GridTestUtils.waitForCondition((GridAbsPredicate)new GridAbsPredicate(){

            public boolean apply() {
                return primary.context().cache().context().tm().activeTransactions().stream().allMatch(tx -> tx.state() == TransactionState.PREPARING);
            }
        }, (long)3000L);
        GridTestUtils.runAsync(() -> {
            try (Transaction txB = primary.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);){
                for (int i = range * 2; i < range * 3; ++i) {
                    primary.cache("default").put(keys.get(i), (Object)3);
                }
                txB.commit();
            }
        }).get();
        long primaryUpdCntr = this.getUpdateCounter(primary, keys.get(0));
        CacheMvccBasicContinuousQueryTest.assertEquals((long)(range * 3), (long)primaryUpdCntr);
        this.stopGrid(primary.name());
        GridTestUtils.waitForCondition((GridAbsPredicate)new GridAbsPredicate(){

            public boolean apply() {
                boolean allRolledBack = true;
                for (int i = 1; i < srvCnt; ++i) {
                    boolean rolledBack = CacheMvccBasicContinuousQueryTest.this.grid(i).context().cache().context().tm().activeTransactions().stream().allMatch(tx -> tx.state() == TransactionState.ROLLED_BACK);
                    allRolledBack &= rolledBack;
                }
                return allRolledBack;
            }
        }, (long)3000L);
        for (int i = 1; i < srvCnt; ++i) {
            IgniteCache backupCache = this.grid(i).cache("default");
            int size = backupCache.query(new SqlFieldsQuery("select * from Integer")).getAll().size();
            long backupCntr = this.getUpdateCounter(this.grid(i), keys.get(0));
            CacheMvccBasicContinuousQueryTest.assertEquals((int)(range * 2), (int)size);
            CacheMvccBasicContinuousQueryTest.assertEquals((long)primaryUpdCntr, (long)backupCntr);
        }
        CacheMvccBasicContinuousQueryTest.assertTrue((boolean)latch.await(5L, TimeUnit.SECONDS));
        CacheMvccBasicContinuousQueryTest.assertEquals((int)(range * 2), (int)arrivedEvts.size());
        cur.close();
    }

    private List<Integer> singlePartKeys(IgniteCache<Object, Object> primaryCache, int size, int part) throws Exception {
        Ignite ignite = (Ignite)primaryCache.unwrap(Ignite.class);
        ArrayList<Integer> res = new ArrayList<Integer>();
        final Affinity aff = ignite.affinity(primaryCache.getName());
        final ClusterNode node = ignite.cluster().localNode();
        CacheMvccBasicContinuousQueryTest.assertTrue((boolean)GridTestUtils.waitForCondition((GridAbsPredicate)new GridAbsPredicate(){

            public boolean apply() {
                return aff.primaryPartitions(node).length > 0;
            }
        }, (long)5000L));
        int cnt = 0;
        for (int key = 0; key < aff.partitions() * size * 10; ++key) {
            if (aff.partition((Object)key) != part) continue;
            res.add(key);
            if (++cnt == size) break;
        }
        CacheMvccBasicContinuousQueryTest.assertEquals((int)size, (int)res.size());
        return res;
    }

    private long getUpdateCounter(IgniteEx node, Integer key) {
        int partId = node.cachex("default").context().affinity().partition((Object)key);
        GridDhtLocalPartition part = node.cachex("default").context().dht().topology().localPartition(partId);
        assert (part != null);
        return part.updateCounter();
    }
}

