package org.gridgain.internal.processors.dr.handler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.cache.Cache;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheStoppedException;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.dr.DrSenderConnectionConfiguration;
import org.gridgain.grid.dr.store.fs.DrSenderFsStore;
import org.gridgain.grid.dr.store.memory.DrSenderInMemoryStore;
import org.gridgain.internal.processors.dr.DrAbstractTest;
import org.gridgain.internal.processors.dr.DrThreadPoolSelfTest;
import org.gridgain.plugin.security.SecurityServicePermissionsTest;
import org.jetbrains.annotations.NotNull;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/gridgain/internal/processors/dr/handler/DrFullStateTransferQaTest.class */
public class DrFullStateTransferQaTest extends DrAbstractTest {
    private static final int SENDER_BATCH_SIZE = 10;
    private static final int SENDER_BATCH_FREQ = 200;
    private static final String TOP1_CLI = "cli_1";
    private static final String TOP1_NODE_SND_3 = "top1_node_snd_3";
    private static final String TOP2_NODE_RCV_3 = "top2_node_rcv_3";
    private static final String STORE_PATH_1;
    private static final String STORE_PATH_2;
    private static final String STORE_PATH_3;
    private static final int PARTITIONS_COUNT = 32;
    private boolean useClient = true;
    private boolean persistent;

