/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database.txdr;

import java.io.File;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteState;
import org.apache.ignite.Ignition;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.testframework.GridTestUtils;
import org.gridgain.grid.internal.processors.cache.database.txdr.AbstractReplicationTest;
import org.gridgain.grid.internal.processors.cache.database.txdr.ConsistentCutStore;
import org.gridgain.grid.internal.txdr.ClusterRole;
import org.gridgain.grid.internal.txdr.ReplicationState;
import org.gridgain.grid.internal.txdr.TransactionalDrConfiguration;
import org.junit.Assert;
import org.junit.Test;

public class TxDrStopAndRecoveryTest
extends AbstractReplicationTest {
    private AtomicLong lastAppliedCut = new AtomicLong();

    @Override
    protected TransactionalDrConfiguration getTxDrConfiguration() throws Exception {
        return super.getTxDrConfiguration().setConsistentCutInterval(Long.MAX_VALUE);
    }

    @Test
    public void testNoDeadNodes() throws Exception {
        this.checkStopWithRecovery(Collections.emptyList(), Collections.emptyList());
    }

    @Test
    public void testDeadDeadOnMaster1AndOnReplica2() throws Exception {
        this.checkStopWithRecovery(Arrays.asList(1), Arrays.asList(2));
    }

    @Test
    public void testDeadOnMasterNoneAndOnReplica1_3() throws Exception {
        this.checkStopWithRecovery(Collections.emptyList(), Arrays.asList(1, 3));
    }

    @Test
    public void testDeadOnMaster1_3AndOnReplicaNone() throws Exception {
        this.checkStopWithRecovery(Arrays.asList(1, 3), Collections.emptyList());
    }

    @Test
    public void testDeadOnMaster1_3AndOnReplica3() throws Exception {
        this.checkStopWithRecovery(Arrays.asList(1, 3), Arrays.asList(3));
    }

    public void checkStopWithRecovery(Collection<Integer> deadMasterNodesIdx, Collection<Integer> deadReplicaNodesIdx) throws Exception {
        this.nodesCnt = 4;
        this.backupsCnt = 2;
        List<IgniteEx> masterCluster = this.startCluster(ClusterRole.MASTER);
        List<IgniteEx> replicaCluster = this.startCluster(ClusterRole.REPLICA);
        IgniteEx master0 = masterCluster.get(0);
        IgniteEx replica0 = replicaCluster.get(0);
        master0.cluster().active(true);
        replica0.cluster().active(true);
        long txTotal = this.populateData((Ignite)this.node(ClusterRole.MASTER), "txCache");
        this.populateData((Ignite)this.node(ClusterRole.MASTER), "atomicCache");
        long bootstrapSesId = this.bootstrapMaster();
        this.assertClusterState(masterCluster, ClusterRole.MASTER, ReplicationState.RUNNING, bootstrapSesId);
        log.info(">>> Master cluster bootstrapped successfully, sessionId=" + bootstrapSesId);
        this.txdr((Ignite)replica0).bootstrap(this.snapshotFolder(), bootstrapSesId).get();
        this.txdr((Ignite)replicaCluster.get(0)).consistentCutWatcher().addAppliedCutsListener((IgniteInClosure & Serializable)cutId -> this.lastAppliedCut.set((long)cutId));
        this.assertClusterState(replicaCluster, ClusterRole.REPLICA, ReplicationState.RUNNING, bootstrapSesId);
        log.info(">>> Replica cluster bootstrapped successfully, sessionId=" + bootstrapSesId);
        IgniteCache replicaTxCache = replica0.cache("txCache");
        IgniteCache replicaAtomicCache = replica0.cache("atomicCache");
        TxDrStopAndRecoveryTest.assertEquals((long)txTotal, (long)this.sumOf((IgniteCache<Integer, Long>)replicaTxCache));
        IgniteInternalFuture txLoadFut = this.startTxLoad(3, ClusterRole.MASTER);
        IgniteInternalFuture txAtomicFut = this.startAtomicLoad(3, ClusterRole.MASTER);
        if (!deadMasterNodesIdx.isEmpty()) {
            this.stopClusterNodes(ClusterRole.MASTER, deadMasterNodesIdx);
        }
        if (!deadMasterNodesIdx.isEmpty()) {
            this.stopClusterNodes(ClusterRole.REPLICA, deadReplicaNodesIdx);
        }
        long cutId2 = this.forceConsistentCut((Ignite)master0);
        long cutTime = System.currentTimeMillis();
        this.waitForApplyingCut(cutId2, 10000L);
        HashSet<Integer> expDeadOnMaster = new HashSet<Integer>(deadMasterNodesIdx);
        expDeadOnMaster.removeAll(deadReplicaNodesIdx);
        this.assertLaggingBehindNodes(expDeadOnMaster);
        Map<Integer, Long> dumpReplicaTx = this.dumpCache((IgniteCache<Integer, Long>)replicaTxCache);
        Map<Integer, Long> dumpReplicaAtomic = this.dumpCache((IgniteCache<Integer, Long>)replicaAtomicCache);
        ConsistentCutStore masterCutStore = this.txdr((Ignite)master0).consistentCutStore();
        long lastWalSegmentIdx = ((FileWALPointer)masterCutStore.restore(cutId2).cutPtr()).index();
        TxDrStopAndRecoveryTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> this.isWalSegsPresentInTransferFolder(bootstrapSesId, lastWalSegmentIdx), (long)120000L));
        this.stopTxLoad(txLoadFut);
        this.stopAtomicLoad(txAtomicFut);
        Map<Integer, Long> dumpReplicaTx2 = this.dumpCache((IgniteCache<Integer, Long>)replicaTxCache);
        Map<Integer, Long> dumpReplicaAtomic2 = this.dumpCache((IgniteCache<Integer, Long>)replicaAtomicCache);
        TxDrStopAndRecoveryTest.assertEquals(dumpReplicaTx, dumpReplicaTx2);
        TxDrStopAndRecoveryTest.assertEquals(dumpReplicaAtomic, dumpReplicaAtomic2);
        Long time = (Long)this.txdr(ClusterRole.REPLICA).stopAndRecover().get();
        this.assertClusterState(replicaCluster, ClusterRole.DISABLED, ReplicationState.STOPPED, 0L);
        this.awaitPmeOnReplica();
        TxDrStopAndRecoveryTest.assertTrue((boolean)this.idleVerifyReplica(replica0));
        dumpReplicaTx2 = this.dumpCache((IgniteCache<Integer, Long>)replicaTxCache);
        dumpReplicaAtomic2 = this.dumpCache((IgniteCache<Integer, Long>)replicaAtomicCache);
        Assert.assertNotEquals(dumpReplicaAtomic, dumpReplicaAtomic2);
        Assert.assertNotEquals(dumpReplicaTx, dumpReplicaTx2);
        TxDrStopAndRecoveryTest.assertEquals((long)txTotal, (long)this.sumOf(dumpReplicaTx2));
        TxDrStopAndRecoveryTest.assertTrue((time > cutTime ? 1 : 0) != 0);
        log.info(">>> Replica cluster recovered successfully, recovered time = " + time);
    }

    protected void stopClusterNodes(ClusterRole role, Collection<Integer> idx) {
        for (Integer nodeIdx : idx) {
            this.stopClusterNode(role, nodeIdx);
        }
    }

    protected void stopClusterNode(ClusterRole role, Integer nodeIdx) {
        String instanceName = this.igniteInstanceNameWithRole(role, nodeIdx, AbstractReplicationTest.NodeType.BLT_NODE);
        List cluster = (List)this.clusterMap.get(role);
        boolean rmv = cluster.removeIf(ig -> ig.name().equals(instanceName));
        if (rmv) {
            this.stopGrid(instanceName, true, false);
        }
    }

    @Override
    protected IgniteInternalFuture startAtomicLoad(int threads, ClusterRole role, Ignite ignite) {
        atomicStop.set(false);
        return GridTestUtils.runMultiThreadedAsync(() -> {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();
            while (!atomicStop.get()) {
                IgniteCache cache;
                int nodeIdx = rnd.nextInt(this.nodesCnt);
                try {
                    Ignite ig = ignite == null ? this.grid(this.igniteInstanceNameWithRole(role, nodeIdx, AbstractReplicationTest.NodeType.BLT_NODE)) : ignite;
                    cache = ig.cache("atomicCache");
                }
                catch (IllegalStateException e) {
                    continue;
                }
                int cnt = rnd.nextInt(5);
                for (int i = 0; i < cnt; ++i) {
                    try {
                        cache.put((Object)rnd.nextInt(50), (Object)rnd.nextLong(100L, 1001L));
                        continue;
                    }
                    catch (Throwable e) {
                        log.error("Unexpected error", e);
                    }
                }
            }
        }, (int)threads, (String)"atomic-load-thread");
    }

    private boolean isWalSegsPresentInTransferFolder(long snapshotId, long lastWalSegmentIdx) {
        List nodes = (List)this.clusterMap.get(ClusterRole.MASTER);
        try {
            String snapshotPath = this.transferFolder().getAbsolutePath() + "/" + snapshotId + '/';
            for (IgniteEx ignite : nodes) {
                Path walFolderPath = new File(snapshotPath, ignite.localNode().consistentId() + "/wal/" + this.txdr((Ignite)ignite).spawnId()).toPath();
                boolean present = Files.list(walFolderPath).map(f -> {
                    String filename = f.getFileName().toString();
                    return Long.parseLong(filename.split("\\.")[0]);
                }).anyMatch(idx -> idx > lastWalSegmentIdx);
                if (present) continue;
                return false;
            }
            return true;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void waitForApplyingCut(long cutId, long timeout) throws IgniteInterruptedCheckedException {
        TxDrStopAndRecoveryTest.assertTrue((String)("Failed to wait for applying cut globally [cutId=" + cutId + ']'), (boolean)GridTestUtils.waitForCondition(() -> {
            if (this.lastAppliedCut.get() < cutId) {
                try {
                    this.awakeCutsWatcher((List)this.clusterMap.get(ClusterRole.REPLICA));
                }
                catch (Exception e) {
                    log.error("Failed to awake watcher", (Throwable)e);
                }
                return false;
            }
            return true;
        }, (long)timeout));
    }

    private void assertLaggingBehindNodes(Set<Integer> expLaggingNodes) {
        HashSet<Integer> actualLaggingNodes = new HashSet<Integer>();
        for (int nodeIdx = 0; nodeIdx < this.nodesCnt; ++nodeIdx) {
            String instanceName = this.igniteInstanceNameWithRole(ClusterRole.REPLICA, nodeIdx, AbstractReplicationTest.NodeType.BLT_NODE);
            if (Ignition.state((String)instanceName) != IgniteState.STARTED || !this.txdr(Ignition.ignite((String)instanceName)).localState().laggingBehind()) continue;
            actualLaggingNodes.add(nodeIdx);
        }
        TxDrStopAndRecoveryTest.assertEquals(expLaggingNodes, actualLaggingNodes);
    }

    private void awaitPmeOnReplica() throws InterruptedException {
        this.awaitPartitionMapExchange(false, false, ((IgniteEx)((List)this.clusterMap.get(ClusterRole.REPLICA)).get(0)).context().discovery().aliveServerNodes());
    }
}

