package org.gridgain.internal.processors.dr.hubs;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PAX;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.dr.store.DrSenderStore;
import org.gridgain.grid.dr.store.DrSenderStoreCorruptedException;
import org.gridgain.grid.dr.store.DrSenderStoreCursor;
import org.gridgain.grid.dr.store.DrSenderStoreEntry;
import org.gridgain.grid.dr.store.DrSenderStoreOverflowMode;
import org.gridgain.grid.dr.store.DurableStore;
import org.gridgain.grid.dr.store.memory.DrSenderInMemoryStore;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrSenderHubStopInfo;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrSenderHubStopKey;
import org.gridgain.internal.processors.dr.DrAbstractTest;
import org.gridgain.plugin.security.SecurityServicePermissionsTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

@WithSystemProperty(key = "GG_INCREMENTAL_STATE_TRANSFER", value = "false")
/* loaded from: input_file:org/gridgain/internal/processors/dr/hubs/DrSenderImplStoreFailureTest.class */
public class DrSenderImplStoreFailureTest extends DrAbstractTest {
    private final AtomicBoolean storeError1 = new AtomicBoolean();
    private final AtomicBoolean cursorError1 = new AtomicBoolean();
    private final AtomicBoolean storeError2 = new AtomicBoolean();
    private final AtomicBoolean cursorError2 = new AtomicBoolean();
    protected boolean senderOnClient;

    /* JADX INFO: Access modifiers changed from: package-private */
    @DurableStore
    /* loaded from: input_file:org/gridgain/internal/processors/dr/hubs/DrSenderImplStoreFailureTest$TestDrSenderStore.class */
    public static class TestDrSenderStore implements DrSenderStore, LifecycleAware {
        private final AtomicBoolean storeError;
        private final AtomicBoolean cursorError;
        private final DrSenderInMemoryStore delegate = new DrSenderInMemoryStore().setOverflowMode(DrSenderStoreOverflowMode.STOP);

        /* JADX INFO: Access modifiers changed from: package-private */
        public TestDrSenderStore(AtomicBoolean atomicBoolean, AtomicBoolean atomicBoolean2) {
            this.storeError = atomicBoolean;
            this.cursorError = atomicBoolean2;
            this.delegate.setOverflowMode(DrSenderStoreOverflowMode.STOP);
        }

        public void store(byte[] bArr, byte[] bArr2, int i, @Nullable IgniteUuid igniteUuid) throws IgniteCheckedException {
            if (this.storeError.get()) {
                throw new DrSenderStoreCorruptedException("Test store corrupted exception.");
            }
            this.delegate.store(bArr, bArr2, i, igniteUuid);
        }

        public DrSenderStoreCursor cursor(byte b) throws IgniteCheckedException {
            final DrSenderStoreCursor cursor = this.delegate.cursor(b);
            return new DrSenderStoreCursor() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreFailureTest.TestDrSenderStore.1
                @Nullable
                public DrSenderStoreEntry next() throws IgniteCheckedException {
                    if (TestDrSenderStore.this.cursorError.get()) {
                        throw new IgniteCheckedException("Test store cursor exception.");
                    }
                    return cursor.next();
                }

                public void close() throws Exception {
                    cursor.close();
                }
            };
        }

        public void clear() throws IgniteCheckedException {
            this.delegate.clear();
        }

        public boolean isOverflow() {
            return false;
        }

        public long sizeBytes() {
            return this.delegate.sizeBytes();
        }

        public void start() throws IgniteException {
            this.delegate.start();
        }

