package org.gridgain.grid.internal.dr;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.gridgain.grid.GridGain;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.configuration.DrReceiverConfiguration;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.configuration.SnapshotConfiguration;
import org.gridgain.grid.dr.DrSenderConnectionConfiguration;
import org.gridgain.grid.dr.DrSenderLoadBalancingMode;
import org.gridgain.grid.persistentstore.GridSnapshot;
import org.gridgain.grid.persistentstore.SnapshotFuture;
import org.gridgain.internal.processors.dr.DrAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

@WithSystemProperty(key = "GG_INCREMENTAL_STATE_TRANSFER", value = "true")
/* loaded from: input_file:org/gridgain/grid/internal/dr/IncrementalStateTransferViaSnapshotSelfTest.class */
public class IncrementalStateTransferViaSnapshotSelfTest extends DrAbstractTest {
    static final /* synthetic */ boolean $assertionsDisabled;

    protected long getTestTimeout() {
        return 90000L;
    }

    protected boolean useSenderGroups() {
        return true;
    }

    protected void beforeTest() throws Exception {
        super.beforeTest();
        cleanPersistenceDir();
    }

    protected void afterTest() throws Exception {
        super.afterTest();
        cleanPersistenceDir();
    }

    @Test
    public void testIncrementalStateTransferOverSnapshot() throws Exception {
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        TcpDiscoveryIpFinder createReceiverTopology = createReceiverTopology();
        startTopology(createSenderTopology);
        startTopology(createReceiverTopology);
        startGrid(senderNodeConfig(createSenderTopology, "top1_node_snd", 1));
        startGrid(receiverNodeConfig(createReceiverTopology, "top2_node_rcv", 12311, new CacheConfiguration[0]));
        Map<Integer, Integer> populateCache = populateCache(grid("top1_node").cache("cache"), 100);
        waitForCacheReplicated("cache", new String[]{"top1_node", "top2_node"});
        SnapshotFuture createFullSnapshot = snapshot(grid("top1_node")).createFullSnapshot((Set) null, "Snapshot done.");
        createFullSnapshot.get(15000L);
        long snapshotId = createFullSnapshot.snapshotOperation().snapshotId();
        Map<Integer, Integer> populateCache2 = populateCache(grid("top1_node").cache("cache"), 100, 100);
        populateCache.putAll(populateCache2);
        waitForCacheReplicated("cache", new String[]{"top1_node", "top2_node"});
        SnapshotFuture createSnapshot = snapshot(grid("top1_node")).createSnapshot((Set) null, "Incremental snapshot done.");
        createSnapshot.get(15000L);
        long snapshotId2 = createSnapshot.snapshotOperation().snapshotId();
        Map<Integer, Integer> populateCache3 = populateCache(grid("top1_node").cache("cache"), 200, 100);
        populateCache.putAll(populateCache3);
        populateCache2.putAll(populateCache3);
        waitForCacheReplicated("cache", new String[]{"top1_node", "top2_node"});
        grid("top2_node").cache("cache").clear();
        DrAbstractTest.dr(grid("top1_node")).incrementalStateTransfer("cache", snapshotId2, (byte) 2).get(15000L);
        compareCaches(grid("top2_node").cache("cache"), populateCache3, populateCache.keySet(), 0L);
        grid("top2_node").cache("cache").clear();
        DrAbstractTest.dr(grid("top1_node")).incrementalStateTransfer("cache", snapshotId, (byte) 2).get(15000L);
        compareCaches(grid("top2_node").cache("cache"), populateCache2, populateCache.keySet(), 0L);
        DrAbstractTest.dr(grid("top1_node")).stateTransfer("cache", new byte[0]).get(10000L);
        compareCaches(grid("top2_node").cache("cache"), populateCache, 0L);
    }

