/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.spi.discovery.zk.internal;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.curator.test.TestingZooKeeperServer;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
import org.apache.ignite.internal.processors.query.DummyQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.services.Service;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClient;
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoverySpiTestBase;
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoverySpiTestHelper;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Ignore;
import org.junit.Test;

public class ZookeeperDiscoveryTopologyChangeAndReconnectTest
extends ZookeeperDiscoverySpiTestBase {
    private boolean indexingDisabled;

    @Override
    protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
        cfg.setIncludeEventTypes(EventType.EVTS_ALL);
        if (this.indexingDisabled) {
            GridQueryProcessor.idxCls = DummyQueryIndexing.class;
        }
        return cfg;
    }

    @Override
    protected void afterTest() throws Exception {
        super.afterTest();
        this.indexingDisabled = false;
        GridQueryProcessor.idxCls = null;
    }

    @Test
    public void testTopologyChangeMultithreaded() throws Exception {
        this.topologyChangeWithRestarts(false, false);
    }

    @Ignore(value="https://issues.apache.org/jira/browse/IGNITE-9138")
    @Test
    public void testTopologyChangeMultithreaded_RestartZk() throws Exception {
        try {
            this.topologyChangeWithRestarts(true, false);
        }
        finally {
            zkCluster.close();
            zkCluster = null;
        }
    }

    @Ignore(value="https://issues.apache.org/jira/browse/IGNITE-9138")
    @Test
    public void testTopologyChangeMultithreaded_RestartZk_CloseClients() throws Exception {
        try {
            this.topologyChangeWithRestarts(true, true);
        }
        finally {
            zkCluster.close();
            zkCluster = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void topologyChangeWithRestarts(boolean restartZk, boolean closeClientSock) throws Exception {
        IgniteInternalFuture<?> fut2;
        IgniteInternalFuture<?> fut1;
        this.sesTimeout = 30000L;
        if (closeClientSock) {
            this.testSockNio = true;
        }
        long stopTime = System.currentTimeMillis() + (long)GridTestUtils.SF.applyLB((int)30000, (int)5000);
        AtomicBoolean stop = new AtomicBoolean();
        try {
            fut1 = restartZk ? this.startRestartZkServers(stopTime, stop) : null;
            fut2 = closeClientSock ? this.startCloseZkClientSocket(stopTime, stop) : null;
            int INIT_NODES = 10;
            this.startGridsMultiThreaded(INIT_NODES);
            int MAX_NODES = 20;
            final ArrayList<Integer> startedNodes = new ArrayList<Integer>();
            for (int i = 0; i < INIT_NODES; ++i) {
                startedNodes.add(i);
            }
            ThreadLocalRandom rnd = ThreadLocalRandom.current();
            final AtomicInteger startIdx = new AtomicInteger(INIT_NODES);
            while (System.currentTimeMillis() < stopTime) {
                if (startedNodes.size() >= 20) {
                    int stopNodes = rnd.nextInt(5) + 1;
                    log.info("Next, stop nodes: " + stopNodes);
                    final ArrayList idxs = new ArrayList();
                    while (idxs.size() < stopNodes) {
                        int stopIdx = rnd.nextInt(startedNodes.size());
                        if (idxs.contains(stopIdx)) continue;
                        idxs.add(startedNodes.get(stopIdx));
                    }
                    GridTestUtils.runMultiThreaded((IgniteInClosure)new IgniteInClosure<Integer>(){

                        public void apply(Integer threadIdx) {
                            int stopNodeIdx = (Integer)idxs.get(threadIdx);
                            ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.info("Stop node: " + stopNodeIdx);
                            ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.stopGrid(stopNodeIdx);
                        }
                    }, (int)stopNodes, (String)"stop-node");
                    startedNodes.removeAll(idxs);
                } else {
                    int startNodes = rnd.nextInt(5) + 1;
                    log.info("Next, start nodes: " + startNodes);
                    GridTestUtils.runMultiThreaded((Callable)new Callable<Void>(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public Void call() throws Exception {
                            int idx = startIdx.incrementAndGet();
                            log.info("Start node: " + idx);
                            ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.startGrid(idx);
                            List list = startedNodes;
                            synchronized (list) {
                                startedNodes.add(idx);
                            }
                            return null;
                        }
                    }, (int)startNodes, (String)"start-node");
                }
                U.sleep((long)(rnd.nextInt(100) + 1));
            }
        }
        finally {
            stop.set(true);
        }
        if (fut1 != null) {
            fut1.get();
        }
        if (fut2 != null) {
            fut2.get();
        }
    }

    @Test
    public void testRandomTopologyChanges() throws Exception {
        this.randomTopologyChanges(false, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkZkNodesCleanup() throws Exception {
        final ZookeeperClient zkClient = new ZookeeperClient(this.getTestResources().getLogger(), zkCluster.getConnectString(), 30000, null);
        String basePath = "/apacheIgnite/";
        String aliveDir = "/apacheIgnite/n/";
        try {
            List<String> znodes = this.listSubTree(zkClient.zk(), "/apacheIgnite");
            boolean foundAlive = false;
            for (String znode : znodes) {
                if (!znode.startsWith("/apacheIgnite/n/")) continue;
                foundAlive = true;
                break;
            }
            ZookeeperDiscoveryTopologyChangeAndReconnectTest.assertTrue((boolean)foundAlive);
            ZookeeperDiscoveryTopologyChangeAndReconnectTest.assertTrue((String)"Failed to wait for unused znodes cleanup", (boolean)GridTestUtils.waitForCondition((GridAbsPredicate)new GridAbsPredicate(){

                public boolean apply() {
                    try {
                        List znodes = ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.listSubTree(zkClient.zk(), "/apacheIgnite");
                        for (String znode : znodes) {
                            if (znode.startsWith("/apacheIgnite/n/") || znode.length() < "/apacheIgnite/".length() || !(znode = znode.substring("/apacheIgnite/".length())).contains("/") || znode.startsWith("jd/")) continue;
                            log.info("Found unexpected znode: " + znode);
                            return false;
                        }
                        return true;
                    }
                    catch (Exception e) {
                        ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.error("Unexpected error: " + e, e);
                        ZookeeperDiscoveryTopologyChangeAndReconnectTest.fail((String)("Unexpected error: " + e));
                        return false;
                    }
                }
            }, (long)10000L));
        }
        finally {
            zkClient.close();
        }
    }

    @Ignore(value="https://issues.apache.org/jira/browse/IGNITE-9138")
    @Test
    public void testRandomTopologyChanges_RestartZk() throws Exception {
        this.randomTopologyChanges(true, false);
    }

    @Test
    public void testRandomTopologyChanges_CloseClients() throws Exception {
        this.randomTopologyChanges(false, true);
    }

    @Test
    public void testDeployService1() throws Exception {
        this.startGridsMultiThreaded(3);
        this.grid(0).services((ClusterGroup)this.grid(0).cluster()).deployNodeSingleton("test", (Service)new GridCacheAbstractFullApiSelfTest.DummyServiceImpl());
    }

    @Test
    public void testDeployService2() throws Exception {
        this.helper.clientMode(false);
        this.startGrid(0);
        this.helper.clientMode(true);
        this.startGrid(1);
        this.grid(0).services((ClusterGroup)this.grid(0).cluster()).deployNodeSingleton("test", (Service)new GridCacheAbstractFullApiSelfTest.DummyServiceImpl());
    }

    @Test
    public void testDeployService3() throws Exception {
        IgniteInternalFuture fut = GridTestUtils.runAsync((Callable)new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.helper.clientModeThreadLocal(true);
                ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.startGrid(0);
                return null;
            }
        }, (String)"start-node");
        this.helper.clientModeThreadLocal(false);
        this.startGrid(1);
        fut.get();
        this.grid(0).services((ClusterGroup)this.grid(0).cluster()).deployNodeSingleton("test", (Service)new GridCacheAbstractFullApiSelfTest.DummyServiceImpl());
    }

    @Test
    public void testLargeUserAttribute1() throws Exception {
        this.initLargeAttribute();
        this.startGrid(0);
        this.checkZkNodesCleanup();
        this.userAttrs = null;
        this.startGrid(1);
        this.helper.waitForEventsAcks((Ignite)this.ignite(0));
        this.waitForTopology(2);
    }

    @Test
    public void testLargeUserAttribute2() throws Exception {
        this.startGrid(0);
        this.initLargeAttribute();
        this.startGrid(1);
        this.helper.waitForEventsAcks((Ignite)this.ignite(0));
        this.checkZkNodesCleanup();
    }

    @Test
    public void testLargeUserAttribute3() throws Exception {
        Set idxs = ThreadLocalRandom.current().ints(0, 10).distinct().limit(3L).boxed().collect(Collectors.toSet());
        for (int i = 0; i < 10; ++i) {
            this.info("Iteration: " + i);
            if (idxs.contains(i)) {
                this.initLargeAttribute();
            } else {
                this.userAttrs = null;
            }
            this.helper.clientMode(i > 5);
            this.startGrid(i);
        }
        this.waitForTopology(10);
    }

    private void initLargeAttribute() {
        this.userAttrs = new HashMap();
        int[] attr = new int[0x100000 + ThreadLocalRandom.current().nextInt(524288)];
        for (int i = 0; i < attr.length; ++i) {
            attr[i] = i;
        }
        this.userAttrs.put("testAttr", attr);
    }

    @Test
    public void testLargeCustomEvent() throws Exception {
        int i;
        IgniteEx srv0 = this.startGrid(0);
        IgniteCache cache = srv0.createCache(this.largeCacheConfiguration("c1"));
        for (i = 0; i < 100; ++i) {
            cache.put((Object)i, (Object)i);
        }
        ZookeeperDiscoveryTopologyChangeAndReconnectTest.assertEquals((Object)1, (Object)cache.get((Object)1));
        this.helper.waitForEventsAcks((Ignite)this.ignite(0));
        this.startGridsMultiThreaded(1, 3);
        srv0.destroyCache("c1");
        cache = srv0.createCache(this.largeCacheConfiguration("c1"));
        for (i = 0; i < 100; ++i) {
            cache.put((Object)i, (Object)i);
        }
        this.waitForTopology(4);
        this.ignite(3).createCache(this.largeCacheConfiguration("c2"));
    }

    @Test
    public void testClientReconnectSessionExpire1_1() throws Exception {
        this.clientReconnectSessionExpire(false);
    }

    @Test
    public void testClientReconnectSessionExpire1_2() throws Exception {
        this.clientReconnectSessionExpire(true);
    }

    private void clientReconnectSessionExpire(boolean closeSock) throws Exception {
        this.startGrid(0);
        this.sesTimeout = 2000L;
        this.helper.clientMode(true);
        this.testSockNio = true;
        IgniteEx client = this.startGrid(1);
        client.cache("default").put((Object)1, (Object)1);
        ZookeeperDiscoveryTopologyChangeAndReconnectTest.reconnectClientNodes(log, Collections.singletonList(client), closeSock);
        ZookeeperDiscoveryTopologyChangeAndReconnectTest.assertEquals((Object)1, (Object)client.cache("default").get((Object)1));
        client.compute().broadcast((IgniteCallable)new ZookeeperDiscoverySpiTestHelper.DummyCallable(null));
    }

    @Test
    public void testForceClientReconnect() throws Exception {
        int SRVS = 3;
        this.startGrids(3);
        this.helper.clientMode(true);
        this.startGrid(3);
        this.reconnectClientNodes(Collections.singletonList(this.ignite(3)), new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                ZookeeperDiscoverySpi spi = ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.helper.waitSpi(ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.getTestIgniteInstanceName(3), ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.spis);
                spi.clientReconnect();
                return null;
            }
        });
        this.waitForTopology(4);
    }

    @Test
    public void testForcibleClientFail() throws Exception {
        int SRVS = 3;
        this.startGrids(3);
        this.helper.clientMode(true);
        this.startGrid(3);
        this.reconnectClientNodes(Collections.singletonList(this.ignite(3)), new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                ZookeeperDiscoverySpi spi = ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.helper.waitSpi(ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.getTestIgniteInstanceName(0), ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.spis);
                spi.failNode(ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.ignite(3).cluster().localNode().id(), "Test forcible node fail");
                return null;
            }
        });
        this.waitForTopology(4);
    }

    @Test
    public void testDuplicatedNodeId() throws Exception {
        this.indexingDisabled = true;
        UUID nodeId0 = this.nodeId = UUID.randomUUID();
        this.startGrid(0);
        int failingNodeIdx = 100;
        for (int i = 0; i < 2; ++i) {
            int idx = failingNodeIdx++;
            this.nodeId = nodeId0;
            this.info("Start node with duplicated ID [iter=" + i + ", nodeId=" + this.nodeId + ']');
            GridTestUtils.assertThrowsAnyCause((IgniteLogger)log, () -> this.startGrid(idx), IgniteSpiException.class, (String)"Node with the same ID already exists");
            this.nodeId = null;
            this.info("Start node with unique ID [iter=" + i + ']');
            IgniteEx ignite = this.startGrid(idx);
            nodeId0 = ignite.cluster().localNode().id();
            this.waitForTopology(i + 2);
        }
    }

    @Test
    public void testPing() throws Exception {
        this.sesTimeout = 5000L;
        this.startGrids(3);
        final ZookeeperDiscoverySpi spi = this.helper.waitSpi(this.getTestIgniteInstanceName(1), this.spis);
        final UUID nodeId = this.ignite(2).cluster().localNode().id();
        IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync((Runnable)new Runnable(){

            @Override
            public void run() {
                ZookeeperDiscoveryTopologyChangeAndReconnectTest.assertTrue((boolean)spi.pingNode(nodeId));
            }
        }, (int)32, (String)"ping");
        fut.get();
        fut = GridTestUtils.runMultiThreadedAsync((Runnable)new Runnable(){

            @Override
            public void run() {
                spi.pingNode(nodeId);
            }
        }, (int)32, (String)"ping");
        U.sleep((long)100L);
        this.stopGrid(2);
        fut.get();
        fut = GridTestUtils.runMultiThreadedAsync((Runnable)new Runnable(){

            @Override
            public void run() {
                ZookeeperDiscoveryTopologyChangeAndReconnectTest.assertFalse((boolean)spi.pingNode(nodeId));
            }
        }, (int)32, (String)"ping");
        fut.get();
    }

    @Test
    public void testWithPersistence1() throws Exception {
        this.startWithPersistence(false);
    }

    @Test
    public void testWithPersistence2() throws Exception {
        this.startWithPersistence(true);
    }

    private static void reconnectClientNodes(final IgniteLogger log, List<Ignite> clients, boolean closeSock) throws Exception {
        final CountDownLatch disconnectLatch = new CountDownLatch(clients.size());
        final CountDownLatch reconnectLatch = new CountDownLatch(clients.size());
        IgnitePredicate<Event> p = new IgnitePredicate<Event>(){

            public boolean apply(Event evt) {
                if (evt.type() == 16) {
                    log.info("Disconnected: " + evt);
                    disconnectLatch.countDown();
                } else if (evt.type() == 17) {
                    log.info("Reconnected: " + evt);
                    reconnectLatch.countDown();
                }
                return true;
            }
        };
        ArrayList<String> zkNodes = new ArrayList<String>();
        for (Ignite client : clients) {
            client.events().localListen((IgnitePredicate)p, new int[]{16, 17});
            zkNodes.add(ZookeeperDiscoverySpiTestHelper.aliveZkNodePath(client));
        }
        long timeout = 15000L;
        if (closeSock) {
            for (Ignite ignite : clients) {
                ZookeeperDiscoverySpi spi = (ZookeeperDiscoverySpi)ignite.configuration().getDiscoverySpi();
                ZkTestClientCnxnSocketNIO.forNode(ignite.name()).closeSocket(true);
                timeout = Math.max(timeout, (long)((float)spi.getSessionTimeout() * 1.5f));
            }
        } else {
            ArrayList<ZooKeeper> dummyClients = new ArrayList<ZooKeeper>();
            block4: for (Ignite client : clients) {
                ZookeeperDiscoverySpi spi = (ZookeeperDiscoverySpi)client.configuration().getDiscoverySpi();
                ZooKeeper zk = ZookeeperDiscoverySpiTestHelper.zkClient(spi);
                for (String s : spi.getZkConnectionString().split(",")) {
                    try {
                        ZooKeeper dummyZk = new ZooKeeper(s, 10000, null, zk.getSessionId(), zk.getSessionPasswd());
                        dummyZk.exists("/a", false);
                        dummyClients.add(dummyZk);
                        continue block4;
                    }
                    catch (Exception e) {
                        log.warning("Can't connect to server " + s + " [err=" + e + ']');
                    }
                }
            }
            for (ZooKeeper zk : dummyClients) {
                zk.close();
            }
        }
        ZookeeperDiscoverySpiTestHelper.waitNoAliveZkNodes(log, ((ZookeeperDiscoverySpi)clients.get(0).configuration().getDiscoverySpi()).getZkConnectionString(), zkNodes, timeout);
        if (closeSock) {
            for (Ignite ignite : clients) {
                ZkTestClientCnxnSocketNIO.forNode(ignite.name()).allowConnect();
            }
        }
        ZookeeperDiscoverySpiTestHelper.waitReconnectEvent(log, disconnectLatch);
        ZookeeperDiscoverySpiTestHelper.waitReconnectEvent(log, reconnectLatch);
        for (Ignite ignite : clients) {
            ignite.events().stopLocalListen((IgnitePredicate)p, new int[0]);
        }
    }

    private List<String> listSubTree(ZooKeeper zk, String root) throws Exception {
        for (int i = 0; i < 30; ++i) {
            try {
                return ZKUtil.listSubTreeBFS((ZooKeeper)zk, (String)root);
            }
            catch (KeeperException.NoNodeException e) {
                this.info("NoNodeException when get znodes, will retry: " + (Object)((Object)e));
                continue;
            }
        }
        throw new Exception("Failed to get znodes: " + root);
    }

    private CacheConfiguration<Object, Object> largeCacheConfiguration(String cacheName) {
        CacheConfiguration ccfg = new CacheConfiguration(cacheName);
        ccfg.setAffinity((AffinityFunction)new TestAffinityFunction(0x100000));
        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
        return ccfg;
    }

    private void reconnectClientNodes(List<Ignite> clients, Callable<Void> c) throws Exception {
        final CountDownLatch disconnectLatch = new CountDownLatch(clients.size());
        final CountDownLatch reconnectLatch = new CountDownLatch(clients.size());
        IgnitePredicate<Event> p = new IgnitePredicate<Event>(){

            public boolean apply(Event evt) {
                if (evt.type() == 16) {
                    log.info("Disconnected: " + evt);
                    disconnectLatch.countDown();
                } else if (evt.type() == 17) {
                    log.info("Reconnected: " + evt);
                    reconnectLatch.countDown();
                }
                return true;
            }
        };
        for (Ignite client : clients) {
            client.events().localListen((IgnitePredicate)p, new int[]{16, 17});
        }
        c.call();
        ZookeeperDiscoverySpiTestHelper.waitReconnectEvent(log, disconnectLatch);
        ZookeeperDiscoverySpiTestHelper.waitReconnectEvent(log, reconnectLatch);
        for (Ignite client : clients) {
            client.events().stopLocalListen((IgnitePredicate)p, new int[0]);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void randomTopologyChanges(boolean restartZk, boolean closeClientSock) throws Exception {
        this.sesTimeout = 30000L;
        if (closeClientSock) {
            this.testSockNio = true;
        }
        ArrayList<Integer> startedNodes = new ArrayList<Integer>();
        ArrayList<String> startedCaches = new ArrayList<String>();
        int nextNodeIdx = 0;
        int nextCacheIdx = 0;
        long stopTime = System.currentTimeMillis() + (long)GridTestUtils.SF.applyLB((int)30000, (int)5000);
        int MAX_NODES = 20;
        int MAX_CACHES = 10;
        AtomicBoolean stop = new AtomicBoolean();
        IgniteInternalFuture<?> fut1 = restartZk ? this.startRestartZkServers(stopTime, stop) : null;
        IgniteInternalFuture<?> fut2 = closeClientSock ? this.startCloseZkClientSocket(stopTime, stop) : null;
        try {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();
            while (System.currentTimeMillis() < stopTime) {
                int nodeIdx;
                if (!startedNodes.isEmpty() && rnd.nextInt(10) == 0) {
                    String cacheName;
                    boolean startCache = startedCaches.size() < 2 || startedCaches.size() < MAX_CACHES && rnd.nextInt(5) != 0;
                    nodeIdx = (Integer)startedNodes.get(rnd.nextInt(startedNodes.size()));
                    if (startCache) {
                        cacheName = "cache-" + nextCacheIdx++;
                        log.info("Next, start new cache [cacheName=" + cacheName + ", node=" + nodeIdx + ", crd=" + (startedNodes.isEmpty() ? null : (Integer)Collections.min(startedNodes)) + ", curCaches=" + startedCaches.size() + ']');
                        this.ignite(nodeIdx).createCache(new CacheConfiguration(cacheName));
                        startedCaches.add(cacheName);
                    } else if (startedCaches.size() > 1) {
                        cacheName = (String)startedCaches.get(rnd.nextInt(startedCaches.size()));
                        log.info("Next, stop cache [nodeIdx=" + nodeIdx + ", node=" + nodeIdx + ", crd=" + (startedNodes.isEmpty() ? null : (Integer)Collections.min(startedNodes)) + ", cacheName=" + startedCaches.size() + ']');
                        this.ignite(nodeIdx).destroyCache(cacheName);
                        ZookeeperDiscoveryTopologyChangeAndReconnectTest.assertTrue((boolean)startedCaches.remove(cacheName));
                    }
                } else {
                    boolean startNode;
                    boolean bl = startNode = startedNodes.size() < 2 || startedNodes.size() < MAX_NODES && rnd.nextInt(5) != 0;
                    if (startNode) {
                        nodeIdx = nextNodeIdx++;
                        log.info("Next, start new node [nodeIdx=" + nodeIdx + ", crd=" + (startedNodes.isEmpty() ? null : (Integer)Collections.min(startedNodes)) + ", curNodes=" + startedNodes.size() + ']');
                        this.startGrid(nodeIdx);
                        ZookeeperDiscoveryTopologyChangeAndReconnectTest.assertTrue((boolean)startedNodes.add(nodeIdx));
                    } else if (startedNodes.size() > 1) {
                        nodeIdx = (Integer)startedNodes.get(rnd.nextInt(startedNodes.size()));
                        log.info("Next, stop [nodeIdx=" + nodeIdx + ", crd=" + (startedNodes.isEmpty() ? null : (Integer)Collections.min(startedNodes)) + ", curNodes=" + startedNodes.size() + ']');
                        this.stopGrid(nodeIdx);
                        ZookeeperDiscoveryTopologyChangeAndReconnectTest.assertTrue((boolean)startedNodes.remove((Object)nodeIdx));
                    }
                }
                U.sleep((long)(rnd.nextInt(100) + 1));
            }
        }
        finally {
            stop.set(true);
        }
        if (fut1 != null) {
            fut1.get();
        }
        if (fut2 != null) {
            fut2.get();
        }
    }

    private IgniteInternalFuture<?> startRestartZkServers(final long stopTime, final AtomicBoolean stop) {
        return GridTestUtils.runAsync((Callable)new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                ThreadLocalRandom rnd = ThreadLocalRandom.current();
                while (!stop.get() && System.currentTimeMillis() < stopTime) {
                    U.sleep((long)rnd.nextLong(2500L));
                    int idx = rnd.nextInt(3);
                    log.info("Restart ZK server: " + idx);
                    ((TestingZooKeeperServer)ZookeeperDiscoverySpiTestBase.zkCluster.getServers().get(idx)).restart();
                    ZookeeperDiscoverySpiTestBase.waitForZkClusterReady(ZookeeperDiscoverySpiTestBase.zkCluster);
                }
                return null;
            }
        }, (String)"zk-restart-thread");
    }

    private IgniteInternalFuture<?> startCloseZkClientSocket(final long stopTime, final AtomicBoolean stop) {
        assert (this.testSockNio);
        return GridTestUtils.runAsync((Callable)new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                ThreadLocalRandom rnd = ThreadLocalRandom.current();
                while (!stop.get() && System.currentTimeMillis() < stopTime) {
                    Ignite node;
                    ZkTestClientCnxnSocketNIO nio;
                    U.sleep((long)(rnd.nextLong(100L) + 50L));
                    List nodes = G.allGrids();
                    if (nodes.isEmpty() || (nio = ZkTestClientCnxnSocketNIO.forNode(node = (Ignite)nodes.get(rnd.nextInt(nodes.size())))) == null) continue;
                    ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.info("Close zk client socket for node: " + node.name());
                    try {
                        nio.closeSocket(false);
                    }
                    catch (Exception e) {
                        ZookeeperDiscoveryTopologyChangeAndReconnectTest.this.info("Failed to close zk client socket for node: " + node.name());
                    }
                }
                return null;
            }
        }, (String)"zk-restart-thread");
    }

    private void startWithPersistence(boolean dfltConsistenId) throws Exception {
        this.dfltConsistenId = dfltConsistenId;
        this.persistence = true;
        for (int i = 0; i < 3; ++i) {
            this.info("Iteration: " + i);
            this.helper.clientMode(false);
            this.startGridsMultiThreaded(4, i == 0);
            this.helper.clientMode(true);
            this.startGridsMultiThreaded(4, 3);
            this.waitForTopology(7);
            this.stopGrid(1);
            this.waitForTopology(6);
            this.stopGrid(4);
            this.waitForTopology(5);
            this.stopGrid(0);
            this.waitForTopology(4);
            this.checkEventsConsistency();
            this.stopAllGrids();
            evts.clear();
        }
    }

    private static class TestAffinityFunction
    extends RendezvousAffinityFunction {
        private static final long serialVersionUID = 0L;
        private int[] dummyData;

        TestAffinityFunction(int dataSize) {
            this.dummyData = new int[dataSize];
            for (int i = 0; i < dataSize; ++i) {
                this.dummyData[i] = i;
            }
        }
    }
}

