package org.gridgain.internal.processors.dr.handler;

import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.dr.DrSenderConnectionConfiguration;
import org.gridgain.grid.dr.DrSenderLoadBalancingMode;
import org.gridgain.grid.dr.store.DrSenderStoreOverflowException;
import org.gridgain.grid.dr.store.memory.DrSenderInMemoryStore;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.gridgain.grid.internal.processors.dr.store.DrCommonStoreManager;
import org.gridgain.grid.internal.processors.dr.store.DrFullStateTransferBuffer;
import org.gridgain.grid.internal.processors.dr.store.DrStoreManager;
import org.gridgain.internal.processors.dr.DrAbstractTest;
import org.gridgain.internal.processors.dr.util.DrTestReceiverHubListener;
import org.gridgain.plugin.security.SecurityServicePermissionsTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Assume;
import org.junit.Test;

/* loaded from: input_file:org/gridgain/internal/processors/dr/handler/DrHandlerBatchRoutingLocalSenderSelfTest.class */
public class DrHandlerBatchRoutingLocalSenderSelfTest extends DrAbstractTest {
    private boolean preferLocalSender;
    private boolean configureLocalSender;
    private boolean withSenderBufferOverflow;
    private final AtomicInteger locBatchCnt = new AtomicInteger();
    private final AtomicInteger rmtBatchCnt = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/internal/processors/dr/handler/DrHandlerBatchRoutingLocalSenderSelfTest$TestSenderStore.class */
    public static class TestSenderStore extends DrSenderInMemoryStore {
        private final boolean overflow;

        TestSenderStore(boolean z) {
            this.overflow = z;
        }

