package org.gridgain.internal.processors.dr.hubs;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.Serializable;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.dr.DrSenderConnectionConfiguration;
import org.gridgain.grid.dr.store.DrAbstractSenderStore;
import org.gridgain.grid.dr.store.DrSenderStore;
import org.gridgain.grid.dr.store.DrSenderStoreCursor;
import org.gridgain.grid.dr.store.DrSenderStoreCursorClosedException;
import org.gridgain.grid.dr.store.DrSenderStoreEntry;
import org.gridgain.grid.dr.store.DrSenderStoreOverflowMode;
import org.gridgain.grid.dr.store.DurableStore;
import org.gridgain.grid.dr.store.fs.DrSenderFsStore;
import org.gridgain.grid.internal.processors.dr.DrSenderRemoteDataCenter;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalBatchRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrExternalBatchResponse;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalRequestEntry;
import org.gridgain.grid.internal.processors.dr.store.DrCommonStoreManager;
import org.gridgain.grid.internal.processors.dr.store.DrReplicaStoreManager;
import org.gridgain.internal.processors.dr.DrAbstractTest;
import org.gridgain.internal.processors.dr.util.DrTestCommunicationSpi;
import org.gridgain.internal.processors.dr.util.DrTestQueuedCommunicationSpiListener;
import org.gridgain.internal.processors.dr.util.DrTestQueuedReceiverHubListener;
import org.gridgain.plugin.security.SecurityServicePermissionsTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

@WithSystemProperty(key = "IGNITE_DISABLE_SMART_DR_THROTTLING", value = "true")
/* loaded from: input_file:org/gridgain/internal/processors/dr/hubs/DrSenderImplStoreSelfTest.class */
public class DrSenderImplStoreSelfTest extends DrAbstractTest {
    private TcpDiscoveryIpFinder ipFinder;
    private DrTestQueuedCommunicationSpiListener sndHubLsnr;
    private IgniteCache<Integer, Integer> cache;
    private DrTestReceiverListener rcvHubLsnr;
    private volatile boolean stopped;
    private DrSenderFsStore store;
    private static final String STORE_PATH;
    static final /* synthetic */ boolean $assertionsDisabled;
    private int sndHubMaxQueueSize = 1000;
    private final Map<DrExternalBatchRequest, CountDownLatch> latchMap = Collections.synchronizedMap(new IdentityHashMap());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/internal/processors/dr/hubs/DrSenderImplStoreSelfTest$DrTestReceiverListener.class */
    public class DrTestReceiverListener extends DrTestQueuedReceiverHubListener {
        private DrTestReceiverListener() {
        }

