/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database;

import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.PluginConfiguration;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.gridgain.grid.GridGain;
import org.gridgain.grid.configuration.SnapshotConfiguration;
import org.gridgain.grid.internal.processors.cache.database.snapshot.CompressionOption;
import org.gridgain.grid.internal.processors.cache.database.snapshot.FutureTaskQueue;
import org.gridgain.grid.internal.processors.cache.database.snapshot.GridCacheSnapshotManager;
import org.gridgain.grid.internal.processors.cache.database.snapshot.SnapshotEncryptionOptions;
import org.gridgain.grid.internal.processors.cache.database.snapshot.SnapshotOperationContext;
import org.gridgain.grid.internal.processors.cache.database.snapshot.SnapshotSession;
import org.gridgain.grid.internal.processors.cache.database.txdr.AbstractReplicationTest;
import org.gridgain.grid.internal.txdr.GridGainTxDrConfiguration;
import org.gridgain.grid.internal.txdr.TransactionalDrConfiguration;
import org.gridgain.grid.persistentstore.MessageDigestFactory;
import org.gridgain.grid.persistentstore.SnapshotFuture;
import org.gridgain.grid.persistentstore.SnapshotOperationInfo;
import org.gridgain.grid.persistentstore.SnapshotOperationType;
import org.gridgain.grid.persistentstore.snapshot.file.FileDatabaseSnapshotSpi;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

