/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database.txdr;

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.cache.Cache;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingCluster;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteState;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.GridKernalContextImpl;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.internal.visor.verify.CacheFilterEnum;
import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg;
import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskV2;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.plugin.PluginConfiguration;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiTestUtil;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionOptimisticException;
import org.gridgain.grid.configuration.SnapshotConfiguration;
import org.gridgain.grid.internal.processors.cache.database.snapshot.CompressionOption;
import org.gridgain.grid.internal.processors.cache.database.snapshot.GridCacheSnapshotManager;
import org.gridgain.grid.internal.processors.cache.database.txdr.ConsistentCut;
import org.gridgain.grid.internal.processors.cache.database.txdr.ConsistentCutStore;
import org.gridgain.grid.internal.processors.cache.database.txdr.ConsistentCutWatcher;
import org.gridgain.grid.internal.processors.cache.database.txdr.TopologyEventsSnapshot;
import org.gridgain.grid.internal.processors.cache.database.txdr.TopologyEventsTracker;
import org.gridgain.grid.internal.processors.cache.database.txdr.TransactionalDrProcessorImpl;
import org.gridgain.grid.internal.txdr.BootstrapMasterParameters;
import org.gridgain.grid.internal.txdr.ClusterRole;
import org.gridgain.grid.internal.txdr.GridGainTxDrConfiguration;
import org.gridgain.grid.internal.txdr.ReplicationSessionDescriptor;
import org.gridgain.grid.internal.txdr.ReplicationState;
import org.gridgain.grid.internal.txdr.TransactionalDrConfiguration;
import org.gridgain.grid.internal.txdr.TransactionalDrMaster;
import org.gridgain.grid.persistentstore.SnapshotFuture;
import org.mockito.Matchers;
import org.mockito.Mockito;