        @Override // org.gridgain.internal.processors.dr.util.DrTestQueuedReceiverHubListener, org.gridgain.internal.processors.dr.util.DrTestReceiverHubListener
        public DrExternalBatchResponse onBatch(DrExternalBatchRequest drExternalBatchRequest) throws Exception {
            DrExternalBatchResponse onBatch = super.onBatch(drExternalBatchRequest);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            DrSenderImplStoreSelfTest.this.latchMap.put(drExternalBatchRequest, countDownLatch);
            while (!DrSenderImplStoreSelfTest.this.stopped && !countDownLatch.await(100L, TimeUnit.MILLISECONDS)) {
            }
            return onBatch;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/gridgain/internal/processors/dr/hubs/DrSenderImplStoreSelfTest$DummyDrStore.class */
    public static class DummyDrStore implements DrSenderStore {
        DummyDrStore() {
        }

        public void store(byte[] bArr, byte[] bArr2, int i, @Nullable IgniteUuid igniteUuid) {
            throw new UnsupportedOperationException();
        }

        public DrSenderStoreCursor cursor(byte b) {
            throw new UnsupportedOperationException();
        }

        public void clear() {
            throw new UnsupportedOperationException();
        }

        public boolean isOverflow() {
            throw new UnsupportedOperationException();
        }

        public long sizeBytes() {
            throw new UnsupportedOperationException();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @DurableStore
    /* loaded from: input_file:org/gridgain/internal/processors/dr/hubs/DrSenderImplStoreSelfTest$DurableDrStore.class */
    public static class DurableDrStore extends NonDurableDrStore {
        DurableDrStore() {
        }
    }

    /* loaded from: input_file:org/gridgain/internal/processors/dr/hubs/DrSenderImplStoreSelfTest$HugeValue.class */
    static class HugeValue implements Serializable {
        byte[] data;

        public HugeValue(int i) {
            this.data = new byte[i];
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Arrays.equals(this.data, ((HugeValue) obj).data);
        }

        public int hashCode() {
            return 42;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/gridgain/internal/processors/dr/hubs/DrSenderImplStoreSelfTest$NonDurableDrStore.class */
    public static class NonDurableDrStore extends DrAbstractSenderStore {
        private Map<Byte, Collection<DrSenderStoreEntry>> store = new ConcurrentHashMap();

        /* loaded from: input_file:org/gridgain/internal/processors/dr/hubs/DrSenderImplStoreSelfTest$NonDurableDrStore$Entry.class */
        private class Entry implements DrSenderStoreEntry {
            private final byte[] data;
            private final IgniteUuid fstId;

            public Entry(byte[] bArr, IgniteUuid igniteUuid) {
                this.data = bArr;
                this.fstId = igniteUuid;
            }

            public byte[] data() {
                return this.data;
            }

            public void acknowledge(byte b) {
                Collection collection = (Collection) NonDurableDrStore.this.store.get(Byte.valueOf(b));
                if (collection != null) {
                    collection.remove(this);
                }
            }

            public IgniteUuid stateTransferId() {
                return this.fstId;
            }
        }

        NonDurableDrStore() {
        }

        public void store0(byte[] bArr, byte[] bArr2, int i, @Nullable IgniteUuid igniteUuid) {
            for (byte b : bArr) {
                Byte valueOf = Byte.valueOf(b);
                Collection<DrSenderStoreEntry> collection = this.store.get(valueOf);
                if (collection == null) {
                    ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
                    concurrentLinkedQueue.add(new Entry(bArr2, igniteUuid));
                    this.store.put(valueOf, concurrentLinkedQueue);
                } else {
                    collection.add(new Entry(bArr2, igniteUuid));
                }
            }
        }

        public DrSenderStoreCursor cursor0(byte b) {
            Collection<DrSenderStoreEntry> collection = this.store.get(Byte.valueOf(b));
            final Iterator<DrSenderStoreEntry> it = collection != null ? collection.iterator() : null;
            return new DrSenderStoreCursor() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.NonDurableDrStore.1
                public DrSenderStoreEntry next() throws IgniteCheckedException {
                    if (it == null) {
                        throw new DrSenderStoreCursorClosedException("closed");
                    }
                    if (it.hasNext()) {
                        return (DrSenderStoreEntry) it.next();
                    }
                    close();
                    return null;
                }

                public void close() {
                }
            };
        }

        public void clear0() {
            this.store.clear();
        }

        public long sizeBytes() {
            long j = 0;
            Iterator<Collection<DrSenderStoreEntry>> it = this.store.values().iterator();
            while (it.hasNext()) {
                while (it.next().iterator().hasNext()) {
                    j += r0.next().data().length;
                }
            }
            return j;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public void beforeTest() throws Exception {
        super.beforeTest();
        U.delete(Paths.get(STORE_PATH, new String[0]));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public boolean useSenderGroups() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public void afterTest() throws Exception {
        this.stopped = true;
        super.afterTest();
        this.cache = null;
        this.sndHubLsnr = null;
        this.rcvHubLsnr = null;
    }

    @Test
    public void testDataNodeAcknowledge() throws Exception {
        startUp();
        this.cache.put(1, 1);
        assertNotNull(this.sndHubLsnr.nextInRequest(500L));
        assertNotNull(this.sndHubLsnr.nextOutResponse(500L));
    }

    @Test
    public void testSenderAcknowledge() throws Exception {
        final NonDurableDrStore nonDurableDrStore = new NonDurableDrStore();
        DrTestQueuedReceiverHubListener drTestQueuedReceiverHubListener = new DrTestQueuedReceiverHubListener() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.1
            private boolean first = true;

            @Override // org.gridgain.internal.processors.dr.util.DrTestQueuedReceiverHubListener, org.gridgain.internal.processors.dr.util.DrTestReceiverHubListener
            public DrExternalBatchResponse onBatch(DrExternalBatchRequest drExternalBatchRequest) throws Exception {
                DrExternalBatchResponse onBatch = super.onBatch(drExternalBatchRequest);
                if (this.first) {
                    this.first = false;
                    return new DrExternalBatchResponse(drExternalBatchRequest.requestId(), "Simulate error");
                }
                CountDownLatch countDownLatch = new CountDownLatch(1);
                DrSenderImplStoreSelfTest.this.latchMap.put(drExternalBatchRequest, countDownLatch);
                while (!DrSenderImplStoreSelfTest.this.stopped && !countDownLatch.await(100L, TimeUnit.MILLISECONDS)) {
                }
                return onBatch;
            }
        };
        receiverHub(startTopology(createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.2
            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config(new GridGainConfiguration(), "top2_node_rcv", (byte) 2, tcpDiscoveryIpFinder, null, null, new CacheConfiguration[0]));
            }
        })).get(0), 12311, drTestQueuedReceiverHubListener);
        startTopology(createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.3
            private static final long serialVersionUID = 0;

            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                CacheConfiguration cacheConfig = DrSenderImplStoreSelfTest.this.cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, true);
                IgniteConfiguration config = DrSenderImplStoreSelfTest.this.config("top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, new DrSenderConfiguration().setStore(nonDurableDrStore).setMaxErrors(1).setReconnectOnFailureTimeout(10L).setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 2, "127.0.0.1:12311")}), null, new CacheConfiguration[0]);
                return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config("top1_node", (byte) 1, tcpDiscoveryIpFinder, null, null, cacheConfig), config);
            }
        }));
        IgniteCache cache = G.ignite("top1_node").cache(SecurityServicePermissionsTest.CACHE_NAME);
        assertEquals("Store is not empty.", 0L, nonDurableDrStore.sizeBytes());
        cache.put(1, 1);
        assertNotNull(drTestQueuedReceiverHubListener.nextBatchRequest(5000L));
        assertEquals(116L, nonDurableDrStore.sizeBytes());
        assertNotNull(nonDurableDrStore.cursor((byte) 2).next());
        DrExternalBatchRequest nextBatchRequest = drTestQueuedReceiverHubListener.nextBatchRequest(5000L);
        assertNotNull(nextBatchRequest);
        GridTestUtils.waitForCondition(() -> {
            return this.latchMap.get(nextBatchRequest) != null;
        }, 2000L);
        assertEquals(116L, nonDurableDrStore.sizeBytes());
        assertNotNull(nonDurableDrStore.cursor((byte) 2).next());
        this.latchMap.get(nextBatchRequest).countDown();
        GridTestUtils.waitForCondition(() -> {
            return nonDurableDrStore.sizeBytes() == 0;
        }, getTestTimeout());
    }

    @Test
    public void testSingleBatchSend() throws Exception {
        startUp();
        this.cache.put(1, 1);
        checkExternalRequest(cache(this.cache), this.rcvHubLsnr.nextBatchRequest(500L), 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(1, 1));
    }

    @Test
    public void testSingleBatchMetrics() throws Exception {
        startUp();
        assertEquals(0L, this.store.totalBytes());
        IgniteCache cache = G.ignite("top1_node").cache(SecurityServicePermissionsTest.CACHE_NAME);
        HugeValue hugeValue = new HugeValue(524288);
        cache.put(1, hugeValue);
        DrInternalRequest drInternalRequest = (DrInternalRequest) this.sndHubLsnr.nextInRequest(500L).get2();
        DrExternalBatchRequest nextBatchRequest = this.rcvHubLsnr.nextBatchRequest(500L);
        DrInternalRequestEntry drInternalRequestEntry = (DrInternalRequestEntry) drInternalRequest.entries().iterator().next();
        GridCacheRawVersionedEntry readDrEntry = DrUtils.readDrEntry(new DataInputStream(new ByteArrayInputStream(drInternalRequestEntry.dataBytes())), drInternalRequestEntry.dataCenterId());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DrUtils.writeDrEntry(new DataOutputStream(byteArrayOutputStream), readDrEntry);
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        assertTrue("Dr entry overhead looks too high.", byteArray.length - 524288 < 200);
        assertTrue("Stored Dr entry overhead looks too high.", this.store.totalBytes() - ((long) 524288) < ((long) 200));
        byte[] copyOf = Arrays.copyOf(nextBatchRequest.dataBytes(), nextBatchRequest.dataBytes().length);
        checkExternalRequest(cache(cache), nextBatchRequest, 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(1, hugeValue));
        assertEquals(byteArray.length, drInternalRequestEntry.dataLength());
        assertEquals(byteArray.length, nextBatchRequest.dataSize());
        assertEquals(byteArray.length, copyOf.length);
    }

    @Test
    public void testMultipleBatchesSend() throws Exception {
        this.sndHubMaxQueueSize = 2;
        startUp();
        this.cache.put(1, 1);
        if (!$assertionsDisabled && this.sndHubLsnr.nextInRequest(500L) == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.sndHubLsnr.nextOutResponse(500L) == null) {
            throw new AssertionError();
        }
        DrExternalBatchRequest nextBatchRequest = this.rcvHubLsnr.nextBatchRequest(500L);
        checkExternalRequest(cache(this.cache), nextBatchRequest, 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(1, 1));
        this.cache.put(2, 2);
        if (!$assertionsDisabled && this.sndHubLsnr.nextInRequest(500L) == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.sndHubLsnr.nextOutResponse(500L) == null) {
            throw new AssertionError();
        }
        checkExternalRequest(cache(this.cache), this.rcvHubLsnr.nextBatchRequest(500L), 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(2, 2));
        this.cache.put(3, 3);
        if (!$assertionsDisabled && this.sndHubLsnr.nextInRequest(500L) == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.sndHubLsnr.nextOutResponse(500L) == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.rcvHubLsnr.nextBatchRequest(500L) != null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.latchMap.size() != 2) {
            throw new AssertionError();
        }
        this.latchMap.get(nextBatchRequest).countDown();
        checkExternalRequest(cache(this.cache), this.rcvHubLsnr.nextBatchRequest(500L), 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(3, 3));
        if (!$assertionsDisabled && this.rcvHubLsnr.nextBatchRequest(500L) != null) {
            throw new AssertionError();
        }
    }

    @Test
    public void testClear() throws Exception {
        this.sndHubMaxQueueSize = 1;
        checkClear();
        for (int i = 0; i < 5; i++) {
            checkClearStability();
        }
    }

    @Test
    public void testDefaultStore() throws Exception {
        startTopology(createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.4
            private static final long serialVersionUID = 0;

            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config("top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, new DrSenderConfiguration().setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 2, "127.0.0.1:12311")}), null, new CacheConfiguration[0]));
            }
        }));
        DrAbstractSenderStore drGlobalStore = drGlobalStore(G.ignite("top1_node_snd"));
        assertTrue(dr(G.ignite("top1_node_snd")).localSender().isGlobalStore());
        assertEquals(DrSenderFsStore.class, drGlobalStore.getClass());
        assertEquals(DrSenderStoreOverflowMode.STOP, drGlobalStore.getOverflowMode());
        assertNull(drReplicaStores(G.ignite("top1_node_snd")));
    }

    @Test
    public void testGlobalDrFsStoreDefaults() throws Exception {
        startTopology(createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.5
            private static final long serialVersionUID = 0;

            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config("top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, new DrSenderConfiguration().setStore(new DrSenderFsStore()).setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 2, "127.0.0.1:12311")}), null, new CacheConfiguration[0]));
            }
        }));
        DrSenderFsStore drGlobalStore = drGlobalStore(G.ignite("top1_node_snd"));
        assertTrue(dr(G.ignite("top1_node_snd")).localSender().isGlobalStore());
        assertNotNull(drGlobalStore);
        assertEquals(DrSenderStoreOverflowMode.STOP, drGlobalStore.getOverflowMode());
        assertNull(drReplicaStores(G.ignite("top1_node_snd")));
    }

    @Test
    public void testPerReplicaDrFsStoreDefaults() throws Exception {
        startTopology(createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.6
            private static final long serialVersionUID = 0;

            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config("top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, new DrSenderConfiguration().setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 2, "127.0.0.1:12311").setStore(new DrSenderFsStore()), DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 3, "127.0.0.1:12312").setStore(new DrSenderFsStore())}), null, new CacheConfiguration[0]));
            }
        }));
        assertFalse(dr(G.ignite("top1_node_snd")).localSender().isGlobalStore());
        assertNull(drGlobalStore(G.ignite("top1_node_snd")));
        for (DrSenderFsStore drSenderFsStore : drReplicaStores(G.ignite("top1_node_snd"))) {
            assertTrue(drSenderFsStore instanceof DrSenderFsStore);
            assertEquals(DrSenderStoreOverflowMode.STOP, drSenderFsStore.getOverflowMode());
        }
    }

    @Test
    public void testUserGlobalStoreImpl() throws Exception {
        startTopology(createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.7
            private static final long serialVersionUID = 0;

            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config("top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, new DrSenderConfiguration().setStore(new DummyDrStore()).setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 2, "127.0.0.1:12311")}), null, new CacheConfiguration[0]));
            }
        }));
        DrSenderStore drGlobalStore = drGlobalStore(G.ignite("top1_node_snd"));
        assertTrue(dr(G.ignite("top1_node_snd")).localSender().isGlobalStore());
        assertEquals("Incorrect default sender store class", DummyDrStore.class, drGlobalStore.getClass());
    }

    @Test
    public void testDurableGlobalStoreImpl() throws Exception {
        checkGlobalStoreImpl(true);
    }

    @Test
    public void testNonDurableGlobalStoreImpl() throws Exception {
        checkGlobalStoreImpl(false);
    }

    public void checkGlobalStoreImpl(boolean z) throws Exception {
        DrTestReceiverListener drTestReceiverListener = new DrTestReceiverListener();
        receiverHub(startTopology(createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.8
            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config(new GridGainConfiguration(), "top2_node_rcv", (byte) 2, tcpDiscoveryIpFinder, null, null, new CacheConfiguration[0]));
            }
        })).get(0), 12311, drTestReceiverListener);
        final DrSenderStore durableDrStore = z ? new DurableDrStore() : new NonDurableDrStore();
        TcpDiscoveryIpFinder createTopology = createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.9
            private static final long serialVersionUID = 0;

            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                CacheConfiguration cacheConfig = DrSenderImplStoreSelfTest.this.cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, true);
                DrSenderConfiguration connectionConfiguration = new DrSenderConfiguration().setStore(durableDrStore).setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 2, "127.0.0.1:12311")});
                DrSenderConfiguration connectionConfiguration2 = new DrSenderConfiguration().setStore(durableDrStore).setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 2, "127.0.0.1:12311")});
                IgniteConfiguration config = DrSenderImplStoreSelfTest.this.config("top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, connectionConfiguration, null, new CacheConfiguration[0]);
                IgniteConfiguration config2 = DrSenderImplStoreSelfTest.this.config("top1_node_snd_2", (byte) 1, tcpDiscoveryIpFinder, connectionConfiguration2, null, new CacheConfiguration[0]);
                return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config("top1_node", (byte) 1, tcpDiscoveryIpFinder, null, null, cacheConfig), config, config2);
            }
        });
        startTopology(createTopology);
        IgniteCache cache = G.ignite("top1_node").cache(SecurityServicePermissionsTest.CACHE_NAME);
        cache.put(1, 1);
        DrExternalBatchRequest nextBatchRequest = drTestReceiverListener.nextBatchRequest(500L);
        checkExternalRequest(cache(cache), nextBatchRequest, 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(1, 1));
        checkExternalRequest(cache(cache), nextBatchRequest, 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(1, 1));
        G.stop("top1_node_snd", false);
        G.start(config("top1_node_snd", (byte) 1, createTopology, new DrSenderConfiguration().setStore(durableDrStore).setConnectionConfiguration(new DrSenderConnectionConfiguration[]{senderHubReplicaConfig((byte) 2, "127.0.0.1:12311")}), null, new CacheConfiguration[0]));
        if (z) {
            assertFalse(dr(G.ignite("top1_node")).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
            checkExternalRequest(cache(cache), drTestReceiverListener.nextBatchRequest(500L), 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(1, 1));
        } else {
            assertTrue(dr(G.ignite("top1_node")).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
            assertEquals("Sender with non-persistent sender store has gone.", dr(G.ignite("top1_node")).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).error());
        }
    }

    @Test
    public void testDurablePerReplicaStoreImpl() throws Exception {
        checkPerReplicaStoreImpl(true);
    }

    @Test
    public void testNonDurablePerReplicaStoreImpl() throws Exception {
        checkPerReplicaStoreImpl(false);
    }

    public void checkPerReplicaStoreImpl(boolean z) throws Exception {
        TcpDiscoveryIpFinder createTopology = createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.10
            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config(new GridGainConfiguration(), "top2_node_rcv", (byte) 2, tcpDiscoveryIpFinder, null, null, new CacheConfiguration[0]));
            }
        });
        TcpDiscoveryIpFinder createTopology2 = createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.11
            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config(new GridGainConfiguration(), "top3_node_rcv", (byte) 3, tcpDiscoveryIpFinder, null, null, new CacheConfiguration[0]));
            }
        });
        Ignite ignite = startTopology(createTopology).get(0);
        Ignite ignite2 = startTopology(createTopology2).get(0);
        DrTestReceiverListener drTestReceiverListener = new DrTestReceiverListener();
        DrTestReceiverListener drTestReceiverListener2 = new DrTestReceiverListener();
        receiverHub(ignite, 12311, drTestReceiverListener);
        receiverHub(ignite2, 12312, drTestReceiverListener2);
        final DrSenderStore durableDrStore = z ? new DurableDrStore() : new NonDurableDrStore();
        final DrSenderStore durableDrStore2 = z ? new DurableDrStore() : new NonDurableDrStore();
        TcpDiscoveryIpFinder createTopology3 = createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.12
            private static final long serialVersionUID = 0;

            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                CacheConfiguration cacheConfig = DrSenderImplStoreSelfTest.this.cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, true);
                DrSenderConfiguration connectionConfiguration = new DrSenderConfiguration().setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 2, "127.0.0.1:12311").setStore(durableDrStore), DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 3, "127.0.0.1:12312").setStore(durableDrStore2)});
                DrSenderConfiguration connectionConfiguration2 = new DrSenderConfiguration().setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 2, "127.0.0.1:12311").setStore(durableDrStore), DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 3, "127.0.0.1:12312").setStore(durableDrStore2)});
                IgniteConfiguration config = DrSenderImplStoreSelfTest.this.config("top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, connectionConfiguration, null, new CacheConfiguration[0]);
                IgniteConfiguration config2 = DrSenderImplStoreSelfTest.this.config("top1_node_snd_2", (byte) 1, tcpDiscoveryIpFinder, connectionConfiguration2, null, new CacheConfiguration[0]);
                return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config("top1_node", (byte) 1, tcpDiscoveryIpFinder, null, null, cacheConfig), config, config2);
            }
        });
        startTopology(createTopology3);
        this.cache = G.ignite("top1_node").cache(SecurityServicePermissionsTest.CACHE_NAME);
        this.cache.put(1, 1);
        DrExternalBatchRequest nextBatchRequest = drTestReceiverListener.nextBatchRequest(500L);
        DrExternalBatchRequest nextBatchRequest2 = drTestReceiverListener2.nextBatchRequest(500L);
        checkExternalRequest(cache(this.cache), nextBatchRequest, 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(1, 1));
        checkExternalRequest(cache(this.cache), nextBatchRequest2, 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(1, 1));
        G.stop("top1_node_snd", false);
        G.start(config("top1_node_snd", (byte) 1, createTopology3, new DrSenderConfiguration().setConnectionConfiguration(new DrSenderConnectionConfiguration[]{senderHubReplicaConfig((byte) 2, "127.0.0.1:12311").setStore(durableDrStore), senderHubReplicaConfig((byte) 3, "127.0.0.1:12312").setStore(durableDrStore2)}), null, new CacheConfiguration[0]));
        if (!z) {
            assertTrue(dr(G.ignite("top1_node")).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
            assertEquals("Sender with non-persistent sender store has gone.", dr(G.ignite("top1_node")).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).error());
            return;
        }
        assertFalse(dr(G.ignite("top1_node")).senderCacheStatus(SecurityServicePermissionsTest.CACHE_NAME).stopped());
        DrExternalBatchRequest nextBatchRequest3 = drTestReceiverListener.nextBatchRequest(500L);
        DrExternalBatchRequest nextBatchRequest4 = drTestReceiverListener2.nextBatchRequest(500L);
        checkExternalRequest(cache(this.cache), nextBatchRequest3, 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(1, 1));
        checkExternalRequest(cache(this.cache), nextBatchRequest4, 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(1, 1));
    }

    @Test
    public void testUserPerReplicaStoreImpl() throws Exception {
        startTopology(createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.13
            private static final long serialVersionUID = 0;

            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config("top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, new DrSenderConfiguration().setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 2, "127.0.0.1:12311").setStore(new DummyDrStore()), DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 3, "127.0.0.1:12312").setStore(new DummyDrStore())}), null, new CacheConfiguration[0]));
            }
        }));
        assertFalse(dr(G.ignite("top1_node_snd")).localSender().isGlobalStore());
        assertNull(drGlobalStore(G.ignite("top1_node_snd")));
        for (DrSenderRemoteDataCenter drSenderRemoteDataCenter : dr(G.ignite("top1_node_snd")).localSender().connections()) {
            assertTrue(drSenderRemoteDataCenter instanceof DrSenderRemoteDataCenter);
            assertEquals("Incorrect default sender store class", DummyDrStore.class, drSenderRemoteDataCenter.getStore().getClass());
        }
    }

    @Test
    public void testSameDefaultWorkDirGlogalStore() throws Exception {
        GridTestUtils.assertThrowsAnyCause(log, () -> {
            return startTopology(createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.14
                private static final long serialVersionUID = 0;

                public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                    DrSenderConfiguration connectionConfiguration = new DrSenderConfiguration().setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 2, "127.0.0.1:12311")});
                    DrSenderConfiguration connectionConfiguration2 = new DrSenderConfiguration().setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 3, "127.0.0.1:12311")});
                    return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config("top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, connectionConfiguration, null, new CacheConfiguration[0]), DrSenderImplStoreSelfTest.this.config("top1_node_snd_2", (byte) 1, tcpDiscoveryIpFinder, connectionConfiguration2, null, new CacheConfiguration[0]));
                }
            }));
        }, IgniteException.class, "Failed to acquire lock in directory: ");
    }

    @Test
    public void testGlobalFsStoresAtTheSameWorkDir() throws Exception {
        GridTestUtils.assertThrowsAnyCause(log, () -> {
            return startTopology(createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.15
                private static final long serialVersionUID = 0;

                public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                    DrSenderConfiguration store = new DrSenderConfiguration().setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 2, "127.0.0.1:12311")}).setStore(new DrSenderFsStore().setDirectoryPath(DrSenderImplStoreSelfTest.STORE_PATH));
                    DrSenderConfiguration store2 = new DrSenderConfiguration().setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 3, "127.0.0.1:12311")}).setStore(new DrSenderFsStore().setDirectoryPath(DrSenderImplStoreSelfTest.STORE_PATH));
                    return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config("top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, store, null, new CacheConfiguration[0]), DrSenderImplStoreSelfTest.this.config("top1_node_snd_2", (byte) 1, tcpDiscoveryIpFinder, store2, null, new CacheConfiguration[0]));
                }
            }));
        }, IgniteException.class, "Failed to acquire lock in directory: ");
    }

    @Test
    public void testPerConnectionFsStoresAtTheSameWorkDir() throws Exception {
        GridTestUtils.assertThrowsAnyCause(log, () -> {
            return startTopology(createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.16
                private static final long serialVersionUID = 0;

                public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                    return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config("top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, new DrSenderConfiguration().setConnectionConfiguration(new DrSenderConnectionConfiguration[]{DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 2, "127.0.0.1:12311").setStore(new DrSenderFsStore().setDirectoryPath(DrSenderImplStoreSelfTest.STORE_PATH)), DrSenderImplStoreSelfTest.this.senderHubReplicaConfig((byte) 3, "127.0.0.1:12311").setStore(new DrSenderFsStore().setDirectoryPath(DrSenderImplStoreSelfTest.STORE_PATH))}), null, new CacheConfiguration[0]));
                }
            }));
        }, IgniteException.class, "Failed to acquire lock in directory: ");
    }

    private DrSenderStore drGlobalStore(Ignite ignite) {
        Object fieldValue;
        Object fieldValue2 = GridTestUtils.getFieldValue(dr(ignite).localSender(), new String[]{"storeMgr"});
        if ((fieldValue2 instanceof DrCommonStoreManager) && (fieldValue = GridTestUtils.getFieldValue(fieldValue2, new String[]{"drStore"})) != null) {
            return (DrSenderStore) GridTestUtils.getFieldValue(fieldValue, new String[]{"store"});
        }
        return null;
    }

    private DrSenderStore[] drReplicaStores(Ignite ignite) {
        Object fieldValue = GridTestUtils.getFieldValue(dr(ignite).localSender(), new String[]{"storeMgr"});
        if (fieldValue instanceof DrReplicaStoreManager) {
            return (DrSenderStore[]) Arrays.stream((Object[]) GridTestUtils.getFieldValue(fieldValue, new String[]{"storesPerReplica"})).filter(Objects::nonNull).map(obj -> {
                return (DrSenderStore) GridTestUtils.getFieldValue(obj, new String[]{"store"});
            }).toArray(i -> {
                return new DrSenderStore[i];
            });
        }
        return null;
    }

    private IgniteConfiguration dataNodeConfig(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws Exception {
        return config("top1_node", (byte) 1, tcpDiscoveryIpFinder, null, null, cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, true));
    }

    private void checkClear() throws Exception {
        startUp();
        for (int i = 0; i < 10; i++) {
            this.cache.put(Integer.valueOf(i), Integer.valueOf(i));
        }
        if (!$assertionsDisabled && !GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.17
            public boolean apply() {
                try {
                    DrSenderStoreCursor cursor = DrSenderImplStoreSelfTest.this.store.cursor((byte) 2);
                    Throwable th = null;
                    try {
                        return cursor.next() != null;
                    } finally {
                        if (cursor != null) {
                            if (0 != 0) {
                                try {
                                    cursor.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                cursor.close();
                            }
                        }
                    }
                } catch (Exception e) {
                    throw new IgniteException(e);
                }
            }
        }, 5000L)) {
            throw new AssertionError();
        }
        this.store.clear();
        if (!$assertionsDisabled && !GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.18
            public boolean apply() {
                try {
                    DrSenderStoreCursor cursor = DrSenderImplStoreSelfTest.this.store.cursor((byte) 2);
                    Throwable th = null;
                    try {
                        return cursor.next() == null;
                    } finally {
                        if (cursor != null) {
                            if (0 != 0) {
                                try {
                                    cursor.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                cursor.close();
                            }
                        }
                    }
                } catch (Exception e) {
                    throw new IgniteException(e);
                }
            }
        }, 5000L)) {
            throw new AssertionError();
        }
        afterTest();
    }

    private void checkClearStability() throws Exception {
        startUp();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        ArrayList arrayList = new ArrayList();
        arrayList.add(multithreadedAsync(new Runnable() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.19
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 1000; i++) {
                    try {
                        DrSenderImplStoreSelfTest.this.cache.put(Integer.valueOf(i), Integer.valueOf(i));
                    } catch (Exception e) {
                        atomicInteger.incrementAndGet();
                        return;
                    }
                }
            }
        }, 5));
        arrayList.add(multithreadedAsync(new Runnable() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.20
            @Override // java.lang.Runnable
            public void run() {
                ThreadLocalRandom current = ThreadLocalRandom.current();
                for (int i = 0; i < 1000; i++) {
                    try {
                        DrSenderImplStoreSelfTest.this.cache.remove(Integer.valueOf(current.nextInt(99)));
                    } catch (Exception e) {
                        atomicInteger.incrementAndGet();
                        return;
                    }
                }
            }
        }, 5));
        arrayList.add(multithreadedAsync(new Runnable() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.21
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 10; i++) {
                    try {
                        DrSenderImplStoreSelfTest.this.store.clear();
                        try {
                            Thread.sleep(5L);
                        } catch (InterruptedException e) {
                            throw new IgniteException(e);
                        }
                    } catch (IgniteCheckedException e2) {
                        atomicInteger.incrementAndGet();
                        return;
                    }
                }
            }
        }, 5));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((IgniteInternalFuture) it.next()).get();
        }
        if (!$assertionsDisabled && atomicInteger.get() != 0) {
            throw new AssertionError();
        }
        afterTest();
    }

    @Test
    public void testCursorActiveCollectionsAreFreed() throws Exception {
        startUp();
        assertEquals("In the beginning.", 0L, this.store.getActiveEntriesCount());
        DrExternalBatchRequest drExternalBatchRequest = null;
        for (int i = 0; i < 2; i++) {
            X.println("k = " + i, new Object[0]);
            this.cache.put(Integer.valueOf(i), Integer.valueOf(i));
            if (!$assertionsDisabled && this.sndHubLsnr.nextInRequest(500L) == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && this.sndHubLsnr.nextOutResponse(500L) == null) {
                throw new AssertionError();
            }
            final DrExternalBatchRequest nextBatchRequest = this.rcvHubLsnr.nextBatchRequest(500L);
            checkExternalRequest(cache(this.cache), nextBatchRequest, 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(Integer.valueOf(i), Integer.valueOf(i)));
            if (i == 0) {
                drExternalBatchRequest = nextBatchRequest;
            } else {
                if (!$assertionsDisabled && !GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.22
                    public boolean apply() {
                        return DrSenderImplStoreSelfTest.this.latchMap.containsKey(nextBatchRequest);
                    }
                }, 1000L)) {
                    throw new AssertionError();
                }
                CountDownLatch countDownLatch = this.latchMap.get(nextBatchRequest);
                if (!$assertionsDisabled && countDownLatch == null) {
                    throw new AssertionError();
                }
                countDownLatch.countDown();
            }
            if (!$assertionsDisabled && !GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.23
                public boolean apply() {
                    return DrSenderImplStoreSelfTest.this.store.getActiveEntriesCount() == 1;
                }
            }, 1000L)) {
                throw new AssertionError();
            }
        }
        if (!$assertionsDisabled && drExternalBatchRequest == null) {
            throw new AssertionError();
        }
        this.latchMap.get(drExternalBatchRequest).countDown();
        if (!$assertionsDisabled && !GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.24
            public boolean apply() {
                return DrSenderImplStoreSelfTest.this.store.getActiveEntriesCount() == 0;
            }
        }, 1000L)) {
            throw new AssertionError();
        }
    }

    @Test
    public void testFailover() throws Exception {
        startUp();
        this.cache.put(1, 1);
        if (!$assertionsDisabled && this.sndHubLsnr.nextInRequest(500L) == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.sndHubLsnr.nextOutResponse(500L) == null) {
            throw new AssertionError();
        }
        checkExternalRequest(cache(this.cache), this.rcvHubLsnr.nextBatchRequest(500L), 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(1, 1));
        U.sleep(1000L);
        G.stop("top1_node_snd", false);
        G.start(senderHubConfiguration(this.ipFinder));
        checkExternalRequest(cache(this.cache), this.rcvHubLsnr.nextBatchRequest(500L), 1, SecurityServicePermissionsTest.CACHE_NAME, 1, F.t(1, 1));
    }

    private void startUp() throws Exception {
        this.sndHubLsnr = new DrTestQueuedCommunicationSpiListener();
        this.rcvHubLsnr = new DrTestReceiverListener();
        TcpDiscoveryIpFinder createTopology = createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.25
            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                return DrSenderImplStoreSelfTest.this.wrap(DrSenderImplStoreSelfTest.this.config(new GridGainConfiguration(), "top2_node_rcv", (byte) 2, tcpDiscoveryIpFinder, null, null, new CacheConfiguration[0]));
            }
        });
        TcpDiscoveryIpFinder createTopology2 = createTopology(new IgniteClosureX<TcpDiscoveryIpFinder, IgniteConfiguration[]>() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreSelfTest.26
            public IgniteConfiguration[] applyx(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
                DrSenderImplStoreSelfTest.this.ipFinder = tcpDiscoveryIpFinder;
                GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
                CacheConfiguration cacheConfiguration = new CacheConfiguration();
                cacheConfiguration.setName(SecurityServicePermissionsTest.CACHE_NAME);
                cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
                cacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
                cacheConfiguration.setNearConfiguration(new NearCacheConfiguration());
                cacheConfiguration.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
                CacheDrSenderConfiguration cacheDrSenderConfiguration = new CacheDrSenderConfiguration();
                cacheDrSenderConfiguration.setBatchSendSize(1);
                cacheDrSenderConfiguration.setBatchSendFrequency(0L);
                DrSenderImplStoreSelfTest.this.ggCacheConfig(cacheConfiguration).setDrSenderConfiguration(cacheDrSenderConfiguration);
                IgniteConfiguration config = DrSenderImplStoreSelfTest.this.config(gridGainConfiguration, "top1_node", (byte) 1, tcpDiscoveryIpFinder, null, null, cacheConfiguration);
                config.setCommunicationSpi(new DrTestCommunicationSpi());
                return DrSenderImplStoreSelfTest.this.wrap(config, DrSenderImplStoreSelfTest.this.senderHubConfiguration(tcpDiscoveryIpFinder));
            }
        });
        receiverHub(startTopology(createTopology).get(0), 12311, this.rcvHubLsnr);
        startTopology(createTopology2);
        this.cache = G.ignite("top1_node").cache(SecurityServicePermissionsTest.CACHE_NAME);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public IgniteConfiguration senderHubConfiguration(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
        DrSenderConfiguration drSenderConfiguration = new DrSenderConfiguration();
        drSenderConfiguration.setConnectionConfiguration(new DrSenderConnectionConfiguration[]{senderHubReplicaConfig((byte) 2, "127.0.0.1:12311")});
        drSenderConfiguration.setMaxQueueSize(this.sndHubMaxQueueSize);
        drSenderConfiguration.setHealthCheckFrequency(100L);
        this.store = new DrSenderFsStore();
        this.store.setDirectoryPath(storePath());
        this.store.setOverflowMode(DrSenderStoreOverflowMode.REMOVE_OLDEST);
        drSenderConfiguration.setStore(this.store);
        IgniteConfiguration config = config(new GridGainConfiguration(), "top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, drSenderConfiguration, null, new CacheConfiguration[0]);
        config.setCommunicationSpi(new DrTestCommunicationSpi(this.sndHubLsnr));
        return config;
    }

    static {
        $assertionsDisabled = !DrSenderImplStoreSelfTest.class.desiredAssertionStatus();
        STORE_PATH = U.getIgniteHome() + "/work/some_test_dr_store";
    }
}
