/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

public class CacheScanPartitionQueryFallbackSelfTest
extends GridCommonAbstractTest {
    private static final int GRID_CNT = 3;
    private static final int KEYS_CNT = 51200;
    private int backups;
    private CacheMode cacheMode;
    private volatile boolean clientMode;
    private static UUID expNodeId;
    private CommunicationSpiFactory commSpiFactory;
    private Map<Integer, Map<Integer, Integer>> entries = new HashMap<Integer, Map<Integer, Integer>>();
    private boolean syncRebalance;

    protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
        cfg.setClientMode(this.clientMode);
        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
        cfg.setCommunicationSpi((CommunicationSpi)this.commSpiFactory.create());
        CacheConfiguration ccfg = CacheScanPartitionQueryFallbackSelfTest.defaultCacheConfiguration();
        ccfg.setCacheMode(this.cacheMode);
        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
        ccfg.setBackups(this.backups);
        if (this.syncRebalance) {
            ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
        }
        ccfg.setNearConfiguration(null);
        cfg.setCacheConfiguration(new CacheConfiguration[]{ccfg});
        return cfg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testScanLocal() throws Exception {
        this.cacheMode = CacheMode.PARTITIONED;
        this.backups = 0;
        this.commSpiFactory = new TestLocalCommunicationSpiFactory();
        try {
            IgniteEx ignite = this.startGrids(3);
            IgniteCacheProxy<Integer, Integer> cache = this.fillCache((Ignite)ignite);
            int part = CacheScanPartitionQueryFallbackSelfTest.anyLocalPartition(cache.context());
            QueryCursor qry = cache.query((Query)new ScanQuery().setPartition(Integer.valueOf(part)));
            this.doTestScanQuery((QueryCursor<Cache.Entry<Integer, Integer>>)qry, part);
        }
        finally {
            this.stopAllGrids();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testScanLocalExplicit() throws Exception {
        this.cacheMode = CacheMode.PARTITIONED;
        this.backups = 0;
        this.commSpiFactory = new TestLocalCommunicationSpiFactory();
        try {
            IgniteEx ignite = this.startGrids(3);
            IgniteCacheProxy<Integer, Integer> cache = this.fillCache((Ignite)ignite);
            int part = CacheScanPartitionQueryFallbackSelfTest.anyLocalPartition(cache.context());
            QueryCursor qry = cache.query((Query)new ScanQuery().setPartition(Integer.valueOf(part)).setLocal(true));
            this.doTestScanQuery((QueryCursor<Cache.Entry<Integer, Integer>>)qry, part);
            GridTestUtils.assertThrows((IgniteLogger)log, () -> {
                int remPart = (Integer)this.remotePartition(cache.context()).getKey();
                cache.query((Query)new ScanQuery().setPartition(Integer.valueOf(remPart)).setLocal(true));
                return null;
            }, IgniteCheckedException.class, null);
        }
        finally {
            this.stopAllGrids();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testScanLocalExplicitNoPart() throws Exception {
        this.cacheMode = CacheMode.PARTITIONED;
        this.backups = 0;
        this.commSpiFactory = new TestLocalCommunicationSpiFactory();
        try {
            IgniteEx ignite = this.startGrids(3);
            IgniteCacheProxy<Integer, Integer> cache = this.fillCache((Ignite)ignite);
            QueryCursor qry = cache.query((Query)new ScanQuery().setLocal(true));
            CacheScanPartitionQueryFallbackSelfTest.assertFalse((boolean)qry.getAll().isEmpty());
        }
        finally {
            this.stopAllGrids();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testScanRemote() throws Exception {
        this.cacheMode = CacheMode.PARTITIONED;
        this.backups = 0;
        this.commSpiFactory = new TestRemoteCommunicationSpiFactory();
        try {
            IgniteEx ignite = this.startGrids(3);
            IgniteCacheProxy<Integer, Integer> cache = this.fillCache((Ignite)ignite);
            IgniteBiTuple<Integer, UUID> tup = this.remotePartition(cache.context());
            int part = (Integer)tup.get1();
            expNodeId = (UUID)tup.get2();
            QueryCursor qry = cache.query((Query)new ScanQuery().setPartition(Integer.valueOf(part)));
            this.doTestScanQuery((QueryCursor<Cache.Entry<Integer, Integer>>)qry, part);
        }
        finally {
            this.stopAllGrids();
        }
    }

    @Test
    public void testScanFallbackOnRebalancing() throws Exception {
        this.scanFallbackOnRebalancing(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void scanFallbackOnRebalancing(final boolean cur) throws Exception {
        this.cacheMode = CacheMode.PARTITIONED;
        this.clientMode = false;
        this.backups = 2;
        this.commSpiFactory = new TestFallbackOnRebalancingCommunicationSpiFactory();
        this.syncRebalance = true;
        try {
            IgniteEx ignite = this.startGrids(3);
            this.fillCache((Ignite)ignite);
            final AtomicBoolean done = new AtomicBoolean(false);
            final AtomicInteger idx = new AtomicInteger(3);
            IgniteInternalFuture fut1 = this.multithreadedAsync(new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    int id = idx.getAndIncrement();
                    while (!done.get()) {
                        CacheScanPartitionQueryFallbackSelfTest.this.startGrid(id);
                        Thread.sleep(3000L);
                        CacheScanPartitionQueryFallbackSelfTest.this.info("Will stop grid: " + CacheScanPartitionQueryFallbackSelfTest.this.getTestIgniteInstanceName(id));
                        CacheScanPartitionQueryFallbackSelfTest.this.stopGrid(id);
                        if (done.get()) {
                            return null;
                        }
                        Thread.sleep(3000L);
                    }
                    return null;
                }
            }, 2);
            final AtomicInteger nodeIdx = new AtomicInteger();
            IgniteInternalFuture fut2 = this.multithreadedAsync(new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    int nodeId = nodeIdx.getAndIncrement();
                    IgniteCache cache = CacheScanPartitionQueryFallbackSelfTest.this.grid(nodeId).cache("default");
                    int cntr = 0;
                    while (!done.get()) {
                        int part = ThreadLocalRandom.current().nextInt(CacheScanPartitionQueryFallbackSelfTest.this.ignite(nodeId).affinity("default").partitions());
                        if (cntr++ % 100 == 0) {
                            CacheScanPartitionQueryFallbackSelfTest.this.info("Running query [node=" + nodeId + ", part=" + part + ']');
                        }
                        QueryCursor cur0 = cache.query((Query)new ScanQuery(part));
                        Throwable throwable = null;
                        try {
                            if (cur) {
                                CacheScanPartitionQueryFallbackSelfTest.this.doTestScanQueryCursor((QueryCursor<Cache.Entry<Integer, Integer>>)cur0, part);
                                continue;
                            }
                            CacheScanPartitionQueryFallbackSelfTest.this.doTestScanQuery((QueryCursor<Cache.Entry<Integer, Integer>>)cur0, part);
                        }
                        catch (Throwable throwable2) {
                            throwable = throwable2;
                            throw throwable2;
                        }
                        finally {
                            if (cur0 == null) continue;
                            if (throwable != null) {
                                try {
                                    cur0.close();
                                }
                                catch (Throwable throwable3) {
                                    throwable.addSuppressed(throwable3);
                                }
                                continue;
                            }
                            cur0.close();
                        }
                    }
                    return null;
                }
            }, 3);
            Thread.sleep(60000L);
            done.set(true);
            fut2.get();
            fut1.get();
        }
        finally {
            this.stopAllGrids();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testScanFallbackOnRebalancingCursor1() throws Exception {
        this.cacheMode = CacheMode.PARTITIONED;
        this.clientMode = false;
        this.backups = 1;
        this.commSpiFactory = new TestFallbackOnRebalancingCommunicationSpiFactory();
        try {
            IgniteEx ignite = this.startGrids(3);
            this.fillCache((Ignite)ignite);
            final AtomicBoolean done = new AtomicBoolean(false);
            IgniteInternalFuture fut1 = this.multithreadedAsync(new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    for (int i = 0; i < 5; ++i) {
                        CacheScanPartitionQueryFallbackSelfTest.this.startGrid(3 + i);
                        U.sleep((long)500L);
                    }
                    done.set(true);
                    return null;
                }
            }, 1);
            final AtomicInteger nodeIdx = new AtomicInteger();
            IgniteInternalFuture fut2 = this.multithreadedAsync(new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    int nodeId = nodeIdx.getAndIncrement();
                    IgniteCache cache = CacheScanPartitionQueryFallbackSelfTest.this.grid(nodeId).cache("default");
                    int cntr = 0;
                    while (!done.get()) {
                        int part = ThreadLocalRandom.current().nextInt(CacheScanPartitionQueryFallbackSelfTest.this.ignite(nodeId).affinity("default").partitions());
                        if (cntr++ % 100 == 0) {
                            CacheScanPartitionQueryFallbackSelfTest.this.info("Running query [node=" + nodeId + ", part=" + part + ']');
                        }
                        QueryCursor cur = cache.query((Query)new ScanQuery(part).setPageSize(5));
                        Throwable throwable = null;
                        try {
                            CacheScanPartitionQueryFallbackSelfTest.this.doTestScanQueryCursor((QueryCursor<Cache.Entry<Integer, Integer>>)cur, part);
                        }
                        catch (Throwable throwable2) {
                            throwable = throwable2;
                            throw throwable2;
                        }
                        finally {
                            if (cur == null) continue;
                            if (throwable != null) {
                                try {
                                    cur.close();
                                }
                                catch (Throwable throwable3) {
                                    throwable.addSuppressed(throwable3);
                                }
                                continue;
                            }
                            cur.close();
                        }
                    }
                    return null;
                }
            }, 3);
            fut1.get();
            fut2.get();
        }
        finally {
            this.stopAllGrids();
        }
    }

    @Test
    public void testScanFallbackOnRebalancingCursor2() throws Exception {
        this.scanFallbackOnRebalancing(true);
    }

    protected IgniteCacheProxy<Integer, Integer> fillCache(Ignite ignite) {
        IgniteCacheProxy cache = (IgniteCacheProxy)ignite.cache("default");
        for (int i = 0; i < 51200; ++i) {
            cache.put((Object)i, (Object)i);
            int part = cache.context().affinity().partition((Object)i);
            Map<Integer, Integer> partEntries = this.entries.get(part);
            if (partEntries == null) {
                partEntries = new HashMap<Integer, Integer>();
                this.entries.put(part, partEntries);
            }
            partEntries.put(i, i);
        }
        return cache;
    }

    protected void doTestScanQuery(QueryCursor<Cache.Entry<Integer, Integer>> qry, int part) {
        List qryEntries = qry.getAll();
        Map<Integer, Integer> map = this.entries.get(part);
        for (Cache.Entry e : qryEntries) {
            CacheScanPartitionQueryFallbackSelfTest.assertEquals((Object)map.get(e.getKey()), (Object)e.getValue());
        }
        CacheScanPartitionQueryFallbackSelfTest.assertEquals((String)("Invalid number of entries for partition: " + part), (int)map.size(), (int)qryEntries.size());
    }

    protected void doTestScanQueryCursor(QueryCursor<Cache.Entry<Integer, Integer>> cur, int part) {
        Map<Integer, Integer> map = this.entries.get(part);
        assert (map != null);
        int cnt = 0;
        for (Cache.Entry e : cur) {
            CacheScanPartitionQueryFallbackSelfTest.assertEquals((Object)map.get(e.getKey()), (Object)e.getValue());
            ++cnt;
        }
        CacheScanPartitionQueryFallbackSelfTest.assertEquals((String)("Invalid number of entries for partition: " + part), (int)map.size(), (int)cnt);
    }

    private static int anyLocalPartition(GridCacheContext<?, ?> cctx) {
        return ((GridDhtLocalPartition)F.first((List)cctx.topology().localPartitions())).id();
    }

    private IgniteBiTuple<Integer, UUID> remotePartition(GridCacheContext cctx) {
        ClusterNode node = (ClusterNode)F.first((Iterable)cctx.kernalContext().grid().cluster().forRemotes().nodes());
        GridCacheAffinityManager affMgr = cctx.affinity();
        AffinityTopologyVersion topVer = affMgr.affinityTopologyVersion();
        Set parts = affMgr.primaryPartitions(node.id(), topVer);
        return new IgniteBiTuple(F.first((Iterable)parts), (Object)node.id());
    }

    private Set<Integer> localPartitions(Ignite ignite) {
        GridCacheContext cctx = ((IgniteCacheProxy)ignite.cache("default")).context();
        Collection owningParts = F.view((Collection)cctx.topology().localPartitions(), (IgnitePredicate[])new IgnitePredicate[]{new IgnitePredicate<GridDhtLocalPartition>(){

            public boolean apply(GridDhtLocalPartition part) {
                return part.state() == GridDhtPartitionState.OWNING;
            }
        }});
        return new HashSet<Integer>(F.transform((Collection)owningParts, (IgniteClosure)new IgniteClosure<GridDhtLocalPartition, Integer>(){

            public Integer apply(GridDhtLocalPartition part) {
                return part.id();
            }
        }));
    }

    private static class TestFallbackOnRebalancingCommunicationSpiFactory
    implements CommunicationSpiFactory {
        private TestFallbackOnRebalancingCommunicationSpiFactory() {
        }

        @Override
        public TcpCommunicationSpi create() {
            return new TcpCommunicationSpi();
        }
    }

    private static class TestRemoteCommunicationSpiFactory
    implements CommunicationSpiFactory {
        private TestRemoteCommunicationSpiFactory() {
        }

        @Override
        public TcpCommunicationSpi create() {
            return new TcpCommunicationSpi(){

                public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
                    Message origMsg = ((GridIoMessage)msg).message();
                    if (origMsg instanceof GridCacheQueryRequest) {
                        CacheScanPartitionQueryFallbackSelfTest.assertEquals((Object)expNodeId, (Object)node.id());
                    }
                    super.sendMessage(node, msg, ackC);
                }
            };
        }
    }

    private static class TestLocalCommunicationSpiFactory
    implements CommunicationSpiFactory {
        private TestLocalCommunicationSpiFactory() {
        }

        @Override
        public TcpCommunicationSpi create() {
            return new TcpCommunicationSpi(){

                public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
                    Message origMsg = ((GridIoMessage)msg).message();
                    if (origMsg instanceof GridCacheQueryRequest) {
                        CacheScanPartitionQueryFallbackSelfTest.fail();
                    }
                    super.sendMessage(node, msg, ackC);
                }
            };
        }
    }

    private static interface CommunicationSpiFactory {
        public TcpCommunicationSpi create();
    }
}