    @Test
    public void testIncrementalStateTransferWrongSnapshot() throws Exception {
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        TcpDiscoveryIpFinder createReceiverTopology = createReceiverTopology();
        startTopology(createSenderTopology);
        startTopology(createReceiverTopology);
        startGrid(senderNodeConfig(createSenderTopology, "top1_node_snd", 1));
        startGrid(receiverNodeConfig(createReceiverTopology, "top2_node_rcv", 12311, new CacheConfiguration[0]));
        populateCache(grid("top1_node").cache("cache"), 100);
        waitForCacheReplicated("cache", new String[]{"top1_node", "top2_node"});
        grid("top2_node").cache("cache").clear();
        long currentTimeMillis = System.currentTimeMillis();
        assertTrue(GridTestUtils.assertThrows(log, () -> {
            return DrAbstractTest.dr(grid("top1_node")).incrementalStateTransfer("cache", currentTimeMillis, (byte) 2).get(15000L);
        }, IgniteException.class, "State transfer is cancelled:").getMessage().contains("Snapshot metadata not found:"));
    }

    @NotNull
    private Map<Integer, Integer> populateCache(IgniteCache<Integer, Integer> igniteCache, int i) {
        return populateCache(igniteCache, 0, i);
    }

    @NotNull
    private Map<Integer, Integer> populateCache(IgniteCache<Integer, Integer> igniteCache, int i, int i2) {
        HashMap hashMap = new HashMap();
        for (int i3 = i; i3 < i + i2; i3++) {
            igniteCache.put(Integer.valueOf(i3), Integer.valueOf(i3));
            hashMap.put(Integer.valueOf(i3), Integer.valueOf(i3));
        }
        return hashMap;
    }

