/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.mvcc;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.junit.Test;

public class CacheMvccTxRecoveryTest
extends CacheMvccAbstractTest {
    protected CacheMode cacheMode() {
        throw new RuntimeException("Is not supposed to be used");
    }

    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);
        cfg.setCommunicationSpi((CommunicationSpi)new TestRecordingCommunicationSpi());
        return cfg;
    }

    @Test
    public void testRecoveryCommitNearFailure1() throws Exception {
        this.checkRecoveryNearFailure(TxEndResult.COMMIT, NodeMode.CLIENT);
    }

    @Test
    public void testRecoveryCommitNearFailure2() throws Exception {
        this.checkRecoveryNearFailure(TxEndResult.COMMIT, NodeMode.SERVER);
    }

    @Test
    public void testRecoveryRollbackNearFailure1() throws Exception {
        this.checkRecoveryNearFailure(TxEndResult.ROLLBAK, NodeMode.CLIENT);
    }

    @Test
    public void testRecoveryRollbackNearFailure2() throws Exception {
        this.checkRecoveryNearFailure(TxEndResult.ROLLBAK, NodeMode.SERVER);
    }

    @Test
    public void testRecoveryCommitPrimaryFailure1() throws Exception {
        this.checkRecoveryPrimaryFailure(TxEndResult.COMMIT, false);
    }

    @Test
    public void testRecoveryRollbackPrimaryFailure1() throws Exception {
        this.checkRecoveryPrimaryFailure(TxEndResult.ROLLBAK, false);
    }

    @Test
    public void testRecoveryCommitPrimaryFailure2() throws Exception {
        this.checkRecoveryPrimaryFailure(TxEndResult.COMMIT, true);
    }

    @Test
    public void testRecoveryRollbackPrimaryFailure2() throws Exception {
        this.checkRecoveryPrimaryFailure(TxEndResult.ROLLBAK, true);
    }

    private void checkRecoveryNearFailure(TxEndResult endRes, NodeMode nearNodeMode) throws Exception {
        int i2;
        int gridCnt = 4;
        int baseCnt = gridCnt - 1;
        boolean commit = endRes == TxEndResult.COMMIT;
        this.startGridsMultiThreaded(baseCnt);
        this.client = nearNodeMode == NodeMode.CLIENT;
        IgniteEx nearNode = this.startGrid(baseCnt);
        IgniteCache cache = nearNode.getOrCreateCache(CacheMvccTxRecoveryTest.basicCcfg().setBackups(1));
        Affinity aff = nearNode.affinity("default");
        ArrayList<Integer> keys = new ArrayList<Integer>();
        for (i2 = 0; i2 < 100; ++i2) {
            if (!aff.isPrimary(this.grid(0).localNode(), (Object)i2) || !aff.isBackup(this.grid(1).localNode(), (Object)i2)) continue;
            keys.add(i2);
            break;
        }
        for (i2 = 0; i2 < 100; ++i2) {
            if (!aff.isPrimary(this.grid(1).localNode(), (Object)i2) || !aff.isBackup(this.grid(2).localNode(), (Object)i2)) continue;
            keys.add(i2);
            break;
        }
        assert (keys.size() == 2);
        TestRecordingCommunicationSpi nearComm = (TestRecordingCommunicationSpi)nearNode.configuration().getCommunicationSpi();
        if (!commit) {
            nearComm.blockMessages(GridNearTxPrepareRequest.class, this.grid(1).name());
        }
        GridTestUtils.runAsync(() -> {
            GridNearTxLocal nearTx = ((TransactionProxyImpl)nearNode.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)).tx();
            for (Integer k : keys) {
                cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(new Object[]{k}));
            }
            List txs = IntStream.range(0, baseCnt).mapToObj(i -> CacheMvccTxRecoveryTest.txsOnNode(this.grid(i), nearTx.xidVersion())).flatMap(Collection::stream).collect(Collectors.toList());
            IgniteInternalFuture prepareFut = nearTx.prepareNearTxLocal();
            if (commit) {
                prepareFut.get();
            } else {
                CacheMvccTxRecoveryTest.assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == TransactionState.PREPARED));
            }
            nearNode.close();
            CacheMvccTxRecoveryTest.assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == (commit ? TransactionState.COMMITTED : TransactionState.ROLLED_BACK)));
            return null;
        }).get();
        if (commit) {
            CacheMvccTxRecoveryTest.assertConditionEventually(() -> {
                int rowsCnt = this.grid(0).cache("default").query(new SqlFieldsQuery("select * from Integer")).getAll().size();
                return rowsCnt == keys.size();
            });
        } else {
            int rowsCnt = ((Ignite)G.allGrids().get(0)).cache("default").query(new SqlFieldsQuery("select * from Integer")).getAll().size();
            CacheMvccTxRecoveryTest.assertEquals((int)0, (int)rowsCnt);
        }
        this.assertPartitionCountersAreConsistent(keys, this.grids(baseCnt, i -> true));
    }

    private void checkRecoveryPrimaryFailure(TxEndResult endRes, boolean mvccCrd) throws Exception {
        int victimBackup;
        int victim;
        int i2;
        int gridCnt = 4;
        int baseCnt = gridCnt - 1;
        boolean commit = endRes == TxEndResult.COMMIT;
        this.startGridsMultiThreaded(baseCnt);
        this.client = true;
        IgniteEx nearNode = this.startGrid(baseCnt);
        IgniteCache cache = nearNode.getOrCreateCache(CacheMvccTxRecoveryTest.basicCcfg().setBackups(1));
        Affinity aff = nearNode.affinity("default");
        ArrayList<Integer> keys = new ArrayList<Integer>();
        for (i2 = 0; i2 < 100; ++i2) {
            if (!aff.isPrimary(this.grid(0).localNode(), (Object)i2) || !aff.isBackup(this.grid(1).localNode(), (Object)i2)) continue;
            keys.add(i2);
            break;
        }
        for (i2 = 0; i2 < 100; ++i2) {
            if (!aff.isPrimary(this.grid(1).localNode(), (Object)i2) || !aff.isBackup(this.grid(2).localNode(), (Object)i2)) continue;
            keys.add(i2);
            break;
        }
        assert (keys.size() == 2);
        if (mvccCrd) {
            victim = 0;
            victimBackup = 1;
        } else {
            victim = 1;
            victimBackup = 2;
        }
        TestRecordingCommunicationSpi victimComm = (TestRecordingCommunicationSpi)this.grid(victim).configuration().getCommunicationSpi();
        if (commit) {
            victimComm.blockMessages(GridNearTxFinishResponse.class, nearNode.name());
        } else {
            victimComm.blockMessages(GridDhtTxPrepareRequest.class, this.grid(victimBackup).name());
        }
        GridNearTxLocal nearTx = ((TransactionProxyImpl)nearNode.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)).tx();
        for (Integer k : keys) {
            cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(new Object[]{k}));
        }
        List txs = IntStream.range(0, baseCnt).filter(i -> i != victim).mapToObj(i -> CacheMvccTxRecoveryTest.txsOnNode(this.grid(i), nearTx.xidVersion())).flatMap(Collection::stream).collect(Collectors.toList());
        IgniteInternalFuture commitFut = nearTx.commitAsync();
        if (commit) {
            CacheMvccTxRecoveryTest.assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == TransactionState.COMMITTED));
        } else {
            CacheMvccTxRecoveryTest.assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == TransactionState.PREPARED));
        }
        this.grid(victim).close();
        this.awaitPartitionMapExchange();
        CacheMvccTxRecoveryTest.assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == (commit ? TransactionState.COMMITTED : TransactionState.ROLLED_BACK)));
        assert (victimComm.hasBlockedMessages());
        if (commit) {
            CacheMvccTxRecoveryTest.assertConditionEventually(() -> {
                int rowsCnt = ((Ignite)G.allGrids().get(0)).cache("default").query(new SqlFieldsQuery("select * from Integer")).getAll().size();
                return rowsCnt == keys.size();
            });
        } else {
            int rowsCnt = ((Ignite)G.allGrids().get(0)).cache("default").query(new SqlFieldsQuery("select * from Integer")).getAll().size();
            CacheMvccTxRecoveryTest.assertEquals((int)0, (int)rowsCnt);
        }
        CacheMvccTxRecoveryTest.assertTrue((boolean)commitFut.isDone());
        this.assertPartitionCountersAreConsistent(keys, this.grids(baseCnt, i -> i != victim));
    }

    @Test
    public void testRecoveryCommit() throws Exception {
        this.startGridsMultiThreaded(2);
        this.client = true;
        IgniteEx ign = this.startGrid(2);
        IgniteCache cache = ign.getOrCreateCache(CacheMvccTxRecoveryTest.basicCcfg());
        AtomicInteger keyCntr = new AtomicInteger();
        ArrayList<Integer> keys = new ArrayList<Integer>();
        ign.cluster().forServers().nodes().forEach(node -> keys.add(this.keyForNode(ign.affinity("default"), keyCntr, (ClusterNode)node)));
        GridTestUtils.runAsync(() -> {
            Transaction tx = ign.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
            for (Integer k : keys) {
                cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(new Object[]{k}));
            }
            ((TransactionProxyImpl)tx).tx().prepareNearTxLocal().get();
            return null;
        }).get();
        this.stopGrid(2, true);
        IgniteEx srvNode = this.grid(0);
        CacheMvccTxRecoveryTest.assertConditionEventually(() -> srvNode.cache("default").query(new SqlFieldsQuery("select * from Integer")).getAll().size() == 2);
        this.assertPartitionCountersAreConsistent(keys, G.allGrids());
    }

    @Test
    public void testCountersNeighborcastServerFailed() throws Exception {
        int i2;
        int srvCnt = 4;
        this.startGridsMultiThreaded(srvCnt);
        this.client = true;
        IgniteEx ign = this.startGrid(srvCnt);
        IgniteCache cache = ign.getOrCreateCache(CacheMvccTxRecoveryTest.basicCcfg().setBackups(2));
        ArrayList<Integer> keys = new ArrayList<Integer>();
        int vid = 3;
        IgniteEx victim = this.grid(vid);
        Affinity aff = ign.affinity("default");
        for (i2 = 0; i2 < 100; ++i2) {
            if (!aff.isPrimary(victim.localNode(), (Object)i2) || aff.isBackup(this.grid(0).localNode(), (Object)i2)) continue;
            keys.add(i2);
            break;
        }
        for (i2 = 0; i2 < 100; ++i2) {
            if (!aff.isPrimary(victim.localNode(), (Object)i2) || aff.isBackup(this.grid(1).localNode(), (Object)i2)) continue;
            keys.add(i2);
            break;
        }
        assert (keys.size() == 2 && !keys.contains(99));
        ((TestRecordingCommunicationSpi)victim.configuration().getCommunicationSpi()).blockMessages(GridDhtTxPrepareRequest.class, this.grid(0).name());
        GridNearTxLocal nearTx = ((TransactionProxyImpl)ign.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)).tx();
        for (Integer k : keys) {
            cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(new Object[]{k}));
        }
        List txs = IntStream.range(0, srvCnt).mapToObj(arg_0 -> ((CacheMvccTxRecoveryTest)this).grid(arg_0)).filter(g -> g != victim).map(g -> CacheMvccTxRecoveryTest.txsOnNode(g, nearTx.xidVersion())).flatMap(Collection::stream).collect(Collectors.toList());
        nearTx.commitAsync();
        CacheMvccTxRecoveryTest.assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == TransactionState.PREPARED));
        CountDownLatch latch1 = new CountDownLatch(1);
        CountDownLatch latch2 = new CountDownLatch(1);
        IgniteInternalFuture backgroundTxFut = GridTestUtils.runAsync(() -> {
            try (Transaction ignored = ign.transactions().txStart();){
                boolean upd = false;
                for (int i = 100; i < 200; ++i) {
                    if (aff.isPrimary(victim.localNode(), (Object)i)) continue;
                    cache.put((Object)i, (Object)11);
                    upd = true;
                    break;
                }
                assert (upd);
                latch1.countDown();
                latch2.await(this.getTestTimeout(), TimeUnit.MILLISECONDS);
            }
            return null;
        });
        latch1.await(this.getTestTimeout(), TimeUnit.MILLISECONDS);
        victim.close();
        CacheMvccTxRecoveryTest.assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == TransactionState.ROLLED_BACK));
        List<IgniteEx> liveNodes = this.grids(srvCnt, i -> i != vid);
        this.assertPartitionCountersAreConsistent(keys, liveNodes);
        latch2.countDown();
        backgroundTxFut.get(this.getTestTimeout());
        CacheMvccTxRecoveryTest.assertTrue((boolean)liveNodes.stream().map(node -> node.cache("default").query(new SqlFieldsQuery("select * from Integer")).getAll()).allMatch(Collection::isEmpty));
    }

    @Test
    public void testTxRecoveryWithLostFullMassageOnJoiningBackupNode() throws Exception {
        CountDownLatch success = new CountDownLatch(1);
        int joiningBackupNodeId = 2;
        IgniteEx crd = this.startGrid(0);
        IgniteEx partOwner = this.startGrid(1);
        IgniteCache cache = partOwner.getOrCreateCache(new CacheConfiguration("default").setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).setCacheMode(CacheMode.PARTITIONED).setIndexedTypes(new Class[]{Integer.class, Integer.class}).setBackups(2));
        ((TestRecordingCommunicationSpi)crd.configuration().getCommunicationSpi()).blockMessages(GridDhtPartitionsFullMessage.class, this.getTestIgniteInstanceName(joiningBackupNodeId));
        new Thread(() -> {
            try {
                this.startGrid(joiningBackupNodeId);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            success.countDown();
        }).start();
        CacheMvccTxRecoveryTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> crd.cluster().nodes().size() == 3, (long)10000L));
        ArrayList<Integer> keys = new ArrayList<Integer>();
        Affinity aff = crd.affinity("default");
        for (int i = 0; i < 100; ++i) {
            if (!aff.isPrimary(partOwner.localNode(), (Object)i)) continue;
            keys.add(i);
            break;
        }
        ((TestRecordingCommunicationSpi)partOwner.configuration().getCommunicationSpi()).blockMessages(GridDhtTxPrepareRequest.class, this.getTestIgniteInstanceName(joiningBackupNodeId));
        GridNearTxLocal nearTx = ((TransactionProxyImpl)partOwner.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)).tx();
        for (Integer k : keys) {
            cache.put((Object)k, (Object)k);
        }
        nearTx.commitAsync();
        IgniteTxManager tm = crd.context().cache().context().tm();
        CacheMvccTxRecoveryTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> !tm.activeTransactions().isEmpty(), (long)10000L));
        CacheMvccTxRecoveryTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> ((IgniteInternalTx)tm.activeTransactions().iterator().next()).state().equals((Object)TransactionState.PREPARED), (long)10000L));
        partOwner.close();
        ((TestRecordingCommunicationSpi)crd.configuration().getCommunicationSpi()).stopBlock();
        success.await();
        this.awaitPartitionMapExchange();
        CacheMvccTxRecoveryTest.assertEquals((int)2, (int)crd.cluster().nodes().size());
    }

    @Test
    public void testUpdateCountersGapIsClosed() throws Exception {
        int srvCnt = 3;
        this.startGridsMultiThreaded(srvCnt);
        this.client = true;
        IgniteEx ign = this.startGrid(srvCnt);
        IgniteCache cache = ign.getOrCreateCache(CacheMvccTxRecoveryTest.basicCcfg().setBackups(2));
        int vid = 1;
        IgniteEx victim = this.grid(vid);
        ArrayList<Integer> keys = new ArrayList<Integer>();
        Integer part = null;
        Affinity aff = ign.affinity("default");
        for (int i2 = 0; i2 < 2000; ++i2) {
            int p = aff.partition((Object)i2);
            if (!aff.isPrimary(victim.localNode(), (Object)i2)) continue;
            if (part == null) {
                part = p;
            }
            if (p == part) {
                keys.add(i2);
            }
            if (keys.size() == 2) break;
        }
        assert (keys.size() == 2);
        Transaction txA = ign.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
        ((TestRecordingCommunicationSpi)victim.configuration().getCommunicationSpi()).blockMessages((IgniteBiPredicate)new IgniteBiPredicate<ClusterNode, Message>(){
            final AtomicInteger limiter = new AtomicInteger();

            public boolean apply(ClusterNode node, Message msg) {
                if (msg instanceof GridDhtTxPrepareRequest) {
                    return this.limiter.getAndIncrement() < 2;
                }
                return false;
            }
        });
        cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(new Object[]{keys.get(0)}));
        txA.commitAsync();
        GridCacheVersion aXidVer = ((TransactionProxyImpl)txA).tx().xidVersion();
        CacheMvccTxRecoveryTest.assertConditionEventually(() -> CacheMvccTxRecoveryTest.txsOnNode(victim, aXidVer).stream().anyMatch(tx -> tx.state() == TransactionState.PREPARING));
        GridTestUtils.runAsync(() -> {
            try (Transaction txB = ign.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);){
                cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(new Object[]{keys.get(1)}));
                txB.commit();
            }
        }).get();
        long victimUpdCntr = CacheMvccTxRecoveryTest.updateCounter(victim.cachex("default").context(), keys.get(0));
        List<IgniteEx> backupNodes = this.grids(srvCnt, i -> i != vid);
        List backupTxsA = backupNodes.stream().map(node -> CacheMvccTxRecoveryTest.txsOnNode(node, aXidVer)).flatMap(Collection::stream).collect(Collectors.toList());
        victim.close();
        CacheMvccTxRecoveryTest.assertConditionEventually(() -> backupTxsA.stream().allMatch(tx -> tx.state() == TransactionState.ROLLED_BACK));
        backupNodes.stream().map(node -> node.cache("default")).forEach(c -> CacheMvccTxRecoveryTest.assertEquals((int)1, (int)c.query(new SqlFieldsQuery("select * from Integer")).getAll().size()));
        backupNodes.forEach(node -> {
            for (Integer k : keys) {
                CacheMvccTxRecoveryTest.assertEquals((long)victimUpdCntr, (long)CacheMvccTxRecoveryTest.updateCounter(node.cachex("default").context(), k));
            }
        });
    }

    private static CacheConfiguration<Object, Object> basicCcfg() {
        return new CacheConfiguration("default").setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT).setCacheMode(CacheMode.PARTITIONED).setIndexedTypes(new Class[]{Integer.class, Integer.class});
    }

    private static List<IgniteInternalTx> txsOnNode(IgniteEx node, GridCacheVersion xidVer) {
        List<IgniteInternalTx> txs = node.context().cache().context().tm().activeTransactions().stream().peek(tx -> CacheMvccTxRecoveryTest.assertEquals((Object)xidVer, (Object)tx.nearXidVersion())).collect(Collectors.toList());
        assert (!txs.isEmpty());
        return txs;
    }

    private static void assertConditionEventually(GridAbsPredicate p) throws IgniteInterruptedCheckedException {
        if (!GridTestUtils.waitForCondition((GridAbsPredicate)p, (long)5000L)) {
            CacheMvccTxRecoveryTest.fail();
        }
    }

    private List<IgniteEx> grids(int cnt, IntPredicate p) {
        return IntStream.range(0, cnt).filter(p).mapToObj(arg_0 -> ((CacheMvccTxRecoveryTest)this).grid(arg_0)).collect(Collectors.toList());
    }

    private void assertPartitionCountersAreConsistent(Iterable<Integer> keys, Iterable<? extends Ignite> nodes) {
        for (Integer key : keys) {
            long cntr0 = -1L;
            for (Ignite ignite : nodes) {
                IgniteEx node = (IgniteEx)ignite;
                if (!node.affinity("default").isPrimaryOrBackup(node.localNode(), (Object)key)) continue;
                long cntr = CacheMvccTxRecoveryTest.updateCounter(node.cachex("default").context(), key);
                if (cntr0 == -1L) {
                    cntr0 = cntr;
                }
                CacheMvccTxRecoveryTest.assertEquals((long)cntr0, (long)cntr);
            }
        }
    }

    private static long updateCounter(GridCacheContext<?, ?> cctx, Object key) {
        return CacheMvccTxRecoveryTest.dataStore(cctx, key).map(IgniteCacheOffheapManager.CacheDataStore::updateCounter).get();
    }

    private static Optional<IgniteCacheOffheapManager.CacheDataStore> dataStore(GridCacheContext<?, ?> cctx, Object key) {
        int p = cctx.affinity().partition(key);
        IgniteCacheOffheapManager offheap = cctx.offheap();
        return StreamSupport.stream(offheap.cacheDataStores().spliterator(), false).filter(ds -> ds.partId() == p).findFirst();
    }

    public static enum NodeMode {
        SERVER,
        CLIENT;

    }

    public static enum TxEndResult {
        COMMIT,
        ROLLBAK;

    }
}

