package org.gridgain.internal.processors.dr;

import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.metric.MetricExporterSpi;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.cache.dr.CacheDrSenderMetrics;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.dr.DrSenderConnectionConfiguration;
import org.gridgain.grid.dr.DrSenderConnectionState;
import org.gridgain.grid.dr.DrSenderLoadBalancingMode;
import org.gridgain.grid.dr.store.DrSenderStoreOverflowMode;
import org.gridgain.grid.dr.store.memory.DrSenderInMemoryStore;
import org.gridgain.internal.processors.dr.qa.DrJmxMetricsAbstractTest;
import org.gridgain.plugin.security.SecurityServicePermissionsTest;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
import org.junit.Test;

@WithSystemProperty(key = "IGNITE_DISABLE_SMART_DR_THROTTLING", value = "true")
/* loaded from: input_file:org/gridgain/internal/processors/dr/DrSenderPauseSelfTest.class */
public class DrSenderPauseSelfTest extends DrAbstractTest {
    private static final int STORE_SIZE = 10;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public boolean useSenderGroups() {
        return true;
    }

    private TcpDiscoveryIpFinder createTopologySender() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        DrSenderConfiguration senderGroups = senderHubConfig(senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1), senderHubReplicaConfig((byte) 3, "127.0.0.1:12312")).setSenderGroups(new String[]{"group-1"});
        DrSenderConfiguration reconnectOnFailureTimeout = senderHubConfig(senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1), senderHubReplicaConfig((byte) 3, "127.0.0.1:12312")).setSenderGroups(new String[]{"group-1"}).setReconnectOnFailureTimeout(1000L);
        addTopology(ipFinder, config(new GridGainConfiguration(), DrAbstractTest.TOP1_NODE_SND, (byte) 1, ipFinder, senderGroups, null, new CacheConfiguration[0]).setMetricExporterSpi(new MetricExporterSpi[]{new JmxMetricExporterSpi()}), config(new GridGainConfiguration(), "top1_node_snd_2", (byte) 1, ipFinder, reconnectOnFailureTimeout, null, new CacheConfiguration[0]).setMetricExporterSpi(new MetricExporterSpi[]{new JmxMetricExporterSpi()}), config(new GridGainConfiguration(), DrAbstractTest.TOP1_NODE, (byte) 1, ipFinder, null, null, cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, true)), config(new GridGainConfiguration(), DrAbstractTest.TOP1_NODE_2, (byte) 1, ipFinder, null, null, cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, true)));
        return ipFinder;
    }

    private TcpDiscoveryIpFinder createTopologyReceiver1() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        IgniteConfiguration config = config(new GridGainConfiguration(), DrAbstractTest.TOP2_NODE_RCV, (byte) 2, ipFinder, null, receiverHubConfig(DrAbstractTest.RCV_PORT_1), null);
        GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
        CacheConfiguration cacheConfig = cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED);
        ggCacheConfig(cacheConfig);
        addTopology(ipFinder, config, config(gridGainConfiguration, DrAbstractTest.TOP2_NODE, (byte) 2, ipFinder, null, null, cacheConfig));
        return ipFinder;
    }

    private TcpDiscoveryIpFinder createTopologyReceiver2() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        IgniteConfiguration config = config(new GridGainConfiguration(), "top3_node_rcv", (byte) 3, ipFinder, null, receiverHubConfig(DrAbstractTest.RCV_PORT_2), null);
        GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
        CacheConfiguration cacheConfig = cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED);
        ggCacheConfig(cacheConfig);
        addTopology(ipFinder, config, config(gridGainConfiguration, "top3_node", (byte) 3, ipFinder, null, null, cacheConfig));
        return ipFinder;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public CacheConfiguration cacheConfig(@Nullable String str, CacheMode cacheMode, boolean z) {
        CacheConfiguration cacheConfig = super.cacheConfig(str, cacheMode, z);
        if (z) {
            CacheDrSenderConfiguration drSenderConfiguration = ggCacheConfig(cacheConfig).getDrSenderConfiguration();
            if (!$assertionsDisabled && drSenderConfiguration == null) {
                throw new AssertionError();
            }
            drSenderConfiguration.setSenderGroup("group-1");
            drSenderConfiguration.setBatchSendSize(1);
            drSenderConfiguration.setLoadBalancingMode(DrSenderLoadBalancingMode.DR_ROUND_ROBIN);
        }
        return cacheConfig;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public DrSenderConnectionConfiguration senderHubReplicaConfig(byte b, String... strArr) {
        DrSenderConnectionConfiguration senderHubReplicaConfig = super.senderHubReplicaConfig(b, strArr);
        senderHubReplicaConfig.setStore(new DrSenderInMemoryStore().setMaxSize(STORE_SIZE).setOverflowMode(DrSenderStoreOverflowMode.STOP));
        senderHubReplicaConfig.setAwaitAcknowledge(true);
        return senderHubReplicaConfig;
    }

    @Test
    public void testDrPauseResume() throws Exception {
        startTopology(createTopologyReceiver1());
        startTopology(createTopologyReceiver2());
        startTopology(createTopologySender());
        IgniteCache<Object, Object> cache = G.ignite(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        dr(G.ignite(DrAbstractTest.TOP1_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        checkCacheNotPaused();
        IgniteCache cache2 = G.ignite(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        IgniteCache cache3 = G.ignite("top3_node").cache(SecurityServicePermissionsTest.CACHE_NAME);
        Map<Object, Object> updateEntries = updateEntries(cache, 1);
        compareCaches(cache2, updateEntries, 10000L);
        compareCaches(cache3, updateEntries, 10000L);
        dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().pause((byte) 3);
        Assert.assertTrue(dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().paused((byte) 3));
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().connection((byte) 2).connectionState());
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite("top1_node_snd_2")).localSender().connection((byte) 2).connectionState());
        assertEquals(DrSenderConnectionState.DISCONNECTED, dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().connection((byte) 3).connectionState());
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite("top1_node_snd_2")).localSender().connection((byte) 3).connectionState());
        U.sleep(500L);
        checkCacheNotPaused();
        Map<Object, Object> updateEntries2 = updateEntries(cache, 2);
        Thread.sleep(2000L);
        checkCacheNotPaused();
        compareCaches(cache2, updateEntries2, 10000L);
        assertFalse(updateEntries2.equals(cache3.getAll(updateEntries2.keySet())));
        dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().resume((byte) 3);
        Thread.sleep(2000L);
        Assert.assertFalse(dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().paused((byte) 3));
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().connection((byte) 2).connectionState());
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite("top1_node_snd_2")).localSender().connection((byte) 2).connectionState());
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().connection((byte) 3).connectionState());
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite("top1_node_snd_2")).localSender().connection((byte) 3).connectionState());
        assertEquals(updateEntries2, cache2.getAll(updateEntries2.keySet()));
        compareCaches(cache3, updateEntries2, 10000L);
        dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().pause((byte) 3);
        Assert.assertTrue(dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().paused((byte) 3));
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().connection((byte) 2).connectionState());
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite("top1_node_snd_2")).localSender().connection((byte) 2).connectionState());
        assertEquals(DrSenderConnectionState.DISCONNECTED, dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().connection((byte) 3).connectionState());
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite("top1_node_snd_2")).localSender().connection((byte) 3).connectionState());
        updateEntries(cache, 3);
        updateEntries(cache, 4);
        Thread.sleep(2000L);
        checkCacheNotPaused();
        Map<Object, Object> updateEntries3 = updateEntries(cache, 5);
        compareCaches(cache2, updateEntries3, 10000L);
        compareCaches(cache3, updateEntries3, 10000L);
        dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().pause((byte) 3);
        dr(G.ignite("top1_node_snd_2")).localSender().pause((byte) 3);
        Assert.assertTrue(dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().paused((byte) 3));
        Assert.assertTrue(dr(G.ignite("top1_node_snd_2")).localSender().paused((byte) 3));
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().connection((byte) 2).connectionState());
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite("top1_node_snd_2")).localSender().connection((byte) 2).connectionState());
        assertEquals(DrSenderConnectionState.DISCONNECTED, dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().connection((byte) 3).connectionState());
        assertEquals(DrSenderConnectionState.DISCONNECTED, dr(G.ignite("top1_node_snd_2")).localSender().connection((byte) 3).connectionState());
        checkCacheNotPaused();
        Map<Object, Object> updateEntries4 = updateEntries(cache, 6);
        compareCaches(cache2, updateEntries4, 10000L);
        assertEquals(updateEntries3, cache3.getAll(updateEntries3.keySet()));
        checkCacheNotPaused();
        dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().resume((byte) 3);
        dr(G.ignite("top1_node_snd_2")).localSender().resume((byte) 3);
        Thread.sleep(2000L);
        Assert.assertFalse(dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().paused((byte) 3));
        Assert.assertFalse(dr(G.ignite("top1_node_snd_2")).localSender().paused((byte) 3));
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().connection((byte) 2).connectionState());
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite("top1_node_snd_2")).localSender().connection((byte) 2).connectionState());
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().connection((byte) 3).connectionState());
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite("top1_node_snd_2")).localSender().connection((byte) 3).connectionState());
        U.sleep(2000L);
        compareCaches(cache3, updateEntries4, 10000L);
        Map<Object, Object> updateEntries5 = updateEntries(cache, 7);
        compareCaches(cache2, updateEntries5, 10000L);
        compareCaches(cache3, updateEntries5, 10000L);
        dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().pause((byte) 3);
        dr(G.ignite("top1_node_snd_2")).localSender().pause((byte) 3);
        Assert.assertTrue(dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().paused((byte) 3));
        Assert.assertTrue(dr(G.ignite("top1_node_snd_2")).localSender().paused((byte) 3));
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().connection((byte) 2).connectionState());
        assertEquals(DrSenderConnectionState.CONNECTED, dr(G.ignite("top1_node_snd_2")).localSender().connection((byte) 2).connectionState());
        assertEquals(DrSenderConnectionState.DISCONNECTED, dr(G.ignite(DrAbstractTest.TOP1_NODE_SND)).localSender().connection((byte) 3).connectionState());
        assertEquals(DrSenderConnectionState.DISCONNECTED, dr(G.ignite("top1_node_snd_2")).localSender().connection((byte) 3).connectionState());
        checkCacheNotPaused();
        updateEntries(cache, 8);
        updateEntries(cache, 9);
        updateEntries(cache, STORE_SIZE);
        updateEntries(cache, 11);
        checkCachePaused();
    }

    @Test
    public void testPauseOnRemoteCacheDoesntExist() throws Exception {
        startTopology(createTopologyReceiver1());
        startTopology(createTopologyReceiver2());
        startTopology(createTopologySender());
        G.ignite(DrAbstractTest.TOP1_NODE).createCache(cacheConfig("dyn_cache", CacheMode.PARTITIONED, true)).put(0, 0);
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return dr(G.ignite(DrAbstractTest.TOP1_NODE)).senderCacheStatus("dyn_cache").stopped();
        }, 5000L));
        DrJmxMetricsAbstractTest.MetricsStateHolder add = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP1_NODE_SND), IgniteUtils.makeMBeanName(DrAbstractTest.TOP1_NODE_SND, "dr", "sender.dc2.dyn_cache")).add(DrJmxMetricsAbstractTest.metrics(grid("top1_node_snd_2"), IgniteUtils.makeMBeanName("top1_node_snd_2", "dr", "sender.dc2.dyn_cache")));
        DrJmxMetricsAbstractTest.MetricsStateHolder add2 = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP1_NODE_SND), IgniteUtils.makeMBeanName(DrAbstractTest.TOP1_NODE_SND, "dr", "sender.dc3.dyn_cache")).add(DrJmxMetricsAbstractTest.metrics(grid("top1_node_snd_2"), IgniteUtils.makeMBeanName("top1_node_snd_2", "dr", "sender.dc3.dyn_cache")));
        assertEquals(1L, add.metric("BatchesFailed"));
        assertEquals(1L, add.metric("EntriesFailed"));
        assertTrue(add.metric("BytesFailed") > 0);
        assertEquals(1L, add2.metric("BatchesFailed"));
        assertEquals(1L, add2.metric("EntriesFailed"));
        assertTrue(add2.metric("BytesFailed") > 0);
        assertFalse(dr(G.ignite(DrAbstractTest.TOP1_NODE)).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
    }

    private void checkCacheNotPaused() {
        CacheDrSenderMetrics senderCacheMetrics = dr(G.ignite(DrAbstractTest.TOP1_NODE)).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        CacheDrSenderMetrics senderCacheMetrics2 = dr(G.ignite(DrAbstractTest.TOP1_NODE_2)).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        assertNull(senderCacheMetrics.status().reason());
        assertNull(senderCacheMetrics2.status().reason());
    }

    private void checkCachePaused() throws IgniteInterruptedCheckedException {
        CacheDrSenderMetrics senderCacheMetrics = dr(G.ignite(DrAbstractTest.TOP1_NODE)).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        CacheDrSenderMetrics senderCacheMetrics2 = dr(G.ignite(DrAbstractTest.TOP1_NODE_2)).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        assertTrue("Failed to wait for replication to stop. [reason1=" + senderCacheMetrics.status().reason() + ", reason2=" + senderCacheMetrics2.status().reason() + "]", GridTestUtils.waitForCondition(() -> {
            return (senderCacheMetrics.status().reason() == null || senderCacheMetrics2.status().reason() == null) ? false : true;
        }, 5000L));
    }

    @NotNull
    private Map<Object, Object> updateEntries(IgniteCache<Object, Object> igniteCache, int i) {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < STORE_SIZE; i2++) {
            String valueOf = String.valueOf(i2);
            hashMap.put(valueOf, Integer.valueOf(i));
            igniteCache.put(valueOf, Integer.valueOf(i));
        }
        return hashMap;
    }

    static {
        $assertionsDisabled = !DrSenderPauseSelfTest.class.desiredAssertionStatus();
    }
}
