/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database;

import java.io.Serializable;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.worker.GridWorkerListener;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.testframework.GridTestUtils;
import org.gridgain.grid.GridGain;
import org.gridgain.grid.internal.processors.cache.database.AbstractSnapshotTest;
import org.gridgain.grid.internal.processors.cache.database.snapshot.GridCacheSnapshotManager;
import org.gridgain.grid.internal.processors.cache.database.snapshot.SnapshotCreateFuture;
import org.gridgain.grid.persistentstore.CheckSnapshotParams;
import org.gridgain.grid.persistentstore.RestoreSnapshotParams;
import org.gridgain.grid.persistentstore.SnapshotFuture;
import org.gridgain.grid.persistentstore.SnapshotOperationInfo;
import org.gridgain.grid.persistentstore.SnapshotOperationType;
import org.junit.Test;

public class IgniteDbSnapshotConsequentCheckpointsTest
extends AbstractSnapshotTest {
    private static final long TIMEOUT = 10000L;

    @Override
    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);
        cfg.setCacheConfiguration(new CacheConfiguration[]{new CacheConfiguration().setAffinity((AffinityFunction)new RendezvousAffinityFunction(false, 1)).setName("default")});
        return cfg;
    }

    protected FailureHandler getFailureHandler(String igniteInstanceName) {
        return new StopNodeFailureHandler();
    }

    protected void beforeTest() throws Exception {
        super.beforeTest();
        this.stopAllGrids();
        this.cleanPersistenceDir();
        this.cleanSnapshotDirs();
    }

    @Override
    protected void afterTest() throws Exception {
        super.afterTest();
        this.stopAllGrids();
        this.cleanPersistenceDir();
        this.cleanSnapshotDirs();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSecondCheckpointHappensBeforeSnapshotExecutorStartsSnapshot() throws Exception {
        IgniteEx ignite = this.startGrid(0);
        ignite.cluster().state(ClusterState.ACTIVE);
        GridCacheSnapshotManager gridSnapshotManager = (GridCacheSnapshotManager)this.snapshotManager(ignite);
        StripedExecutor defaultExec = (StripedExecutor)GridTestUtils.getFieldValue((Object)gridSnapshotManager, (String[])new String[]{"snapshotExecutor"});
        PausableStripedExecutor exec = new PausableStripedExecutor(defaultExec.stripesCount(), "instance", "db-snapshot-executor", log, (IgniteInClosure<Throwable>)(IgniteInClosure & Serializable)throwable -> {}, null, ignite.context().config().getFailureDetectionTimeout());
        try {
            this.test(ignite, exec);
        }
        finally {
            exec.shutdown();
            defaultExec.shutdown();
        }
    }

    private void test(IgniteEx ignite, final PausableStripedExecutor exec) throws Exception {
        final GridCacheSnapshotManager gridSnapshotManager = (GridCacheSnapshotManager)this.snapshotManager(ignite);
        GridTestUtils.setFieldValue((Object)gridSnapshotManager, (String)"snapshotExecutor", (Object)((Object)exec));
        final GridGain gg = (GridGain)ignite.plugin("GridGain");
        int entriesCnt = 10000;
        IgniteDbSnapshotConsequentCheckpointsTest.loadWithInts((Ignite)ignite, 0, 1, entriesCnt);
        GridCacheDatabaseSharedManager db = this.dbMgr(ignite);
        exec.pause();
        final GridFutureAdapter snapshotFutureWrapper = new GridFutureAdapter();
        CheckpointListener firstCpListener = new CheckpointListener(){

            public void onMarkCheckpointBegin(CheckpointListener.Context ctx) {
            }

            public void beforeCheckpointBegin(CheckpointListener.Context ctx) {
            }

            public void onCheckpointBegin(CheckpointListener.Context ctx) {
                snapshotFutureWrapper.onDone((Object)gg.snapshot().createFullSnapshot(Collections.singleton("default"), "snp"));
            }
        };
        db.addCheckpointListener(firstCpListener);
        CheckpointProgress checkpointProgress = db.forceCheckpoint("test-cp-1");
        SnapshotFuture snapshotFuture = (SnapshotFuture)snapshotFutureWrapper.get(10000L);
        db.removeCheckpointListener(firstCpListener);
        checkpointProgress.futureFor(CheckpointState.FINISHED).get(10000L);
        int newEntriesCnt = 10 * entriesCnt;
        int shift = 10;
        int multiply = 1;
        IgniteDbSnapshotConsequentCheckpointsTest.loadWithInts((Ignite)ignite, shift, multiply, newEntriesCnt);
        final GridFutureAdapter snapshotInProgressSetFut = new GridFutureAdapter();
        CheckpointListener secondCpListener = new CheckpointListener(){

            public void onMarkCheckpointBegin(CheckpointListener.Context ctx) {
            }

            public void beforeCheckpointBegin(CheckpointListener.Context ctx) {
            }

            public void onCheckpointBegin(CheckpointListener.Context ctx) {
                new Thread(() -> {
                    try {
                        IgniteFutureImpl opFut = (IgniteFutureImpl)gridSnapshotManager.getOngoingOperationFuture();
                        SnapshotCreateFuture createFut = (SnapshotCreateFuture)opFut.internalFuture();
                        IgniteDbSnapshotConsequentCheckpointsTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> (Boolean)GridTestUtils.getFieldValue((Object)createFut, (String[])new String[]{"snapshotInProgress"}), (long)10000L, (long)10L));
                        snapshotInProgressSetFut.onDone((Object)null);
                    }
                    catch (Throwable e) {
                        snapshotInProgressSetFut.onDone(e);
                        return;
                    }
                    exec.unpause();
                }).start();
            }
        };
        db.addCheckpointListener(secondCpListener);
        db.forceCheckpoint("test-cp-2");
        snapshotInProgressSetFut.get(10000L);
        snapshotFuture.get(10000L);
        db.removeCheckpointListener(secondCpListener);
        SnapshotOperationInfo snapshotOperationInfo = snapshotFuture.snapshotOperation();
        List snapshotIssues = (List)gg.snapshot().check(new CheckSnapshotParams().snapshotId(snapshotOperationInfo.snapshotId()).operationType(SnapshotOperationType.CHECK).cacheNames(Collections.singleton("default"))).get(10000L);
        IgniteDbSnapshotConsequentCheckpointsTest.assertEquals((int)0, (int)snapshotIssues.size());
        long snapshotId = snapshotOperationInfo.snapshotId();
        this.stopAllGrids();
        this.cleanPersistenceDir();
        ignite = this.startGrid(0);
        ignite.cluster().state(ClusterState.ACTIVE);
        GridGain gg2 = (GridGain)ignite.plugin("GridGain");
        gg2.snapshot().restore(new RestoreSnapshotParams().snapshotId(snapshotId)).get(10000L);
        IgniteCache cache = ignite.getOrCreateCache("default");
        for (int i = 0; i < newEntriesCnt; ++i) {
            int expected = i * multiply + shift;
            Integer actual = (Integer)cache.get((Object)i);
            IgniteDbSnapshotConsequentCheckpointsTest.assertNotNull((Object)actual);
            IgniteDbSnapshotConsequentCheckpointsTest.assertEquals((int)expected, (int)actual);
        }
    }

    private static void loadWithInts(Ignite ig, int shift, int multiply, int entriesCnt) {
        try (IgniteDataStreamer ldr = ig.dataStreamer("default");){
            ldr.allowOverwrite(true);
            HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
            for (int i = 0; i < entriesCnt; ++i) {
                map.put(i, i * multiply + shift);
            }
            ldr.addData(map);
        }
    }

    static class PausableStripedExecutor
    extends StripedExecutor {
        private final AtomicBoolean paused = new AtomicBoolean();
        private final Deque<TaskHolder> tasks = new ConcurrentLinkedDeque<TaskHolder>();

        public PausableStripedExecutor(int cnt, String igniteInstanceName, String poolName, IgniteLogger log, IgniteInClosure<Throwable> errHnd, GridWorkerListener gridWorkerLsnr, long failureDetectionTimeout) {
            super(cnt, igniteInstanceName, poolName, log, errHnd, gridWorkerLsnr, failureDetectionTimeout);
        }

        void pause() {
            if (!this.paused.compareAndSet(false, true)) {
                throw new IllegalStateException("Expected to be unpaused");
            }
        }

        void unpause() {
            if (!this.paused.compareAndSet(true, false)) {
                throw new IllegalStateException("Expected to be paused");
            }
            while (!this.tasks.isEmpty()) {
                TaskHolder remove = this.tasks.remove();
                if (remove == null || !remove.take()) continue;
                super.execute(remove.idx, remove.cmd);
            }
        }

        public void execute(int idx, Runnable cmd) {
            if (this.paused.get()) {
                TaskHolder task = new TaskHolder(idx, cmd);
                this.tasks.add(task);
                if (!this.paused.get() && task.take()) {
                    this.tasks.remove(task);
                    super.execute(idx, cmd);
                }
                return;
            }
            super.execute(idx, cmd);
        }

        private static class TaskHolder {
            private final int idx;
            private final Runnable cmd;
            private final AtomicBoolean taken = new AtomicBoolean();

            private TaskHolder(int idx, Runnable cmd) {
                this.idx = idx;
                this.cmd = cmd;
            }

            boolean take() {
                return this.taken.compareAndSet(false, true);
            }
        }
    }
}

