/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database.txdr;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteState;
import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.gridgain.grid.internal.processors.cache.database.txdr.AbstractReplicationTest;
import org.gridgain.grid.internal.processors.cache.database.txdr.ConsistentCut;
import org.gridgain.grid.internal.processors.cache.database.txdr.ConsistentCutStore;
import org.gridgain.grid.internal.txdr.ClusterRole;
import org.gridgain.grid.internal.txdr.ReplicationSessionDescriptor;
import org.gridgain.grid.internal.txdr.ReplicationState;
import org.gridgain.grid.internal.txdr.TransactionalDrMaster;
import org.junit.Test;

public class TxDrBaselineTopologyScenarios
extends AbstractReplicationTest {
    private static final long WAIT_TIMEOUT = 60000L;
    protected List<IgniteEx> masterCluster;
    protected List<IgniteEx> replicaCluster;
    protected long uploadTime;
    protected boolean useRecordingCommSpi;
    protected int rebalanceBatchSize;
    protected long rebalanceThrottle;

    public TxDrBaselineTopologyScenarios() {
        this.uploadTime = this.consistentCutInterval * 3L;
        this.rebalanceBatchSize = 524288;
        this.rebalanceThrottle = 0L;
    }

    @Override
    protected IgniteConfiguration getConfiguration(String igniteInstanceName, String consistentId, ClusterRole role) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName, consistentId, role);
        cfg.setFailureDetectionTimeout(3600000L);
        if (ClusterRole.MASTER == role) {
            for (CacheConfiguration ccfg : cfg.getCacheConfiguration()) {
                ccfg.setRebalanceBatchSize(this.rebalanceBatchSize);
                ccfg.setRebalanceThrottle(this.rebalanceThrottle);
            }
        }
        if (this.useRecordingCommSpi) {
            TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
            commSpi.setLocalPort(GridTestUtils.getNextCommPort(((Object)((Object)this)).getClass()));
            cfg.setCommunicationSpi((CommunicationSpi)commSpi);
        }
        cfg.setFailureHandler((FailureHandler)new StopNodeFailureHandler());
        return cfg;
    }

    @Override
    protected void beforeTestsStarted() throws Exception {
        super.beforeTestsStarted();
        System.setProperty("CONSISTENT_CUT_GC_DISABLED", "true");
    }

    @Override
    protected void afterTestsStopped() throws Exception {
        super.afterTestsStopped();
        System.clearProperty("CONSISTENT_CUT_GC_DISABLED");
    }

    @Override
    protected void beforeTest() throws Exception {
        super.beforeTest();
        this.useRecordingCommSpi = false;
        this.consistentCutInterval = 2000L;
        this.nodesCnt = 5;
        this.backupsCnt = 2;
        this.clientsCnt = 0;
        this.rebalanceBatchSize = 524288;
        this.rebalanceThrottle = 0L;
    }

    @Override
    protected void afterTest() throws Exception {
        super.afterTest();
        this.masterCluster = null;
        this.replicaCluster = null;
    }

    @Test
    public void testAddNodeJoinedBeforeBootstrap() throws Exception {
        this.addNodesToBaseline(true, 1);
    }

    @Test
    public void testAddNodesJoinedBeforeBootstrap() throws Exception {
        this.addNodesToBaseline(true, 3);
    }

    @Test
    public void testAddNodeJoinedAfterBootstrap() throws Exception {
        this.addNodesToBaseline(false, 1);
    }

    @Test
    public void testAddNodesJoinedAfterBootstrap() throws Exception {
        this.addNodesToBaseline(false, 3);
    }

    @Test
    public void testRemoveNodeLeftBeforeBootstrap() throws Exception {
        this.removeNodeFromBaseline(true, 1, false);
    }

    @Test
    public void testRemoveCrdNodeLeftBeforeBootstrap() throws Exception {
        this.removeNodeFromBaseline(true, 1, true);
    }

    @Test
    public void testRemoveNodesLeftBeforeBootstrap() throws Exception {
        this.removeNodeFromBaseline(true, this.backupsCnt, false);
    }

    @Test
    public void testRemoveNodeLeftAfterBootstrap() throws Exception {
        this.removeNodeFromBaseline(false, 1, false);
    }

    @Test
    public void testRemoveCrdNodeLeftAfterBootstrap() throws Exception {
        this.removeNodeFromBaseline(false, 1, true);
    }

    @Test
    public void testRemoveNodesLeftAfterBootstrap() throws Exception {
        this.removeNodeFromBaseline(false, this.backupsCnt, false);
    }

    @Test
    public void testDeanOnMasterAliveOnReplica() throws Exception {
        this.masterCluster = this.startCluster(ClusterRole.MASTER);
        IgniteEx master = this.masterCluster.get(0);
        master.cluster().baselineAutoAdjustEnabled(false);
        master.cluster().active(true);
        this.populateData((Ignite)master, "txCache");
        this.populateData((Ignite)master, "atomicCache");
        this.replicaCluster = this.startCluster(ClusterRole.REPLICA);
        IgniteEx replica = this.replicaCluster.get(0);
        replica.cluster().active(true);
        long sesId = (Long)TxDrBaselineTopologyScenarios.bootstrapMaster((TransactionalDrMaster)this.txdr((Ignite)master), this.snapshotFolder()).get();
        this.txdr((Ignite)replica).bootstrap(this.snapshotFolder(), sesId).get();
        this.stopClusterNodes(ClusterRole.MASTER, 1, false);
        IgniteInternalFuture txLoadFut = this.startTxLoad(4, ClusterRole.MASTER, (Ignite)master);
        IgniteInternalFuture atomicLoadFut = this.startAtomicLoad(4, ClusterRole.MASTER, (Ignite)master);
        log.info("Load started");
        U.sleep((long)this.uploadTime);
        master.cluster().setBaselineTopology(master.cluster().forServers().nodes());
        log.info("Baseline topology changed on MASTER cluster");
        this.assertClusterState(this.masterCluster, ClusterRole.MASTER, ReplicationState.RUNNING, sesId);
        this.waitForBltCutCreated(this.masterCluster, -1L, 60000L);
        U.sleep((long)this.uploadTime);
        this.stopTxLoad(txLoadFut);
        this.stopAtomicLoad(atomicLoadFut);
        log.info("Load stopped.");
        this.txdr((Ignite)master).stop().get();
        long lastCreatedCutId = this.txdr((Ignite)master).localState().lastCreatedCutId();
        log.info("MASTER cluster replication stopped. [lastCreatedCutId=" + lastCreatedCutId + ']');
        this.assertClusterState(this.replicaCluster, ClusterRole.REPLICA, ReplicationState.RUNNING, sesId);
        boolean awaited = GridTestUtils.waitForCondition(() -> {
            for (IgniteEx node : this.replicaCluster) {
                if (this.txdr((Ignite)node).localState().lastSuccessfullyAppliedCutId() != lastCreatedCutId) continue;
                return true;
            }
            return false;
        }, (long)(this.consistentCutInterval * 5L));
        TxDrBaselineTopologyScenarios.assertFalse((boolean)awaited);
        TxDrBaselineTopologyScenarios.assertFalse((boolean)this.idleVerifyReplica(replica));
        this.txdr((Ignite)replica).pause().get();
        this.awaitPartitionMapExchange(false, false, replica.context().discovery().aliveServerNodes());
        TxDrBaselineTopologyScenarios.assertTrue((boolean)this.idleVerifyReplica(replica));
        this.txdr((Ignite)replica).stop().get();
        log.info("REPLICA cluster replication stopped.");
    }

    @Test
    public void testBaselineChangeInMixedWay() throws Exception {
        this.masterCluster = this.startCluster(ClusterRole.MASTER);
        IgniteEx master = this.masterCluster.get(1);
        master.cluster().baselineAutoAdjustEnabled(false);
        master.cluster().active(true);
        this.populateData((Ignite)master, "txCache");
        this.populateData((Ignite)master, "atomicCache");
        this.replicaCluster = this.startCluster(ClusterRole.REPLICA);
        IgniteEx replica = this.replicaCluster.get(1);
        replica.cluster().active(true);
        long sesId = (Long)TxDrBaselineTopologyScenarios.bootstrapMaster((TransactionalDrMaster)this.txdr((Ignite)master), this.snapshotFolder()).get();
        this.txdr((Ignite)replica).bootstrap(this.snapshotFolder(), sesId).get();
        IgniteInternalFuture txLoadFut = this.startTxLoad(4, ClusterRole.MASTER, (Ignite)master);
        IgniteInternalFuture atomicLoadFut = this.startAtomicLoad(4, ClusterRole.MASTER, (Ignite)master);
        log.info("Load started");
        U.sleep((long)this.uploadTime);
        this.stopClusterNodes(ClusterRole.MASTER, 1, true);
        master.cluster().setBaselineTopology(master.cluster().forServers().nodes());
        long bltCutId = this.waitForBltCutCreated(this.masterCluster, -1L, 60000L);
        U.sleep((long)this.uploadTime);
        this.startClusterNodes(ClusterRole.MASTER, 1);
        this.assertClusterState(this.masterCluster, ClusterRole.MASTER, ReplicationState.RUNNING, sesId);
        master.cluster().setBaselineTopology(master.cluster().forServers().nodes());
        this.waitForBltCutCreated(this.masterCluster, bltCutId, 60000L);
        U.sleep((long)this.uploadTime);
        this.stopTxLoad(txLoadFut);
        this.stopAtomicLoad(atomicLoadFut);
        log.info("Load stopped.");
        this.txdr((Ignite)master).stop().get();
        long lastCreatedCutId = this.txdr((Ignite)master).localState().lastCreatedCutId();
        log.info("MASTER cluster replication stopped. [lastCreatedCutId=" + lastCreatedCutId + ']');
        U.sleep((long)this.uploadTime);
        this.stopClusterNodes(ClusterRole.REPLICA, 1, true);
        this.assertClusterState(this.replicaCluster, ClusterRole.REPLICA, ReplicationState.RUNNING, sesId);
        this.startClusterNodes(ClusterRole.REPLICA, 1);
        this.waitForApplyingCut(this.replicaCluster, lastCreatedCutId, 60000L);
        this.awaitPartitionMapExchange(false, false, replica.context().discovery().aliveServerNodes());
        this.checkCacheDumps((Ignite)master, (Ignite)replica, 60000L);
        this.txdr((Ignite)replica).stop().get();
        log.info("REPLICA cluster replication stopped.");
    }

    @Test
    public void testFailoverOnMaster() throws Exception {
        this.useRecordingCommSpi = true;
        this.rebalanceBatchSize = 16;
        this.rebalanceThrottle = 500L;
        boolean rmvCrd = false;
        int remNodesCntr = 1;
        this.masterCluster = this.startCluster(ClusterRole.MASTER);
        int initiatorId = 0;
        IgniteEx master = this.masterCluster.get(initiatorId);
        master.cluster().baselineAutoAdjustEnabled(false);
        master.cluster().active(true);
        this.populateData((Ignite)master, "txCache");
        this.populateData((Ignite)master, "atomicCache");
        this.replicaCluster = this.startCluster(ClusterRole.REPLICA);
        IgniteEx replica = this.replicaCluster.get(initiatorId);
        replica.cluster().active(true);
        this.stopClusterNodes(ClusterRole.MASTER, remNodesCntr, rmvCrd);
        this.stopClusterNodes(ClusterRole.REPLICA, remNodesCntr, rmvCrd);
        long sesId = (Long)TxDrBaselineTopologyScenarios.bootstrapMaster((TransactionalDrMaster)this.txdr((Ignite)master), this.snapshotFolder()).get();
        this.assertClusterState(this.masterCluster, ClusterRole.MASTER, ReplicationState.RUNNING, sesId);
        this.txdr((Ignite)replica).bootstrap(this.snapshotFolder(), sesId).get();
        IgniteInternalFuture txLoadFut = this.startTxLoad(4, ClusterRole.MASTER, (Ignite)master);
        IgniteInternalFuture atomicLoadFut = this.startAtomicLoad(4, ClusterRole.MASTER, (Ignite)master);
        log.info("Load started");
        U.sleep((long)this.uploadTime);
        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi((Ignite)master);
        spi.blockMessages((IgniteBiPredicate & Serializable)(node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
        master.cluster().setBaselineTopology(master.cluster().forServers().nodes());
        log.info(">>>>> Baseline topology changed on MASTER cluster");
        spi.waitForBlocked();
        spi.stopBlock();
        this.stopClusterNodes(ClusterRole.MASTER, remNodesCntr, rmvCrd);
        this.stopTxLoad(txLoadFut);
        this.stopAtomicLoad(atomicLoadFut);
        this.txdr((Ignite)master).stop().get();
        long lastCreatedCutId = this.txdr((Ignite)master).localState().lastCreatedCutId();
        log.info("MASTER cluster replication stopped. [lastCreatedCutId=" + lastCreatedCutId + ']');
        this.waitForApplyingCut(this.replicaCluster, lastCreatedCutId, 60000L);
        this.awaitPartitionMapExchange(false, false, replica.context().discovery().aliveServerNodes());
        this.checkCacheDumps((Ignite)master, (Ignite)replica, 60000L);
        TxDrBaselineTopologyScenarios.assertTrue((boolean)this.idleVerifyReplica(replica));
        this.txdr((Ignite)replica).stop().get();
        log.info("REPLICA cluster replication stopped.");
    }

    @Test
    public void testNodeFailureWhenBltChangeIsInProgress() throws Exception {
        this.nodesCnt = 2;
        this.backupsCnt = 1;
        this.clientsCnt = 0;
        this.useRecordingCommSpi = true;
        this.consistentCutInterval = Long.MAX_VALUE;
        this.masterCluster = this.startCluster(ClusterRole.MASTER);
        IgniteEx master = this.masterCluster.get(0);
        master.cluster().baselineAutoAdjustEnabled(false);
        master.cluster().active(true);
        this.populateData((Ignite)master, "txCache");
        this.populateData((Ignite)master, "atomicCache");
        TxDrBaselineTopologyScenarios.bootstrapMaster((TransactionalDrMaster)this.txdr((Ignite)master), this.snapshotFolder()).get();
        this.stopClusterNode(ClusterRole.MASTER, 1);
        master.cluster().setBaselineTopology(master.cluster().forServers().nodes());
        IgniteEx restartedNode = this.startClusterNode(ClusterRole.MASTER, 1);
        this.awaitPartitionMapExchange(false, false, master.context().discovery().aliveServerNodes());
        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi((Ignite)restartedNode);
        spi.blockMessages((IgniteBiPredicate & Serializable)(node, msg) -> msg instanceof GridDhtPartitionsSingleMessage && ((GridDhtPartitionsSingleMessage)msg).exchangeId() != null);
        GridTestUtils.runAsync(() -> master.cluster().setBaselineTopology(master.cluster().forServers().nodes()));
        spi.waitForBlocked();
        this.stopClusterNode(ClusterRole.MASTER, 1);
        TxDrBaselineTopologyScenarios.assertFalse((String)"The kernel context must be in valid state", (boolean)master.context().invalid());
        TxDrBaselineTopologyScenarios.assertTrue((IgnitionEx.state((String)master.context().igniteInstanceName()) == IgniteState.STARTED ? 1 : 0) != 0);
        TxDrBaselineTopologyScenarios.assertNotNull((Object)TestRecordingCommunicationSpi.spi((Ignite)master).ignite());
        this.txdr((Ignite)master).stop().get();
    }

    @Test
    public void testReturnNodeToBLTAfterRemoving() throws Exception {
        this.masterCluster = this.startCluster(ClusterRole.MASTER);
        int nodeIdx = this.nodesCnt - 1;
        IgniteEx master = this.masterCluster.get(0);
        master.cluster().baselineAutoAdjustEnabled(false);
        master.cluster().active(true);
        this.populateData((Ignite)master, "txCache");
        this.populateData((Ignite)master, "atomicCache");
        this.replicaCluster = this.startCluster(ClusterRole.REPLICA);
        IgniteEx replica = this.replicaCluster.get(0);
        replica.cluster().active(true);
        long sesId = (Long)TxDrBaselineTopologyScenarios.bootstrapMaster((TransactionalDrMaster)this.txdr((Ignite)master), this.snapshotFolder()).get();
        this.txdr((Ignite)replica).bootstrap(this.snapshotFolder(), sesId).get();
        IgniteInternalFuture txLoadFut = this.startTxLoad(4, ClusterRole.MASTER, (Ignite)master);
        IgniteInternalFuture atomicLoadFut = this.startAtomicLoad(4, ClusterRole.MASTER, (Ignite)master);
        log.info("Load started");
        U.sleep((long)this.uploadTime);
        this.stopClusterNode(ClusterRole.MASTER, nodeIdx);
        this.stopClusterNode(ClusterRole.REPLICA, nodeIdx);
        master.cluster().setBaselineTopology(master.cluster().forServers().nodes());
        long bltCutId = this.waitForBltCutCreated(this.masterCluster, -1L, 60000L);
        this.waitForApplyingCut(this.replicaCluster, bltCutId, 60000L);
        this.startClusterNode(ClusterRole.MASTER, nodeIdx);
        master.cluster().setBaselineTopology(master.cluster().forServers().nodes());
        this.waitForBltCutCreated(this.masterCluster, bltCutId, 60000L);
        this.startClusterNode(ClusterRole.REPLICA, nodeIdx);
        U.sleep((long)this.uploadTime);
        this.stopTxLoad(txLoadFut);
        this.stopAtomicLoad(atomicLoadFut);
        log.info("Load stopped.");
        this.txdr((Ignite)master).stop().get();
        long lastCreatedCutId = this.txdr((Ignite)master).localState().lastCreatedCutId();
        log.info("MASTER cluster replication stopped. [lastCreatedCutId=" + lastCreatedCutId + ']');
        this.waitForApplyingCut(this.replicaCluster, lastCreatedCutId, 60000L);
        this.awaitPartitionMapExchange(false, false, replica.context().discovery().aliveServerNodes());
        this.checkCacheDumps((Ignite)master, (Ignite)replica, 60000L);
        TxDrBaselineTopologyScenarios.assertTrue((boolean)this.idleVerifyReplica(replica));
        this.txdr((Ignite)replica).stop().get();
        log.info("REPLICA cluster replication stopped.");
    }

    private void addNodesToBaseline(boolean beforeBootstrap, int newNodesCntr) throws Exception {
        this.masterCluster = this.startCluster(ClusterRole.MASTER);
        IgniteEx master = this.masterCluster.get(0);
        master.cluster().baselineAutoAdjustEnabled(false);
        master.cluster().active(true);
        this.populateData((Ignite)master, "txCache");
        this.populateData((Ignite)master, "atomicCache");
        this.replicaCluster = this.startCluster(ClusterRole.REPLICA);
        IgniteEx replica = this.replicaCluster.get(0);
        replica.cluster().active(true);
        this.assertClusterState(this.masterCluster, ClusterRole.DISABLED, ReplicationState.STOPPED, 0L);
        this.assertClusterState(this.replicaCluster, ClusterRole.DISABLED, ReplicationState.STOPPED, 0L);
        List<IgniteEx> newMasterNodes = null;
        List<IgniteEx> newReplicaNodes = null;
        if (beforeBootstrap) {
            newMasterNodes = this.startClusterNodes(ClusterRole.MASTER, newNodesCntr);
            newReplicaNodes = this.startClusterNodes(ClusterRole.REPLICA, newNodesCntr);
            this.assertClusterState(newMasterNodes, ClusterRole.DISABLED, ReplicationState.STOPPED, 0L);
            this.assertClusterState(newReplicaNodes, ClusterRole.DISABLED, ReplicationState.STOPPED, 0L);
        }
        long sesId = (Long)TxDrBaselineTopologyScenarios.bootstrapMaster((TransactionalDrMaster)this.txdr((Ignite)master), this.snapshotFolder()).get();
        this.txdr((Ignite)replica).bootstrap(this.snapshotFolder(), sesId).get();
        if (beforeBootstrap) {
            Iterator<IgniteEx> newMasterNodes0 = newMasterNodes;
            boolean awaited = GridTestUtils.waitForCondition(() -> newMasterNodes0.stream().allMatch(n -> this.txdr((Ignite)n).localState().role() == ClusterRole.MASTER && this.txdr((Ignite)n).localState().state() == ReplicationState.RUNNING), (long)60000L);
            TxDrBaselineTopologyScenarios.assertTrue((boolean)awaited);
            List<IgniteEx> newReplicaNodes0 = newReplicaNodes;
            awaited = GridTestUtils.waitForCondition(() -> newReplicaNodes0.stream().allMatch(n -> this.txdr((Ignite)n).localState().role() == ClusterRole.REPLICA && this.txdr((Ignite)n).localState().state() == ReplicationState.RUNNING), (long)60000L);
            TxDrBaselineTopologyScenarios.assertTrue((boolean)awaited);
            this.assertClusterState(newMasterNodes, ClusterRole.MASTER, ReplicationState.RUNNING, sesId);
            this.assertClusterState(newReplicaNodes, ClusterRole.REPLICA, ReplicationState.RUNNING, sesId);
            for (IgniteEx node : newMasterNodes) {
                TxDrBaselineTopologyScenarios.assertNull((Object)this.txdr((Ignite)node).consistentCutScheduler());
                TxDrBaselineTopologyScenarios.assertNull((Object)this.txdr((Ignite)node).walSender());
            }
            for (IgniteEx node : newReplicaNodes) {
                TxDrBaselineTopologyScenarios.assertNull((Object)this.txdr((Ignite)node).consistentCutWatcher());
            }
        }
        if (!beforeBootstrap) {
            newMasterNodes = this.startClusterNodes(ClusterRole.MASTER, newNodesCntr);
            newReplicaNodes = this.startClusterNodes(ClusterRole.REPLICA, newNodesCntr);
            this.assertClusterState(newMasterNodes, ClusterRole.MASTER, ReplicationState.RUNNING, sesId);
            this.assertClusterState(newReplicaNodes, ClusterRole.REPLICA, ReplicationState.RUNNING, sesId);
            for (IgniteEx node : newMasterNodes) {
                TxDrBaselineTopologyScenarios.assertNull((Object)this.txdr((Ignite)node).consistentCutScheduler());
                TxDrBaselineTopologyScenarios.assertNull((Object)this.txdr((Ignite)node).walSender());
            }
            for (IgniteEx node : newReplicaNodes) {
                TxDrBaselineTopologyScenarios.assertNull((Object)this.txdr((Ignite)node).consistentCutWatcher());
            }
        }
        IgniteInternalFuture txLoadFut = this.startTxLoad(4, ClusterRole.MASTER);
        IgniteInternalFuture atomicLoadFut = this.startAtomicLoad(4, ClusterRole.MASTER);
        log.info("Load started");
        U.sleep((long)this.uploadTime);
        master.cluster().setBaselineTopology(master.cluster().forServers().nodes());
        log.info("Baseline topology changed on MASTER cluster");
        this.assertNodesState(this.masterCluster, ClusterRole.MASTER, ReplicationState.RUNNING, sesId, "All master-nodes must be included into the current baseline topology.", (IgniteBiPredicate<Collection<BaselineNode>, Object>)(IgniteBiPredicate & Serializable)(baseline, consistentId) -> baseline.stream().anyMatch(n -> n.consistentId().equals(consistentId)));
        this.waitForBltCutCreated(this.masterCluster, -1L, 60000L);
        for (IgniteEx node : newMasterNodes) {
            TxDrBaselineTopologyScenarios.assertNotNull((Object)this.txdr((Ignite)node).walSender());
            TxDrBaselineTopologyScenarios.assertNotNull((Object)this.txdr((Ignite)node).walSender().runner().isAlive());
            TxDrBaselineTopologyScenarios.assertNotNull((Object)this.txdr((Ignite)node).topologyTracker());
        }
        U.sleep((long)this.uploadTime);
        this.stopTxLoad(txLoadFut);
        this.stopAtomicLoad(atomicLoadFut);
        log.info("Load stopped.");
        long lastCreatedCutId = this.txdr((Ignite)master).localState().lastCreatedCutId();
        this.txdr((Ignite)master).stop().get();
        log.info("MASTER cluster replication stopped. [lastCreatedCutId=" + lastCreatedCutId + ']');
        this.waitForApplyingCut(this.replicaCluster, lastCreatedCutId, 60000L);
        this.assertNodesState(this.replicaCluster, ClusterRole.REPLICA, ReplicationState.RUNNING, sesId, "All replica-nodes must be included into the current baseline topology.", (IgniteBiPredicate<Collection<BaselineNode>, Object>)(IgniteBiPredicate & Serializable)(baseline, consistentId) -> baseline.stream().anyMatch(n -> n.consistentId().equals(consistentId)));
        for (IgniteEx node : newReplicaNodes) {
            TxDrBaselineTopologyScenarios.assertNotNull((Object)this.txdr((Ignite)node).consistentCutWatcher());
            TxDrBaselineTopologyScenarios.assertTrue((boolean)GridTestUtils.waitForCondition(() -> node.context().cache().context().readOnlyMode(), (long)60000L));
        }
        this.awaitPartitionMapExchange(false, false, replica.context().discovery().aliveServerNodes());
        this.checkCacheDumps((Ignite)master, (Ignite)replica, 60000L);
        TxDrBaselineTopologyScenarios.assertTrue((boolean)this.idleVerifyReplica(replica));
        this.txdr((Ignite)replica).stop().get();
        log.info("REPLICA cluster replication stopped.");
    }

    private void removeNodeFromBaseline(boolean beforeBootstrap, int remNodesCntr, boolean rmvCrd) throws Exception {
        this.masterCluster = this.startCluster(ClusterRole.MASTER);
        int initiatorId = rmvCrd ? this.nodesCnt - remNodesCntr : 0;
        IgniteEx master = this.masterCluster.get(initiatorId);
        master.cluster().baselineAutoAdjustEnabled(false);
        master.cluster().active(true);
        this.populateData((Ignite)master, "txCache");
        this.populateData((Ignite)master, "atomicCache");
        this.replicaCluster = this.startCluster(ClusterRole.REPLICA);
        IgniteEx replica = this.replicaCluster.get(initiatorId);
        replica.cluster().active(true);
        if (beforeBootstrap) {
            this.stopClusterNodes(ClusterRole.MASTER, remNodesCntr, rmvCrd);
            this.stopClusterNodes(ClusterRole.REPLICA, remNodesCntr, rmvCrd);
        }
        long sesId = (Long)TxDrBaselineTopologyScenarios.bootstrapMaster((TransactionalDrMaster)this.txdr((Ignite)master), this.snapshotFolder()).get();
        this.txdr((Ignite)replica).bootstrap(this.snapshotFolder(), sesId).get();
        if (!beforeBootstrap) {
            this.stopClusterNodes(ClusterRole.MASTER, remNodesCntr, rmvCrd);
            this.stopClusterNodes(ClusterRole.REPLICA, remNodesCntr, rmvCrd);
        }
        IgniteInternalFuture txLoadFut = this.startTxLoad(4, ClusterRole.MASTER, (Ignite)master);
        IgniteInternalFuture atomicLoadFut = this.startAtomicLoad(4, ClusterRole.MASTER, (Ignite)master);
        log.info("Load started");
        U.sleep((long)this.uploadTime);
        master.cluster().setBaselineTopology(master.cluster().forServers().nodes());
        log.info("Baseline topology changed on MASTER cluster");
        this.assertClusterState(this.masterCluster, ClusterRole.MASTER, ReplicationState.RUNNING, sesId);
        this.waitForBltCutCreated(this.masterCluster, -1L, 60000L);
        U.sleep((long)this.uploadTime);
        this.stopTxLoad(txLoadFut);
        this.stopAtomicLoad(atomicLoadFut);
        log.info("Load stopped.");
        this.txdr((Ignite)master).stop().get();
        long lastCreatedCutId = this.txdr((Ignite)master).localState().lastCreatedCutId();
        log.info("MASTER cluster replication stopped. [lastCreatedCutId=" + lastCreatedCutId + ']');
        this.waitForApplyingCut(this.replicaCluster, lastCreatedCutId, 60000L);
        this.assertClusterState(this.replicaCluster, ClusterRole.REPLICA, ReplicationState.RUNNING, sesId);
        this.awaitPartitionMapExchange(false, false, replica.context().discovery().aliveServerNodes());
        this.checkCacheDumps((Ignite)master, (Ignite)replica, 60000L);
        TxDrBaselineTopologyScenarios.assertTrue((boolean)this.idleVerifyReplica(replica));
        this.txdr((Ignite)replica).stop().get();
        log.info("REPLICA cluster replication stopped.");
    }

    private long findBltCutId(IgniteEx node, long startId) throws IgniteCheckedException {
        ConsistentCutStore store = this.txdr((Ignite)node).consistentCutStore();
        for (Long cutId : store.list(startId)) {
            ConsistentCut cut = store.restore(cutId.longValue());
            if (cutId <= startId || cut.baselineTopology() == null) continue;
            return cutId;
        }
        throw new IgniteCheckedException("Baseline consistent cut is not found.");
    }

    private long waitForBltCutCreated(List<IgniteEx> nodes, long startId, long timeout) throws IgniteCheckedException {
        TxDrBaselineTopologyScenarios.assertTrue((String)"Failed to wait for a baseline consistent cut.", (boolean)GridTestUtils.waitForCondition(() -> {
            try {
                ConsistentCutStore store = this.txdr((Ignite)nodes.get(0)).consistentCutStore();
                for (Long cutId : store.list(startId)) {
                    ConsistentCut cut = store.restore(cutId.longValue());
                    if (cutId <= startId || cut.baselineTopology() == null) continue;
                    return true;
                }
            }
            catch (IgniteCheckedException e) {
                return false;
            }
            return false;
        }, (long)timeout));
        long bltCutId = this.findBltCutId(nodes.get(0), startId);
        ArrayList<Long> consistentIds = new ArrayList<Long>(Collections.nCopies(nodes.size(), -1L));
        boolean awaited = GridTestUtils.waitForCondition(() -> {
            for (int i = 0; i < nodes.size(); ++i) {
                IgniteEx node = (IgniteEx)nodes.get(i);
                long lastCutId = this.txdr((Ignite)node).localState().lastCreatedCutId();
                consistentIds.set(i, lastCutId);
                if (lastCutId >= bltCutId) continue;
                return false;
            }
            return true;
        }, (long)timeout);
        TxDrBaselineTopologyScenarios.assertTrue((String)("Failed to wait for the baseline consistent cut [expected=" + bltCutId + ", actual=[" + String.join((CharSequence)", ", consistentIds.stream().map(l -> Long.toString(l)).collect(Collectors.toList())) + ']' + ']'), (boolean)awaited);
        return bltCutId;
    }

    protected List<IgniteEx> startClusterNodes(ClusterRole role, int numOfNodes) throws Exception {
        ArrayList<IgniteEx> res = new ArrayList<IgniteEx>(this.nodesCnt + numOfNodes);
        for (int i = this.nodesCnt; i < this.nodesCnt + numOfNodes; ++i) {
            res.add(this.startClusterNode(role, i));
        }
        log.info("New node(s) joined the " + (role == ClusterRole.MASTER ? "master" : "replica") + " cluster consistentIds=[" + res.stream().map(n -> n.cluster().localNode().consistentId().toString()).collect(Collectors.joining(", ")) + ']');
        return res;
    }

    @Override
    protected IgniteEx startClusterNode(ClusterRole role, int nodeIdx) throws Exception {
        IgniteEx node = super.startClusterNode(role, nodeIdx);
        if (role == ClusterRole.MASTER) {
            this.masterCluster.add(node);
        } else {
            this.replicaCluster.add(node);
        }
        return node;
    }

    protected void stopClusterNodes(ClusterRole role, int numOfNodes, boolean stopCrd) {
        List<IgniteEx> cluster = role == ClusterRole.MASTER ? this.masterCluster : this.replicaCluster;
        int startIdx = stopCrd ? 0 : 1;
        int stopIdx = stopCrd ? numOfNodes : numOfNodes + 1;
        ArrayList<IgniteEx> toBeStopped = new ArrayList<IgniteEx>(cluster.subList(startIdx, stopIdx));
        ArrayList<Object> toBeStoppedIds = new ArrayList<Object>(toBeStopped.size());
        for (IgniteEx node : toBeStopped) {
            toBeStoppedIds.add(node.localNode().consistentId());
            this.stopClusterNode(role, startIdx++);
        }
        log.info("Node(s) left the " + (role == ClusterRole.MASTER ? "master" : "replica") + " cluster consistentIds=[" + toBeStoppedIds.stream().map(Object::toString).collect(Collectors.joining(", ")) + ']');
    }

    private void stopClusterNode(ClusterRole role, int nodeIdx) {
        this.stopClusterNode(role, nodeIdx, AbstractReplicationTest.NodeType.BLT_NODE);
    }

    private void stopClusterNode(ClusterRole role, int nodeIdx, AbstractReplicationTest.NodeType nodeType) {
        String instanceName = this.igniteInstanceNameWithRole(role, nodeIdx, nodeType);
        List<IgniteEx> cluster = role == ClusterRole.MASTER ? this.masterCluster : this.replicaCluster;
        IgniteEx node = null;
        for (IgniteEx ignite : cluster) {
            if (!instanceName.equals(ignite.name())) continue;
            node = ignite;
        }
        if (node != null) {
            this.stopClusterNode(role, node);
            cluster.remove(node);
        }
    }

    private void checkCacheDumps(Ignite master, Ignite replica, long timeout) throws Exception {
        IgniteCache masterTxCache = master.cache("txCache");
        IgniteCache masterAtomicCache = master.cache("atomicCache");
        IgniteCache replicaTxCache = replica.cache("txCache");
        IgniteCache replicaAtomicCache = replica.cache("atomicCache");
        Map<Integer, Long> dumpMasterTx = this.dumpCache((IgniteCache<Integer, Long>)masterTxCache);
        Map<Integer, Long> dumpMasterAtomic = this.dumpCache((IgniteCache<Integer, Long>)masterAtomicCache);
        TxDrBaselineTopologyScenarios.assertTrue((boolean)GridTestUtils.waitForCondition(() -> {
            Map<Integer, Long> dumpReplicaTx = this.dumpCache((IgniteCache<Integer, Long>)replicaTxCache);
            Map<Integer, Long> dumpReplicaAtomic = this.dumpCache((IgniteCache<Integer, Long>)replicaAtomicCache);
            return dumpMasterTx.equals(dumpReplicaTx) && dumpMasterAtomic.equals(dumpReplicaAtomic);
        }, (long)60000L));
    }

    protected void assertNodesState(List<IgniteEx> cluster, ClusterRole role, ReplicationState state, long bootstrapSesId, String msg, IgniteBiPredicate<Collection<BaselineNode>, Object> baselinePred) {
        Collection currBaseline = this.node(role).cluster().currentBaselineTopology();
        for (IgniteEx ignite : cluster) {
            if (baselinePred != null) {
                TxDrBaselineTopologyScenarios.assertTrue((String)msg, (boolean)baselinePred.apply((Object)currBaseline, ignite.cluster().localNode().consistentId()));
            }
            ReplicationSessionDescriptor locState = this.txdr((Ignite)ignite).localState();
            TxDrBaselineTopologyScenarios.assertEquals((String)("Wrong role for node: " + ignite.name()), (Object)role, (Object)locState.role());
            TxDrBaselineTopologyScenarios.assertEquals((String)("Wrong process state for node: " + ignite.name()), (Object)state, (Object)locState.state());
            TxDrBaselineTopologyScenarios.assertEquals((String)("Wrong session ID for node: " + ignite.name()), (long)bootstrapSesId, (long)locState.sessionId());
        }
    }
}