public class IgniteDbConcurrentSnapshotSelfTest
extends GridCommonAbstractTest {
    private static final int ENTRIES_COUNT = 30;
    private static final int NODES = 4;
    private static final int ITERATION_CNT = 10;
    private static final String CACHE_NAME = "cache1";
    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
    private static final String TRANSFER_FOLDER_NAME = "transfer-folder";
    private static boolean client = false;
    private static volatile CountDownLatch latch;
    private volatile long lastSuccessfulSnapshotId;

    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);
        DataStorageConfiguration memCfg = new DataStorageConfiguration().setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(314572800L).setPersistenceEnabled(true)).setWalMode(WALMode.LOG_ONLY);
        cfg.setDataStorageConfiguration(memCfg);
        CacheConfiguration ccfg = new CacheConfiguration();
        ccfg.setName(CACHE_NAME);
        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
        ccfg.setRebalanceMode(CacheRebalanceMode.NONE);
        ccfg.setAffinity((AffinityFunction)new RendezvousAffinityFunction(false, 32));
        cfg.setCacheConfiguration(new CacheConfiguration[]{ccfg});
        GridGainTxDrConfiguration ggCfg = new GridGainTxDrConfiguration();
        ggCfg.setTxDrConfiguration(new TransactionalDrConfiguration().setTransferFolderPath(U.resolveWorkDirectory((String)U.defaultWorkDirectory(), (String)TRANSFER_FOLDER_NAME, (boolean)false).getAbsolutePath()));
        SnapshotConfiguration gDbCfg = new SnapshotConfiguration();
        GridCacheSnapshotManager.TEST_SNAPSHOT_SPI.set(new TestSnapshotSpi());
        ggCfg.setSnapshotConfiguration(gDbCfg);
        cfg.setPluginConfigurations(new PluginConfiguration[]{ggCfg});
        if (gridName.contains(String.valueOf(3)) && client) {
            cfg.setClientMode(true);
        }
        cfg.setDiscoverySpi((DiscoverySpi)new TcpDiscoverySpi().setIpFinder(IP_FINDER));
        return cfg;
    }

    protected void beforeTestsStarted() throws Exception {
        this.stopAllGrids();
        this.deleteWorkFiles();
    }

    protected void afterTest() throws Exception {
        super.afterTest();
        this.stopAllGrids();
        this.deleteWorkFiles();
    }

    protected void beforeTest() throws Exception {
        super.beforeTest();
        client = false;
    }

    @Test
    public void testConcurrentStartSnapshotFromClient() throws Exception {
        this.doTestConcurrentStartSnapshot(true, true, false);
    }

    @Test
    public void testConcurrentStartSnapshotClientNodes() throws Exception {
        this.doTestConcurrentStartSnapshot(true, false, false);
    }

    @Test
    public void testConcurrentStartSnapshotServerNodes() throws Exception {
        this.doTestConcurrentStartSnapshot(false, false, false);
    }

    @Test
    public void testConcurrentStartSnapshotFromRandomNodeWithClient() throws Exception {
        this.doTestConcurrentStartSnapshot(true, false, true);
    }

    @Test
    public void testConcurrentStartSnapshotFromRandomNode() throws Exception {
        this.doTestConcurrentStartSnapshot(false, false, true);
    }

    @Test
    public void testContinuousStartSnapshot() throws Exception {
        this.doTestContinuousStartSnapshot(false);
    }

    @Test
    public void testContinuousStartSnapshotWithClient() throws Exception {
        this.doTestContinuousStartSnapshot(true);
    }

    private void doTestConcurrentStartSnapshot(boolean client, final boolean onlyFromClient, final boolean randomNode) throws Exception {
        IgniteDbConcurrentSnapshotSelfTest.client = client;
        this.startGridsMultiThreaded(4);
        IgniteEx ignite = this.ignite(0);
        ignite.cluster().active(true);
        AbstractReplicationTest.replaceTransactionalProcessor(G.allGrids().stream().map(e -> (IgniteEx)e).collect(Collectors.toList()));
        IgniteCache cache = ignite.cache(CACHE_NAME);
        try (IgniteDataStreamer ldr = ignite.dataStreamer(CACHE_NAME);){
            HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
            for (int i = 0; i < 30; ++i) {
                map.put(i, i);
            }
            ldr.addData(map);
        }
        final AtomicReference snapshotInfo = new AtomicReference();
        for (int i = 0; i < 10; ++i) {
            log.info("Iteration: " + i);
            TestSnapshotSpi.latch = new CountDownLatch(1);
            latch = new CountDownLatch(1);
            snapshotInfo.set(null);
            final AtomicInteger ids = new AtomicInteger();
            final AtomicInteger failCnt = new AtomicInteger();
            int threadCnt = 16;
            IgniteInternalFuture future = GridTestUtils.runMultiThreadedAsync((Runnable)new Runnable(){

                @Override
                public void run() {
                    int id = ids.getAndIncrement();
                    SnapshotFuture fut = null;
                    try {
                        int idx = randomNode ? ThreadLocalRandom.current().nextInt(3) : (onlyFromClient ? 3 : id % 4);
                        GridGain gg = (GridGain)IgniteDbConcurrentSnapshotSelfTest.this.ignite(idx).plugin("GridGain");
                        fut = gg.snapshot().createFullSnapshot(Collections.singleton(IgniteDbConcurrentSnapshotSelfTest.CACHE_NAME), null);
                        fut.get();
                        log.info("Finished thread: " + Thread.currentThread().getName());
                        SnapshotOperationInfo info = fut.snapshotOperation();
                        IgniteDbConcurrentSnapshotSelfTest.assertNotNull((Object)info);
                        IgniteDbConcurrentSnapshotSelfTest.assertTrue((boolean)snapshotInfo.compareAndSet(null, info));
                    }
                    catch (Exception e) {
                        IgniteDbConcurrentSnapshotSelfTest.assertTrue((String)("Got unexpected exception: " + e), (boolean)X.hasCause((Throwable)e, (Class[])new Class[]{IllegalStateException.class}));
                        failCnt.incrementAndGet();
                    }
                }
            }, (int)16, (String)"start-snapshot-thread");
            U.await((CountDownLatch)latch);
            TestSnapshotSpi.latch.countDown();
            future.get(30000L);
            IgniteDbConcurrentSnapshotSelfTest.assertTrue((String)"Snapshot creation doesn't started.", (snapshotInfo.get() != null ? 1 : 0) != 0);
            IgniteDbConcurrentSnapshotSelfTest.assertEquals((int)15, (int)failCnt.get());
        }
        GridGain gg = (GridGain)ignite.plugin("GridGain");
        gg.snapshot().restoreSnapshot(((SnapshotOperationInfo)snapshotInfo.get()).snapshotId(), Collections.singleton(CACHE_NAME), null).get();
        cache = ignite.cache(CACHE_NAME);
        for (int key = 0; key < 30; ++key) {
            IgniteDbConcurrentSnapshotSelfTest.assertEquals((String)("Failed for key: " + key + ", snapshot id: " + snapshotInfo.get()), (Object)key, (Object)cache.get((Object)key));
        }
    }

    private void doTestContinuousStartSnapshot(boolean client) throws Exception {
        IgniteDbConcurrentSnapshotSelfTest.client = client;
        this.startGridsMultiThreaded(4);
        IgniteEx ignite = this.ignite(0);
        ignite.cluster().active(true);
        AbstractReplicationTest.replaceTransactionalProcessor(G.allGrids().stream().map(e -> (IgniteEx)e).collect(Collectors.toList()));
        IgniteCache cache = ignite.cache(CACHE_NAME);
        for (int i = 0; i < 30; ++i) {
            cache.put((Object)i, (Object)i);
            if (i == 0 || i % 10 != 0) continue;
            log.info("Iteration: " + i);
        }
        SnapshotOperationInfo snapshotInfo = null;
        for (int i = 0; i < 10; ++i) {
            log.info("Iteration: " + i);
            TestSnapshotSpi.latch = new CountDownLatch(1);
            latch = new CountDownLatch(1);
            final IgniteInternalFuture f = GridTestUtils.runAsync((Callable)new Callable<SnapshotOperationInfo>(){

                @Override
                public SnapshotOperationInfo call() throws Exception {
                    GridGain gg = (GridGain)IgniteDbConcurrentSnapshotSelfTest.this.ignite(0).plugin("GridGain");
                    SnapshotFuture fut = gg.snapshot().createFullSnapshot(Collections.singleton(IgniteDbConcurrentSnapshotSelfTest.CACHE_NAME), null);
                    fut.get();
                    return fut.snapshotOperation();
                }
            }, (String)"startup-runner");
            U.await((CountDownLatch)latch);
            int threadCnt = 100;
            final AtomicInteger failCnt = new AtomicInteger();
            GridTestUtils.runMultiThreaded((Runnable)new Runnable(){

                @Override
                public void run() {
                    try {
                        IgniteDbConcurrentSnapshotSelfTest.assertFalse((boolean)f.isDone());
                        GridGain gg = (GridGain)IgniteDbConcurrentSnapshotSelfTest.this.ignite(ThreadLocalRandom.current().nextInt(3)).plugin("GridGain");
                        SnapshotFuture fut = gg.snapshot().createFullSnapshot(Collections.singleton(IgniteDbConcurrentSnapshotSelfTest.CACHE_NAME), null);
                        fut.get();
                        IgniteDbConcurrentSnapshotSelfTest.fail((String)"All threads should be failed");
                    }
                    catch (Exception e) {
                        IgniteDbConcurrentSnapshotSelfTest.assertTrue((String)"Got unexpected exception.", (boolean)X.hasCause((Throwable)e, (Class[])new Class[]{IllegalStateException.class}));
                        failCnt.incrementAndGet();
                    }
                }
            }, (int)threadCnt, (String)"fail-snapshot-starter");
            TestSnapshotSpi.latch.countDown();
            snapshotInfo = (SnapshotOperationInfo)f.get();
            IgniteDbConcurrentSnapshotSelfTest.assertNotNull((String)"Snapshot creation doesn't started.", (Object)snapshotInfo);
            IgniteDbConcurrentSnapshotSelfTest.assertEquals((int)threadCnt, (int)failCnt.get());
        }
        GridGain gg = (GridGain)ignite.plugin("GridGain");
        gg.snapshot().restoreSnapshot(snapshotInfo.snapshotId(), Collections.singleton(CACHE_NAME), null).get();
        cache = ignite.cache(CACHE_NAME);
        for (int key = 0; key < 30; ++key) {
            IgniteDbConcurrentSnapshotSelfTest.assertEquals((String)("Failed for key: " + key + ", snapshot id: " + snapshotInfo.snapshotId()), (Object)key, (Object)cache.get((Object)key));
        }
    }

    private SnapshotFuture<Void> startSnapshotOperation(IgniteEx ignite, SnapshotOperationType type, final CountDownLatch startLatch, final CountDownLatch resumeLatch, boolean expectFail) throws Exception {
        GridGain gg = (GridGain)ignite.plugin("GridGain");
        GridCacheSnapshotManager snapMgr = (GridCacheSnapshotManager)ignite.context().cache().context().snapshot();
        try {
            SnapshotFuture fut;
            switch (type) {
                case CREATE: {
                    fut = gg.snapshot().createFullSnapshot(null, null);
                    break;
                }
                case CONSISTENT_CUT: {
                    fut = snapMgr.startGlobalConsistentCut();
                    break;
                }
                case RESTORE: {
                    fut = gg.snapshot().restoreSnapshot(this.lastSuccessfulSnapshotId, null, null);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unsupported snapshot operation type: " + type);
                }
            }
            fut.initFuture().listen((IgniteInClosure)new IgniteInClosure<IgniteFuture>(){

                public void apply(IgniteFuture fut) {
                    startLatch.countDown();
                    try {
                        resumeLatch.await();
                    }
                    catch (InterruptedException e) {
                        IgniteDbConcurrentSnapshotSelfTest.fail((String)"Thread interrupted");
                    }
                }
            });
            return fut;
        }
        catch (Exception e) {
            if (!expectFail) {
                log.error("Unexpected error", (Throwable)e);
                IgniteDbConcurrentSnapshotSelfTest.fail((String)("Got unexpected error: " + e.getMessage()));
            }
            return null;
        }
    }

    private void assertFutureResult(SnapshotFuture<Void> fut, boolean expectFail) {
        block5: {
            if (fut == null) {
                return;
            }
            try {
                fut.get();
                if (fut.snapshotOperation().operationType() == SnapshotOperationType.CREATE) {
                    this.lastSuccessfulSnapshotId = fut.snapshotOperation().snapshotId();
                }
                if (expectFail) {
                    IgniteDbConcurrentSnapshotSelfTest.fail((String)"Snapshot future completed successfully, but error expected");
                }
            }
            catch (Exception e) {
                if (expectFail) break block5;
                log.error("Got unexpected exception for future " + fut, (Throwable)e);
                IgniteDbConcurrentSnapshotSelfTest.fail((String)("Got unexpected error: " + e.getMessage()));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doConcurrentSnapshotOperations(IgniteEx node1, SnapshotOperationType type1, boolean expectFail1, IgniteEx node2, SnapshotOperationType type2, boolean expectFail2) throws Exception {
        SnapshotFuture<Void> fut2;
        SnapshotFuture<Void> fut1;
        CountDownLatch startLatch = new CountDownLatch(2);
        CountDownLatch resumeLatch = new CountDownLatch(1);
        try {
            fut1 = this.startSnapshotOperation(node1, type1, startLatch, resumeLatch, expectFail1);
            fut2 = this.startSnapshotOperation(node2, type2, startLatch, resumeLatch, expectFail2);
            if (!expectFail1 && !expectFail2) {
                startLatch.await();
            }
        }
        finally {
            resumeLatch.countDown();
        }
        this.assertFutureResult(fut1, expectFail1);
        this.assertFutureResult(fut2, expectFail2);
    }

    public void testConcurrentConsistentCut() throws Exception {
        IgniteEx ignite0 = this.startGrid(0);
        IgniteEx ignite1 = this.startGrid(1);
        this.startGrid(2);
        ignite0.cluster().active(true);
        AbstractReplicationTest.replaceTransactionalProcessor(G.allGrids().stream().map(e -> (IgniteEx)e).collect(Collectors.toList()));
        this.doConcurrentSnapshotOperations(ignite0, SnapshotOperationType.CREATE, false, ignite0, SnapshotOperationType.CONSISTENT_CUT, false);
        this.doConcurrentSnapshotOperations(ignite0, SnapshotOperationType.CONSISTENT_CUT, false, ignite0, SnapshotOperationType.CONSISTENT_CUT, true);
        this.doConcurrentSnapshotOperations(ignite0, SnapshotOperationType.CONSISTENT_CUT, false, ignite1, SnapshotOperationType.CONSISTENT_CUT, true);
        this.doConcurrentSnapshotOperations(ignite0, SnapshotOperationType.CREATE, false, ignite0, SnapshotOperationType.CREATE, true);
        this.doConcurrentSnapshotOperations(ignite0, SnapshotOperationType.CREATE, false, ignite1, SnapshotOperationType.CREATE, true);
        this.doConcurrentSnapshotOperations(ignite0, SnapshotOperationType.CREATE, false, ignite0, SnapshotOperationType.RESTORE, true);
        this.doConcurrentSnapshotOperations(ignite0, SnapshotOperationType.CREATE, false, ignite1, SnapshotOperationType.RESTORE, true);
        this.doConcurrentSnapshotOperations(ignite0, SnapshotOperationType.CONSISTENT_CUT, true, ignite0, SnapshotOperationType.RESTORE, false);
        this.doConcurrentSnapshotOperations(ignite0, SnapshotOperationType.RESTORE, false, ignite0, SnapshotOperationType.CONSISTENT_CUT, true);
        this.doConcurrentSnapshotOperations(ignite0, SnapshotOperationType.RESTORE, false, ignite1, SnapshotOperationType.CONSISTENT_CUT, true);
        this.doConcurrentSnapshotOperations(ignite0, SnapshotOperationType.CONSISTENT_CUT, true, ignite1, SnapshotOperationType.RESTORE, false);
        this.doConcurrentSnapshotOperations(ignite1, SnapshotOperationType.RESTORE, false, ignite0, SnapshotOperationType.CONSISTENT_CUT, true);
        this.doConcurrentSnapshotOperations(ignite1, SnapshotOperationType.CONSISTENT_CUT, true, ignite0, SnapshotOperationType.RESTORE, false);
    }

    private void deleteWorkFiles() throws Exception {
        this.cleanPersistenceDir();
        U.delete((File)U.resolveWorkDirectory((String)U.defaultWorkDirectory(), (String)"snapshot", (boolean)false));
        U.delete((File)U.resolveWorkDirectory((String)U.defaultWorkDirectory(), (String)TRANSFER_FOLDER_NAME, (boolean)false));
    }

    private static class TestSnapshotSpi
    extends FileDatabaseSnapshotSpi {
        private static volatile CountDownLatch latch;

        private TestSnapshotSpi() {
        }

        public SnapshotSession sessionForSnapshotCreation(long id, boolean fullSnapshot, File storePath, CompressionOption compression, int compressionLevel, FutureTaskQueue<GroupPartitionId> futureTaskQueue, SnapshotOperationContext snapshotOperationContext, @Nullable MessageDigestFactory msgDigestFactory, @Nullable SnapshotEncryptionOptions encryptionOptions) throws IgniteCheckedException {
            if (latch != null) {
                try {
                    latch.countDown();
                    latch.await();
                }
                catch (InterruptedException e) {
                    IgniteDbConcurrentSnapshotSelfTest.fail((String)"Failed waiting latch release.");
                }
            }
            return super.sessionForSnapshotCreation(id, fullSnapshot, storePath, compression, compressionLevel, futureTaskQueue, snapshotOperationContext, msgDigestFactory, encryptionOptions);
        }
    }
}

