package org.gridgain.internal.processors.dr.handler;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.dr.store.DrSenderStore;
import org.gridgain.grid.dr.store.fs.DrSenderFsStore;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalResponse;
import org.gridgain.grid.internal.processors.dr.store.DrStoreManager;
import org.gridgain.internal.processors.dr.DrAbstractTest;
import org.gridgain.internal.processors.dr.util.DrTestCommunicationSpi;
import org.gridgain.internal.processors.dr.util.DrTestQueuedCommunicationSpiListener;
import org.gridgain.plugin.security.SecurityServicePermissionsTest;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

/* loaded from: input_file:org/gridgain/internal/processors/dr/handler/DrSyncFullStateTransferBackpressureSelfTest.class */
public class DrSyncFullStateTransferBackpressureSelfTest extends DrAbstractTest {
    static final String STORE_PATH_1 = U.getIgniteHome() + "/work/dr/my-dr-store-1";
    static final String STORE_PATH_2 = U.getIgniteHome() + "/work/dr/my-dr-store-2";
    private DrTestQueuedCommunicationSpiListener sndHubLsnr;
    private long fstBufferSize;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/gridgain/internal/processors/dr/handler/DrSyncFullStateTransferBackpressureSelfTest$LargeEntry.class */
    public static class LargeEntry {
        private int id = ThreadLocalRandom.current().nextInt();

        @GridToStringExclude
        private byte[] payload;

        LargeEntry(int i) {
            this.payload = new byte[i];
            Arrays.fill(this.payload, (byte) (this.id & 255));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            LargeEntry largeEntry = (LargeEntry) obj;
            return this.id == largeEntry.id && Arrays.equals(this.payload, largeEntry.payload);
        }

        public int hashCode() {
            return this.id;
        }

        public String toString() {
            return S.toString(LargeEntry.class, this);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public void beforeTest() throws Exception {
        super.beforeTest();
        clearStores(STORE_PATH_1, STORE_PATH_2);
        cleanPersistenceDir();
        this.fstBufferSize = 32768L;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public void afterTest() throws Exception {
        super.afterTest();
        this.sndHubLsnr = null;
        clearStores(STORE_PATH_1, STORE_PATH_2);
        cleanPersistenceDir();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public boolean useSenderGroups() {
        return true;
    }

    private TcpDiscoveryIpFinder createTopologyReceiver() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        addTopology(ipFinder, config(new GridGainConfiguration(), DrAbstractTest.TOP2_NODE, (byte) 2, ipFinder, null, null, cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, false, null, null), cacheConfig("cache_2", CacheMode.PARTITIONED, false, null, null)));
        return ipFinder;
    }

    private TcpDiscoveryIpFinder createTopologySender() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        addTopology(ipFinder, sndDataNodeConfig(ipFinder, DrAbstractTest.TOP1_NODE, (byte) 1), sndDataNodeConfig(ipFinder, DrAbstractTest.TOP1_NODE_2, (byte) 1));
        return ipFinder;
    }

    private <K, V> CacheConfiguration<K, V> senderCacheConfig(String str, @Nullable CacheDrSenderConfiguration cacheDrSenderConfiguration) {
        CacheConfiguration<K, V> cacheConfig = cacheConfig(str, CacheMode.PARTITIONED, true);
        if (cacheDrSenderConfiguration == null) {
            cacheDrSenderConfiguration = cacheDrSenderConfig();
        }
        ggCacheConfig(cacheConfig).setDrSenderConfiguration(cacheDrSenderConfiguration);
        cacheConfig.setAffinity(new RendezvousAffinityFunction(false, 20));
        cacheConfig.setBackups(1);
        return cacheConfig;
    }

    @NotNull
    private CacheDrSenderConfiguration cacheDrSenderConfig() {
        CacheDrSenderConfiguration cacheDrSenderConfiguration = new CacheDrSenderConfiguration();
        cacheDrSenderConfiguration.setBatchSendSize(100);
        cacheDrSenderConfiguration.setBatchSendFrequency(200L);
        cacheDrSenderConfiguration.setStateTransferThrottle(200L);
        cacheDrSenderConfiguration.setStateTransferThrottleBytes(0);
        return cacheDrSenderConfiguration;
    }

    private IgniteConfiguration sndDataNodeConfig(TcpDiscoveryIpFinder tcpDiscoveryIpFinder, String str, byte b) throws Exception {
        IgniteConfiguration config = config(new GridGainConfiguration(), str, b, tcpDiscoveryIpFinder, null, null, senderCacheConfig(SecurityServicePermissionsTest.CACHE_NAME, null));
        config.setCommunicationSpi(new DrTestCommunicationSpi());
        return config;
    }