    @Parameterized.Parameter
    public boolean syncMode;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Parameterized.Parameters(name = "Synchronous FST = {0}")
    public static List<Object[]> parameters() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Object[]{Boolean.FALSE});
        arrayList.add(new Object[]{Boolean.TRUE});
        return arrayList;
    }

    private boolean syncFST() {
        return this.syncMode;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public boolean useSenderGroups() {
        return true;
    }

    protected long getTestTimeout() {
        return 90000L;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public void beforeTest() throws Exception {
        this.persistent = false;
        clearStores(STORE_PATH_1, STORE_PATH_2, STORE_PATH_3);
        cleanPersistenceDir();
        super.beforeTest();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public void afterTest() throws Exception {
        super.afterTest();
        clearStores(STORE_PATH_1, STORE_PATH_2, STORE_PATH_3);
        cleanPersistenceDir();
        System.clearProperty("IGNITE_DISABLE_SMART_DR_THROTTLING");
    }

    private TcpDiscoveryIpFinder createReceiverTopology1() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
        GridGainConfiguration gridGainConfiguration2 = new GridGainConfiguration();
        GridGainConfiguration gridGainConfiguration3 = new GridGainConfiguration();
        GridGainConfiguration gridGainConfiguration4 = new GridGainConfiguration();
        addTopology(ipFinder, config(gridGainConfiguration, DrAbstractTest.TOP2_NODE, (byte) 2, ipFinder, null, null, cacheConfiguration()), config(gridGainConfiguration2, "top2_node_2", (byte) 2, ipFinder, null, null, cacheConfiguration()), config(gridGainConfiguration3, DrAbstractTest.TOP2_NODE_RCV, (byte) 2, ipFinder, null, receiverHubConfig(DrAbstractTest.RCV_PORT_1), this.useClient, new CacheConfiguration[0]), config(gridGainConfiguration4, "top2_node_rcv_2", (byte) 2, ipFinder, null, receiverHubConfig(DrAbstractTest.RCV_PORT_2), this.useClient, new CacheConfiguration[0]));
        return ipFinder;
    }

    private TcpDiscoveryIpFinder createReceiverTopology2() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
        GridGainConfiguration gridGainConfiguration2 = new GridGainConfiguration();
        GridGainConfiguration gridGainConfiguration3 = new GridGainConfiguration();
        GridGainConfiguration gridGainConfiguration4 = new GridGainConfiguration();
        addTopology(ipFinder, config(gridGainConfiguration, "top3_node", (byte) 3, ipFinder, null, null, cacheConfiguration()), config(gridGainConfiguration2, "top3_node_2", (byte) 3, ipFinder, null, null, cacheConfiguration()), config(gridGainConfiguration3, "top3_node_rcv", (byte) 3, ipFinder, null, receiverHubConfig(12313), this.useClient, new CacheConfiguration[0]), config(gridGainConfiguration4, "top3_node_rcv_2", (byte) 3, ipFinder, null, receiverHubConfig(12314), this.useClient, new CacheConfiguration[0]));
        return ipFinder;
    }

    private CacheConfiguration<?, ?> cacheConfiguration() {
        return senderCacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, SENDER_BATCH_SIZE, 200L).setAffinity(new RendezvousAffinityFunction().setPartitions(PARTITIONS_COUNT));
    }

    private TcpDiscoveryIpFinder createSenderTopology() throws Exception {
        return createSenderTopology(true, false);
    }

    private TcpDiscoveryIpFinder createSenderTopology(boolean z, boolean z2) throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
        GridGainConfiguration gridGainConfiguration2 = new GridGainConfiguration();
        CacheConfiguration<?, ?> cacheConfiguration = cacheConfiguration();
        CacheConfiguration<?, ?> cacheConfiguration2 = cacheConfiguration();
        IgniteConfiguration config = config(gridGainConfiguration, DrAbstractTest.TOP1_NODE, (byte) 1, ipFinder, null, null, cacheConfiguration);
        IgniteConfiguration config2 = config(gridGainConfiguration2, DrAbstractTest.TOP1_NODE_2, (byte) 1, ipFinder, null, null, cacheConfiguration2);
        if (this.persistent) {
            config.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(268435456L).setPersistenceEnabled(true)));
            config2.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(268435456L).setPersistenceEnabled(true)));
        }
        if (z) {
            DrSenderConnectionConfiguration senderHubReplicaConfig = senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312");
            DrSenderConnectionConfiguration senderHubReplicaConfig2 = senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312");
            if (z2) {
                senderHubReplicaConfig.setStore(new DrSenderFsStore().setDirectoryPath(STORE_PATH_1));
                senderHubReplicaConfig2.setStore(new DrSenderFsStore().setDirectoryPath(STORE_PATH_2));
            }
            addTopology(ipFinder, config, config2, createSender(ipFinder, (byte) 1, DrAbstractTest.TOP1_NODE_SND, senderHubReplicaConfig), createSender(ipFinder, (byte) 1, "top1_node_snd_2", senderHubReplicaConfig2));
        } else {
            addTopology(ipFinder, config, config2);
        }
        return ipFinder;
    }

    private TcpDiscoveryIpFinder createSenderTopology2() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
        GridGainConfiguration gridGainConfiguration2 = new GridGainConfiguration();
        CacheConfiguration<?, ?> cacheConfiguration = cacheConfiguration();
        CacheConfiguration<?, ?> cacheConfiguration2 = cacheConfiguration();
        IgniteConfiguration config = config(gridGainConfiguration, "top3_node", (byte) 3, ipFinder, null, null, cacheConfiguration);
        IgniteConfiguration config2 = config(gridGainConfiguration2, "top3_node_2", (byte) 3, ipFinder, null, null, cacheConfiguration2);
        addTopology(ipFinder, config, config2);
        addTopology(ipFinder, config, config2, createSender(ipFinder, (byte) 3, "top3_node_snd", senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312")), createSender(ipFinder, (byte) 3, "top3_node_snd_2", senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312")));
        return ipFinder;
    }

    private IgniteConfiguration createSender(TcpDiscoveryIpFinder tcpDiscoveryIpFinder, byte b, String str, DrSenderConnectionConfiguration... drSenderConnectionConfigurationArr) throws Exception {
        if (!$assertionsDisabled && F.isEmpty(drSenderConnectionConfigurationArr)) {
            throw new AssertionError();
        }
        GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
        DrSenderConfiguration senderHubConfig = senderHubConfig(drSenderConnectionConfigurationArr);
        senderHubConfig.setMaxQueueSize(SENDER_BATCH_SIZE);
        senderHubConfig.setMaxErrors(30);
        senderHubConfig.setReconnectOnFailureTimeout(300L);
        if (Arrays.stream(drSenderConnectionConfigurationArr).noneMatch(drSenderConnectionConfiguration -> {
            return drSenderConnectionConfiguration.getStore() != null;
        })) {
            senderHubConfig.setStore(new DrSenderInMemoryStore());
        }
        return config(gridGainConfiguration, str, b, tcpDiscoveryIpFinder, senderHubConfig, null, this.useClient, new CacheConfiguration[0]);
    }

    private IgniteConfiguration createReceiver(TcpDiscoveryIpFinder tcpDiscoveryIpFinder, byte b, String str, int i) throws Exception {
        return config(new GridGainConfiguration(), str, b, tcpDiscoveryIpFinder, null, receiverHubConfig(i), this.useClient, new CacheConfiguration[0]);
    }

    @Test
    public void testFullStateTransferSimple() throws Exception {
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        Map<Object, Object> populateCache = populateCache(grid(DrAbstractTest.TOP1_NODE), SecurityServicePermissionsTest.CACHE_NAME, 0, 40);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2}).get(getTestTimeout());
        compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
    }

    @Test
    public void testFullStateTransferTwoRemotes() throws Exception {
        int apply = GridTestUtils.SF.apply(1000);
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology(false, false);
        TcpDiscoveryIpFinder createReceiverTopology1 = createReceiverTopology1();
        TcpDiscoveryIpFinder createReceiverTopology2 = createReceiverTopology2();
        startTopology(createReceiverTopology1);
        startTopology(createReceiverTopology2);
        startTopology(createSenderTopology);
        G.start(createSender(createSenderTopology, (byte) 1, DrAbstractTest.TOP1_NODE_SND, senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312"), senderHubReplicaConfig((byte) 3, "127.0.0.1:12313", "127.0.0.1:12314")));
        G.start(createSender(createSenderTopology, (byte) 1, "top1_node_snd_2", senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312"), senderHubReplicaConfig((byte) 3, "127.0.0.1:12313", "127.0.0.1:12314")));
        Map<Object, Object> populateCache = populateCache(grid(DrAbstractTest.TOP1_NODE), SecurityServicePermissionsTest.CACHE_NAME, 0, apply);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        assertEquals(0, grid("top3_node").cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[0]).get(getTestTimeout());
        compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
        compareCaches(grid("top3_node").cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
    }

    @Test
    public void testFullStateTransferStartTwoMasters() throws Exception {
        int apply = GridTestUtils.SF.apply(1000);
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        TcpDiscoveryIpFinder createSenderTopology2 = createSenderTopology2();
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        startTopology(createSenderTopology2);
        dr(grid(DrAbstractTest.TOP1_NODE)).stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        dr(grid("top3_node")).stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        assertTrue(dr(grid(DrAbstractTest.TOP1_NODE)).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
        assertTrue(dr(grid("top3_node")).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
        Map<Object, Object> populateCache = populateCache(grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), 0, apply);
        Map<Object, Object> populateCache2 = populateCache(grid("top3_node").cache(SecurityServicePermissionsTest.CACHE_NAME), apply, apply);
        dr(grid(DrAbstractTest.TOP1_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        dr(grid("top3_node")).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted(DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        waitDrStarted("top3_node", "top3_node_2");
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        IgniteFuture stateTransfer2 = dr(grid("top3_node")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        GridCompoundFuture gridCompoundFuture = new GridCompoundFuture();
        gridCompoundFuture.add(GridTestUtils.runAsync(() -> {
            stateTransfer.get(getTestTimeout());
            compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
            return null;
        }));
        gridCompoundFuture.add(GridTestUtils.runAsync(() -> {
            stateTransfer2.get(getTestTimeout());
            compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache2, populateCache2.keySet());
            return null;
        }));
        try {
            gridCompoundFuture.markInitialized();
            gridCompoundFuture.get(getTestTimeout());
            if (!stateTransfer.isDone()) {
                stateTransfer.cancel();
            }
            if (stateTransfer2.isDone()) {
                return;
            }
            stateTransfer2.cancel();
        } catch (Throwable th) {
            if (!stateTransfer.isDone()) {
                stateTransfer.cancel();
            }
            if (!stateTransfer2.isDone()) {
                stateTransfer2.cancel();
            }
            throw th;
        }
    }

    @Test
    public void testFullStateTransferAddNodes() throws Exception {
        int apply = GridTestUtils.SF.apply(1000);
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        Map<Object, Object> populateCache = populateCache(grid(DrAbstractTest.TOP1_NODE), SecurityServicePermissionsTest.CACHE_NAME, 0, apply);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        G.start(config(new GridGainConfiguration(), "top1_node_3", (byte) 1, createSenderTopology, null, null, false, new CacheConfiguration[0]));
        G.start(config(new GridGainConfiguration(), TOP1_CLI, (byte) 1, createSenderTopology, null, null, true, new CacheConfiguration[0]));
        stateTransfer.get(getTestTimeout());
        compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
    }

    @Test
    public void testFullStateTransferSenderRollingRestart() throws Exception {
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology(true, true);
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        Map<Object, Object> populateCache = populateCache(grid(DrAbstractTest.TOP1_NODE), SecurityServicePermissionsTest.CACHE_NAME, 0, DrThreadPoolSelfTest.SYS_WORKER_BLOCKED_TIMEOUT);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        G.stop(DrAbstractTest.TOP1_NODE_SND, false);
        G.start(createSender(createSenderTopology, (byte) 1, DrAbstractTest.TOP1_NODE_SND, senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312").setStore(new DrSenderFsStore().setDirectoryPath(STORE_PATH_1))));
        Thread.sleep(500L);
        G.stop("top1_node_snd_2", false);
        assertFalse(isDrStopped(DrAbstractTest.TOP1_NODE, SecurityServicePermissionsTest.CACHE_NAME));
        assertFalse(stateTransfer.isDone());
        G.start(createSender(createSenderTopology, (byte) 1, "top1_node_snd_2", senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312").setStore(new DrSenderFsStore().setDirectoryPath(STORE_PATH_2))));
        G.start(createSender(createSenderTopology, (byte) 1, TOP1_NODE_SND_3, senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312").setStore(new DrSenderFsStore().setDirectoryPath(STORE_PATH_3))));
        stateTransfer.get(getTestTimeout());
        compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
    }

    @Test
    public void testFullStateTransferReceiverRollingRestart() throws Exception {
        int apply = GridTestUtils.SF.apply(1000);
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology(false, false);
        TcpDiscoveryIpFinder createReceiverTopology1 = createReceiverTopology1();
        startTopology(createReceiverTopology1);
        startTopology(createSenderTopology);
        G.start(createSender(createSenderTopology, (byte) 1, DrAbstractTest.TOP1_NODE_SND, senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312", "127.0.0.1:12313")));
        G.start(createSender(createSenderTopology, (byte) 1, "top1_node_snd_2", senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312", "127.0.0.1:12313")));
        Map<Object, Object> populateCache = populateCache(grid(DrAbstractTest.TOP1_NODE), SecurityServicePermissionsTest.CACHE_NAME, 0, apply);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        G.stop(DrAbstractTest.TOP2_NODE_RCV, false);
        G.start(createReceiver(createReceiverTopology1, (byte) 2, DrAbstractTest.TOP2_NODE_RCV, DrAbstractTest.RCV_PORT_1));
        G.stop("top2_node_rcv_2", false);
        G.start(createReceiver(createReceiverTopology1, (byte) 2, "top2_node_rcv_2", DrAbstractTest.RCV_PORT_2));
        G.start(createReceiver(createReceiverTopology1, (byte) 2, TOP2_NODE_RCV_3, 12313));
        stateTransfer.get(getTestTimeout());
        compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
    }

    @Test
    public void testFullStateTransferAllSendersStop() throws Exception {
        int apply = GridTestUtils.SF.apply(1000);
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology(true, true);
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        Map<Object, Object> populateCache = populateCache(grid(DrAbstractTest.TOP1_NODE), SecurityServicePermissionsTest.CACHE_NAME, 0, apply);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        Thread.sleep(200L);
        G.stop(DrAbstractTest.TOP1_NODE_SND, false);
        G.stop("top1_node_snd_2", false);
        Throwable assertThrows = GridTestUtils.assertThrows(log, () -> {
            return stateTransfer.get(getTestTimeout());
        }, IgniteException.class, "State transfer is cancelled:");
        assertTrue(X.getFullStackTrace(assertThrows), X.hasCause(assertThrows, "reason=NO_SND_HUBS,", new Class[]{IgniteCheckedException.class}) || X.hasCause(assertThrows, "reason=BATCH_FAILED,", new Class[]{IgniteCheckedException.class}));
        waitDrStopped(DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        populateCache.putAll(populateCache(grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), apply, apply));
        G.start(createSender(createSenderTopology, (byte) 1, DrAbstractTest.TOP1_NODE_SND, senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312")));
        G.start(createSender(createSenderTopology, (byte) 1, "top1_node_snd_2", senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312")));
        dr(grid(DrAbstractTest.TOP1_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted(DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2}).get(getTestTimeout());
        compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
    }

    @Test
    public void testFullStateTransferAllReceiversStop() throws Exception {
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        TcpDiscoveryIpFinder createReceiverTopology1 = createReceiverTopology1();
        startTopology(createReceiverTopology1);
        startTopology(createSenderTopology);
        Map<Object, Object> populateCache = populateCache(grid(DrAbstractTest.TOP1_NODE), SecurityServicePermissionsTest.CACHE_NAME, 0, 1500);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        G.stop(DrAbstractTest.TOP2_NODE_RCV, false);
        G.stop("top2_node_rcv_2", false);
        Thread.sleep(2000L);
        assertFalse(stateTransfer.isDone());
        G.start(createReceiver(createReceiverTopology1, (byte) 2, DrAbstractTest.TOP2_NODE_RCV, DrAbstractTest.RCV_PORT_1));
        G.start(createReceiver(createReceiverTopology1, (byte) 2, "top2_node_rcv_2", DrAbstractTest.RCV_PORT_2));
        stateTransfer.get(getTestTimeout());
        compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
    }

    @Test
    public void testFullStateTransferDestroyCacheOnSenderSide() throws Exception {
        int apply = GridTestUtils.SF.apply(1000);
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        Map<Object, Object> populateCache = populateCache(grid(DrAbstractTest.TOP1_NODE), SecurityServicePermissionsTest.CACHE_NAME, 0, apply);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        Thread.sleep(200L);
        grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).destroy();
        Throwable assertThrows = GridTestUtils.assertThrows(log, () -> {
            return stateTransfer.get(getTestTimeout());
        }, IgniteException.class, "State transfer is cancelled");
        assertTrue(X.getFullStackTrace(assertThrows), X.hasCause(assertThrows, "Failed to perform cache operation (cache is stopped): cache", new Class[]{CacheStoppedException.class}));
        grid(DrAbstractTest.TOP1_NODE).createCache(cacheConfiguration());
        dr(grid(DrAbstractTest.TOP1_NODE)).stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        assertTrue(dr(grid(DrAbstractTest.TOP1_NODE)).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
        assertFalse(populateCache.size() == grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        Map<Object, Object> populateCache2 = populateCache(grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), 0, apply + 100);
        dr(grid(DrAbstractTest.TOP1_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted(DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2}).get(getTestTimeout());
        compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache2, populateCache2.keySet());
    }

    @Test
    @Ignore("https://ggsystems.atlassian.net/browse/GG-25486")
    public void testFullStateTransferDestroyCacheOnReceiverSide() throws Exception {
        int apply = GridTestUtils.SF.apply(1000);
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        dr(grid(DrAbstractTest.TOP1_NODE)).stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        assertTrue(dr(grid(DrAbstractTest.TOP1_NODE)).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
        populateCache(grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), 0, apply);
        dr(grid(DrAbstractTest.TOP1_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted(DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        Thread.sleep(300L);
        grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).destroy();
        Throwable assertThrows = GridTestUtils.assertThrows(log, () -> {
            return stateTransfer.get(getTestTimeout());
        }, IgniteException.class, "State transfer is cancelled:");
        assertTrue(X.getFullStackTrace(assertThrows), X.hasCause(assertThrows, "errMsg=All available sender hubs failed to process data center replication batch.", new Class[]{IgniteCheckedException.class}));
        grid(DrAbstractTest.TOP2_NODE).createCache(cacheConfiguration());
        Map<Object, Object> populateCache = populateCache(grid(DrAbstractTest.TOP1_NODE), SecurityServicePermissionsTest.CACHE_NAME, 0, apply);
        assertFalse(populateCache.size() == grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2}).get(getTestTimeout());
        compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
    }

    @Test
    public void testFullStateTransferCancelledOnDrStop() throws Exception {
        int apply = GridTestUtils.SF.apply(1000);
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        Map<Object, Object> populateCache = populateCache(grid(DrAbstractTest.TOP1_NODE), SecurityServicePermissionsTest.CACHE_NAME, 0, apply);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        dr(grid(DrAbstractTest.TOP1_NODE)).stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStopped(DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        Throwable assertThrows = GridTestUtils.assertThrows(log, () -> {
            return stateTransfer.get(getTestTimeout());
        }, IgniteException.class, "State transfer is cancelled:");
        assertTrue(X.getFullStackTrace(assertThrows), X.hasCause(assertThrows, "reason=USER_REQUEST", new Class[]{IgniteCheckedException.class}));
        dr(grid(DrAbstractTest.TOP1_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted(DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2}).get(getTestTimeout());
        compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
    }

    @Test
    public void testFullStateTransferFailedIfDrStoppedForCache() throws Exception {
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        dr(grid(DrAbstractTest.TOP1_NODE)).stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        assertTrue(dr(grid(DrAbstractTest.TOP1_NODE)).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
        populateCache(grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), 0, 100);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        Throwable assertThrows = GridTestUtils.assertThrows(log, () -> {
            return stateTransfer.get(getTestTimeout());
        }, IgniteException.class, "Failed to initiate state transfer because data center replication is stopped:");
        assertTrue(X.getFullStackTrace(assertThrows), X.hasCause(assertThrows, "reason=USER_REQUEST", new Class[]{IgniteCheckedException.class}));
    }

    @Test
    public void testFullStateTransferFailedIfDrDisabledForCache() throws Exception {
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        grid(DrAbstractTest.TOP1_NODE).createCache(cacheConfig("cache_2", CacheMode.PARTITIONED, false));
        populateCache(grid(DrAbstractTest.TOP1_NODE).cache("cache_2"), 0, 100);
        GridTestUtils.assertThrows(log, () -> {
            return dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer("cache_2", syncFST(), new byte[]{2}).get(getTestTimeout());
        }, IllegalStateException.class, "Data center replication is not configured for cache: cache_2");
    }

    @Test
    public void testFullStateTransferFailedForNonExistedCache() throws Exception {
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        GridTestUtils.assertThrows(log, () -> {
            return dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer("cache_2", syncFST(), new byte[]{2}).get(getTestTimeout());
        }, IllegalArgumentException.class, "Cache is not configured: cache_2");
    }

    @Test
    @Ignore("https://ggsystems.atlassian.net/browse/GG-25371")
    public void testFullStateTransferFailedOnPartitionLostOnSender() throws Exception {
        int apply = GridTestUtils.SF.apply(1000);
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        dr(grid(DrAbstractTest.TOP1_NODE)).stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        assertTrue(dr(grid(DrAbstractTest.TOP1_NODE)).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
        populateCache(grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), 0, apply);
        dr(grid(DrAbstractTest.TOP1_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted(DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        G.stop(DrAbstractTest.TOP1_NODE_2, true);
        GridTestUtils.assertThrows(log, () -> {
            return stateTransfer.get(getTestTimeout());
        }, IgniteException.class, "Partition lost");
    }

    @Test
    @Ignore("https://ggsystems.atlassian.net/browse/GG-25371")
    public void testFullStateTransferFailedOnPartitionLostOnReceiver() throws Exception {
        int apply = GridTestUtils.SF.apply(1000);
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        dr(grid(DrAbstractTest.TOP1_NODE)).stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        assertTrue(dr(grid(DrAbstractTest.TOP1_NODE)).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
        populateCache(grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), 0, apply);
        dr(grid(DrAbstractTest.TOP1_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted(DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        G.stop("top2_node_2", true);
        GridTestUtils.assertThrows(log, () -> {
            return stateTransfer.get(getTestTimeout());
        }, IgniteException.class, "Partition lost");
    }

    @Test
    public void testFullStateTransferActiveActive() throws Exception {
        int apply = GridTestUtils.SF.apply(1000);
        TcpDiscoveryIpFinder createReceiverTopology1 = createReceiverTopology1();
        TcpDiscoveryIpFinder createReceiverTopology2 = createReceiverTopology2();
        startTopology(createReceiverTopology1);
        startTopology(createReceiverTopology2);
        startGrid(createSender(createReceiverTopology1, (byte) 2, "top2_node_snd", senderHubReplicaConfig((byte) 3, "127.0.0.1:12313", "127.0.0.1:12314")));
        startGrid(createSender(createReceiverTopology1, (byte) 2, "top2_node_snd_2", senderHubReplicaConfig((byte) 3, "127.0.0.1:12313", "127.0.0.1:12314")));
        startGrid(createSender(createReceiverTopology2, (byte) 3, "top3_node_snd", senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312")));
        startGrid(createSender(createReceiverTopology2, (byte) 3, "top3_node_snd_2", senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1, "127.0.0.1:12312")));
        dr(grid(DrAbstractTest.TOP2_NODE)).stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        dr(grid("top3_node")).stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        assertTrue(dr(grid(DrAbstractTest.TOP2_NODE)).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
        assertTrue(dr(grid("top3_node")).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
        Map<Object, Object> populateCache = populateCache(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), 0, apply);
        Map<Object, Object> populateCache2 = populateCache(grid("top3_node").cache(SecurityServicePermissionsTest.CACHE_NAME), apply, apply);
        dr(grid(DrAbstractTest.TOP2_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        dr(grid("top3_node")).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted(DrAbstractTest.TOP2_NODE, "top2_node_2");
        waitDrStarted("top3_node", "top3_node_2");
        assertEquals(apply, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        assertEquals(apply, grid("top3_node").cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP2_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{3});
        IgniteFuture stateTransfer2 = dr(grid("top3_node")).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        stateTransfer.get(getTestTimeout());
        stateTransfer2.get(getTestTimeout());
        compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache2, populateCache2.keySet());
        compareCaches(grid("top3_node").cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
        assertEquals(2 * apply, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        assertEquals(2 * apply, grid("top3_node").cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
    }

    @Test
    public void testFullStateTransferSenderGridDeactivationWithPersistentCache() throws Exception {
        this.persistent = true;
        checkFullStateTransferSenderGridDeactivation();
    }

    @Test
    public void testFullStateTransferSenderGridDeactivationWithNonPersistentCache() throws Exception {
        this.persistent = false;
        checkFullStateTransferSenderGridDeactivation();
    }

    public void checkFullStateTransferSenderGridDeactivation() throws Exception {
        if (syncFST()) {
            TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
            TcpDiscoveryIpFinder createReceiverTopology1 = createReceiverTopology1();
            startTopology(createReceiverTopology1);
            startTopology(createSenderTopology);
            IgniteEx grid = grid(DrAbstractTest.TOP1_NODE);
            IgniteEx grid2 = grid(DrAbstractTest.TOP2_NODE);
            Map<Object, Object> populateCache = populateCache(grid, SecurityServicePermissionsTest.CACHE_NAME, 0, 2000);
            assertEquals(2000, grid.cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
            assertEquals(0, grid2.cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
            IgniteFuture stateTransfer = dr(grid).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
            Thread.sleep(200L);
            assertFalse(stateTransfer.isDone());
            grid.cluster().active(false);
            assertFalse(grid.cluster().active());
            Throwable assertThrows = GridTestUtils.assertThrows(log, () -> {
                return stateTransfer.get(getTestTimeout());
            }, IgniteException.class, "State transfer is cancelled");
            assertTrue(X.getFullStackTrace(assertThrows), X.hasCause(assertThrows, "Failed to perform cache operation (cache is stopped): cache", new Class[]{CacheStoppedException.class}));
            G.stop(DrAbstractTest.TOP2_NODE_RCV, false);
            G.stop("top2_node_rcv_2", false);
            grid2.cache(SecurityServicePermissionsTest.CACHE_NAME).clear();
            assertEquals(0, grid2.cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
            clearStores(STORE_PATH_1, STORE_PATH_2, STORE_PATH_3);
            G.start(config(new GridGainConfiguration(), DrAbstractTest.TOP2_NODE_RCV, (byte) 2, createReceiverTopology1, null, receiverHubConfig(DrAbstractTest.RCV_PORT_1), this.useClient, new CacheConfiguration[0]));
            G.start(config(new GridGainConfiguration(), "top2_node_rcv_2", (byte) 2, createReceiverTopology1, null, receiverHubConfig(DrAbstractTest.RCV_PORT_2), this.useClient, new CacheConfiguration[0]));
            grid.cluster().active(true);
            assertTrue(grid.cluster().active());
            if (this.persistent) {
                assertEquals(2000, grid.cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
                assertEquals(0, grid2.cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
            } else {
                assertEquals(0L, grid.cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
                assertEquals(0, grid2.cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
                populateCache = populateCache(grid, SecurityServicePermissionsTest.CACHE_NAME, 2000, 2000);
            }
            grid.cache(SecurityServicePermissionsTest.CACHE_NAME);
            dr(grid).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2}).get(getTestTimeout());
            compareCaches(grid2.cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
        }
    }

    @Test
    @Ignore("https://ggsystems.atlassian.net/browse/GG-25486")
    public void testFullStateTransferReceiverGridDeactivation() throws Exception {
        int apply = GridTestUtils.SF.apply(2000);
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        this.persistent = true;
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        dr(grid(DrAbstractTest.TOP1_NODE)).stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        assertTrue(dr(grid(DrAbstractTest.TOP1_NODE)).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
        Map<Object, Object> populateCache = populateCache(grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), 0, apply);
        dr(grid(DrAbstractTest.TOP1_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted(DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        grid(DrAbstractTest.TOP2_NODE).cluster().active(false);
        Thread.sleep(2000L);
        assertFalse(stateTransfer.isDone());
        grid(DrAbstractTest.TOP2_NODE).cluster().active(true);
        stateTransfer.get(getTestTimeout());
        compareCaches(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), populateCache, populateCache.keySet());
    }

    @Test
    @Ignore("https://ggsystems.atlassian.net/browse/GG-25366")
    public void testFullStateTransferToUnknownDC() throws Exception {
        int apply = GridTestUtils.SF.apply(100);
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        G.start(config(new GridGainConfiguration(), TOP1_CLI, (byte) 1, createSenderTopology, null, null, true, new CacheConfiguration[0]));
        grid(TOP1_CLI).cache(SecurityServicePermissionsTest.CACHE_NAME);
        dr(grid(DrAbstractTest.TOP1_NODE)).stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        assertTrue(dr(grid(DrAbstractTest.TOP1_NODE)).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
        populateCache(grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), 0, apply);
        dr(grid(DrAbstractTest.TOP1_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted(DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(TOP1_CLI)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2, 3});
        GridTestUtils.assertThrows(log, () -> {
            return stateTransfer.get(getTestTimeout());
        }, IgniteException.class, "State transfer is cancelled:");
    }

    @Test
    public void testFullStateTransferFailsIfStopSenderWithNonPersistentStore() throws Exception {
        System.setProperty("IGNITE_DISABLE_SMART_DR_THROTTLING", "true");
        int apply = GridTestUtils.SF.apply(1000);
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology(true, false);
        startTopology(createReceiverTopology1());
        startTopology(createSenderTopology);
        dr(grid(DrAbstractTest.TOP1_NODE)).stopReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStopped(DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        populateCache(grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME), 0, apply);
        dr(grid(DrAbstractTest.TOP1_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted(DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        assertEquals(0, grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
        IgniteFuture stateTransfer = dr(grid(DrAbstractTest.TOP1_NODE)).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, syncFST(), new byte[]{2});
        G.stop(DrAbstractTest.TOP1_NODE_SND, false);
        Throwable assertThrows = GridTestUtils.assertThrows(log, () -> {
            return stateTransfer.get(getTestTimeout());
        }, IgniteException.class, "State transfer is cancelled:");
        assertTrue(X.getFullStackTrace(assertThrows), X.hasCause(assertThrows, "errMsg=Sender with non-persistent sender store has gone.", new Class[]{IgniteCheckedException.class}));
    }

    @NotNull
    private Map<Object, Object> populateCache(IgniteEx igniteEx, String str, int i, int i2) throws Exception {
        List list = (List) igniteEx.cluster().forCacheNodes(str).nodes().stream().map(clusterNode -> {
            return Ignition.ignite(clusterNode.id()).configuration().getIgniteInstanceName();
        }).collect(Collectors.toList());
        dr(igniteEx).stopReplication(str);
        waitDrStopped((String[]) list.toArray(new String[0]));
        assertTrue(dr(igniteEx).senderCacheStatus(str).stopped());
        Map<Object, Object> populateCache = populateCache(igniteEx.cache(str), i, i2);
        dr(igniteEx).startReplication(str);
        waitDrStarted((String[]) list.toArray(new String[0]));
        return populateCache;
    }

    private Map<Object, Object> populateCache(Cache<Object, Object> cache, int i, int i2) {
        HashMap hashMap = new HashMap(i2);
        for (int i3 = i; i3 < i + i2; i3++) {
            cache.put(Integer.valueOf(i3), Integer.valueOf(i3));
            hashMap.put(Integer.valueOf(i3), Integer.valueOf(i3));
        }
        return hashMap;
    }

    private <K, V> void compareCaches(IgniteCache<K, V> igniteCache, Map<K, V> map, Set<K> set) throws Exception {
        if (syncFST()) {
            assertEquals(map, igniteCache.getAll(set));
        } else {
            compareCaches(igniteCache, map, set, getTestTimeout());
        }
    }

    static {
        $assertionsDisabled = !DrFullStateTransferQaTest.class.desiredAssertionStatus();
        STORE_PATH_1 = U.getIgniteHome() + "/work/my-dr-store-1";
        STORE_PATH_2 = U.getIgniteHome() + "/work/my-dr-store-2";
        STORE_PATH_3 = U.getIgniteHome() + "/work/my-dr-store-3";
    }
}
