package org.gridgain.internal.processors.dr.handler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.gridgain.grid.GridDr;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.dr.store.memory.DrSenderInMemoryStore;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrStateTransferKey;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalRequest;
import org.gridgain.internal.processors.dr.DrAbstractTest;
import org.gridgain.internal.processors.dr.util.DrTestCommunicationSpi;
import org.gridgain.internal.processors.dr.util.DrTestQueuedCommunicationSpiListener;
import org.gridgain.plugin.security.SecurityServicePermissionsTest;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/gridgain/internal/processors/dr/handler/DrHandlerFullStateTransferFailoverSelfTest.class */
public class DrHandlerFullStateTransferFailoverSelfTest extends DrAbstractTest {
    private static int CACHE_SIZE;
    private DrTestQueuedCommunicationSpiListener sndHubLsnr;
    private volatile boolean drBlocked;
    private int storeCapacity;

    @Parameterized.Parameter(0)
    public boolean sync;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Parameterized.Parameters(name = "syncMode={0}")
    public static Collection<Object[]> getParameters() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Object[]{Boolean.TRUE});
        arrayList.add(new Object[]{Boolean.FALSE});
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public void afterTest() throws Exception {
        this.sndHubLsnr = null;
        this.storeCapacity = CACHE_SIZE;
        super.afterTest();
    }

    private TcpDiscoveryIpFinder createTopologyReceiver() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        addTopology(ipFinder, config(new GridGainConfiguration(), "top2_node_rcv", (byte) 2, ipFinder, null, receiverHubConfig(12312), cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, false, null, null)));
        return ipFinder;
    }

    private TcpDiscoveryIpFinder createTopologySender() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        addTopology(ipFinder, sendNodeCfg(ipFinder, "top1_node", (byte) 1), sendNodeCfg(ipFinder, "top1_node_2", (byte) 1), sendNodeCfg(ipFinder, "top1_node_3", (byte) 1));
        return ipFinder;
    }

    private <K, V> CacheConfiguration<K, V> senderCacheConfig() {
        CacheConfiguration<K, V> cacheConfig = cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, true);
        CacheDrSenderConfiguration cacheDrSenderConfiguration = new CacheDrSenderConfiguration();
        cacheDrSenderConfiguration.setBatchSendSize(10);
        cacheDrSenderConfiguration.setStateTransferThrottle(500L);
        ggCacheConfig(cacheConfig).setDrSenderConfiguration(cacheDrSenderConfiguration);
        cacheConfig.setAffinity(new RendezvousAffinityFunction(false, 20));
        cacheConfig.setBackups(1);
        return cacheConfig;
    }

    private IgniteConfiguration sendNodeCfg(TcpDiscoveryIpFinder tcpDiscoveryIpFinder, String str, byte b) throws Exception {
        IgniteConfiguration config = config(new GridGainConfiguration(), str, b, tcpDiscoveryIpFinder, null, null, senderCacheConfig());
        config.setCommunicationSpi(new DrTestCommunicationSpi());
        return config;
    }

    private IgniteConfiguration senderCfg(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws Exception {
        GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
        DrSenderConfiguration senderHubConfig = senderHubConfig(senderHubReplicaConfig((byte) 2, "127.0.0.1:12312"));
        senderHubConfig.setCacheNames(new String[]{SecurityServicePermissionsTest.CACHE_NAME});
        senderHubConfig.setMaxQueueSize(10000);
        senderHubConfig.setMaxErrors(3);
        senderHubConfig.setReconnectOnFailureTimeout(500L);
        senderHubConfig.setStore(new DrSenderInMemoryStore().setMaxSize(this.storeCapacity));
        IgniteConfiguration config = config(gridGainConfiguration, "top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, senderHubConfig, null, new CacheConfiguration[0]);
        this.sndHubLsnr = new DrTestQueuedCommunicationSpiListener() { // from class: org.gridgain.internal.processors.dr.handler.DrHandlerFullStateTransferFailoverSelfTest.1
            @Override // org.gridgain.internal.processors.dr.util.DrTestQueuedCommunicationSpiListener, org.gridgain.internal.processors.dr.util.DrTestCommunicationSpiListener
            public boolean onInDrInternalRequest(IgniteSpiContext igniteSpiContext, UUID uuid, DrInternalRequest drInternalRequest) throws Exception {
                return super.onInDrInternalRequest(igniteSpiContext, uuid, drInternalRequest) && !DrHandlerFullStateTransferFailoverSelfTest.this.drBlocked;
            }
        };
        config.setCommunicationSpi(new DrTestCommunicationSpi(this.sndHubLsnr));
        return config;
    }

    @Test
    public void testFullStateTransferFailureOnSenderHubLeft() throws Exception {
        TcpDiscoveryIpFinder createTopologySender = createTopologySender();
        TcpDiscoveryIpFinder createTopologyReceiver = createTopologyReceiver();
        startTopology(createTopologySender);
        Map<Object, Object> populateCache = populateCache(G.ignite("top1_node").cache(SecurityServicePermissionsTest.CACHE_NAME));
        startTopology(createTopologyReceiver);
        this.drBlocked = true;
        G.start(optimize(senderCfg(createTopologySender)));
        awaitDiscovery(createTopologySender, topologySize(createTopologySender) + 1);
        if (!$assertionsDisabled && this.sndHubLsnr.nextInRequest(500L) != null) {
            throw new AssertionError("DR isn't blocked. DR batch has been received.");
        }
        GridDr dr = dr(G.ignite("top1_node"));
        dr.startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted("top1_node", "top1_node_2", "top1_node_3");
        dr.stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
        IgniteFuture<?> stateTransfer = dr(G.ignite("top1_node")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, new byte[]{2});
        IgniteFuture<?> stateTransfer2 = dr(G.ignite("top1_node_2")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, new byte[]{2});
        IgniteFuture<?> stateTransfer3 = dr(G.ignite("top1_node_3")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, new byte[]{2});
        waitStateTransferEntries(systemCache("top1_node"), 1, 10000L);
        stopGrid("top1_node_snd");
        waitDrStopped("top1_node", "top1_node_2", "top1_node_3");
        assertFutureFailed(stateTransfer);
        assertFutureFailed(stateTransfer2);
        assertFutureFailed(stateTransfer3);
        waitStateTransferEntries(systemCache("top1_node"), 0, 10000L);
        assertFutureFailed(dr.stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2}));
        this.drBlocked = false;
        G.start(optimize(senderCfg(createTopologySender)));
        awaitDiscovery(createTopologySender, topologySize(createTopologySender) + 1);
        dr.startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted("top1_node", "top1_node_2", "top1_node_3");
        IgniteFuture stateTransfer4 = dr(G.ignite("top1_node")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
        IgniteFuture stateTransfer5 = dr(G.ignite("top1_node_2")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
        IgniteFuture stateTransfer6 = dr(G.ignite("top1_node_3")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
        stateTransfer4.get();
        stateTransfer5.get();
        stateTransfer6.get();
        compareCaches(G.ignite("top2_node_rcv").cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, 10000L);
    }

    @Test
    public void testFullStateTransferFailureOnReplicationStop() throws Exception {
        TcpDiscoveryIpFinder createTopologySender = createTopologySender();
        TcpDiscoveryIpFinder createTopologyReceiver = createTopologyReceiver();
        startTopology(createTopologySender);
        Map<Object, Object> populateCache = populateCache(G.ignite("top1_node").cache(SecurityServicePermissionsTest.CACHE_NAME));
        startTopology(createTopologyReceiver);
        IgniteCache cache = G.ignite("top2_node_rcv").cache(SecurityServicePermissionsTest.CACHE_NAME);
        G.start(optimize(senderCfg(createTopologySender)));
        awaitDiscovery(createTopologySender, topologySize(createTopologySender) + 1);
        if (!$assertionsDisabled && (this.sndHubLsnr.nextInRequest(1000L) != null || !cache.getAll(populateCache.keySet()).isEmpty())) {
            throw new AssertionError("Replication is not expected at this step.");
        }
        GridDr dr = dr(G.ignite("top1_node"));
        dr.startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted("top1_node", "top1_node_2", "top1_node_3");
        IgniteFuture<?> stateTransfer = dr(G.ignite("top1_node")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
        IgniteFuture<?> stateTransfer2 = dr(G.ignite("top1_node_2")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
        IgniteFuture<?> stateTransfer3 = dr(G.ignite("top1_node_3")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
        waitStateTransferEntries(systemCache("top1_node_2"), 1, 1000L);
        assertFalse(stateTransfer.isDone());
        assertFalse(stateTransfer2.isDone());
        assertFalse(stateTransfer3.isDone());
        dr.stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStopped("top1_node", "top1_node_2", "top1_node_3");
        assertFutureFailed(stateTransfer);
        assertFutureFailed(stateTransfer2);
        assertFutureFailed(stateTransfer3);
        waitStateTransferEntries(systemCache("top1_node"), 0, 10000L);
        assertFutureFailed(dr.stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2}));
        dr.startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted("top1_node", "top1_node_2", "top1_node_3");
        IgniteFuture stateTransfer4 = dr(G.ignite("top1_node")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
        IgniteFuture stateTransfer5 = dr(G.ignite("top1_node_2")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
        IgniteFuture stateTransfer6 = dr(G.ignite("top1_node_3")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
        stateTransfer4.get();
        stateTransfer5.get();
        stateTransfer6.get();
        compareCaches(cache, populateCache, 10000L);
    }

    @Test
    public void testFullStateTransferFailover() throws Exception {
        TcpDiscoveryIpFinder createTopologyReceiver = createTopologyReceiver();
        TcpDiscoveryIpFinder createTopologySender = createTopologySender();
        startTopology(createTopologyReceiver);
        startTopology(createTopologySender);
        G.start(optimize(senderCfg(createTopologySender)));
        GridDr dr = dr(G.ignite("top1_node"));
        dr.stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStopped("top1_node", "top1_node_2", "top1_node_3");
        IgniteCache cache = G.ignite("top2_node_rcv").cache(SecurityServicePermissionsTest.CACHE_NAME);
        IgniteCache<Object, Object> cache2 = G.ignite("top1_node").cache(SecurityServicePermissionsTest.CACHE_NAME);
        Map<Object, Object> populateCache = populateCache(cache2);
        if (!$assertionsDisabled && (this.sndHubLsnr.nextInRequest(1000L) != null || !cache.getAll(populateCache.keySet()).isEmpty())) {
            throw new AssertionError("Replication is not expected at this step.");
        }
        dr.startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted("top1_node", "top1_node_2", "top1_node_3");
        for (int i = 0; i < 2; i++) {
            IgniteFuture stateTransfer = dr(G.ignite("top1_node")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
            IgniteFuture stateTransfer2 = dr(G.ignite("top1_node_3")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
            waitStateTransferEntries(systemCache("top1_node_2"), 1, 1000L);
            assertFalse(stateTransfer.isDone());
            assertFalse(stateTransfer2.isDone());
            stopGrid("top1_node_2");
            stateTransfer.get();
            stateTransfer2.get();
            compareCaches(cache, populateCache, 10000L);
            if (i < 1) {
                G.start(sendNodeCfg(createTopologySender, "top1_node_2", (byte) 1));
                dr.stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
                waitDrStopped("top1_node", "top1_node_2", "top1_node_3");
                populateCache = populateCache(cache2);
                dr.startReplication(SecurityServicePermissionsTest.CACHE_NAME);
                waitDrStarted("top1_node", "top1_node_2", "top1_node_3");
            }
        }
    }

    @Test
    public void testFullStateTransferCancellationIfDrFailed() throws Exception {
        this.storeCapacity = 999;
        TcpDiscoveryIpFinder createTopologyReceiver = createTopologyReceiver();
        TcpDiscoveryIpFinder createTopologySender = createTopologySender();
        startTopology(createTopologyReceiver);
        startTopology(createTopologySender);
        G.ignite("top2_node_rcv").destroyCache(SecurityServicePermissionsTest.CACHE_NAME);
        populateCache(G.ignite("top1_node").cache(SecurityServicePermissionsTest.CACHE_NAME));
        G.start(optimize(senderCfg(createTopologySender)));
        awaitDiscovery(createTopologySender, topologySize(createTopologySender) + 1);
        dr(G.ignite("top1_node")).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted("top1_node", "top1_node_2", "top1_node_3");
        for (int i = 0; i < GridTestUtils.SF.apply(10, 3, 10); i++) {
            IgniteFuture<?> stateTransfer = dr(G.ignite("top1_node")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
            IgniteFuture<?> stateTransfer2 = dr(G.ignite("top1_node_2")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
            IgniteFuture<?> stateTransfer3 = dr(G.ignite("top1_node_3")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, this.sync, new byte[]{2});
            assertFutureFailed(stateTransfer);
            assertFutureFailed(stateTransfer2);
            assertFutureFailed(stateTransfer3);
            waitStateTransferEntries(systemCache("top1_node"), 0, 10000L);
        }
    }

    private <K, V> IgniteInternalCache<K, V> systemCache(String str) {
        IgniteInternalCache<K, V> utilityCache = G.ignite(str).context().cache().utilityCache();
        if ($assertionsDisabled || utilityCache != null) {
            return utilityCache;
        }
        throw new AssertionError();
    }

    @NotNull
    private Map<Object, Object> populateCache(IgniteCache<Object, Object> igniteCache) {
        HashMap hashMap = new HashMap(CACHE_SIZE);
        for (int i = 0; i < CACHE_SIZE; i++) {
            hashMap.put(String.valueOf(i), Integer.valueOf(i));
        }
        igniteCache.putAll(hashMap);
        return hashMap;
    }

    @NotNull
    private Map<Object, Object> updateCacheData(IgniteCache<Object, Object> igniteCache, Object obj) {
        HashMap hashMap = new HashMap(CACHE_SIZE);
        for (int i = 0; i < CACHE_SIZE; i++) {
            hashMap.put(String.valueOf(i), obj);
        }
        igniteCache.putAll(hashMap);
        return hashMap;
    }

    private void assertFutureFailed(IgniteFuture<?> igniteFuture) {
        GridTestUtils.assertThrows(log, () -> {
            return igniteFuture.get(10000L);
        }, IgniteException.class, (String) null);
    }

    private void waitStateTransferEntries(final IgniteInternalCache<Object, Object> igniteInternalCache, final int i, long j) throws Exception {
        if (!$assertionsDisabled && !GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.gridgain.internal.processors.dr.handler.DrHandlerFullStateTransferFailoverSelfTest.2
            public boolean apply() {
                return DrHandlerFullStateTransferFailoverSelfTest.this.stateTransferEntries(igniteInternalCache) == i;
            }
        }, j)) {
            throw new AssertionError();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int stateTransferEntries(IgniteInternalCache<Object, Object> igniteInternalCache) {
        int i = 0;
        Iterator it = igniteInternalCache.keySet().iterator();
        while (it.hasNext()) {
            if (it.next() instanceof CacheDrStateTransferKey) {
                i++;
            }
        }
        return i;
    }

    static {
        $assertionsDisabled = !DrHandlerFullStateTransferFailoverSelfTest.class.desiredAssertionStatus();
        CACHE_SIZE = 1000;
    }
}
