/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.index;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.index.DynamicEnableIndexingAbstractTest;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.transactions.TransactionSerializationException;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class DynamicEnableIndexingConcurrentSelfTest
extends DynamicEnableIndexingAbstractTest {
    private static final ConcurrentHashMap<UUID, T2<CountDownLatch, CountDownLatch>> BLOCKS = new ConcurrentHashMap();
    private static final String NAME_FIELD_IDX_NAME = "name_idx";
    private static final int LARGE_NUM_ENTRIES = 100000;
    @Parameterized.Parameter(value=0)
    public CacheMode cacheMode;
    @Parameterized.Parameter(value=1)
    public CacheAtomicityMode atomicityMode;

    @Parameterized.Parameters(name="cacheMode={0},atomicityMode={1}")
    public static Iterable<Object[]> params() {
        CacheMode[] cacheModes = new CacheMode[]{CacheMode.PARTITIONED, CacheMode.REPLICATED};
        CacheAtomicityMode[] atomicityModes = new CacheAtomicityMode[]{CacheAtomicityMode.ATOMIC, CacheAtomicityMode.TRANSACTIONAL, CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT};
        ArrayList<Object[]> res = new ArrayList<Object[]>();
        for (CacheMode cacheMode : cacheModes) {
            for (CacheAtomicityMode atomicityMode : atomicityModes) {
                res.add(new Object[]{cacheMode, atomicityMode});
            }
        }
        return res;
    }

    protected void beforeTest() throws Exception {
        super.beforeTest();
    }

    protected void afterTest() throws Exception {
        GridQueryProcessor.idxCls = null;
        for (T2<CountDownLatch, CountDownLatch> block : BLOCKS.values()) {
            ((CountDownLatch)block.get1()).countDown();
        }
        BLOCKS.clear();
        this.stopAllGrids();
        super.afterTest();
    }

    @Test
    @Ignore(value="https://ggsystems.atlassian.net/browse/GG-46342")
    public void testCoordinatorChange() throws Exception {
        IgniteEx srv1 = this.ignitionStart(this.serverConfiguration(1));
        this.ignitionStart(this.serverConfiguration(2));
        this.ignitionStart(this.serverConfiguration(3));
        this.ignitionStart(this.serverConfiguration(4));
        IgniteEx cli = this.ignitionStart(this.clientConfiguration(5));
        cli.cluster().state(ClusterState.ACTIVE);
        this.createCache(cli);
        this.loadData(cli, 0, 1000);
        UUID id1 = srv1.cluster().localNode().id();
        CountDownLatch idxLatch = DynamicEnableIndexingConcurrentSelfTest.blockIndexing(id1);
        IgniteInternalFuture<?> tblFut = this.enableIndexing(cli);
        idxLatch.await();
        Ignition.stop((String)srv1.name(), (boolean)true);
        DynamicEnableIndexingConcurrentSelfTest.unblockIndexing(id1);
        tblFut.get();
        for (Ignite g : G.allGrids()) {
            DynamicEnableIndexingConcurrentSelfTest.assertTrue((this.query(g, SELECT_ALL_QUERY).size() >= 750 ? 1 : 0) != 0);
            this.performQueryingIntegrityCheck(g);
            this.checkQueryParallelism((IgniteEx)g, this.cacheMode);
        }
    }

    @Test
    public void testClientReconnect() throws Exception {
        IgniteEx srv1 = this.ignitionStart(this.serverConfiguration(1));
        this.ignitionStart(this.serverConfiguration(2));
        this.ignitionStart(this.serverConfiguration(3));
        this.ignitionStart(this.serverConfiguration(4));
        IgniteEx cli = this.ignitionStart(this.clientConfiguration(5));
        cli.cluster().state(ClusterState.ACTIVE);
        this.createCache(cli);
        this.loadData(cli, 0, 1000);
        IgniteClientReconnectAbstractTest.reconnectClientNode((IgniteLogger)log, (Ignite)cli, (Ignite)srv1, () -> {
            try {
                this.enableIndexing(srv1).get();
            }
            catch (IgniteCheckedException e) {
                throw new IgniteException("Failed to enable indexing", (Throwable)e);
            }
        });
        DynamicEnableIndexingConcurrentSelfTest.assertEquals((int)1000, (int)this.query((Ignite)cli, SELECT_ALL_QUERY).size());
        for (Ignite g : G.allGrids()) {
            DynamicEnableIndexingConcurrentSelfTest.assertEquals((int)1000, (int)this.query(g, SELECT_ALL_QUERY).size());
            this.performQueryingIntegrityCheck(g);
            this.checkQueryParallelism((IgniteEx)g, this.cacheMode);
        }
    }

    @Test
    public void testNodeJoinOnPendingOperation() throws Exception {
        CountDownLatch finishLatch = new CountDownLatch(3);
        IgniteEx srv1 = this.ignitionStart(this.serverConfiguration(1), finishLatch);
        srv1.cluster().state(ClusterState.ACTIVE);
        this.createCache(srv1);
        this.loadData(srv1, 0, 1000);
        CountDownLatch idxLatch = DynamicEnableIndexingConcurrentSelfTest.blockIndexing((Ignite)srv1);
        IgniteInternalFuture<?> tblFut = this.enableIndexing(srv1);
        U.await((CountDownLatch)idxLatch);
        this.ignitionStart(this.serverConfiguration(2), finishLatch);
        this.ignitionStart(this.serverConfiguration(3), finishLatch);
        this.awaitPartitionMapExchange();
        DynamicEnableIndexingConcurrentSelfTest.assertFalse((boolean)tblFut.isDone());
        DynamicEnableIndexingConcurrentSelfTest.unblockIndexing((Ignite)srv1);
        tblFut.get();
        U.await((CountDownLatch)finishLatch);
        for (Ignite g : G.allGrids()) {
            DynamicEnableIndexingConcurrentSelfTest.assertEquals((int)1000, (int)this.query(g, SELECT_ALL_QUERY).size());
            this.performQueryingIntegrityCheck(g);
            this.checkQueryParallelism((IgniteEx)g, this.cacheMode);
        }
    }

    @Test
    public void testOperationChaining() throws Exception {
        IgniteEx srv1 = this.ignitionStart(this.serverConfiguration(1));
        this.ignitionStart(this.serverConfiguration(2));
        this.ignitionStart(this.serverConfiguration(3, true));
        this.ignitionStart(this.clientConfiguration(4));
        srv1.cluster().state(ClusterState.ACTIVE);
        this.createCache(srv1);
        this.loadData(srv1, 0, 1000);
        CountDownLatch idxLatch = DynamicEnableIndexingConcurrentSelfTest.blockIndexing((Ignite)srv1);
        IgniteInternalFuture<?> tblFut = this.enableIndexing(srv1);
        QueryIndex idx = new QueryIndex();
        idx.setName(NAME_FIELD_IDX_NAME.toUpperCase());
        idx.setFieldNames(Collections.singletonList("name".toUpperCase()), true);
        IgniteInternalFuture idxFut1 = srv1.context().query().dynamicIndexCreate("poi", "DOMAIN", "POI", idx, false, 0);
        idxLatch.await();
        this.ignitionStart(this.serverConfiguration(5));
        this.ignitionStart(this.serverConfiguration(6, true));
        this.ignitionStart(this.clientConfiguration(7));
        DynamicEnableIndexingConcurrentSelfTest.assertFalse((boolean)tblFut.isDone());
        DynamicEnableIndexingConcurrentSelfTest.assertFalse((boolean)idxFut1.isDone());
        DynamicEnableIndexingConcurrentSelfTest.unblockIndexing((Ignite)srv1);
        idxFut1.get();
        for (Ignite g : G.allGrids()) {
            DynamicEnableIndexingConcurrentSelfTest.assertEquals((int)1000, (int)this.query(g, SELECT_ALL_QUERY).size());
            this.performQueryingIntegrityCheck(g);
            this.checkQueryParallelism((IgniteEx)g, this.cacheMode);
            IgniteCache cache = g.cache("poi");
            this.assertIndexUsed(cache, "SELECT * FROM POI WHERE name = 'POI_100'", NAME_FIELD_IDX_NAME);
            List res = cache.query(new SqlFieldsQuery("SELECT id FROM POI WHERE name = 'POI_100'").setSchema("DOMAIN")).getAll();
            DynamicEnableIndexingConcurrentSelfTest.assertEquals((int)1, (int)res.size());
            DynamicEnableIndexingConcurrentSelfTest.assertEquals((Object)100, ((List)res.get(0)).get(0));
        }
    }

    @Test
    @Ignore(value="https://ggsystems.atlassian.net/browse/GG-46217")
    public void testConcurrentRebalance() throws Exception {
        IgniteEx srv1 = this.ignitionStart(this.serverConfiguration(1));
        IgniteEx srv2 = this.ignitionStart(this.serverConfiguration(2));
        srv1.cluster().state(ClusterState.ACTIVE);
        this.createCache(srv1);
        this.loadData(srv1, 0, 100000);
        CountDownLatch idxLatch1 = DynamicEnableIndexingConcurrentSelfTest.blockIndexing((Ignite)srv1);
        CountDownLatch idxLatch2 = DynamicEnableIndexingConcurrentSelfTest.blockIndexing((Ignite)srv2);
        IgniteInternalFuture<?> tblFut = this.enableIndexing(srv1);
        U.await((CountDownLatch)idxLatch1);
        U.await((CountDownLatch)idxLatch2);
        this.ignitionStart(this.serverConfiguration(3));
        DynamicEnableIndexingConcurrentSelfTest.unblockIndexing((Ignite)srv1);
        DynamicEnableIndexingConcurrentSelfTest.unblockIndexing((Ignite)srv2);
        this.ignitionStart(this.serverConfiguration(4));
        this.awaitPartitionMapExchange();
        tblFut.get();
        for (Ignite g : G.allGrids()) {
            DynamicEnableIndexingConcurrentSelfTest.assertEquals((int)100000, (int)this.query(g, SELECT_ALL_QUERY).size());
            this.performQueryingIntegrityCheck(g);
            this.checkQueryParallelism((IgniteEx)g, this.cacheMode);
        }
    }

    @Test
    public void testConcurrentPutRemove() throws Exception {
        CountDownLatch finishLatch = new CountDownLatch(4);
        IgniteEx srv1 = this.ignitionStart(this.serverConfiguration(1), finishLatch);
        this.ignitionStart(this.serverConfiguration(2), finishLatch);
        this.ignitionStart(this.serverConfiguration(3), finishLatch);
        this.ignitionStart(this.serverConfiguration(4), finishLatch);
        srv1.cluster().state(ClusterState.ACTIVE);
        this.createCache(srv1);
        this.loadData(srv1, 0, 100000);
        AtomicBoolean stopped = new AtomicBoolean();
        CountDownLatch iterations = new CountDownLatch(1000);
        IgniteInternalFuture task = this.multithreadedAsync(() -> {
            while (!stopped.get()) {
                IgniteEx node = this.grid(ThreadLocalRandom.current().nextInt(1, 5));
                ThreadLocalRandom rnd = ThreadLocalRandom.current();
                int i = rnd.nextInt(0, 100000);
                BinaryObject val = node.binary().builder("PointOfInterest").setField("name", (Object)("POI_" + i), String.class).setField("latitude", (Object)rnd.nextDouble(), Double.class).setField("longitude", (Object)rnd.nextDouble(), Double.class).build();
                IgniteCache cache = node.cache("poi").withKeepBinary();
                try {
                    if (ThreadLocalRandom.current().nextBoolean()) {
                        cache.put((Object)i, (Object)val);
                        continue;
                    }
                    cache.remove((Object)i);
                }
                catch (CacheException e) {
                    if (X.hasCause((Throwable)e, (Class[])new Class[]{TransactionSerializationException.class})) continue;
                    throw e;
                }
                finally {
                    iterations.countDown();
                }
            }
            return null;
        }, 4);
        iterations.await(2L, TimeUnit.SECONDS);
        this.enableIndexing(srv1).get();
        stopped.set(true);
        task.get();
        finishLatch.await();
        IgniteCache cache = srv1.cache("poi").withKeepBinary();
        this.query((Ignite)srv1, SELECT_ALL_QUERY).forEach(res -> {
            BinaryObject val = (BinaryObject)cache.get(res.get(0));
            DynamicEnableIndexingConcurrentSelfTest.assertNotNull((Object)val);
            DynamicEnableIndexingConcurrentSelfTest.assertEquals((Object)val.field("name"), res.get(1));
            DynamicEnableIndexingConcurrentSelfTest.assertEquals((Object)val.field("latitude"), res.get(2));
            DynamicEnableIndexingConcurrentSelfTest.assertEquals((Object)val.field("longitude"), res.get(3));
        });
    }

    @Test
    @Ignore(value="https://ggsystems.atlassian.net/browse/GG-46252")
    public void testConcurrentEnableIndexing() throws Exception {
        IgniteEx srv1 = this.ignitionStart(this.serverConfiguration(1));
        this.ignitionStart(this.serverConfiguration(2));
        this.ignitionStart(this.clientConfiguration(3));
        this.ignitionStart(this.clientConfiguration(4));
        srv1.cluster().state(ClusterState.ACTIVE);
        this.createCache(srv1);
        this.loadData(srv1, 0, 100000);
        AtomicBoolean stopped = new AtomicBoolean();
        AtomicInteger success = new AtomicInteger();
        CountDownLatch iterations = new CountDownLatch(1000);
        IgniteInternalFuture task = this.multithreadedAsync(() -> {
            while (!stopped.get()) {
                IgniteEx node = this.grid(ThreadLocalRandom.current().nextInt(1, 4));
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
                this.enableIndexing(node).chain((IgniteClosure & Serializable)fut -> {
                    try {
                        fut.get();
                        success.incrementAndGet();
                    }
                    catch (IgniteCheckedException e) {
                        DynamicEnableIndexingConcurrentSelfTest.assertTrue((boolean)e.hasCause(new Class[]{SchemaOperationException.class}));
                        SchemaOperationException opEx = (SchemaOperationException)e.getCause(SchemaOperationException.class);
                        DynamicEnableIndexingConcurrentSelfTest.assertEquals((int)8, (int)opEx.code());
                        DynamicEnableIndexingConcurrentSelfTest.assertEquals((String)"Cache is already indexed: poi", (String)opEx.getMessage());
                    }
                    return null;
                });
                iterations.countDown();
            }
            return null;
        }, 4);
        iterations.await(2L, TimeUnit.SECONDS);
        this.ignitionStart(this.serverConfiguration(5));
        this.ignitionStart(this.serverConfiguration(6));
        stopped.set(true);
        task.get();
        DynamicEnableIndexingConcurrentSelfTest.assertEquals((int)1, (int)success.get());
        this.awaitPartitionMapExchange();
        for (Ignite g : G.allGrids()) {
            DynamicEnableIndexingConcurrentSelfTest.assertEquals((int)100000, (int)this.query(g, SELECT_ALL_QUERY).size());
            this.performQueryingIntegrityCheck(g);
            this.checkQueryParallelism((IgniteEx)g, this.cacheMode);
        }
    }

    private IgniteInternalFuture<?> enableIndexing(IgniteEx node) {
        Integer parallelism = this.cacheMode == CacheMode.PARTITIONED ? Integer.valueOf(4) : null;
        return node.context().query().dynamicAddQueryEntity("poi", "DOMAIN", this.queryEntity(), parallelism, false);
    }

    private QueryEntity queryEntity() {
        LinkedHashMap<String, String> fields = new LinkedHashMap<String, String>();
        fields.put("id", Integer.class.getName());
        fields.put("name", String.class.getName());
        fields.put("latitude", Double.class.getName());
        fields.put("longitude", Double.class.getName());
        return new QueryEntity().setKeyType(Integer.class.getName()).setKeyFieldName("id").setValueType("PointOfInterest").setTableName("POI").setFields(fields);
    }

    private void createCache(IgniteEx node) throws Exception {
        CacheConfiguration<?, ?> ccfg = this.testCacheConfiguration("poi", this.cacheMode, this.atomicityMode);
        node.context().cache().dynamicStartCache(ccfg, "poi", null, true, true, true).get();
    }

    private static void awaitIndexing(UUID nodeId) {
        T2<CountDownLatch, CountDownLatch> blocker = BLOCKS.get(nodeId);
        if (blocker != null) {
            ((CountDownLatch)blocker.get2()).countDown();
            while (true) {
                try {
                    ((CountDownLatch)blocker.get1()).await();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    continue;
                }
                break;
            }
        }
    }

    private static CountDownLatch blockIndexing(Ignite node) {
        UUID nodeId = ((IgniteEx)node).localNode().id();
        return DynamicEnableIndexingConcurrentSelfTest.blockIndexing(nodeId);
    }

    private static CountDownLatch blockIndexing(UUID nodeId) {
        DynamicEnableIndexingConcurrentSelfTest.assertFalse((boolean)BLOCKS.containsKey(nodeId));
        CountDownLatch idxLatch = new CountDownLatch(1);
        BLOCKS.put(nodeId, (T2<CountDownLatch, CountDownLatch>)new T2((Object)new CountDownLatch(1), (Object)idxLatch));
        return idxLatch;
    }

    private static void unblockIndexing(Ignite node) {
        UUID nodeId = ((IgniteEx)node).localNode().id();
        DynamicEnableIndexingConcurrentSelfTest.unblockIndexing(nodeId);
    }

    private static void unblockIndexing(UUID nodeId) {
        T2<CountDownLatch, CountDownLatch> blocker = BLOCKS.remove(nodeId);
        DynamicEnableIndexingConcurrentSelfTest.assertNotNull(blocker);
        ((CountDownLatch)blocker.get1()).countDown();
    }

    private IgniteEx ignitionStart(IgniteConfiguration cfg) throws Exception {
        return this.ignitionStart(cfg, null);
    }

    private IgniteEx ignitionStart(IgniteConfiguration cfg, final CountDownLatch latch) throws Exception {
        GridQueryProcessor.idxCls = BlockingIndexing.class;
        IgniteEx node = this.startGrid(cfg);
        if (latch != null) {
            node.context().discovery().setCustomEventListener(SchemaFinishDiscoveryMessage.class, (CustomEventListener)new CustomEventListener<SchemaFinishDiscoveryMessage>(){

                public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, SchemaFinishDiscoveryMessage msg) {
                    latch.countDown();
                }
            });
        }
        return node;
    }

    private static class BlockingIndexing
    extends IgniteH2Indexing {
        private BlockingIndexing() {
        }

        public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx, boolean force) {
            DynamicEnableIndexingConcurrentSelfTest.awaitIndexing(this.ctx.localNodeId());
            return super.rebuildIndexesFromHash(cctx, force);
        }
    }
}