    private IgniteConfiguration senderNodeCfg(TcpDiscoveryIpFinder tcpDiscoveryIpFinder, String str, byte b, DrSenderStore drSenderStore) throws Exception {
        GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
        if (drSenderStore instanceof DrSenderFsStore) {
            ((DrSenderFsStore) drSenderStore).setSynchronousWrites(true);
        }
        DrSenderConfiguration senderHubConfig = senderHubConfig(senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1).setStore(drSenderStore));
        senderHubConfig.setMaxQueueSize(10000);
        senderHubConfig.setReconnectOnFailureTimeout(500L);
        senderHubConfig.setFullStateTransferBufferSize(this.fstBufferSize);
        IgniteConfiguration config = config(gridGainConfiguration, str, b, tcpDiscoveryIpFinder, senderHubConfig, null, new CacheConfiguration[0]);
        this.sndHubLsnr = new DrTestQueuedCommunicationSpiListener();
        config.setCommunicationSpi(new DrTestCommunicationSpi(this.sndHubLsnr));
        return config;
    }

    @Test
    public void testFullStateTransferBuffer() throws Exception {
        this.fstBufferSize = 0L;
        TcpDiscoveryIpFinder createTopologySender = createTopologySender();
        TcpDiscoveryIpFinder createTopologyReceiver = createTopologyReceiver();
        startTopology(createTopologySender);
        startTopology(createTopologyReceiver);
        DrSenderFsStore directoryPath = new DrSenderFsStore().setDirectoryPath(STORE_PATH_1);
        Ignite start = G.start(senderNodeCfg(createTopologySender, DrAbstractTest.TOP1_NODE_SND, (byte) 1, directoryPath));
        Ignite ignite = G.ignite(DrAbstractTest.TOP1_NODE);
        DrStoreManager drStoreManager = (DrStoreManager) GridTestUtils.getFieldValue(dr(start).localSender(), new String[]{"storeMgr"});
        assertEquals(0L, directoryPath.sizeBytes());
        assertEquals(0L, drStoreManager.fstBufferSizeBytes());
        startReplication(SecurityServicePermissionsTest.CACHE_NAME, ignite, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        Map<Object, Object> populateCache = populateCache(ignite.cache(SecurityServicePermissionsTest.CACHE_NAME), 0, 1000);
        assertTrue(directoryPath.sizeBytes() > 0);
        assertEquals(0L, drStoreManager.fstBufferSizeBytes());
        startReceiver(createTopologyReceiver);
        Ignite ignite2 = G.ignite(DrAbstractTest.TOP2_NODE);
        GridTestUtils.waitForCondition(() -> {
            return directoryPath.sizeBytes() == 0 && drStoreManager.fstBufferSizeBytes() == 0;
        }, 5000L);
        compareCaches(ignite2.cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, 15000L);
        stopReplication(SecurityServicePermissionsTest.CACHE_NAME, ignite, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        populateCache.putAll(populateCache(ignite.cache(SecurityServicePermissionsTest.CACHE_NAME), 1000, 100));
        assertEquals(0L, directoryPath.sizeBytes());
        assertEquals(0L, drStoreManager.fstBufferSizeBytes());
        startReplication(SecurityServicePermissionsTest.CACHE_NAME, ignite, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        dr(start).localSender().pause((byte) 2);
        IgniteFuture stateTransfer = dr(ignite).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, true, new byte[0]);
        GridTestUtils.waitForCondition(() -> {
            return drStoreManager.fstBufferSizeBytes() > 0;
        }, 5000L);
        assertEquals(0L, directoryPath.sizeBytes());
        dr(start).localSender().resume((byte) 2);
        stateTransfer.get(getTestTimeout());
        assertEquals(0L, drStoreManager.fstBufferSizeBytes());
        compareCaches(ignite2.cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, 0L);
    }

    @Test
    public void testFullStateTransferBackPressure() throws Exception {
        TcpDiscoveryIpFinder createTopologySender = createTopologySender();
        TcpDiscoveryIpFinder createTopologyReceiver = createTopologyReceiver();
        startTopology(createTopologySender);
        startTopology(createTopologyReceiver);
        DrSenderFsStore directoryPath = new DrSenderFsStore().setDirectoryPath(STORE_PATH_1);
        Ignite start = G.start(senderNodeCfg(createTopologySender, DrAbstractTest.TOP1_NODE_SND, (byte) 1, directoryPath));
        Ignite ignite = G.ignite(DrAbstractTest.TOP1_NODE);
        DrStoreManager drStoreManager = (DrStoreManager) GridTestUtils.getFieldValue(dr(start).localSender(), new String[]{"storeMgr"});
        stopReplication(SecurityServicePermissionsTest.CACHE_NAME, ignite, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        Map<Object, Object> populateCache = populateCache(ignite.cache(SecurityServicePermissionsTest.CACHE_NAME), 0, 1000);
        assertEquals(0L, directoryPath.sizeBytes());
        assertEquals(0L, drStoreManager.fstBufferSizeBytes());
        startReplication(SecurityServicePermissionsTest.CACHE_NAME, ignite, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        Ignite ignite2 = G.ignite(DrAbstractTest.TOP2_NODE);
        assertEquals(0L, ignite2.cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(ignite).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, true, new byte[0]);
        GridTestUtils.waitForCondition(() -> {
            try {
                IgniteBiTuple<UUID, DrInternalResponse> nextOutResponse = this.sndHubLsnr.nextOutResponse(2000L);
                if (nextOutResponse != null) {
                    if (((DrInternalResponse) nextOutResponse.getValue()).code() == 1) {
                        return true;
                    }
                }
                return false;
            } catch (Exception e) {
                throw new IgniteException(e);
            }
        }, 5000L);
        assertFalse(isDrStopped(DrAbstractTest.TOP1_NODE, SecurityServicePermissionsTest.CACHE_NAME));
        assertEquals(0L, directoryPath.sizeBytes());
        assertTrue(drStoreManager.fstBufferSizeBytes() > 0);
        populateCache.putAll(populateCache(ignite.cache(SecurityServicePermissionsTest.CACHE_NAME), 1000, 100));
        GridTestUtils.waitForCondition(() -> {
            return directoryPath.sizeBytes() > 0;
        }, 5000L);
        assertTrue(directoryPath.sizeBytes() > 0);
        assertTrue(drStoreManager.fstBufferSizeBytes() > 0);
        startReceiver(createTopologyReceiver);
        stateTransfer.get(15000L);
        GridTestUtils.waitForCondition(() -> {
            return directoryPath.sizeBytes() == 0 && drStoreManager.fstBufferSizeBytes() == 0;
        }, 5000L);
        compareCaches(ignite2.cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, 0L);
    }

    @Test
    public void testFSTBackPressureLargeBatch() throws Exception {
        this.fstBufferSize = 4096L;
        TcpDiscoveryIpFinder createTopologySender = createTopologySender();
        TcpDiscoveryIpFinder createTopologyReceiver = createTopologyReceiver();
        startTopology(createTopologySender);
        startTopology(createTopologyReceiver);
        DrSenderFsStore directoryPath = new DrSenderFsStore().setDirectoryPath(STORE_PATH_1);
        Ignite start = G.start(senderNodeCfg(createTopologySender, DrAbstractTest.TOP1_NODE_SND, (byte) 1, directoryPath));
        Ignite ignite = G.ignite(DrAbstractTest.TOP1_NODE);
        DrStoreManager drStoreManager = (DrStoreManager) GridTestUtils.getFieldValue(dr(start).localSender(), new String[]{"storeMgr"});
        ignite.createCache(senderCacheConfig("cache_2", cacheDrSenderConfig().setBatchSendSize(5).setBatchSendFrequency(100L)));
        stopReplication("cache_2", ignite, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        Map<Object, Object> populateCache = populateCache(ignite.cache("cache_2"), 0, 5, 1048576);
        assertEquals(0L, directoryPath.sizeBytes());
        assertEquals(0L, drStoreManager.fstBufferSizeBytes());
        startReplication("cache_2", ignite, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        Ignite ignite2 = G.ignite(DrAbstractTest.TOP2_NODE);
        assertEquals(0L, ignite2.cache("cache_2").size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(ignite).stateTransfer("cache_2", true, new byte[0]);
        GridTestUtils.waitForCondition(() -> {
            try {
                IgniteBiTuple<UUID, DrInternalResponse> nextOutResponse = this.sndHubLsnr.nextOutResponse(15000L);
                if (nextOutResponse != null) {
                    if (((DrInternalResponse) nextOutResponse.getValue()).code() == 1) {
                        return true;
                    }
                }
                return false;
            } catch (Exception e) {
                throw new IgniteException(e);
            }
        }, 15000L);
        assertFalse(isDrStopped(DrAbstractTest.TOP1_NODE, "cache_2"));
        assertEquals(0L, directoryPath.sizeBytes());
        assertTrue(drStoreManager.fstBufferSizeBytes() > 0);
        populateCache.putAll(populateCache(ignite.cache("cache_2"), 5, 5, 1048576));
        GridTestUtils.waitForCondition(() -> {
            return directoryPath.sizeBytes() > 0;
        }, 5000L);
        assertTrue(directoryPath.sizeBytes() > 0);
        assertTrue(drStoreManager.fstBufferSizeBytes() > 0);
        startReceiver(createTopologyReceiver);
        stateTransfer.get(getTestTimeout());
        GridTestUtils.waitForCondition(() -> {
            return directoryPath.sizeBytes() == 0 && drStoreManager.fstBufferSizeBytes() == 0;
        }, 15000L);
        compareCaches(ignite2.cache("cache_2"), populateCache, 5000L);
    }

    @Test
    public void testFullStateTransferBackPressureSenderLeft() throws Exception {
        this.fstBufferSize = 16384L;
        TcpDiscoveryIpFinder createTopologySender = createTopologySender();
        TcpDiscoveryIpFinder createTopologyReceiver = createTopologyReceiver();
        startTopology(createTopologySender);
        startTopology(createTopologyReceiver);
        G.start(senderNodeCfg(createTopologySender, DrAbstractTest.TOP1_NODE_SND, (byte) 1, new DrSenderFsStore().setDirectoryPath(STORE_PATH_1)));
        DrTestQueuedCommunicationSpiListener drTestQueuedCommunicationSpiListener = this.sndHubLsnr;
        Ignite start = G.start(senderNodeCfg(createTopologySender, "top1_node_snd_2", (byte) 1, new DrSenderFsStore().setDirectoryPath(STORE_PATH_2)));
        Ignite ignite = G.ignite(DrAbstractTest.TOP1_NODE);
        stopReplication(SecurityServicePermissionsTest.CACHE_NAME, ignite, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        Map<Object, Object> populateCache = populateCache(ignite.cache(SecurityServicePermissionsTest.CACHE_NAME), 0, 1000);
        startReplication(SecurityServicePermissionsTest.CACHE_NAME, ignite, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        dr(start).localSender().pause((byte) 2);
        assertEquals(0L, G.ignite(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(ignite).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, true, new byte[0]);
        GridTestUtils.waitForCondition(() -> {
            try {
                IgniteBiTuple<UUID, DrInternalResponse> nextOutResponse = drTestQueuedCommunicationSpiListener.nextOutResponse(2000L);
                if (nextOutResponse != null) {
                    if (((DrInternalResponse) nextOutResponse.getValue()).code() == 1) {
                        return true;
                    }
                }
                return false;
            } catch (Exception e) {
                throw new IgniteException(e);
            }
        }, 5000L);
        populateCache.putAll(populateCache(ignite.cache(SecurityServicePermissionsTest.CACHE_NAME), 1000, 100));
        assertFalse(isDrStopped(DrAbstractTest.TOP1_NODE, SecurityServicePermissionsTest.CACHE_NAME));
        assertFalse(stateTransfer.isDone());
        G.stop(DrAbstractTest.TOP1_NODE_SND, true);
        startReceiver(createTopologyReceiver);
        dr(start).localSender().resume((byte) 2);
        stateTransfer.get(getTestTimeout());
        assertFalse(isDrStopped(DrAbstractTest.TOP1_NODE, SecurityServicePermissionsTest.CACHE_NAME));
    }

    private Ignite startReceiver(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
        return G.start(config(DrAbstractTest.TOP2_NODE_RCV, (byte) 2, tcpDiscoveryIpFinder, null, receiverHubConfig(DrAbstractTest.RCV_PORT_1), new CacheConfiguration[0]));
    }

    @NotNull
    private Map<Object, Object> populateCache(Cache<Object, Object> cache, int i, int i2) {
        return populateCache(cache, i, i2, -1);
    }

    @NotNull
    private Map<Object, Object> populateCache(Cache<Object, Object> cache, int i, int i2, int i3) {
        HashMap hashMap = new HashMap();
        for (int i4 = i; i4 < i + i2; i4++) {
            hashMap.put(String.valueOf(i4), i3 > 0 ? new LargeEntry(i3) : Integer.valueOf(i4));
        }
        cache.putAll(hashMap);
        return hashMap;
    }
}
