/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database;

import java.io.Serializable;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.gridgain.grid.GridGain;
import org.gridgain.grid.internal.processors.cache.database.AbstractSnapshotTest;
import org.gridgain.grid.internal.processors.cache.database.SnapshotOperationStage;
import org.gridgain.grid.internal.processors.cache.database.messages.ClusterWideSnapshotOperationStageFinishedMessage;
import org.gridgain.grid.internal.processors.cache.database.messages.SnapshotOperationStageFinishedMessage;
import org.gridgain.grid.internal.processors.cache.database.messages.StartSnapshotOperationAckDiscoveryMessage;
import org.gridgain.grid.internal.processors.cache.database.snapshot.GridCacheSnapshotManager;
import org.gridgain.grid.internal.processors.cache.database.snapshot.SnapshotOperationFuture;
import org.gridgain.grid.internal.processors.cache.database.txdr.ConsistentCut;
import org.gridgain.grid.persistentstore.SnapshotFuture;
import org.gridgain.grid.persistentstore.SnapshotOperationType;
import org.gridgain.grid.persistentstore.SnapshotStatus;
import org.junit.Test;

public class IgniteDbSnapshotAtomicUpdatesReorderingTest
extends AbstractSnapshotTest {
    private static final int EXPECTED_CACHES_COUNT = 15;
    private static final String TEST_CACHE_NAME = "test-atomic-cache";
    private static final int NODES_COUNT = 2;
    protected static IgniteEx ignite;
    protected static IgniteEx ignite2;
    protected static IgniteEx dummyNode;
    protected static GridGain gg;

    @Override
    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);
        cfg.setCommunicationSpi((CommunicationSpi)new TestRecordingCommunicationSpi());
        return cfg;
    }

    @Override
    protected int getBackupCount() {
        return 2;
    }

    protected void beforeTest() throws Exception {
        IgniteDbSnapshotAtomicUpdatesReorderingTest.assertTrue((boolean)ignite.cluster().active());
        ignite.getOrCreateCache(new CacheConfiguration(TEST_CACHE_NAME).setAtomicityMode(CacheAtomicityMode.ATOMIC).setCacheMode(CacheMode.PARTITIONED).setBackups(1));
        IgniteDbSnapshotAtomicUpdatesReorderingTest.loadWithIntsAsync((Ignite)ignite, TEST_CACHE_NAME, 0, 1).get();
    }

    @Override
    protected void afterTest() throws Exception {
        SnapshotStatus op = gg.snapshot().ongoingSnapshotOperation();
        if (op != null) {
            gg.snapshot().cancelSnapshotOperation(op.operationId(), "afterTest").get();
        }
        this.clearSnapshotNodeDir(this.snapshotFolders());
        IgniteDbSnapshotAtomicUpdatesReorderingTest.assertEquals((String)ignite.cacheNames().toString(), (int)15, (int)ignite.cacheNames().size());
    }

    protected void beforeTestsStarted() throws Exception {
        this.stopAllGrids();
        this.cleanSnapshotDirs();
        ignite = this.startGrid(0);
        dummyNode = this.startGrid("dummy");
        ignite2 = this.startGrid(1);
        dummyNode.cluster().active(true);
        gg = (GridGain)ignite.plugin("GridGain");
        this.load((Ignite)ignite);
        this.startGrid("client");
    }

    @Test
    public void testPartitionCountersForReorderedAtomicUpdates() throws Exception {
        IgniteEx crd = ignite;
        IgniteEx nonCrd = ignite2;
        ConsistentCutTestListener cutLsnr = new ConsistentCutTestListener();
        ((GridCacheSnapshotManager)nonCrd.context().cache().context().snapshot()).registerConsistentCutStoreListener((IgniteInClosure)cutLsnr);
        final AtomicReference cutOpId = new AtomicReference();
        nonCrd.context().discovery().setCustomEventListener(StartSnapshotOperationAckDiscoveryMessage.class, (CustomEventListener)new CustomEventListener<StartSnapshotOperationAckDiscoveryMessage>(){

            public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StartSnapshotOperationAckDiscoveryMessage msg) {
                if (msg.snapshotOperation().type() == SnapshotOperationType.CONSISTENT_CUT) {
                    cutOpId.compareAndSet(null, msg.operationId());
                }
            }
        });
        TestRecordingCommunicationSpi nonCrdSpi = TestRecordingCommunicationSpi.spi((Ignite)nonCrd);
        nonCrdSpi.blockMessages((IgniteBiPredicate & Serializable)(node, msg) -> {
            SnapshotOperationStageFinishedMessage stageFinishedMsg;
            if (msg instanceof SnapshotOperationStageFinishedMessage && (stageFinishedMsg = (SnapshotOperationStageFinishedMessage)msg).stage() == SnapshotOperationStage.FIRST) {
                GridCacheSnapshotManager snapMgr = (GridCacheSnapshotManager)nonCrd.context().cache().context().snapshot();
                SnapshotOperationFuture snapFut = (SnapshotOperationFuture)((IgniteFutureImpl)snapMgr.getOngoingOperationFuture()).internalFuture();
                IgniteUuid cutId = (IgniteUuid)cutOpId.get();
                return snapFut.type() == SnapshotOperationType.CREATE && cutId != null && !snapFut.id().equals((Object)cutId);
            }
            return false;
        });
        int[] primaryParts = crd.affinity(TEST_CACHE_NAME).primaryPartitions(crd.localNode());
        int testKey = primaryParts[0];
        IgniteDbSnapshotAtomicUpdatesReorderingTest.assertTrue((boolean)crd.affinity(TEST_CACHE_NAME).isPrimary(crd.localNode(), (Object)testKey));
        SnapshotFuture fut = gg.snapshot().createFullSnapshot(null, "test-exchangeless-snapshot");
        nonCrdSpi.waitForBlocked();
        crd.cache(TEST_CACHE_NAME).put((Object)testKey, (Object)12);
        long partCntr = crd.context().cache().context().cacheContext(CU.cacheId((String)TEST_CACHE_NAME)).dht().topology().localPartition(primaryParts[0]).updateCounter();
        AtomicBoolean firstUpdate = new AtomicBoolean(true);
        TestRecordingCommunicationSpi crdSpi = TestRecordingCommunicationSpi.spi((Ignite)crd);
        crdSpi.blockMessages((IgniteBiPredicate & Serializable)(node, msg) -> {
            if (msg instanceof GridDhtAtomicSingleUpdateRequest && firstUpdate.compareAndSet(true, false)) {
                return true;
            }
            if (msg instanceof ClusterWideSnapshotOperationStageFinishedMessage) {
                ClusterWideSnapshotOperationStageFinishedMessage m = (ClusterWideSnapshotOperationStageFinishedMessage)msg;
                return node.id().equals(nonCrd.context().localNodeId()) && m.operationId().equals(cutOpId.get()) && m.stage() == SnapshotOperationStage.FIRST;
            }
            return false;
        });
        crd.cache(TEST_CACHE_NAME).put((Object)testKey, (Object)14);
        crdSpi.waitForBlocked();
        final CountDownLatch secondStageFinishedLatch = new CountDownLatch(1);
        ignite.context().cache().context().gridIO().addMessageListener(GridTopic.TOPIC_SNAPSHOT, new GridMessageListener(){

            public void onMessage(UUID nodeId, Object msg, byte plc) {
                SnapshotOperationStageFinishedMessage finishMsg;
                boolean sndToCrd = ignite.context().discovery().localNode().id().equals(nodeId);
                if (sndToCrd && msg instanceof SnapshotOperationStageFinishedMessage && (finishMsg = (SnapshotOperationStageFinishedMessage)msg).operationId().equals(cutOpId.get()) && finishMsg.stage() == SnapshotOperationStage.SECOND) {
                    secondStageFinishedLatch.countDown();
                }
            }
        });
        nonCrdSpi.stopBlock();
        secondStageFinishedLatch.await(30L, TimeUnit.SECONDS);
        crd.cache(TEST_CACHE_NAME).put((Object)testKey, (Object)16);
        IgniteDbSnapshotAtomicUpdatesReorderingTest.assertTrue((String)"Could not wait for an update.", (boolean)GridTestUtils.waitForCondition(() -> Integer.valueOf(16).equals(nonCrd.cache(TEST_CACHE_NAME).localPeek((Object)testKey, new CachePeekMode[]{CachePeekMode.BACKUP})), (long)30000L));
        crdSpi.stopBlock(true, (IgnitePredicate & Serializable)blockedMsgDescriptor -> {
            GridIoMessage blockedMsg = blockedMsgDescriptor.ioMessage();
            return blockedMsg.message() instanceof ClusterWideSnapshotOperationStageFinishedMessage;
        }, false, true);
        IgniteDbSnapshotAtomicUpdatesReorderingTest.assertTrue((String)"Cannot find a new consistent cut corresponding to exchangeless snapshot.", (boolean)GridTestUtils.waitForCondition(() -> cutLsnr.consistentCut() != null, (long)30000L));
        crdSpi.stopBlock();
        fut.get(30L, TimeUnit.SECONDS);
        IgniteDbSnapshotAtomicUpdatesReorderingTest.assertEquals((String)"Both updates should not be included in the snapshot.", (long)partCntr, (long)((Long)((Map)cutLsnr.consistentCut().atomicUpdateCounters().get(CU.cacheId((String)TEST_CACHE_NAME))).get(primaryParts[0])));
    }

    private static class ConsistentCutTestListener
    implements IgniteInClosure<ConsistentCut> {
        private volatile ConsistentCut cut;

        private ConsistentCutTestListener() {
        }

        public void apply(ConsistentCut cut) {
            this.cut = cut;
        }

        public ConsistentCut consistentCut() {
            return this.cut;
        }
    }
}

