/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.gridgain.grid.GridGain;
import org.gridgain.grid.internal.processors.cache.database.AbstractSnapshotTest;
import org.gridgain.grid.persistentstore.SnapshotInfo;
import org.junit.Test;

public class IgniteDbSnapshotDuringRebalanceTest
extends AbstractSnapshotTest {
    private static final String CACHE_1 = "cache_1";
    private static final String CACHE_2 = "cache_2";
    private static final String CACHE_3 = "cache_3";
    private static final String CACHE_4 = "cache_4";
    private static final String GROUP_1 = "group_1";
    private static final String GROUP_2 = "group_2";
    private static final Set<String> caches = Stream.of("cache_1", "cache_2", "cache_3", "cache_4").collect(Collectors.toSet());
    private static final int REBALANCE_BATCH_SIZE = 51200;

    @Override
    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);
        cfg.setCacheConfiguration(new CacheConfiguration[0]);
        cfg.setCommunicationSpi((CommunicationSpi)new RebalanceBlockingSPI());
        return cfg;
    }

    protected void beforeTest() throws Exception {
        super.beforeTest();
        this.stopAllGrids();
        this.cleanSnapshotDirs();
    }

    @Override
    protected void afterTest() throws Exception {
        super.afterTest();
        this.stopAllGrids();
        this.cleanSnapshotDirs();
    }

    @Test
    public void testRestoringAllGroups() throws Exception {
        this.performTest(caches);
    }

    @Test
    public void testRestoringSpecificGroup() throws Exception {
        this.performTest(Stream.of(CACHE_1, CACHE_2).collect(Collectors.toSet()));
    }

    private void performTest(Set<String> cachesToRestore) throws Exception {
        IgniteEx ig0 = this.startGrids(2);
        ig0.cluster().active(true);
        Map<String, String> checkSums = this.loadData((Ignite)ig0);
        GridGain gg = (GridGain)ig0.plugin("GridGain");
        gg.snapshot().createFullSnapshot(caches, null).get();
        List list = gg.snapshot().list(null);
        long id = ((SnapshotInfo)list.get(0)).snapshotId();
        this.stopGrid(1);
        this.runLoad((Ignite)ig0);
        this.startGrid(1);
        this.runLoad((Ignite)ig0);
        gg.snapshot().restoreSnapshot(id, cachesToRestore, null).get();
        this.awaitPartitionMapExchange(true, true, null, true);
        IgniteDbSnapshotDuringRebalanceTest.assertNull((Object)this.grid(1).context().failure().failureContext());
        IgniteDbSnapshotDuringRebalanceTest.assertEquals((String)"A few nodes failed during test", (int)2, (int)ig0.cluster().nodes().size());
        this.validateData((Ignite)ig0, Stream.of(CACHE_1, CACHE_2).collect(Collectors.toSet()), checkSums);
    }

    private Map<String, String> loadData(Ignite ig) {
        List<CacheConfiguration> configs = Stream.of(F.t((Object)CACHE_1, (Object)GROUP_1), F.t((Object)CACHE_2, (Object)GROUP_1), F.t((Object)CACHE_3, (Object)GROUP_2), F.t((Object)CACHE_4, (Object)GROUP_2)).map(names -> new CacheConfiguration((String)names.get1()).setGroupName((String)names.get2()).setRebalanceBatchSize(51200).setCacheMode(CacheMode.REPLICATED)).collect(Collectors.toList());
        ig.getOrCreateCaches(configs);
        HashMap<String, String> checkSums = new HashMap<String, String>();
        configs.forEach(cfg -> {
            try (IgniteDataStreamer streamer = ig.dataStreamer(cfg.getName());){
                SHA256CheckSum digest = new SHA256CheckSum();
                ThreadLocalRandom rnd = ThreadLocalRandom.current();
                for (int i = 0; i < 3000; ++i) {
                    byte[] payload = new byte[1024];
                    rnd.nextBytes(payload);
                    digest.update(payload);
                    streamer.addData((Object)i, (Object)payload);
                }
                checkSums.put(cfg.getName(), digest.checkSum());
                streamer.flush();
            }
        });
        return checkSums;
    }

    private void validateData(Ignite ig, Set<String> caches, Map<String, String> checkSums) {
        caches.forEach(cacheName -> {
            IgniteCache cache = ig.cache(cacheName);
            SHA256CheckSum digest = new SHA256CheckSum();
            for (int i = 0; i < 3000; ++i) {
                digest.update((byte[])cache.get((Object)i));
            }
            IgniteDbSnapshotDuringRebalanceTest.assertEquals((String)"Validation of data after restore failed, checksum doesn't match ", (String)digest.checkSum(), (String)((String)checkSums.get(cacheName)));
        });
    }

    private void runLoad(final Ignite ig) throws Exception {
        GridTestUtils.runMultiThreaded((Runnable)new Runnable(){

            @Override
            public void run() {
                String cacheName = (String)F.rand((Object[])new String[]{IgniteDbSnapshotDuringRebalanceTest.CACHE_1, IgniteDbSnapshotDuringRebalanceTest.CACHE_2, IgniteDbSnapshotDuringRebalanceTest.CACHE_3, IgniteDbSnapshotDuringRebalanceTest.CACHE_4});
                IgniteCache cache = ig.cache(cacheName);
                ThreadLocalRandom rnd = ThreadLocalRandom.current();
                byte[] payload = new byte[1024];
                for (int i = 0; i < 3000; ++i) {
                    int idx = ((Random)rnd).nextInt(3000);
                    rnd.nextBytes(payload);
                    cache.put((Object)idx, (Object)payload);
                }
            }
        }, (int)4, (String)"load-thread");
    }

    private class SHA256CheckSum {
        private MessageDigest digest;

        SHA256CheckSum() {
            try {
                this.digest = MessageDigest.getInstance("SHA-256");
            }
            catch (NoSuchAlgorithmException noSuchAlgorithmException) {
                // empty catch block
            }
        }

        void update(byte[] payload) {
            this.digest.update(payload);
        }

        String checkSum() {
            StringBuilder res = new StringBuilder();
            for (byte b : this.digest.digest()) {
                res.append(String.format("%02x", b));
            }
            return res.toString();
        }
    }

    private static class RebalanceBlockingSPI
    extends TcpCommunicationSpi {
        public static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);

        private RebalanceBlockingSPI() {
        }

        public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
            this.slowDownMessage(msg);
            super.sendMessage(node, msg);
        }

        public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
            this.slowDownMessage(msg);
            super.sendMessage(node, msg, ackC);
        }

        private void slowDownMessage(Message msg) {
            int grpId;
            if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage && ((grpId = ((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId()) == CU.cacheId((String)IgniteDbSnapshotDuringRebalanceTest.GROUP_1) || grpId == CU.cacheId((String)IgniteDbSnapshotDuringRebalanceTest.GROUP_2))) {
                try {
                    U.sleep((long)50L);
                }
                catch (IgniteInterruptedCheckedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