        public void stop() throws IgniteException {
            this.delegate.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public void beforeTest() throws Exception {
        super.beforeTest();
        this.storeError1.set(false);
        this.storeError2.set(false);
        this.cursorError1.set(false);
        this.cursorError2.set(false);
    }

    private TcpDiscoveryIpFinder createTopologyReceiver() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        addTopology(ipFinder, config(new GridGainConfiguration(), DrAbstractTest.TOP2_NODE_RCV, (byte) 2, ipFinder, null, receiverHubConfig(DrAbstractTest.RCV_PORT_1), new CacheConfiguration[0]), config(new GridGainConfiguration(), DrAbstractTest.TOP2_NODE, (byte) 2, ipFinder, null, null, cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, true, null, null)));
        return ipFinder;
    }

    private TcpDiscoveryIpFinder createTopologySender() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        addTopology(ipFinder, senderHubConfig(ipFinder, DrAbstractTest.TOP1_NODE_SND, this.storeError1, this.cursorError1), senderHubConfig(ipFinder, "top1_node_snd_2", this.storeError2, this.cursorError2), config(new GridGainConfiguration(), DrAbstractTest.TOP1_NODE, (byte) 1, ipFinder, null, null, cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, true).setBackups(2)));
        return ipFinder;
    }

    protected IgniteConfiguration senderHubConfig(TcpDiscoveryIpFinder tcpDiscoveryIpFinder, String str, AtomicBoolean atomicBoolean, AtomicBoolean atomicBoolean2) throws Exception {
        DrSenderConfiguration store = senderHubConfig(senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1)).setCacheNames(new String[]{SecurityServicePermissionsTest.CACHE_NAME}).setStore(new TestDrSenderStore(atomicBoolean, atomicBoolean2));
        return this.senderOnClient ? config(new GridGainConfiguration(), str, (byte) 1, tcpDiscoveryIpFinder, store, null, new CacheConfiguration[0]) : config(new GridGainConfiguration(), str, (byte) 1, tcpDiscoveryIpFinder, store, null, false, new CacheConfiguration[0]);
    }

    @Test
    public void testStoreFailure() throws Exception {
        this.senderOnClient = true;
        checkStoreFailure();
    }

    @Test
    public void testStoreFailureOnLocalSender() throws Exception {
        this.senderOnClient = false;
        checkStoreFailure();
    }

    private void checkStoreFailure() throws Exception {
        TcpDiscoveryIpFinder createTopologyReceiver = createTopologyReceiver();
        TcpDiscoveryIpFinder createTopologySender = createTopologySender();
        startTopology(createTopologyReceiver);
        startTopology(createTopologySender);
        UUID id = G.ignite(DrAbstractTest.TOP1_NODE_SND).cluster().localNode().id();
        UUID id2 = G.ignite("top1_node_snd_2").cluster().localNode().id();
        IgniteInternalCache<Object, Object> utilityCache = G.ignite(DrAbstractTest.TOP1_NODE).context().cache().utilityCache();
        IgniteCache cache = G.ignite(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        IgniteCache cache2 = G.ignite(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        Map map = (Map) IntStream.range(0, 10).mapToObj(String::valueOf).collect(Collectors.toMap(str -> {
            return str;
        }, str2 -> {
            return 0;
        }));
        cache.putAll(map);
        compareCaches(cache2, map, 20000L);
        this.storeError1.set(true);
        Map map2 = (Map) IntStream.range(0, 20).mapToObj(String::valueOf).collect(Collectors.toMap(str3 -> {
            return str3;
        }, str4 -> {
            return 1;
        }));
        cache.putAll(map2);
        waitStopEntry(utilityCache, id);
        compareCaches(cache2, map2, 20000L);
        stopGrid(DrAbstractTest.TOP1_NODE_SND);
        waitStopEntryRemoved(utilityCache, id);
        this.cursorError2.set(true);
        IntStream.range(0, 10).forEach(i -> {
            cache.put(Integer.valueOf(i), 2);
        });
        waitStopEntry(utilityCache, id2);
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return isDrStopped(DrAbstractTest.TOP1_NODE, SecurityServicePermissionsTest.CACHE_NAME);
        }, 5000L));
        stopGrid("top1_node_snd_2");
        waitStopEntryRemoved(utilityCache, id2);
        GridTestUtils.assertThrows(log, () -> {
            dr(G.ignite(DrAbstractTest.TOP1_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        }, IllegalStateException.class, "Failed to start replication because there are no sender hubs available.");
        this.storeError1.set(false);
        G.start(senderHubConfig(createTopologySender, DrAbstractTest.TOP1_NODE_SND, this.storeError1, this.cursorError1));
        awaitDiscovery(createTopologySender, topologySize(createTopologySender) - 1);
        dr(G.ignite(DrAbstractTest.TOP1_NODE)).startReplication(SecurityServicePermissionsTest.CACHE_NAME);
        waitDrStarted(DrAbstractTest.TOP1_NODE);
        if (!this.senderOnClient) {
            waitDrStarted(DrAbstractTest.TOP1_NODE_SND);
        }
        Map map3 = (Map) IntStream.range(0, 20).mapToObj(String::valueOf).collect(Collectors.toMap(str5 -> {
            return str5;
        }, str6 -> {
            return 3;
        }));
        cache.putAll(map3);
        compareCaches(cache2, map3, 20000L);
    }

    protected CacheDrSenderHubStopInfo waitStopEntry(final IgniteInternalCache<Object, Object> igniteInternalCache, UUID uuid) throws Exception {
        final CacheDrSenderHubStopKey cacheDrSenderHubStopKey = new CacheDrSenderHubStopKey(SecurityServicePermissionsTest.CACHE_NAME, uuid);
        GridTestUtils.waitForCondition(new PAX() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreFailureTest.1
            public boolean applyx() throws IgniteCheckedException {
                return igniteInternalCache.get(cacheDrSenderHubStopKey) != null;
            }
        }, 5000L);
        CacheDrSenderHubStopInfo cacheDrSenderHubStopInfo = (CacheDrSenderHubStopInfo) igniteInternalCache.get(cacheDrSenderHubStopKey);
        assertNotNull("Stop entry not created.", cacheDrSenderHubStopInfo);
        assertSame(Exception.class, cacheDrSenderHubStopInfo.error().getClass());
        assertNull(cacheDrSenderHubStopInfo.error().getCause());
        assertTrue(F.isEmpty(cacheDrSenderHubStopInfo.error().getStackTrace()));
        assertTrue(cacheDrSenderHubStopInfo.error().getMessage().startsWith("SenderHubFailure: "));
        return cacheDrSenderHubStopInfo;
    }

    protected void waitStopEntryRemoved(final IgniteInternalCache<Object, Object> igniteInternalCache, UUID uuid) throws Exception {
        final CacheDrSenderHubStopKey cacheDrSenderHubStopKey = new CacheDrSenderHubStopKey(SecurityServicePermissionsTest.CACHE_NAME, uuid);
        assertTrue("Stop entry not removed.", GridTestUtils.waitForCondition(new PAX() { // from class: org.gridgain.internal.processors.dr.hubs.DrSenderImplStoreFailureTest.2
            public boolean applyx() throws IgniteCheckedException {
                return igniteInternalCache.get(cacheDrSenderHubStopKey) == null;
            }
        }, 5000L));
    }
}