    private TcpDiscoveryIpFinder createReceiverTopology() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        CacheConfiguration cacheConfig = cacheConfig("cache", CacheMode.PARTITIONED, false);
        CacheConfiguration cacheConfig2 = cacheConfig("cache_2", CacheMode.PARTITIONED, false);
        IgniteConfiguration config = config(new GridGainConfiguration(), "top2_node", (byte) 2, ipFinder, null, null, cacheConfig, cacheConfig2);
        ggCacheConfig(cacheConfig).setDrReceiverEnabled(true);
        ggCacheConfig(cacheConfig2).setDrReceiverEnabled(true);
        addTopology(ipFinder, new IgniteConfiguration[]{config});
        return ipFinder;
    }

    private TcpDiscoveryIpFinder createReceiverTopology2() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        CacheConfiguration cacheConfig = cacheConfig("cache", CacheMode.PARTITIONED, false);
        IgniteConfiguration config = config(new GridGainConfiguration(), "top3_node", (byte) 3, ipFinder, null, null, cacheConfig);
        ggCacheConfig(cacheConfig).setDrReceiverEnabled(true);
        addTopology(ipFinder, new IgniteConfiguration[]{config});
        return ipFinder;
    }

    private TcpDiscoveryIpFinder createSenderTopology() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        addTopology(ipFinder, new IgniteConfiguration[]{sndDataNodeCfg(ipFinder, "top1_node"), sndDataNodeCfg(ipFinder, "top1_node_2")});
        return ipFinder;
    }

    private <K, V> CacheConfiguration<K, V> senderCacheConfig(String str) {
        CacheDrSenderConfiguration cacheDrSenderConfiguration = new CacheDrSenderConfiguration();
        cacheDrSenderConfiguration.setLoadBalancingMode(DrSenderLoadBalancingMode.DR_ROUND_ROBIN);
        cacheDrSenderConfiguration.setBackupSyncFrequency(1000L);
        CacheConfiguration<K, V> cacheConfig = cacheConfig(str, CacheMode.PARTITIONED, true);
        ggCacheConfig(cacheConfig).setDrSenderConfiguration(cacheDrSenderConfiguration);
        cacheConfig.setAffinity(new RendezvousAffinityFunction(false, 8));
        return cacheConfig;
    }

    private IgniteConfiguration sndClientNodeCfg(TcpDiscoveryIpFinder tcpDiscoveryIpFinder, String str) throws Exception {
        return config(new GridGainConfiguration(), str, (byte) 1, tcpDiscoveryIpFinder, null, null, new CacheConfiguration[0]);
    }

    private IgniteConfiguration sndDataNodeCfg(TcpDiscoveryIpFinder tcpDiscoveryIpFinder, String str) throws Exception {
        GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
        gridGainConfiguration.setStateTransferBatchSendSizeBytes(256);
        gridGainConfiguration.setBatchSendSizeBytes(256);
        gridGainConfiguration.setSnapshotConfiguration(new SnapshotConfiguration());
        DataRegionConfiguration dataRegionConfiguration = new DataRegionConfiguration();
        dataRegionConfiguration.setMaxSize(268435456L);
        dataRegionConfiguration.setPersistenceEnabled(true);
        return config(gridGainConfiguration, str, (byte) 1, tcpDiscoveryIpFinder, null, null, senderCacheConfig("cache"), senderCacheConfig("cache_2")).setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration(dataRegionConfiguration));
    }

    private IgniteConfiguration senderNodeConfig(TcpDiscoveryIpFinder tcpDiscoveryIpFinder, String str, int i) throws Exception {
        if (!$assertionsDisabled && i != 1 && i != 2) {
            throw new AssertionError();
        }
        GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
        gridGainConfiguration.setSnapshotConfiguration(new SnapshotConfiguration());
        DrSenderConfiguration senderHubConfig = i == 2 ? senderHubConfig(new DrSenderConnectionConfiguration[]{senderHubReplicaConfig((byte) 2, new String[]{"127.0.0.1:12312"}), senderHubReplicaConfig((byte) 3, new String[]{"127.0.0.1:12313"})}) : senderHubConfig(new DrSenderConnectionConfiguration[]{senderHubReplicaConfig((byte) 2, new String[]{"127.0.0.1:12311"})});
        senderHubConfig.setMaxQueueSize(10000);
        senderHubConfig.setReconnectOnFailureTimeout(500L);
        senderHubConfig.setMaxErrors(10);
        return config(gridGainConfiguration, str, (byte) 1, tcpDiscoveryIpFinder, senderHubConfig, null, new CacheConfiguration[0]);
    }

    private IgniteConfiguration receiverNodeConfig(TcpDiscoveryIpFinder tcpDiscoveryIpFinder, String str, int i, CacheConfiguration... cacheConfigurationArr) throws Exception {
        GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
        DrReceiverConfiguration drReceiverConfiguration = new DrReceiverConfiguration();
        drReceiverConfiguration.setLocalInboundPort(i);
        drReceiverConfiguration.setFlushFrequency(10L);
        return config(gridGainConfiguration, str, (byte) 2, tcpDiscoveryIpFinder, null, drReceiverConfiguration, cacheConfigurationArr);
    }

    protected IgniteConfiguration config(GridGainConfiguration gridGainConfiguration, String str, byte b, TcpDiscoveryIpFinder tcpDiscoveryIpFinder, @Nullable DrSenderConfiguration drSenderConfiguration, @Nullable DrReceiverConfiguration drReceiverConfiguration, @Nullable CacheConfiguration... cacheConfigurationArr) throws IgniteCheckedException {
        return super.config(gridGainConfiguration, str, b, tcpDiscoveryIpFinder, drSenderConfiguration, drReceiverConfiguration, cacheConfigurationArr).setCommunicationSpi(new TestRecordingCommunicationSpi());
    }

    protected static GridSnapshot snapshot(Ignite ignite) {
        GridGain plugin = ignite.plugin("GridGain");
        assertNotNull("GridGain plugin is not configured.", plugin);
        GridSnapshot snapshot = plugin.snapshot();
        assertNotNull("Snapshot component is not configured.", snapshot);
        return snapshot;
    }

    static {
        $assertionsDisabled = !IncrementalStateTransferViaSnapshotSelfTest.class.desiredAssertionStatus();
    }
}
