package org.apache.ignite.internal.processors.cache.query.continuous;

import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.AbstractContinuousQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

/* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryBufferCleanupAbstractTest.class */
public abstract class ContinuousQueryBufferCleanupAbstractTest extends GridCommonAbstractTest {
    private static final int RECORDS_CNT = 10000;
    private static final int ACK_THRESHOLD = 100;
    private static final String REMOTE_ROUTINE_INFO_CLASS_NAME = "org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$RemoteRoutineInfo";
    private static final String LOCAL_ROUTINE_INFO_CLASS_NAME = "org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$LocalRoutineInfo";
    private static final String BATCH_CLASS_NAME = "org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer$Batch";

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTest() throws Exception {
        stopAllGrids();
    }

    @Test
    public void checkCacheWithSingleNode() throws Exception {
        checkBuffer(1, 0, true);
    }

    @Test
    public void checkCacheWithMultipleNodes() throws Exception {
        checkBuffer(2, 0, true);
    }

    @Test
    public void checkCacheWithMultipleNodesWithBackups() throws Exception {
        checkBuffer(2, 1, true);
    }

    @Test
    public void checkCacheWithMultipleNodesWithBackupsWithoutClient() throws Exception {
        checkBuffer(2, 1, false);
    }

    protected abstract AbstractContinuousQuery<Integer, String> getContinuousQuery();

    private void checkBuffer(int i, int i2, boolean z) throws Exception {
        System.setProperty("IGNITE_CONTINUOUS_QUERY_ACK_THRESHOLD", Integer.toString(100));
        IgniteEx[] igniteExArr = new IgniteEx[i];
        for (int i3 = 0; i3 < i; i3++) {
            igniteExArr[i3] = startGrid("srv" + i3);
        }
        IgniteCache orCreateCache = (z ? startGrid(optimize(getConfiguration("client").setClientMode(true))) : igniteExArr[0]).getOrCreateCache(new CacheConfiguration("testCache").setBackups(i2).setAffinity(new RendezvousAffinityFunction(32, (IgniteBiPredicate) null)));
        orCreateCache.query(getContinuousQuery());
        for (int i4 = 0; i4 < 10000; i4++) {
            orCreateCache.put(Integer.valueOf(i4), Integer.toString(i4));
        }
        Thread.sleep(2000L);
        for (int i5 = 0; i5 < i; i5++) {
            validateBuffer(igniteExArr[i5], i2);
        }
    }

    private void validateBuffer(IgniteEx igniteEx, int i) throws ClassNotFoundException {
        GridContinuousProcessor continuous = igniteEx.context().continuous();
        ConcurrentMap concurrentMap = (ConcurrentMap) GridTestUtils.getFieldValue(continuous, GridContinuousProcessor.class, "rmtInfos");
        int i2 = 0;
        Iterator it = ((ConcurrentMap) GridTestUtils.getFieldValue(concurrentMap.values().isEmpty() ? (CacheContinuousQueryHandler) GridTestUtils.getFieldValue(((ConcurrentMap) GridTestUtils.getFieldValue(continuous, GridContinuousProcessor.class, "locInfos")).values().toArray()[0], Class.forName(LOCAL_ROUTINE_INFO_CLASS_NAME), "hnd") : (CacheContinuousQueryHandler) GridTestUtils.getFieldValue(concurrentMap.values().toArray()[0], Class.forName(REMOTE_ROUTINE_INFO_CLASS_NAME), "hnd"), CacheContinuousQueryHandler.class, "entryBufs")).values().iterator();
        while (it.hasNext()) {
            AtomicReference atomicReference = (AtomicReference) GridTestUtils.getFieldValue((CacheContinuousQueryEventBuffer) it.next(), CacheContinuousQueryEventBuffer.class, "curBatch");
            if (atomicReference.get() != null) {
                for (CacheContinuousQueryEntry cacheContinuousQueryEntry : (CacheContinuousQueryEntry[]) GridTestUtils.getFieldValue(atomicReference.get(), Class.forName(BATCH_CLASS_NAME), "entries")) {
                    if (cacheContinuousQueryEntry != null) {
                        i2++;
                    }
                }
            }
        }
        assertTrue(i2 < 100 + ((1 + i) * 32));
    }
}