        public void store0(byte[] bArr, byte[] bArr2, int i, @Nullable IgniteUuid igniteUuid) throws IgniteCheckedException {
            if (this.overflow) {
                throw new DrSenderStoreOverflowException("TEST");
            }
            super.store0(bArr, bArr2, i, igniteUuid);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public void afterTest() throws Exception {
        super.afterTest();
        this.preferLocalSender = false;
        this.configureLocalSender = false;
        this.withSenderBufferOverflow = false;
        this.locBatchCnt.set(0);
        this.rmtBatchCnt.set(0);
    }

    @Test
    public void testPreferAndLocalSender() throws Exception {
        this.preferLocalSender = true;
        this.configureLocalSender = true;
        int i = 50;
        startUp();
        IgniteCache cache = grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        for (int i2 = 0; i2 < 50; i2++) {
            cache.put(Integer.valueOf(i2), Integer.valueOf(i2));
        }
        GridTestUtils.waitForCondition(() -> {
            return this.locBatchCnt.get() == i;
        }, 10000L);
        assertEquals(50, this.locBatchCnt.get());
        assertEquals(0, this.rmtBatchCnt.get());
    }

    @Test
    public void testPreferAndNoLocalSender() throws Exception {
        this.preferLocalSender = true;
        this.configureLocalSender = false;
        int i = 50;
        startUp();
        IgniteCache cache = grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        for (int i2 = 0; i2 < 50; i2++) {
            cache.put(Integer.valueOf(i2), Integer.valueOf(i2));
        }
        GridTestUtils.waitForCondition(() -> {
            return this.rmtBatchCnt.get() == i;
        }, 10000L);
        assertEquals(0, this.locBatchCnt.get());
        assertEquals(50, this.rmtBatchCnt.get());
    }

    @Test
    public void testPreferAndLocalSenderWithOverflowedBuffer() throws Exception {
        Assume.assumeTrue(DrUtils.isIncrementalDrEnabled());
        this.preferLocalSender = true;
        this.configureLocalSender = true;
        this.withSenderBufferOverflow = true;
        int i = 20;
        startUp();
        IgniteCache cache = grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        for (int i2 = 0; i2 < 20; i2++) {
            cache.put(Integer.valueOf(i2), Integer.valueOf(i2));
        }
        GridTestUtils.waitForCondition(() -> {
            return this.locBatchCnt.get() >= i;
        }, 5000L);
        assertEquals(0, this.rmtBatchCnt.get());
        assertTrue(this.locBatchCnt.get() > 0);
    }

    @Test
    public void testPreferAndLocalSenderWithOverflowedBuffer2() throws Exception {
        Assume.assumeFalse(DrUtils.isIncrementalDrEnabled());
        this.preferLocalSender = true;
        this.configureLocalSender = true;
        this.withSenderBufferOverflow = true;
        int i = 20;
        startUp();
        IgniteCache cache = grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        for (int i2 = 0; i2 < 20; i2++) {
            cache.put(Integer.valueOf(i2), Integer.valueOf(i2));
        }
        GridTestUtils.waitForCondition(() -> {
            return this.rmtBatchCnt.get() == i;
        }, 10000L);
        assertEquals(20, this.rmtBatchCnt.get());
        assertEquals(20, this.locBatchCnt.get());
    }

    @Test
    public void testDoNotPreferAndLocalSender() throws Exception {
        this.preferLocalSender = false;
        this.configureLocalSender = true;
        int i = 20;
        startUp();
        IgniteCache cache = grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        for (int i2 = 0; i2 < 20; i2++) {
            cache.put(Integer.valueOf(i2), Integer.valueOf(i2));
        }
        GridTestUtils.waitForCondition(() -> {
            return this.locBatchCnt.get() + this.rmtBatchCnt.get() == i;
        }, 10000L);
        assertTrue(this.locBatchCnt.get() > 0);
        assertTrue(this.rmtBatchCnt.get() > 0);
    }

    private void startUp() throws Exception {
        startTopology(createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.handler.DrHandlerBatchRoutingLocalSenderSelfTest.1
            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                DrHandlerBatchRoutingLocalSenderSelfTest drHandlerBatchRoutingLocalSenderSelfTest = DrHandlerBatchRoutingLocalSenderSelfTest.this;
                IgniteConfiguration[] igniteConfigurationArr = new IgniteConfiguration[2];
                igniteConfigurationArr[0] = DrHandlerBatchRoutingLocalSenderSelfTest.this.config(new GridGainConfiguration().setBatchSendSizeBytes(10), DrAbstractTest.TOP1_NODE, (byte) 1, tcpDiscoveryIpFinder, DrHandlerBatchRoutingLocalSenderSelfTest.this.configureLocalSender ? DrHandlerBatchRoutingLocalSenderSelfTest.this.senderHubConfig(DrHandlerBatchRoutingLocalSenderSelfTest.this.senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1)).setStore(new TestSenderStore(DrHandlerBatchRoutingLocalSenderSelfTest.this.withSenderBufferOverflow)) : null, null, DrHandlerBatchRoutingLocalSenderSelfTest.this.cacheConfiguration(SecurityServicePermissionsTest.CACHE_NAME));
                igniteConfigurationArr[1] = DrHandlerBatchRoutingLocalSenderSelfTest.this.config(new GridGainConfiguration(), DrAbstractTest.TOP1_NODE_SND, (byte) 1, tcpDiscoveryIpFinder, DrHandlerBatchRoutingLocalSenderSelfTest.this.senderHubConfig(DrHandlerBatchRoutingLocalSenderSelfTest.this.senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1)).setStore(new TestSenderStore(false)), null, new CacheConfiguration[0]);
                return drHandlerBatchRoutingLocalSenderSelfTest.wrap(igniteConfigurationArr);
            }
        }));
        receiverHub(startTopology(createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.handler.DrHandlerBatchRoutingLocalSenderSelfTest.2
            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                return DrHandlerBatchRoutingLocalSenderSelfTest.this.wrap(DrHandlerBatchRoutingLocalSenderSelfTest.this.config(new GridGainConfiguration(), DrAbstractTest.TOP2_NODE, (byte) 2, tcpDiscoveryIpFinder, null, null, DrHandlerBatchRoutingLocalSenderSelfTest.this.cacheConfiguration(SecurityServicePermissionsTest.CACHE_NAME)));
            }
        })).get(0), DrAbstractTest.RCV_PORT_1, new DrTestReceiverHubListener());
        IgniteKernal grid = grid(DrAbstractTest.TOP1_NODE);
        IgniteKernal grid2 = grid(DrAbstractTest.TOP1_NODE_SND);
        grid.context().io().addMessageListener(GridCacheUtils.replicationTopicSend(), (uuid, obj, b) -> {
            this.locBatchCnt.incrementAndGet();
        });
        grid2.context().io().addMessageListener(GridCacheUtils.replicationTopicSend(), (uuid2, obj2, b2) -> {
            this.rmtBatchCnt.incrementAndGet();
        });
        grid2.cache(SecurityServicePermissionsTest.CACHE_NAME);
        if (DrUtils.isIncrementalDrEnabled() && this.withSenderBufferOverflow) {
            DrCommonStoreManager storeManager = dr(grid).localSender().storeManager();
            Field declaredField = DrStoreManager.class.getDeclaredField("fstBuffer");
            declaredField.setAccessible(true);
            declaredField.set(storeManager, new DrFullStateTransferBuffer(0L) { // from class: org.gridgain.internal.processors.dr.handler.DrHandlerBatchRoutingLocalSenderSelfTest.3
                public boolean store(byte[] bArr, byte[] bArr2, int i, IgniteUuid igniteUuid, GridFutureAdapter<Void> gridFutureAdapter) {
                    return false;
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public DrSenderConfiguration senderHubConfig(DrSenderConnectionConfiguration... drSenderConnectionConfigurationArr) {
        return useSenderGroups() ? super.senderHubConfig(drSenderConnectionConfigurationArr).setSenderGroups(new String[]{"group-1"}) : super.senderHubConfig(drSenderConnectionConfigurationArr).setCacheNames(new String[]{SecurityServicePermissionsTest.CACHE_NAME});
    }

    protected CacheConfiguration cacheConfiguration(String str) {
        CacheConfiguration cacheConfiguration = new CacheConfiguration();
        cacheConfiguration.setName(str);
        cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
        cacheConfiguration.setAtomicityMode(CacheAtomicityMode.ATOMIC);
        cacheConfiguration.setNearConfiguration((NearCacheConfiguration) null);
        cacheConfiguration.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
        cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 2));
        CacheDrSenderConfiguration cacheDrSenderConfiguration = new CacheDrSenderConfiguration();
        cacheDrSenderConfiguration.setBatchSendSize(1);
        cacheDrSenderConfiguration.setBatchSendFrequency(0L);
        cacheDrSenderConfiguration.setLoadBalancingMode(DrSenderLoadBalancingMode.DR_ROUND_ROBIN);
        cacheDrSenderConfiguration.setPreferLocalSender(this.preferLocalSender);
        if (useSenderGroups()) {
            cacheDrSenderConfiguration.setSenderGroup("group-1");
        }
        ggCacheConfig(cacheConfiguration).setDrSenderConfiguration(cacheDrSenderConfiguration);
        return cacheConfiguration;
    }
}