public abstract class AbstractReplicationTest
extends GridCommonAbstractTest {
    protected static final String TX_CACHE_NAME = "txCache";
    protected static final String ATOMIC_CACHE_NAME = "atomicCache";
    protected static final String TRANSFER_FOLDER_NAME = "transfer-folder";
    protected static final String SNAPSHOT_FOLDER_NAME = "snapshot-folder";
    protected static final String USE_ZOOKEEPER_DISCOVERY_SPI = "USE_ZOOKEEPER_DISCOVERY_SPI";
    private static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT";
    protected static final long DEFAULT_CONSISTENT_CUT_INTERVAL = 2000L;
    private static final int TX_TIMEOUT_ON_PME = 5000;
    protected static final int KEYS_CNT = 50;
    protected static final AtomicBoolean txStop = new AtomicBoolean();
    protected static final AtomicBoolean atomicStop = new AtomicBoolean();
    protected static final long ACCOUNT_VAL_BOUND = 1000L;
    protected static final long ACCOUNT_VAL_ORIGIN = 100L;
    protected static final BiPredicate<Transaction, Throwable> TX_OPTIMISTIC_EXCEPTION_FILTER = (tx, e) -> e instanceof TransactionOptimisticException;
    private static final int ZK_SRVS = 3;
    private static final Map<ClusterRole, TestingCluster> zkClusters = new ConcurrentHashMap<ClusterRole, TestingCluster>();
    private static final int FORCE_CONSISTENT_CUT_RETRY_CNT = 5;
    protected TcpDiscoveryIpFinder masterIpFinder = new TcpDiscoveryVmIpFinder(true);
    protected TcpDiscoveryIpFinder replicaIpFinder = new TcpDiscoveryVmIpFinder(true);
    protected TcpDiscoveryIpFinder dfltIpFinder = new TcpDiscoveryVmIpFinder(true);
    protected int nodesCnt;
    protected int backupsCnt;
    protected int clientsCnt;
    protected int nonBltNodesCnt;
    protected boolean useClientsForOps;
    protected boolean useZookeeperDiscoverySpi = IgniteSystemProperties.getBoolean((String)"USE_ZOOKEEPER_DISCOVERY_SPI", (boolean)false);
    protected long zkSpiSesTimeout = 10000L;
    protected long consistentCutInterval = 2000L;
    protected final Map<ClusterRole, List<IgniteEx>> clusterMap = new ConcurrentHashMap<ClusterRole, List<IgniteEx>>();
    private final Map<ClusterRole, List<IgniteEx>> clientsMap = new ConcurrentHashMap<ClusterRole, List<IgniteEx>>();
    private int clientSelectionIdx;
    protected volatile BiPredicate<Transaction, Throwable> txErrorFilter = TX_OPTIMISTIC_EXCEPTION_FILTER;

    protected void beforeTestsStarted() throws Exception {
        super.beforeTestsStarted();
        System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT, "1000");
    }

    protected void beforeTest() throws Exception {
        super.beforeTest();
        this.stopAllGrids();
        this.deleteWorkFiles();
        this.nodesCnt = 5;
        this.backupsCnt = 2;
        this.clientsCnt = 1;
        this.useClientsForOps = false;
        this.clientSelectionIdx = 0;
    }

    protected void afterTest() throws Exception {
        this.clusterMap.clear();
        this.clientsMap.clear();
        this.stopAllGrids();
        this.deleteWorkFiles();
        super.afterTest();
    }

    protected void afterTestsStopped() throws Exception {
        super.afterTestsStopped();
        for (ClusterRole role : ClusterRole.values()) {
            this.stopZkCluster(zkClusters.remove(role));
        }
    }

    protected IgniteConfiguration getConfiguration(String igniteInstanceName, String consistentId, ClusterRole role) throws Exception {
        IgniteConfiguration cfg = this.getConfiguration(igniteInstanceName);
        DataStorageConfiguration memCfg = new DataStorageConfiguration().setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(0x3200000L).setPersistenceEnabled(true)).setWalMode(WALMode.LOG_ONLY).setWalSegments(4).setWalSegmentSize(0x100000).setWalCompactionEnabled(true);
        cfg.setDataStorageConfiguration(memCfg);
        if (role != ClusterRole.DISABLED) {
            cfg.setWorkDirectory(new File(U.defaultWorkDirectory(), role == ClusterRole.MASTER ? "master" : "replica").getAbsolutePath());
        }
        cfg.setPluginConfigurations(new PluginConfiguration[]{new GridGainTxDrConfiguration().setTxDrConfiguration(this.getTxDrConfiguration()).setSnapshotConfiguration(this.getSnapshotConfiguration())});
        cfg.setConsistentId((Serializable)((Object)consistentId));
        cfg.setDiscoverySpi(this.getDiscoverySpi(role));
        CacheConfiguration txCacheCfg = new CacheConfiguration(TX_CACHE_NAME).setBackups(this.backupsCnt).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).setAffinity((AffinityFunction)new RendezvousAffinityFunction(false, 32)).setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC).setQueryEntities(Collections.singletonList(new QueryEntity(Integer.class, Long.class)));
        CacheConfiguration atomicCacheCfg = new CacheConfiguration(ATOMIC_CACHE_NAME).setBackups(this.backupsCnt).setAtomicityMode(CacheAtomicityMode.ATOMIC).setAffinity((AffinityFunction)new RendezvousAffinityFunction(false, 32)).setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC).setQueryEntities(Collections.singletonList(new QueryEntity(Integer.class, Long.class)));
        cfg.setCacheConfiguration(new CacheConfiguration[]{txCacheCfg, atomicCacheCfg});
        cfg.setTransactionConfiguration(new TransactionConfiguration().setTxTimeoutOnPartitionMapExchange(5000L));
        return cfg;
    }

    protected DiscoverySpi getDiscoverySpi(ClusterRole role) throws Exception {
        return this.useZookeeperDiscoverySpi ? this.getZookeeperDiscoverySpi(role) : this.getTcpDiscoverySpi(role);
    }

    protected TcpDiscoverySpi getTcpDiscoverySpi(ClusterRole role) {
        TcpDiscoveryIpFinder ipFinder = role == ClusterRole.DISABLED ? this.dfltIpFinder : (role == ClusterRole.MASTER ? this.masterIpFinder : this.replicaIpFinder);
        return new TcpDiscoverySpi().setIpFinder(ipFinder);
    }

    protected ZookeeperDiscoverySpi getZookeeperDiscoverySpi(ClusterRole role) throws Exception {
        this.ensureZkClusterStarted(role);
        TestingCluster zkCluster = zkClusters.get(role);
        assert (zkCluster != null);
        return new ZookeeperDiscoverySpi().setSessionTimeout(this.zkSpiSesTimeout).setZkConnectionString(zkCluster.getConnectString());
    }

    protected SnapshotConfiguration getSnapshotConfiguration() {
        return new SnapshotConfiguration().setCompressionOption(CompressionOption.ZIP);
    }

    protected TransactionalDrConfiguration getTxDrConfiguration() throws Exception {
        return new TransactionalDrConfiguration().setTransferFolderPath(this.transferFolder().getAbsolutePath()).setConsistentCutInterval(this.consistentCutInterval);
    }

    protected List<IgniteEx> startCluster(ClusterRole role) throws Exception {
        return this.startCluster(role, this.nodesCnt);
    }

    protected List<IgniteEx> startCluster(ClusterRole role, int nodesCnt) throws Exception {
        ArrayList<String> consIds = new ArrayList<String>();
        for (int i = 0; i < nodesCnt; ++i) {
            consIds.add(this.nodeConsistentId(i));
        }
        return this.startCluster(role, consIds.toArray(new String[nodesCnt]));
    }

    protected List<IgniteEx> startCluster(ClusterRole role, String ... consistentIds) throws Exception {
        int i;
        AbstractReplicationTest.assertNotNull((Object)consistentIds);
        AbstractReplicationTest.assertTrue((consistentIds.length > 0 ? 1 : 0) != 0);
        List<IgniteEx> ignites = new ArrayList<IgniteEx>(consistentIds.length);
        for (i = 0; i < consistentIds.length; ++i) {
            ignites.add(this.startClusterNode(role, i, consistentIds[i], NodeType.BLT_NODE));
        }
        ignites = new CopyOnWriteArrayList(ignites);
        this.clusterMap.put(role, ignites);
        ((IgniteEx)ignites.get(0)).cluster().active(true);
        if (this.nonBltNodesCnt > 0) {
            for (i = consistentIds.length; i < this.nonBltNodesCnt + consistentIds.length; ++i) {
                this.startClusterNode(role, i, this.nodeConsistentId(i), NodeType.NON_BLT_NODE);
            }
        }
        if (this.clientsCnt > 0) {
            this.clientsMap.put(role, this.startClients(role, this.clientsCnt));
        }
        return ignites;
    }

    private String instanceName(IgniteEx ignite) {
        return ignite.configuration().getIgniteInstanceName();
    }

    protected IgniteEx startClusterNode(ClusterRole role, int idx, String consistentId) throws Exception {
        return this.startClusterNode(role, idx, consistentId, NodeType.BLT_NODE);
    }

    protected IgniteEx startClusterNode(ClusterRole role, int idx, String consistentId, NodeType nodeType) throws Exception {
        AbstractReplicationTest.assertNotNull((Object)consistentId);
        String igniteInstanceName = this.igniteInstanceNameWithRole(role, idx, nodeType);
        IgniteEx ignite = this.startGrid(this.getConfiguration(igniteInstanceName, consistentId, role));
        DiscoverySpi discoSpi = ignite.context().discovery().getInjectedDiscoverySpi();
        if (this.useZookeeperDiscoverySpi) {
            AbstractReplicationTest.assertTrue((boolean)(discoSpi instanceof ZookeeperDiscoverySpi));
        } else {
            AbstractReplicationTest.assertTrue((boolean)(discoSpi instanceof TcpDiscoverySpi));
        }
        List<IgniteEx> ignites = this.clusterMap.get(role);
        if (ignites != null) {
            ignites.add(ignite);
        }
        return ignite;
    }

    protected IgniteEx startClusterNode(ClusterRole role, int nodeIdx) throws Exception {
        return this.startClusterNode(role, nodeIdx, this.nodeConsistentId(nodeIdx));
    }

    protected IgniteEx startClient(ClusterRole role, int idx) throws Exception {
        return this.startGrid(this.getConfiguration(this.igniteInstanceNameWithRole(role, idx, NodeType.CLIENT), "client" + idx, role).setClientMode(true));
    }

    protected List<IgniteEx> startClients(ClusterRole role, int nodesCnt) throws Exception {
        int i;
        ArrayList<IgniteEx> clients = new ArrayList<IgniteEx>(nodesCnt);
        int n = i + nodesCnt;
        for (i = 0; i < n; ++i) {
            clients.add(this.startClient(role, i));
        }
        return new CopyOnWriteArrayList<IgniteEx>(clients);
    }

    protected void stopCluster(ClusterRole role) throws Exception {
        this.stopClients(role);
        List<IgniteEx> cluster = this.clusterMap.get(role);
        if (cluster == null) {
            return;
        }
        ListIterator<IgniteEx> it = cluster.listIterator(cluster.size());
        while (it.hasPrevious()) {
            IgniteEx grid = it.previous();
            this.stopGrid(grid.name(), true, false);
        }
        cluster.clear();
    }

    protected void stopClients(ClusterRole role) {
        List<IgniteEx> clients = this.clientsMap.get(role);
        if (clients == null) {
            return;
        }
        while (!clients.isEmpty()) {
            IgniteEx grid = clients.remove(0);
            this.stopGrid(grid.name(), true, false);
        }
    }

    protected void stopClusterNode(ClusterRole role, IgniteEx node) {
        List<IgniteEx> servers;
        Object consistentId = node.localNode().consistentId();
        List<IgniteEx> clients = this.clientsMap.get(role);
        if (!F.isEmpty(clients)) {
            ArrayList<IgniteEx> aliveClients = new ArrayList<IgniteEx>(clients.size());
            for (IgniteEx n : clients) {
                if (n.localNode().consistentId().equals(consistentId)) {
                    this.stopGrid(n.name(), true, false);
                    continue;
                }
                aliveClients.add(n);
            }
            if (clients.size() != aliveClients.size()) {
                this.clientsMap.put(role, new CopyOnWriteArrayList(aliveClients));
                return;
            }
        }
        if (!F.isEmpty(servers = this.clusterMap.get(role))) {
            ArrayList<IgniteEx> aliveServers = new ArrayList<IgniteEx>(servers.size());
            for (IgniteEx n : servers) {
                if (n.localNode().consistentId().equals(consistentId)) {
                    this.stopGrid(n.name(), true, false);
                    continue;
                }
                aliveServers.add(n);
            }
            if (servers.size() != aliveServers.size()) {
                this.clusterMap.put(role, new CopyOnWriteArrayList(aliveServers));
            }
        }
    }

    protected void stopGrids(Iterable<? extends Ignite> grids) {
        if (grids == null) {
            return;
        }
        for (Ignite ignite : grids) {
            this.stopGrid(ignite.name(), true, false);
        }
    }

    protected String igniteInstanceNameWithRole(ClusterRole role, int i) {
        return this.igniteInstanceNameWithRole(role, i, NodeType.BLT_NODE);
    }

    protected String igniteInstanceNameWithRole(ClusterRole role, int i, NodeType nodeType) {
        return this.igniteInstanceNameWithRoleAndType(role, i, nodeType);
    }

    protected String igniteInstanceNameWithRoleAndType(ClusterRole role, int i, NodeType nodeType) {
        StringBuilder nameBuilder = new StringBuilder();
        if (role == ClusterRole.MASTER) {
            nameBuilder.append("master.");
        } else if (role == ClusterRole.REPLICA) {
            nameBuilder.append("replica.");
        }
        if (nodeType == NodeType.CLIENT) {
            nameBuilder.append("client.");
        } else if (nodeType == NodeType.NON_BLT_NODE) {
            nameBuilder.append("nonblt.");
        }
        return nameBuilder.append(this.getTestIgniteInstanceName(i)).toString();
    }

    protected String nodeConsistentId(int nodeIdx) {
        return "node" + nodeIdx;
    }

    protected File transferFolder() throws IgniteCheckedException {
        return U.resolveWorkDirectory((String)U.defaultWorkDirectory(), (String)TRANSFER_FOLDER_NAME, (boolean)false);
    }

    protected File snapshotFolder() throws IgniteCheckedException {
        return U.resolveWorkDirectory((String)U.defaultWorkDirectory(), (String)SNAPSHOT_FOLDER_NAME, (boolean)false);
    }

    protected void deleteWorkFiles() throws Exception {
        this.cleanPersistenceDir();
        U.delete((File)U.resolveWorkDirectory((String)U.defaultWorkDirectory(), (String)"master", (boolean)false));
        U.delete((File)U.resolveWorkDirectory((String)U.defaultWorkDirectory(), (String)"replica", (boolean)false));
        U.delete((File)this.transferFolder());
        U.delete((File)this.snapshotFolder());
        U.delete((File)U.resolveWorkDirectory((String)U.defaultWorkDirectory(), (String)"snapshot", (boolean)false));
    }

    protected Map<Integer, Long> dumpCache(IgniteCache<Integer, Long> cache) {
        HashMap<Integer, Long> map = new HashMap<Integer, Long>(50);
        for (Cache.Entry e : cache) {
            map.put((Integer)e.getKey(), (Long)e.getValue());
        }
        return map;
    }

    protected long sumOf(Map<Integer, Long> map) {
        return map.values().stream().mapToLong(v -> v).sum();
    }

    protected long sumOf(IgniteCache<Integer, Long> cache) {
        return (Long)cache.getAll(IntStream.range(0, 50).boxed().collect(Collectors.toSet())).values().stream().reduce((val1, val2) -> val1 + val2).get();
    }

    protected long forceConsistentCutNoRetry(Ignite ignite) {
        GridCacheSnapshotManager snapshotMgr = this.snapMgr(ignite);
        SnapshotFuture fut = snapshotMgr.startGlobalConsistentCut();
        fut.get();
        return fut.snapshotOperation().snapshotId();
    }

    protected long forceConsistentCut(Ignite ignite) {
        for (int i = 1; i <= 5; ++i) {
            try {
                return this.forceConsistentCutNoRetry(ignite);
            }
            catch (IllegalStateException ignored) {
                try {
                    U.sleep((long)(100 * i));
                    continue;
                }
                catch (IgniteInterruptedCheckedException e) {
                    throw new IgniteException((Throwable)e);
                }
            }
        }
        return this.forceConsistentCutNoRetry(ignite);
    }

    protected ConsistentCut loadConsistentCut(long consistentCutId, Ignite ignite) throws IgniteCheckedException {
        ConsistentCutStore store = this.txdr(ignite).consistentCutStore();
        return store.restore(consistentCutId);
    }

    protected IgniteInternalFuture startTxLoad(int threads, ClusterRole role) {
        return this.startTxLoad(threads, role, null);
    }

    protected IgniteInternalFuture startTxLoad(int threads, ClusterRole role, Ignite ignite) {
        txStop.set(false);
        return GridTestUtils.runMultiThreadedAsync(() -> {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();
            while (!txStop.get()) {
                Ignite ig;
                Transaction tx = null;
                Ignite ignite2 = ig = ignite == null ? this.tryGetRandomInstance(role, rnd) : ignite;
                if (ig == null) continue;
                IgniteCache cache = ig.cache(TX_CACHE_NAME);
                TransactionConcurrency concurrency = rnd.nextBoolean() ? TransactionConcurrency.PESSIMISTIC : TransactionConcurrency.OPTIMISTIC;
                TransactionIsolation isolation = concurrency == TransactionConcurrency.PESSIMISTIC ? TransactionIsolation.REPEATABLE_READ : TransactionIsolation.SERIALIZABLE;
                try {
                    Transaction tx0 = tx = ig.transactions().txStart(concurrency, isolation, 0L, 100);
                    Throwable throwable = null;
                    try {
                        int acc1;
                        int acc0 = rnd.nextInt(50);
                        while ((acc1 = rnd.nextInt(50)) == acc0) {
                        }
                        if (acc0 > acc1) {
                            int tmp = acc0;
                            acc0 = acc1;
                            acc1 = tmp;
                        }
                        long val0 = (Long)cache.get((Object)acc0);
                        long val1 = (Long)cache.get((Object)acc1);
                        long delta = rnd.nextLong(Math.max(val0, val1));
                        if (val0 < val1) {
                            cache.put((Object)acc0, (Object)(val0 + delta));
                            cache.put((Object)acc1, (Object)(val1 - delta));
                        } else {
                            cache.put((Object)acc0, (Object)(val0 - delta));
                            cache.put((Object)acc1, (Object)(val1 + delta));
                        }
                        if (rnd.nextInt(10) == 0) {
                            tx.rollback();
                            continue;
                        }
                        tx.commit();
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (tx0 == null) continue;
                        if (throwable != null) {
                            try {
                                tx0.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        tx0.close();
                    }
                }
                catch (Throwable e) {
                    if (this.txErrorFilter.test(tx, e)) continue;
                    log.error("Unexpected error [tx=" + tx + "]", e);
                }
            }
        }, (int)threads, (String)"tx-load-thread");
    }

    protected IgniteInternalFuture startAtomicLoad(int threads, ClusterRole role) {
        return this.startAtomicLoad(threads, role, null);
    }

    protected IgniteInternalFuture startAtomicLoad(int threads, ClusterRole role, Ignite ignite) {
        atomicStop.set(false);
        return GridTestUtils.runMultiThreadedAsync(() -> {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();
            while (!atomicStop.get()) {
                Ignite ig;
                int nodeIdx = rnd.nextInt(this.nodesCnt);
                Ignite ignite2 = ig = ignite == null ? this.tryGetRandomInstance(role, rnd) : ignite;
                if (ig == null) continue;
                IgniteCache cache = ig.cache(ATOMIC_CACHE_NAME);
                int cnt = rnd.nextInt(5);
                for (int i = 0; i < cnt; ++i) {
                    try {
                        cache.put((Object)rnd.nextInt(50), (Object)rnd.nextLong(100L, 1001L));
                        continue;
                    }
                    catch (Throwable e) {
                        log.error("Unexpected error", e);
                    }
                }
            }
        }, (int)threads, (String)"atomic-load-thread");
    }

    protected void stopTxLoad(IgniteInternalFuture fut) throws IgniteCheckedException {
        txStop.set(true);
        fut.get();
    }

    protected void stopAtomicLoad(IgniteInternalFuture fut) throws IgniteCheckedException {
        atomicStop.set(true);
        fut.get();
    }

    protected long populateData(Ignite ignite, String cacheName) {
        ThreadLocalRandom rnd = ThreadLocalRandom.current();
        long total = 0L;
        try (IgniteDataStreamer dataStreamer = ignite.dataStreamer(cacheName);){
            for (int i = 0; i < 50; ++i) {
                long val = rnd.nextLong(100L, 1001L);
                dataStreamer.addData((Object)i, (Object)val);
                total += val;
            }
            dataStreamer.flush();
        }
        log.info("Total sum for cache '" + cacheName + "': " + total);
        return total;
    }

    protected TransactionalDrProcessorImpl txdr(Ignite ignite) {
        return (TransactionalDrProcessorImpl)((IgniteEx)ignite).context().txDr();
    }

    protected TransactionalDrProcessorImpl txdr(ClusterRole role) {
        return this.txdr((Ignite)this.node(role));
    }

    protected GridCacheSnapshotManager snapMgr(Ignite ignite) {
        return (GridCacheSnapshotManager)((IgniteEx)ignite).context().cache().context().snapshot();
    }

    protected GridCacheSnapshotManager snapMgr(ClusterRole role) {
        return this.snapMgr((Ignite)this.node(role));
    }

    protected File walDir(Ignite ignite) throws IgniteCheckedException {
        TransactionalDrProcessorImpl txdr = this.txdr(ignite);
        return txdr.walDir(txdr.spawnId());
    }

    protected IgniteEx node(ClusterRole role) {
        return this.useClientsForOps ? this.clientNode(role) : this.clusterMap.get(role).get(0);
    }

    protected IgniteEx clientNode(ClusterRole role) {
        AbstractReplicationTest.assertTrue((this.clientsCnt > 0 ? 1 : 0) != 0);
        List<IgniteEx> clients = this.clientsMap.get(role);
        AbstractReplicationTest.assertNotNull(clients);
        AbstractReplicationTest.assertEquals((int)clients.size(), (int)this.clientsCnt);
        int clientIdx = this.clientSelectionIdx;
        if (clientIdx >= clients.size()) {
            clientIdx = 0;
        }
        this.clientSelectionIdx = clientIdx + 1;
        return clients.get(clientIdx);
    }

    protected void assertClusterState(List<IgniteEx> cluster, ClusterRole role, ReplicationState state, long bootstrapSesId) {
        for (IgniteEx ignite : cluster) {
            ReplicationSessionDescriptor locState = this.txdr((Ignite)ignite).localState();
            AbstractReplicationTest.assertEquals((String)("Wrong role for node: " + ignite.name()), (Object)role, (Object)locState.role());
            AbstractReplicationTest.assertEquals((String)("Wrong process state for node: " + ignite.name()), (Object)state, (Object)locState.state());
            AbstractReplicationTest.assertEquals((String)("Wrong session ID for node: " + ignite.name()), (long)bootstrapSesId, (long)locState.sessionId());
        }
    }

    protected void awakeCutsWatcher(IgniteEx ignite) {
        ConsistentCutWatcher watcher = this.txdr((Ignite)ignite).consistentCutWatcher();
        if (watcher != null) {
            watcher.awake();
        }
    }

    protected void awakeCutsWatcher(List<IgniteEx> cluster) {
        for (IgniteEx ignite : cluster) {
            this.awakeCutsWatcher(ignite);
        }
    }

    protected void waitForApplyingCut(final List<IgniteEx> cluster, final long cutId, long timeout) throws Exception {
        log.info("Waiting for applying cut locally on all cluster nodes, cutId=" + cutId);
        AbstractReplicationTest.assertTrue((String)("Failed to wait for applying cut " + cutId), (boolean)GridTestUtils.waitForCondition((GridAbsPredicate)new GridAbsPredicate(){

            public boolean apply() {
                boolean res = true;
                for (IgniteEx ignite : cluster) {
                    if (AbstractReplicationTest.this.txdr((Ignite)ignite).localState().lastSuccessfullyAppliedCutId() >= cutId) continue;
                    try {
                        AbstractReplicationTest.this.awakeCutsWatcher(ignite);
                    }
                    catch (Exception e) {
                        log.error("Failed to awake watcher", (Throwable)e);
                    }
                    res = false;
                }
                return res;
            }
        }, (long)timeout));
    }

    protected void assertClusterReadOnly(Collection<IgniteEx> cluster) throws IgniteInterruptedCheckedException {
        for (IgniteEx ignite : cluster) {
            AbstractReplicationTest.assertTrue((boolean)GridTestUtils.waitForCondition(() -> ignite.context().cache().context().readOnlyMode(), (long)30000L));
        }
    }

    private void ensureZkClusterStarted(ClusterRole role) throws Exception {
        if (zkClusters.containsKey(role)) {
            return;
        }
        TestingCluster zkCluster = ZookeeperDiscoverySpiTestUtil.createTestingCluster((int)3, (int)(3 * role.ordinal()));
        zkClusters.put(role, zkCluster);
        zkCluster.start();
        AbstractReplicationTest.waitForZkClusterReady(zkCluster);
    }

    private static void waitForZkClusterReady(TestingCluster zkCluster) throws InterruptedException {
        try (CuratorFramework curator = CuratorFrameworkFactory.newClient((String)zkCluster.getConnectString(), (RetryPolicy)new RetryNTimes(10, 1000));){
            curator.start();
            AbstractReplicationTest.assertTrue((String)"Failed to wait for Zookeeper testing cluster ready.", (boolean)curator.blockUntilConnected(30, TimeUnit.SECONDS));
        }
    }

    private void stopZkCluster(TestingCluster zkCluster) {
        if (zkCluster != null) {
            try {
                zkCluster.close();
            }
            catch (Exception e) {
                U.error((IgniteLogger)log, (Object)"Failed to stop Zookeeper cluster", (Throwable)e);
            }
        }
    }

    static IgniteFuture<Long> bootstrapMaster(TransactionalDrMaster txDr, File snapshotFolder) {
        return txDr.bootstrap(new BootstrapMasterParameters.Builder().withSnapshotFolder(snapshotFolder).build());
    }

    protected long bootstrapMaster() throws IgniteCheckedException {
        IgniteFuture<Long> fut = AbstractReplicationTest.bootstrapMaster((TransactionalDrMaster)this.txdr(ClusterRole.MASTER), this.snapshotFolder());
        long bootstrapSesId = (Long)fut.get();
        log.info(">>> Master cluster bootstrapped successfully, sessionId=" + bootstrapSesId);
        return bootstrapSesId;
    }

    protected void bootstrapReplica(long bootstrapSesId) throws IgniteCheckedException {
        this.txdr(ClusterRole.REPLICA).bootstrap(this.snapshotFolder(), bootstrapSesId).get();
        log.info(">>> Replica cluster bootstrapped successfully, sessionId=" + bootstrapSesId);
    }

    public static void replaceTransactionalProcessor(List<IgniteEx> cluster) throws IgniteCheckedException {
        for (IgniteEx ig : cluster) {
            TransactionalDrProcessorImpl txdr = (TransactionalDrProcessorImpl)ig.context().txDr();
            Iterator it = ig.context().iterator();
            while (it.hasNext()) {
                GridComponent comp = (GridComponent)it.next();
                if (!(comp instanceof TransactionalDrProcessorImpl)) continue;
                it.remove();
                break;
            }
            TopologyEventsTracker mockedTracker = (TopologyEventsTracker)Mockito.mock(TopologyEventsTracker.class);
            ((TopologyEventsTracker)Mockito.doNothing().when((Object)mockedTracker)).merge((TopologyEventsSnapshot)Matchers.any(TopologyEventsSnapshot.class));
            ((TopologyEventsTracker)Mockito.doAnswer(invocation -> {
                Long cutId = (Long)invocation.getArguments()[0];
                AffinityTopologyVersion topVer = (AffinityTopologyVersion)invocation.getArguments()[1];
                return new TopologyEventsSnapshot(cutId.longValue(), topVer, Collections.EMPTY_MAP);
            }).when((Object)mockedTracker)).snapshot(Matchers.anyLong(), (AffinityTopologyVersion)Matchers.any(AffinityTopologyVersion.class));
            TransactionalDrProcessorImpl mockedProcessor = (TransactionalDrProcessorImpl)Mockito.spy((Object)txdr);
            ((TransactionalDrProcessorImpl)Mockito.doNothing().when((Object)mockedProcessor)).lastCreatedConsistentCut(Matchers.anyLong());
            ((TransactionalDrProcessorImpl)Mockito.doNothing().when((Object)mockedProcessor)).lastAppliedConsistentCut(Matchers.anyLong());
            ((TransactionalDrProcessorImpl)Mockito.doNothing().when((Object)mockedProcessor)).lastSentWalSegment(Matchers.anyLong());
            ((TransactionalDrProcessorImpl)Mockito.doAnswer(invocation -> mockedTracker).when((Object)mockedProcessor)).topologyTracker();
            ((GridKernalContextImpl)ig.context()).add((GridComponent)mockedProcessor);
        }
    }

    protected boolean idleVerifyReplica(IgniteEx ignite) {
        HashSet<String> caches = new HashSet<String>();
        caches.add(TX_CACHE_NAME);
        VisorIdleVerifyTaskArg taskArg = new VisorIdleVerifyTaskArg(caches, null, false, CacheFilterEnum.ALL, false);
        IdleVerifyResultV2 res = (IdleVerifyResultV2)ignite.compute().execute(VisorIdleVerifyTaskV2.class.getName(), (Object)new VisorTaskArgument(ignite.cluster().localNode().id(), (Object)taskArg, false));
        boolean ok = res.hashConflicts().isEmpty();
        if (!ok) {
            StringBuilder sb = new StringBuilder(">>>> ");
            res.print(sb::append);
            log.info(sb.toString());
        }
        return ok;
    }

    protected Ignite tryGetRandomInstance(ClusterRole role, ThreadLocalRandom rnd) {
        List<IgniteEx> nodes;
        int nodesCnt0 = this.useClientsForOps ? this.clientsCnt : this.nodesCnt;
        int nodeIdx = rnd.nextInt(nodesCnt0);
        String instanceName = this.igniteInstanceNameWithRole(role, nodeIdx, this.useClientsForOps ? NodeType.CLIENT : NodeType.BLT_NODE);
        List<IgniteEx> list = nodes = this.useClientsForOps ? this.clientsMap.get(role) : this.clusterMap.get(role);
        if (nodes != null && nodes.stream().noneMatch(grid -> grid.name().equals(instanceName))) {
            return null;
        }
        return Ignition.state((String)instanceName) == IgniteState.STARTED ? this.grid(instanceName) : null;
    }

    static enum NodeType {
        CLIENT,
        BLT_NODE,
        NON_BLT_NODE;

    }
}

