/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database.txdr;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.CopyOption;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import junit.framework.AssertionFailedError;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.WalSegmentArchivedEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.testframework.GridTestUtils;
import org.gridgain.grid.GridGain;
import org.gridgain.grid.internal.processors.cache.database.snapshot.GridCacheSnapshotManager;
import org.gridgain.grid.internal.processors.cache.database.txdr.AbstractReplicationTest;
import org.gridgain.grid.internal.processors.cache.database.txdr.ConsistentCut;
import org.gridgain.grid.internal.processors.cache.database.txdr.TransactionalDrProcessorImpl;
import org.gridgain.grid.internal.txdr.ClusterRole;
import org.gridgain.grid.persistentstore.GridSnapshot;
import org.gridgain.grid.persistentstore.SnapshotFuture;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;

public class ConsistentCutApplyingTest
extends AbstractReplicationTest {
    private static final long CONSISTENT_CUT_INTERVAL = 1000L;
    private static final int CONSISTENT_CUT_CNT = 7;
    private static final int CONSISTENT_CUT_ORDER_NUMBER_NOT_FOUND = -1;

    protected long getTestTimeout() {
        return 600000L;
    }

    @Test
    public void testApplyConsistentCutsSequentially() throws Exception {
        this.doTestConsistentCuts(1);
    }

    @Test
    public void testApplyConsistentCutsFromLatest() throws Exception {
        this.doTestConsistentCuts(8);
    }

    @Test
    public void testApplyConsistentCutsSequentiallyWithGap1() throws Exception {
        this.doTestConsistentCuts(2);
    }

    @Test
    public void testApplyConsistentCutsSequentiallyWithGap2() throws Exception {
        this.doTestConsistentCuts(3);
    }

    private void doTestConsistentCuts(int skipCuts) throws Exception {
        try {
            List<IgniteEx> cluster = this.startCluster(ClusterRole.DISABLED);
            IgniteEx ignite = this.ignite(0);
            ConsistentCutApplyingTest.replaceTransactionalProcessor(cluster);
            IgniteCache txCache = ignite.cache("txCache");
            IgniteCache atomicCache = ignite.cache("atomicCache");
            Map<Object, Long> archivedWalSegments = this.listenWALSegmentArchivedEvent();
            long txTotal = this.populateData((Ignite)ignite, "txCache");
            this.populateData((Ignite)ignite, "atomicCache");
            long snapshotId = this.createFullSnapshot((Ignite)ignite);
            IgniteInternalFuture txLoadFut = this.startTxLoad(4, ClusterRole.DISABLED);
            IgniteInternalFuture atomicLoadFut = this.startAtomicLoad(4, ClusterRole.DISABLED);
            List<Long> consistentCutIds = this.createConsistentCuts((Ignite)ignite);
            this.stopTxLoad(txLoadFut);
            this.stopAtomicLoad(atomicLoadFut);
            consistentCutIds.add(this.forceConsistentCut((Ignite)ignite));
            Map<Integer, Long> txCacheDump = this.dumpCache((IgniteCache<Integer, Long>)txCache);
            Map<Integer, Long> atomicCacheDump = this.dumpCache((IgniteCache<Integer, Long>)atomicCache);
            ConsistentCutApplyingTest.assertEquals((long)txTotal, (long)this.sumOf((IgniteCache<Integer, Long>)txCache));
            ConsistentCutApplyingTest.assertEquals((long)txTotal, (long)this.sumOf(txCacheDump));
            List<Long> cutIds = this.listCutIds();
            log.info("Consistent cut IDs: " + cutIds);
            ConsistentCutApplyingTest.assertEqualsCollections(consistentCutIds, cutIds);
            this.waitForAllWALSegmentsArchived(archivedWalSegments, cutIds.get(cutIds.size() - 1));
            this.copyWalArchive();
            this.restoreSnapshot((Ignite)ignite, snapshotId);
            ConsistentCutApplyingTest.assertEquals((String)"Sum of transactional cache values doesn't match after restore snapshot", (long)txTotal, (long)this.sumOf((IgniteCache<Integer, Long>)txCache));
            this.applyConsistentCuts(cutIds, skipCuts, (IgniteCache<Integer, Long>)txCache, txTotal);
            Map<Integer, Long> actualTxCacheDump = this.dumpCache((IgniteCache<Integer, Long>)txCache);
            Map<Integer, Long> actualAtomicCacheDump = this.dumpCache((IgniteCache<Integer, Long>)atomicCache);
            ConsistentCutApplyingTest.assertEquals((String)"Transactional cache data isn't consistent", txCacheDump, actualTxCacheDump);
            ConsistentCutApplyingTest.assertEquals((String)"Atomic cache data isn't consistent", atomicCacheDump, actualAtomicCacheDump);
        }
        catch (AssertionFailedError e) {
            AssertionFailedError err = e;
            try {
                this.validateWalArchive();
            }
            catch (AssertionFailedError e1) {
                e1.initCause((Throwable)e);
                err = e1;
            }
            throw err;
        }
    }

    private void applyConsistentCuts(List<Long> cutIds, int skipCuts, IgniteCache<Integer, Long> txCache, long txTotal) throws IgniteCheckedException {
        long cutId;
        long lastAppliedCutId = 0L;
        int i = Math.min(cutIds.size(), skipCuts) - 1;
        while (i < cutIds.size() && (cutId = cutIds.get(i).longValue()) != lastAppliedCutId) {
            log.info("Applying consistent cut started: [id=" + cutId + ", lastAppliedCutId=" + lastAppliedCutId + ']');
            this.applyConsistentCut(cutId, lastAppliedCutId).get();
            ConsistentCutApplyingTest.assertTrue((boolean)this.idleVerifyReplica(this.ignite(0)));
            ConsistentCutApplyingTest.assertEquals((String)("Sum of transactional cache values doesn't match after consistent cut was applied: [id=" + cutId + ']'), (long)txTotal, (long)this.sumOf(txCache));
            log.info("Applying consistent cut finished: [id=" + cutId + ", lastAppliedCutId=" + lastAppliedCutId + ']');
            lastAppliedCutId = cutId;
            i = Math.min(i + skipCuts, cutIds.size() - 1);
        }
    }

    private long createFullSnapshot(Ignite ignite) {
        GridSnapshot mgr = ((GridGain)ignite.plugin("GridGain")).snapshot();
        SnapshotFuture fut = mgr.createFullSnapshot(Collections.singleton("txCache"), "Initial snapshot.");
        fut.get();
        return fut.snapshotOperation().snapshotId();
    }

    private void restoreSnapshot(Ignite ignite, long snapshotId) {
        GridSnapshot mgr = ((GridGain)ignite.plugin("GridGain")).snapshot();
        mgr.restoreSnapshot(snapshotId, null, "Restoring from initial snapshot").get();
    }

    private void waitForAllWALSegmentsArchived(Map<Object, Long> archivedWalSegments, long cutId) throws IgniteCheckedException {
        HashMap<Serializable, Long> segs = new HashMap<Serializable, Long>(this.nodesCnt);
        for (int i = 0; i < this.nodesCnt; ++i) {
            IgniteEx ignite = this.ignite(i);
            Serializable consistentId = ignite.configuration().getConsistentId();
            ConsistentCut cut = this.loadConsistentCut(cutId, (Ignite)ignite);
            segs.put(consistentId, ((FileWALPointer)cut.cutPtr()).index());
        }
        GridTestUtils.waitForCondition(() -> archivedWalSegments.equals(segs), (long)60000L);
    }

    @NotNull
    private Map<Object, Long> listenWALSegmentArchivedEvent() {
        ConcurrentHashMap<Object, Long> archivedWalSegments = new ConcurrentHashMap<Object, Long>();
        for (int i = 0; i < this.nodesCnt; ++i) {
            IgniteEx ignite = this.ignite(i);
            ignite.events().localListen(arg_0 -> ConsistentCutApplyingTest.lambda$listenWALSegmentArchivedEvent$fc51f1ce$1((Ignite)ignite, archivedWalSegments, arg_0), new int[]{128});
        }
        return archivedWalSegments;
    }

    private List<Long> createConsistentCuts(Ignite ignite) throws IgniteCheckedException {
        IgniteInternalFuture consistentCutFut = GridTestUtils.runAsync(() -> {
            ArrayList<Long> ids = new ArrayList<Long>(7);
            for (int i = 0; i < 7; ++i) {
                U.sleep((long)1000L);
                ids.add(this.forceConsistentCut(ignite));
            }
            return ids;
        });
        U.sleep((long)1000L);
        return (List)consistentCutFut.get();
    }

    private IgniteInternalFuture applyConsistentCut(long consistentCutId, long lastAppliedId) {
        AtomicInteger cnt = new AtomicInteger(0);
        return GridTestUtils.runMultiThreadedAsync(() -> {
            try {
                int idx = cnt.getAndIncrement();
                IgniteEx ignite = this.ignite(idx);
                GridCacheSnapshotManager snapMgr = this.snapMgr((Ignite)ignite);
                snapMgr.applyConsistentCut(consistentCutId, lastAppliedId, false, null);
            }
            catch (IgniteCheckedException e) {
                log.error("Unexpected error", (Throwable)e);
            }
        }, (int)this.nodesCnt, (String)"test-consistent-cut-applier");
    }

    private List<Long> listCutIds() throws IgniteCheckedException {
        IgniteEx ignite = this.ignite(0);
        GridKernalContext ctx = ignite.context();
        return ((TransactionalDrProcessorImpl)ctx.txDr()).consistentCutStore().list();
    }

    private void copyWalArchive() throws Exception {
        final AtomicInteger cnt = new AtomicInteger();
        this.multithreadedAsync(new Runnable(){

            @Override
            public void run() {
                try {
                    IgniteEx ignite = ConsistentCutApplyingTest.this.ignite(cnt.getAndIncrement());
                    String consistentId = ignite.configuration().getConsistentId().toString().replace('.', '_');
                    final Path srcDir = U.resolveWorkDirectory((String)U.defaultWorkDirectory(), (String)("db/wal/archive/" + consistentId), (boolean)false).toPath();
                    final Path dstDir = ConsistentCutApplyingTest.this.walDir((Ignite)ignite).toPath();
                    Files.walkFileTree(srcDir, (FileVisitor<? super Path>)new SimpleFileVisitor<Path>(){

                        @Override
                        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                            String fileName = file.getFileName().toString();
                            if (fileName.endsWith(".wal") || fileName.endsWith(".wal.zip")) {
                                Files.copy(file, dstDir.resolve(srcDir.relativize(file)), new CopyOption[0]);
                            }
                            return FileVisitResult.CONTINUE;
                        }

                        @Override
                        public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
                            if (exc instanceof NoSuchFileException) {
                                return FileVisitResult.CONTINUE;
                            }
                            return super.visitFileFailed(file, exc);
                        }
                    });
                }
                catch (Exception e) {
                    ConsistentCutApplyingTest.fail((String)X.getFullStackTrace((Throwable)e));
                }
            }
        }, this.nodesCnt, "copy-thread").get();
    }

    private WALIterator walIterator(Ignite ignite) throws IgniteCheckedException {
        GridKernalContext ctx = ((IgniteEx)ignite).context();
        String workDir = U.defaultWorkDirectory();
        String subFolder = ctx.pdsFolderResolver().resolveFolders().folderName();
        File walDir = this.walDir(ignite);
        DataStorageConfiguration dsCfg = ignite.configuration().getDataStorageConfiguration();
        IgniteWalIteratorFactory iterFactory = new IgniteWalIteratorFactory(log);
        IgniteWalIteratorFactory.IteratorParametersBuilder params = new IgniteWalIteratorFactory.IteratorParametersBuilder();
        params.binaryMetadataFileStoreDir(new File(U.resolveWorkDirectory((String)workDir, (String)"db/binary_meta", (boolean)false), subFolder)).marshallerMappingFileStoreDir(U.resolveWorkDirectory((String)workDir, (String)"db/marshaller", (boolean)false)).pageSize(dsCfg.getPageSize()).ioFactory(dsCfg.getFileIOFactory()).bufferSize(dsCfg.getWalRecordIteratorBufferSize());
        return iterFactory.iterator(params.copy().filesOrDirs(new File[]{walDir}));
    }

    private void validateWalArchive() throws Exception {
        this.validateConsistentCutBeforeDataRecordInWalSegment();
        Map<Object, Map<GridCacheVersion, NavigableSet<FileWALPointer>>> nodeTxs = this.getTxsByNodes();
        Map<Object, List<ConsistentCut>> nodeCuts = this.getConsistentCutsByNodes();
        Set<GridCacheVersion> allTx = nodeTxs.values().stream().flatMap(x -> x.keySet().stream()).collect(Collectors.toSet());
        this.validateCorrectSkipTxs(nodeTxs, nodeCuts, allTx);
    }

    private void validateConsistentCutBeforeDataRecordInWalSegment() throws IgniteCheckedException {
        for (int i = 0; i < this.nodesCnt; ++i) {
            IgniteEx ignite = this.ignite(i);
            try (WALIterator it = this.walIterator((Ignite)ignite);){
                long prevSegmentIdx = Long.MIN_VALUE;
                boolean consistentCutRecordOccurs = false;
                for (IgniteBiTuple next : it) {
                    FileWALPointer ptr = (FileWALPointer)next.get1();
                    WALRecord rec = (WALRecord)next.get2();
                    if (prevSegmentIdx != ptr.index()) {
                        prevSegmentIdx = ptr.index();
                        consistentCutRecordOccurs = false;
                    }
                    switch (rec.type()) {
                        case CONSISTENT_CUT: {
                            consistentCutRecordOccurs = true;
                            break;
                        }
                        case DATA_RECORD_V2: {
                            ConsistentCutApplyingTest.assertTrue((String)("Data records can't be before consistent cut record in wal segment!  Node: " + ignite.configuration().getConsistentId() + " on wal segment " + ptr), (boolean)consistentCutRecordOccurs);
                            break;
                        }
                    }
                }
                continue;
            }
        }
    }

    private void validateCorrectSkipTxs(Map<Object, Map<GridCacheVersion, NavigableSet<FileWALPointer>>> nodeTxs, Map<Object, List<ConsistentCut>> nodeCuts, Set<GridCacheVersion> allTxs) {
        HashMap<GridCacheVersion, Integer> checkedTx = new HashMap<GridCacheVersion, Integer>();
        for (GridCacheVersion txId : allTxs) {
            Map<Integer, Set<Object>> groupedByConsistentCutOrderMap = this.groupByInteger(this.getTxConsistentCutOrderNumberPerNode(nodeTxs, nodeCuts, txId));
            ConsistentCutApplyingTest.assertTrue((String)("Tx must be in consistent state! txId: " + txId), (groupedByConsistentCutOrderMap.size() == 1 || groupedByConsistentCutOrderMap.size() == 2 ? 1 : 0) != 0);
            if (groupedByConsistentCutOrderMap.size() != 2) continue;
            ConsistentCutApplyingTest.assertTrue((String)("Tx must be in consistent state! txId: " + txId), (boolean)groupedByConsistentCutOrderMap.containsKey(-1));
            Integer consistentCutOrderNum = groupedByConsistentCutOrderMap.keySet().stream().filter(x -> x != -1).findAny().get();
            checkedTx.put(txId, consistentCutOrderNum);
        }
        for (GridCacheVersion tx : checkedTx.keySet()) {
            Set<Object> nodeIds = this.getAllNodesWithTx(nodeTxs, tx);
            for (Object nodeId : nodeIds) {
                Set skipTxs = nodeCuts.get(nodeId).get((Integer)checkedTx.get(tx)).skipTxs();
                ConsistentCutApplyingTest.assertTrue((String)("Transaction " + tx + " must be in skipTxs"), (boolean)skipTxs.contains(tx));
            }
        }
    }

    private Map<Object, Integer> getTxConsistentCutOrderNumberPerNode(Map<Object, Map<GridCacheVersion, NavigableSet<FileWALPointer>>> nodeTxs, Map<Object, List<ConsistentCut>> nodeCuts, GridCacheVersion tx) {
        Set<Object> nodes = this.getAllNodesWithTx(nodeTxs, tx);
        return nodes.stream().collect(Collectors.toMap(x -> x, x -> this.getConsistentCutOrderNumber((List)nodeCuts.get(x), (Iterable)((Map)nodeTxs.get(x)).get(tx))));
    }

    private Integer getConsistentCutOrderNumber(List<ConsistentCut> cuts, Iterable<FileWALPointer> ptrs) {
        for (int i = 0; i < cuts.size(); ++i) {
            ConsistentCut cut = cuts.get(i);
            for (FileWALPointer ptr : ptrs) {
                if (!this.isPointerBetweenFuzzyBorderAndCutPtr(cut, ptr)) continue;
                return i;
            }
        }
        return -1;
    }

    private Set<Object> getAllNodesWithTx(Map<Object, Map<GridCacheVersion, NavigableSet<FileWALPointer>>> nodeTxs, GridCacheVersion tx) {
        return nodeTxs.entrySet().stream().filter(e -> ((Map)e.getValue()).get(tx) != null).map(Map.Entry::getKey).collect(Collectors.toSet());
    }

    private boolean isPointerBetweenFuzzyBorderAndCutPtr(ConsistentCut cut, FileWALPointer ptr) {
        return ptr.compareTo((FileWALPointer)cut.fuzzyBorderStartPtr()) > 0 && ptr.compareTo((FileWALPointer)cut.cutPtr()) < 0;
    }

    private Map<Object, List<ConsistentCut>> getConsistentCutsByNodes() throws Exception {
        HashMap<Object, List<ConsistentCut>> nodeCuts = new HashMap<Object, List<ConsistentCut>>(this.nodesCnt);
        List<Long> cutIds = this.listCutIds();
        for (int i = 0; i < this.nodesCnt; ++i) {
            IgniteEx ignite = this.ignite(i);
            Serializable consistentId = ignite.configuration().getConsistentId();
            for (Long cutId : cutIds) {
                ConsistentCut consistentCut = this.loadConsistentCut(cutId, (Ignite)ignite);
                List cuts = nodeCuts.computeIfAbsent(consistentId, k -> new ArrayList());
                cuts.add(consistentCut);
            }
        }
        return nodeCuts;
    }

    private Map<Object, Map<GridCacheVersion, NavigableSet<FileWALPointer>>> getTxsByNodes() throws Exception {
        LinkedHashMap<Object, Map<GridCacheVersion, NavigableSet<FileWALPointer>>> nodeTxs = new LinkedHashMap<Object, Map<GridCacheVersion, NavigableSet<FileWALPointer>>>(this.nodesCnt);
        for (int i = 0; i < this.nodesCnt; ++i) {
            IgniteEx ignite = this.ignite(i);
            HashMap<GridCacheVersion, NavigableSet> dataRecords = new HashMap<GridCacheVersion, NavigableSet>();
            nodeTxs.put(ignite.configuration().getConsistentId(), dataRecords);
            try (WALIterator it = this.walIterator((Ignite)ignite);){
                block13: for (IgniteBiTuple next : it) {
                    FileWALPointer ptr = (FileWALPointer)next.get1();
                    WALRecord rec = (WALRecord)next.get2();
                    switch (rec.type()) {
                        case DATA_RECORD_V2: {
                            DataEntry e;
                            GridCacheVersion txId;
                            DataRecord dataRec = (DataRecord)rec;
                            Iterator iterator = dataRec.writeEntries().iterator();
                            if (!iterator.hasNext() || (txId = (e = (DataEntry)iterator.next()).nearXidVersion()) == null || e.op() != GridCacheOperation.UPDATE) continue block13;
                            Set ptrs = dataRecords.computeIfAbsent(txId, k -> new TreeSet());
                            ptrs.add(ptr);
                            break;
                        }
                    }
                }
                continue;
            }
        }
        return nodeTxs;
    }

    private Map<Integer, Set<Object>> groupByInteger(Map<Object, Integer> srcMap) {
        return srcMap.entrySet().stream().collect(Collectors.groupingBy(Map.Entry::getValue, Collectors.mapping(Map.Entry::getKey, Collectors.toSet())));
    }

    private static /* synthetic */ boolean lambda$listenWALSegmentArchivedEvent$fc51f1ce$1(Ignite ignite, Map archivedWalSegments, Event evt) {
        if (evt.type() == 128) {
            WalSegmentArchivedEvent walSegEvt = (WalSegmentArchivedEvent)evt;
            Serializable consistentId = ignite.configuration().getConsistentId();
            long idx = walSegEvt.getAbsWalSegmentIdx();
            archivedWalSegments.put(consistentId, idx);
        }
        return true;
    }
}

