package org.apache.ignite.internal.processors.cache.query.continuous;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import javax.cache.configuration.Factory;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListener;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

/* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.class */
public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonAbstractTest {
    private static final int SERVER_GRIDS_COUNT = 6;
    public static final int KEYS = 2000;
    private static final ConcurrentMap<String, AtomicInteger> opCounts = new ConcurrentHashMap();
    private static boolean client = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest$EntryEventFilterFactory.class */
    public static final class EntryEventFilterFactory implements Factory<CacheEntryEventFilter> {

        @IgniteInstanceResource
        private Ignite ignite;
        private final int idx;

        private EntryEventFilterFactory(int i) {
            this.idx = i;
        }

        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public CacheEntryEventFilter m1160create() {
            return new CacheEntryEventFilter() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest.EntryEventFilterFactory.1
                public boolean evaluate(CacheEntryEvent cacheEntryEvent) throws CacheEntryListenerException {
                    GridCacheContinuousQueryMultiNodesFilteringTest.assertTrue(((Integer) EntryEventFilterFactory.this.ignite.cluster().localNode().attributes().get("idx")).intValue() % 2 == EntryEventFilterFactory.this.idx % 2);
                    return true;
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest$ListenerConfiguration.class */
    public static final class ListenerConfiguration extends MutableCacheEntryListenerConfiguration {

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest$ListenerConfiguration$Op.class */
        public enum Op {
            INSERT,
            UPDATE,
            REMOVE
        }

        ListenerConfiguration(final String str, final Op op) {
            super(new Factory<CacheEntryListener>() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest.ListenerConfiguration.1
                /* renamed from: create, reason: merged with bridge method [inline-methods] */
                public CacheEntryListener m1161create() {
                    switch (Op.this) {
                        case INSERT:
                            return new CacheEntryCreatedListener() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest.ListenerConfiguration.1.1
                                public void onCreated(Iterable iterable) {
                                    for (Object obj : iterable) {
                                        ((AtomicInteger) GridCacheContinuousQueryMultiNodesFilteringTest.opCounts.get(str + "_ins")).getAndIncrement();
                                    }
                                }
                            };
                        case UPDATE:
                            return new CacheEntryUpdatedListener() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest.ListenerConfiguration.1.2
                                public void onUpdated(Iterable iterable) {
                                    for (Object obj : iterable) {
                                        ((AtomicInteger) GridCacheContinuousQueryMultiNodesFilteringTest.opCounts.get(str + "_upd")).getAndIncrement();
                                    }
                                }
                            };
                        case REMOVE:
                            return new CacheEntryRemovedListener() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest.ListenerConfiguration.1.3
                                public void onRemoved(Iterable iterable) {
                                    for (Object obj : iterable) {
                                        ((AtomicInteger) GridCacheContinuousQueryMultiNodesFilteringTest.opCounts.get(str + "_rmv")).getAndIncrement();
                                    }
                                }
                            };
                        default:
                            throw new IgniteException(new IllegalArgumentException());
                    }
                }
            }, (Factory) null, true, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest$NodeFilter.class */
    public static final class NodeFilter implements IgnitePredicate<ClusterNode> {
        private final int idx;

        private NodeFilter(int i) {
            this.idx = i;
        }

        public boolean apply(ClusterNode clusterNode) {
            return ((Integer) clusterNode.attributes().get("idx")).intValue() % 2 == this.idx % 2;
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest$NodeFilterByRegexp.class */
    private static final class NodeFilterByRegexp implements IgnitePredicate<ClusterNode> {
        private final Pattern pattern;

        private NodeFilterByRegexp(String str) {
            this.pattern = Pattern.compile(str);
        }

        public boolean apply(ClusterNode clusterNode) {
            return this.pattern.matcher(clusterNode.consistentId().toString()).matches();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTest() throws Exception {
        stopAllGrids();
        client = false;
        super.afterTest();
    }

    @Test
    public void testFiltersAndListeners() throws Exception {
        for (int i = 1; i <= SERVER_GRIDS_COUNT; i++) {
            startGrid(i, false);
        }
        startGrid(7, true);
        for (int i2 = 1; i2 <= 7; i2++) {
            for (int i3 = 0; i3 < i2; i3++) {
                jcache(i2, "part" + i2).put("k" + i3, "v0");
                jcache(i2, "repl" + i2).put("k" + i3, "v0");
                jcache(i2, "part" + i2).put("k" + i3, "v1");
                jcache(i2, "repl" + i2).put("k" + i3, "v1");
                jcache(i2, "part" + i2).remove("k" + i3);
                jcache(i2, "repl" + i2).remove("k" + i3);
            }
        }
        for (int i4 = 1; i4 <= 7; i4++) {
            final int i5 = i4 * 3 * 2;
            final int i6 = i4;
            GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest.1
                public boolean apply() {
                    return ((AtomicInteger) GridCacheContinuousQueryMultiNodesFilteringTest.opCounts.get(new StringBuilder().append("qry").append(i6).append("_total").toString())).get() == i5;
                }
            }, 5000L);
            int i7 = opCounts.get("part" + i4 + "_ins").get();
            int i8 = opCounts.get("repl" + i4 + "_ins").get();
            int i9 = opCounts.get("part" + i4 + "_upd").get();
            int i10 = opCounts.get("repl" + i4 + "_upd").get();
            int i11 = opCounts.get("part" + i4 + "_rmv").get();
            int i12 = opCounts.get("repl" + i4 + "_rmv").get();
            int i13 = opCounts.get("qry" + i4 + "_total").get();
            assertEquals(i4, i7);
            assertEquals(i4, i8);
            assertEquals(i4, i9);
            assertEquals(i4, i10);
            assertEquals(i4, i11);
            assertEquals(i4, i12);
            assertEquals(i5, i13);
            assertEquals(i13, i7 + i8 + i9 + i10 + i11 + i12);
        }
    }

    @Test
    public void testWithNodeFilter() throws Exception {
        ArrayList arrayList = new ArrayList();
        startGridsMultiThreaded(3);
        awaitPartitionMapExchange();
        CacheConfiguration cacheConfiguration = cacheConfiguration(new NodeFilterByRegexp(".*(0|1)$"));
        grid(0).createCache(cacheConfiguration);
        final AtomicInteger atomicInteger = new AtomicInteger();
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CacheEntryUpdatedListener<Integer, Integer> cacheEntryUpdatedListener = new CacheEntryUpdatedListener<Integer, Integer>() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest.2
            public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> iterable) throws CacheEntryListenerException {
                for (CacheEntryEvent<? extends Integer, ? extends Integer> cacheEntryEvent : iterable) {
                    atomicInteger.incrementAndGet();
                    ClusterNode localNode = ((Ignite) cacheEntryEvent.getSource().unwrap(Ignite.class)).cluster().localNode();
                    Set set = (Set) concurrentHashMap.get(localNode);
                    if (set == null) {
                        ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet();
                        Set set2 = (Set) concurrentHashMap.putIfAbsent(localNode, concurrentSkipListSet);
                        set = set2 != null ? set2 : concurrentSkipListSet;
                    }
                    if (!set.add(cacheEntryEvent.getValue())) {
                        atomicBoolean.set(false);
                    }
                }
            }
        };
        for (int i = 0; i < 3; i++) {
            ContinuousQuery continuousQuery = new ContinuousQuery();
            continuousQuery.setLocalListener(cacheEntryUpdatedListener);
            IgniteEx grid = grid(i);
            log.info("Try to start CQ on node: " + grid.cluster().localNode().id());
            arrayList.add(grid.cache(cacheConfiguration.getName()).query(continuousQuery));
            log.info("CQ started on node: " + grid.cluster().localNode().id());
        }
        client = true;
        startGrid(3).cluster().active(true);
        awaitPartitionMapExchange();
        ContinuousQuery continuousQuery2 = new ContinuousQuery();
        continuousQuery2.setLocalListener(cacheEntryUpdatedListener);
        arrayList.add(grid(3).cache(cacheConfiguration.getName()).query(continuousQuery2));
        for (int i2 = 0; i2 <= 3; i2++) {
            for (int i3 = 0; i3 < 2000; i3++) {
                int i4 = (i2 * 2000) + i3;
                grid(i2).cache(cacheConfiguration.getName()).put(Integer.valueOf(i4), Integer.valueOf(i4));
            }
        }
        assertTrue(GridTestUtils.waitForCondition(new PA() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest.3
            public boolean apply() {
                return atomicInteger.get() >= 16000;
            }
        }, 5000L));
        assertFalse("Got duplicate", atomicBoolean.get());
        for (int i5 = 0; i5 < 8000; i5++) {
            for (Map.Entry entry : concurrentHashMap.entrySet()) {
                assertTrue("Lost event on node: " + ((ClusterNode) entry.getKey()).id() + ", event: " + i5, ((Set) entry.getValue()).remove(Integer.valueOf(i5)));
            }
        }
        for (Map.Entry entry2 : concurrentHashMap.entrySet()) {
            assertTrue("Unexpected event on node: " + entry2.getKey(), ((Set) entry2.getValue()).isEmpty());
        }
        assertEquals("Not expected count of CQ", 4, arrayList.size());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((QueryCursor) it.next()).close();
        }
    }

    private Ignite startGrid(final int i, boolean z) throws Exception {
        String testIgniteInstanceName = getTestIgniteInstanceName(i);
        IgniteConfiguration clientMode = optimize(getConfiguration(testIgniteInstanceName)).setClientMode(z);
        clientMode.setUserAttributes(Collections.singletonMap("idx", Integer.valueOf(i)));
        Ignite startGrid = startGrid(testIgniteInstanceName, clientMode);
        NodeFilter nodeFilter = new NodeFilter(i);
        String str = "part" + i;
        IgniteCache createCache = startGrid.createCache(defaultCacheConfiguration().setName("part" + i).setCacheMode(CacheMode.PARTITIONED).setBackups(1).setNodeFilter(nodeFilter));
        opCounts.put(str + "_ins", new AtomicInteger());
        opCounts.put(str + "_upd", new AtomicInteger());
        opCounts.put(str + "_rmv", new AtomicInteger());
        createCache.registerCacheEntryListener(new ListenerConfiguration(str, ListenerConfiguration.Op.INSERT));
        createCache.registerCacheEntryListener(new ListenerConfiguration(str, ListenerConfiguration.Op.UPDATE));
        createCache.registerCacheEntryListener(new ListenerConfiguration(str, ListenerConfiguration.Op.REMOVE));
        String str2 = "repl" + i;
        IgniteCache createCache2 = startGrid.createCache(defaultCacheConfiguration().setName("repl" + i).setCacheMode(CacheMode.REPLICATED).setNodeFilter(nodeFilter));
        opCounts.put(str2 + "_ins", new AtomicInteger());
        opCounts.put(str2 + "_upd", new AtomicInteger());
        opCounts.put(str2 + "_rmv", new AtomicInteger());
        createCache2.registerCacheEntryListener(new ListenerConfiguration(str2, ListenerConfiguration.Op.INSERT));
        createCache2.registerCacheEntryListener(new ListenerConfiguration(str2, ListenerConfiguration.Op.UPDATE));
        createCache2.registerCacheEntryListener(new ListenerConfiguration(str2, ListenerConfiguration.Op.REMOVE));
        opCounts.put("qry" + i + "_total", new AtomicInteger());
        ContinuousQuery continuousQuery = new ContinuousQuery();
        continuousQuery.setRemoteFilterFactory(new EntryEventFilterFactory(i));
        continuousQuery.setLocalListener(new CacheEntryUpdatedListener() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest.4
            public void onUpdated(Iterable iterable) {
                ((AtomicInteger) GridCacheContinuousQueryMultiNodesFilteringTest.opCounts.get("qry" + i + "_total")).incrementAndGet();
            }
        });
        createCache.query(continuousQuery);
        createCache2.query(continuousQuery);
        return startGrid;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        IgniteConfiguration configuration = super.getConfiguration(str);
        configuration.setConsistentId(str);
        configuration.setClientMode(client);
        return configuration;
    }

    private CacheConfiguration cacheConfiguration(NodeFilterByRegexp nodeFilterByRegexp) {
        return new CacheConfiguration("test-cache-cq").setBackups(1).setNodeFilter(nodeFilterByRegexp).setAtomicityMode(atomicityMode()).setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC).setCacheMode(CacheMode.PARTITIONED);
    }

    protected CacheAtomicityMode atomicityMode() {
        return CacheAtomicityMode.ATOMIC;
    }
}
