/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.dr;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.gridgain.grid.GridDr;
import org.gridgain.grid.GridGain;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.configuration.DrReceiverConfiguration;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.configuration.SnapshotConfiguration;
import org.gridgain.grid.dr.DrSenderConnectionConfiguration;
import org.gridgain.grid.dr.DrSenderLoadBalancingMode;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.gridgain.grid.persistentstore.GridSnapshot;
import org.gridgain.grid.persistentstore.SnapshotFuture;
import org.gridgain.internal.processors.dr.DrAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Assume;
import org.junit.Test;

public class IncrementalStateTransferViaSnapshotSelfTest
extends DrAbstractTest {
    private static final int RCV_FLUSH_DELAY = 10;
    private static final String SNAPSHOT_DIR = "snapshot";

    protected long getTestTimeout() {
        return 90000L;
    }

    protected boolean useSenderGroups() {
        return true;
    }

    protected void beforeTest() throws Exception {
        super.beforeTest();
        U.delete((File)U.resolveWorkDirectory((String)U.defaultWorkDirectory(), (String)SNAPSHOT_DIR, (boolean)true));
        this.cleanPersistenceDir();
    }

    protected void afterTest() throws Exception {
        super.afterTest();
        U.delete((File)U.resolveWorkDirectory((String)U.defaultWorkDirectory(), (String)SNAPSHOT_DIR, (boolean)true));
        this.cleanPersistenceDir();
    }

    @Test
    public void testIncrementalStateTransferOverSnapshot() throws Exception {
        Assume.assumeTrue((boolean)DrUtils.isIncrementalDrEnabled());
        int keysCnt = 100;
        TcpDiscoveryIpFinder sndTop = this.createSenderTopology();
        TcpDiscoveryIpFinder rcvTop = this.createReceiverTopology();
        this.startTopology(sndTop);
        this.startTopology(rcvTop);
        this.startGrid(this.senderNodeConfig(sndTop, "top1_node_snd", 1));
        this.startGrid(this.receiverNodeConfig(rcvTop, "top2_node_rcv", 12311, new CacheConfiguration[0]));
        Map<Integer, Integer> allData = this.populateCache((IgniteCache<Integer, Integer>)this.grid("top1_node").cache("cache"), 100);
        this.waitForCacheReplicated("cache", new String[]{"top1_node", "top1_node_2"});
        SnapshotFuture snapFut = IncrementalStateTransferViaSnapshotSelfTest.snapshot((Ignite)this.grid("top1_node")).createFullSnapshot(null, "Snapshot done.");
        snapFut.get(15000L);
        long snapId = snapFut.snapshotOperation().snapshotId();
        Map<Integer, Integer> additionalData = this.populateCache((IgniteCache<Integer, Integer>)this.grid("top1_node").cache("cache"), 100, 100);
        allData.putAll(additionalData);
        GridDr dr = DrAbstractTest.dr((Ignite)this.grid("top2_node"));
        this.waitForCacheReplicated("cache", new String[]{"top1_node", "top1_node_2"});
        long recEnt = dr.receiverCacheMetrics("cache").entriesReceived();
        IncrementalStateTransferViaSnapshotSelfTest.assertTrue((String)("received=" + recEnt + " sended=" + allData.size()), (recEnt == (long)allData.size() ? 1 : 0) != 0);
        SnapshotFuture snapFut2 = IncrementalStateTransferViaSnapshotSelfTest.snapshot((Ignite)this.grid("top1_node")).createSnapshot(null, "Incremental snapshot done.");
        snapFut2.get(15000L);
        long incSnapId = snapFut2.snapshotOperation().snapshotId();
        Map<Integer, Integer> additionalData2 = this.populateCache((IgniteCache<Integer, Integer>)this.grid("top1_node").cache("cache"), 200, 100);
        allData.putAll(additionalData2);
        additionalData.putAll(additionalData2);
        this.waitForCacheReplicated("cache", new String[]{"top1_node", "top1_node_2"});
        recEnt = dr.receiverCacheMetrics("cache").entriesReceived();
        IncrementalStateTransferViaSnapshotSelfTest.assertTrue((String)("received=" + recEnt + " sended=" + allData.size()), (recEnt == (long)allData.size() ? 1 : 0) != 0);
        this.grid("top2_node").cache("cache").clear();
        DrAbstractTest.dr((Ignite)this.grid("top1_node")).incrementalStateTransfer("cache", incSnapId, (byte)2).get(15000L);
        IncrementalStateTransferViaSnapshotSelfTest.compareCaches((IgniteCache)this.grid("top2_node").cache("cache"), additionalData2, allData.keySet(), (long)0L);
        this.grid("top2_node").cache("cache").clear();
        DrAbstractTest.dr((Ignite)this.grid("top1_node")).incrementalStateTransfer("cache", snapId, (byte)2).get(15000L);
        IncrementalStateTransferViaSnapshotSelfTest.compareCaches((IgniteCache)this.grid("top2_node").cache("cache"), additionalData, allData.keySet(), (long)0L);
        DrAbstractTest.dr((Ignite)this.grid("top1_node")).stateTransfer("cache", new byte[0]).get(10000L);
        IncrementalStateTransferViaSnapshotSelfTest.compareCaches((IgniteCache)this.grid("top2_node").cache("cache"), allData, (long)0L);
    }

    @Test
    public void testIncrementalStateTransferWrongSnapshot() throws Exception {
        Assume.assumeTrue((boolean)DrUtils.isIncrementalDrEnabled());
        int keysCnt = 100;
        TcpDiscoveryIpFinder sndTop = this.createSenderTopology();
        TcpDiscoveryIpFinder rcvTop = this.createReceiverTopology();
        this.startTopology(sndTop);
        this.startTopology(rcvTop);
        this.startGrid(this.senderNodeConfig(sndTop, "top1_node_snd", 1));
        this.startGrid(this.receiverNodeConfig(rcvTop, "top2_node_rcv", 12311, new CacheConfiguration[0]));
        this.populateCache((IgniteCache<Integer, Integer>)this.grid("top1_node").cache("cache"), 100);
        this.waitForCacheReplicated("cache", new String[]{"top1_node", "top1_node_2"});
        this.grid("top2_node").cache("cache").clear();
        long snapId = System.currentTimeMillis();
        IgniteException ex = (IgniteException)GridTestUtils.assertThrows((IgniteLogger)log, () -> DrAbstractTest.dr((Ignite)this.grid("top1_node")).incrementalStateTransfer("cache", snapId, (byte)2).get(15000L), IgniteException.class, (String)"State transfer is cancelled:");
        IncrementalStateTransferViaSnapshotSelfTest.assertTrue((boolean)ex.getMessage().contains("Snapshot metadata not found:"));
    }

    @NotNull
    private Map<Integer, Integer> populateCache(IgniteCache<Integer, Integer> cache, int keysCnt) {
        return this.populateCache(cache, 0, keysCnt);
    }

    @NotNull
    private Map<Integer, Integer> populateCache(IgniteCache<Integer, Integer> cache, int startKey, int keysCnt) {
        HashMap<Integer, Integer> data = new HashMap<Integer, Integer>();
        for (int i = startKey; i < startKey + keysCnt; ++i) {
            cache.put((Object)i, (Object)i);
            data.put(i, i);
        }
        return data;
    }

    private TcpDiscoveryIpFinder createReceiverTopology() throws Exception {
        TcpDiscoveryIpFinder ipFinder = this.ipFinder();
        CacheConfiguration ccfg = this.cacheConfig("cache", CacheMode.PARTITIONED, false);
        CacheConfiguration ccfg2 = this.cacheConfig("cache_2", CacheMode.PARTITIONED, false);
        IgniteConfiguration nodeCfg1 = this.config(new GridGainConfiguration(), "top2_node", (byte)2, ipFinder, null, null, ccfg, ccfg2);
        this.ggCacheConfig(ccfg).setDrReceiverEnabled(true);
        this.ggCacheConfig(ccfg2).setDrReceiverEnabled(true);
        this.addTopology(ipFinder, new IgniteConfiguration[]{nodeCfg1});
        return ipFinder;
    }

    private TcpDiscoveryIpFinder createReceiverTopology2() throws Exception {
        TcpDiscoveryIpFinder ipFinder = this.ipFinder();
        CacheConfiguration ccfg = this.cacheConfig("cache", CacheMode.PARTITIONED, false);
        IgniteConfiguration nodeCfg1 = this.config(new GridGainConfiguration(), "top3_node", (byte)3, ipFinder, null, null, ccfg);
        this.ggCacheConfig(ccfg).setDrReceiverEnabled(true);
        this.addTopology(ipFinder, new IgniteConfiguration[]{nodeCfg1});
        return ipFinder;
    }

    private TcpDiscoveryIpFinder createSenderTopology() throws Exception {
        TcpDiscoveryIpFinder ipFinder = this.ipFinder();
        IgniteConfiguration nodeCfg1 = this.sndDataNodeCfg(ipFinder, "top1_node");
        IgniteConfiguration nodeCfg2 = this.sndDataNodeCfg(ipFinder, "top1_node_2");
        this.addTopology(ipFinder, new IgniteConfiguration[]{nodeCfg1, nodeCfg2});
        return ipFinder;
    }

    private <K, V> CacheConfiguration<K, V> senderCacheConfig(String name) {
        CacheDrSenderConfiguration drSndCfg = new CacheDrSenderConfiguration();
        drSndCfg.setLoadBalancingMode(DrSenderLoadBalancingMode.DR_ROUND_ROBIN);
        drSndCfg.setBackupSyncFrequency(1000L);
        CacheConfiguration ccfg = this.cacheConfig(name, CacheMode.PARTITIONED, true);
        this.ggCacheConfig(ccfg).setDrSenderConfiguration(drSndCfg);
        ccfg.setAffinity((AffinityFunction)new RendezvousAffinityFunction(false, 8));
        return ccfg;
    }

    private IgniteConfiguration sndClientNodeCfg(TcpDiscoveryIpFinder ipFinder, String igniteInstanceName) throws Exception {
        GridGainConfiguration ggCfg = new GridGainConfiguration();
        return this.config(ggCfg, igniteInstanceName, (byte)1, ipFinder, null, null, new CacheConfiguration[0]);
    }

    private IgniteConfiguration sndDataNodeCfg(TcpDiscoveryIpFinder ipFinder, String igniteInstanceName) throws Exception {
        GridGainConfiguration ggCfg = new GridGainConfiguration();
        ggCfg.setStateTransferBatchSendSizeBytes(256);
        ggCfg.setBatchSendSizeBytes(256);
        ggCfg.setSnapshotConfiguration(new SnapshotConfiguration());
        DataRegionConfiguration dataReg = new DataRegionConfiguration();
        dataReg.setMaxSize(0x10000000L);
        dataReg.setPersistenceEnabled(true);
        return this.config(ggCfg, igniteInstanceName, (byte)1, ipFinder, null, null, this.senderCacheConfig("cache"), this.senderCacheConfig("cache_2")).setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration(dataReg));
    }

    private IgniteConfiguration senderNodeConfig(TcpDiscoveryIpFinder ipFinder, String igniteInstanceName, int receivers) throws Exception {
        assert (receivers == 1 || receivers == 2);
        GridGainConfiguration ggCfg = new GridGainConfiguration();
        ggCfg.setSnapshotConfiguration(new SnapshotConfiguration());
        DrSenderConfiguration sndHubCfg = receivers == 2 ? this.senderHubConfig(new DrSenderConnectionConfiguration[]{this.senderHubReplicaConfig((byte)2, new String[]{"127.0.0.1:12312"}), this.senderHubReplicaConfig((byte)3, new String[]{"127.0.0.1:12313"})}) : this.senderHubConfig(new DrSenderConnectionConfiguration[]{this.senderHubReplicaConfig((byte)2, new String[]{"127.0.0.1:12311"})});
        sndHubCfg.setMaxQueueSize(10000);
        sndHubCfg.setReconnectOnFailureTimeout(500L);
        sndHubCfg.setMaxErrors(10);
        IgniteConfiguration cfg = this.config(ggCfg, igniteInstanceName, (byte)1, ipFinder, sndHubCfg, null, new CacheConfiguration[0]);
        return cfg;
    }

    private IgniteConfiguration receiverNodeConfig(TcpDiscoveryIpFinder ipFinder, String igniteInstanceName, int port, CacheConfiguration ... caches) throws Exception {
        GridGainConfiguration ggCfg = new GridGainConfiguration();
        DrReceiverConfiguration rcvCfg = new DrReceiverConfiguration();
        rcvCfg.setLocalInboundPort(port);
        rcvCfg.setFlushFrequency(10L);
        IgniteConfiguration cfg = this.config(ggCfg, igniteInstanceName, (byte)2, ipFinder, null, rcvCfg, caches);
        return cfg;
    }

    protected IgniteConfiguration config(GridGainConfiguration ggCfg, String igniteInstanceName, byte dataCenterId, TcpDiscoveryIpFinder ipFinder, @Nullable DrSenderConfiguration sndHubCfg, @Nullable DrReceiverConfiguration rcvHubCfg, CacheConfiguration ... cacheCfgs) throws IgniteCheckedException {
        return super.config(ggCfg, igniteInstanceName, dataCenterId, ipFinder, sndHubCfg, rcvHubCfg, cacheCfgs).setCommunicationSpi((CommunicationSpi)new TestRecordingCommunicationSpi());
    }

    protected static GridSnapshot snapshot(Ignite ignite) {
        GridGain plugin = (GridGain)ignite.plugin("GridGain");
        IncrementalStateTransferViaSnapshotSelfTest.assertNotNull((String)"GridGain plugin is not configured.", (Object)plugin);
        GridSnapshot snapshot = plugin.snapshot();
        IncrementalStateTransferViaSnapshotSelfTest.assertNotNull((String)"Snapshot component is not configured.", (Object)snapshot);
        return snapshot;
    }
}

