package org.apache.ignite.internal.processors.cache.persistence.db.wal;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.ByteBuffer;
import java.nio.file.OpenOption;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCachePreloader;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.class */
public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
    private static final String CACHE_NAME = "cache";
    private static final int PARTS_CNT = 32;
    private IgniteBiPredicate<ClusterNode, Message> blockMessagePredicate;

    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest$FailingIOFactory.class */
    static class FailingIOFactory implements FileIOFactory {
        private volatile boolean failRead;
        private final FileIOFactory delegate;

        FailingIOFactory(FileIOFactory fileIOFactory) {
            this.delegate = fileIOFactory;
        }

        public FileIO create(File file, OpenOption... openOptionArr) throws IOException {
            FileIO create = this.delegate.create(file, openOptionArr);
            return (file.getName().endsWith(".wal") && this.failRead) ? new FileIODecorator(create) { // from class: org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceTest.FailingIOFactory.1
                public int read(ByteBuffer byteBuffer) throws IOException {
                    throw new IOException("Test exception.");
                }
            } : create;
        }

        public void throwExceptionOnWalRead() {
            this.failRead = true;
        }

        public void reset() {
            this.failRead = false;
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest$IndexedObject.class */
    private static class IndexedObject {

        @QuerySqlField(index = true)
        private int iVal;
        private byte[] payload;

        private IndexedObject(int i) {
            this.payload = new byte[1024];
            this.iVal = i;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return (obj instanceof IndexedObject) && this.iVal == ((IndexedObject) obj).iVal;
        }

        public int hashCode() {
            return this.iVal;
        }

        public String toString() {
            return S.toString(IndexedObject.class, this);
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest$WalRebalanceCheckingCommunicationSpi.class */
    public static class WalRebalanceCheckingCommunicationSpi extends TestRecordingCommunicationSpi {
        private static final Map<Integer, Set<Long>> topVers = new HashMap();
        private static final Object mux = new Object();

        Set<Long> walRebalanceVersions(int i) {
            Set<Long> unmodifiableSet;
            synchronized (mux) {
                unmodifiableSet = Collections.unmodifiableSet(topVers.getOrDefault(Integer.valueOf(i), Collections.emptySet()));
            }
            return unmodifiableSet;
        }

        public static Map<Integer, Set<Long>> allRebalances() {
            Map<Integer, Set<Long>> unmodifiableMap;
            synchronized (mux) {
                unmodifiableMap = Collections.unmodifiableMap(topVers);
            }
            return unmodifiableMap;
        }

        public static void cleanup() {
            synchronized (mux) {
                topVers.clear();
            }
        }

        @Override // org.apache.ignite.internal.TestRecordingCommunicationSpi
        public void sendMessage(ClusterNode clusterNode, Message message, IgniteInClosure<IgniteException> igniteInClosure) throws IgniteSpiException {
            if (((GridIoMessage) message).message() instanceof GridDhtPartitionDemandMessage) {
                GridDhtPartitionDemandMessage message2 = ((GridIoMessage) message).message();
                if (!message2.partitions().historicalMap().isEmpty()) {
                    int groupId = message2.groupId();
                    long j = message2.topologyVersion().topologyVersion();
                    synchronized (mux) {
                        topVers.computeIfAbsent(Integer.valueOf(groupId), num -> {
                            return new HashSet();
                        }).add(Long.valueOf(j));
                    }
                }
            }
            super.sendMessage(clusterNode, message, igniteInClosure);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        System.setProperty("IGNITE_PDS_WAL_REBALANCE_THRESHOLD", "0");
        IgniteConfiguration configuration = super.getConfiguration(str);
        configuration.setConsistentId(str);
        configuration.setCacheConfiguration(new CacheConfiguration[]{new CacheConfiguration("cache").setAtomicityMode(CacheAtomicityMode.ATOMIC).setRebalanceMode(CacheRebalanceMode.ASYNC).setCacheMode(CacheMode.REPLICATED).setAffinity(new RendezvousAffinityFunction(false, 32))});
        configuration.setDataStorageConfiguration(new DataStorageConfiguration().setWalHistorySize(Integer.MAX_VALUE).setWalMode(WALMode.LOG_ONLY).setCheckpointFrequency(900000L).setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true).setMaxSize(268435456L)));
        configuration.setCommunicationSpi(new WalRebalanceCheckingCommunicationSpi());
        if (this.blockMessagePredicate != null) {
            configuration.getCommunicationSpi().blockMessages(this.blockMessagePredicate);
        }
        return configuration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.common.GridCommonAbstractTest, org.apache.ignite.testframework.junits.GridAbstractTest
    public void beforeTest() throws Exception {
        stopAllGrids();
        cleanPersistenceDir();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTest() throws Exception {
        System.clearProperty("IGNITE_PDS_WAL_REBALANCE_THRESHOLD");
        System.clearProperty("IGNITE_DISABLE_WAL_DURING_REBALANCING");
        boolean z = !WalRebalanceCheckingCommunicationSpi.allRebalances().isEmpty();
        WalRebalanceCheckingCommunicationSpi.cleanup();
        stopAllGrids();
        cleanPersistenceDir();
        if (!z) {
            throw new AssertionError("WAL rebalance hasn't been invoked.");
        }
    }

    @Test
    public void testSimple() throws Exception {
        IgniteEx startGrid = startGrid(0);
        startGrid(1);
        startGrid.cluster().active(true);
        IgniteCache cache = startGrid.cache("cache");
        for (int i = 0; i < 3200; i++) {
            cache.put(Integer.valueOf(i), new IndexedObject(i));
        }
        forceCheckpoint();
        stopGrid(1, false);
        for (int i2 = 0; i2 < 3200; i2++) {
            cache.put(Integer.valueOf(i2), new IndexedObject(i2 + 1));
        }
        forceCheckpoint();
        startGrid(1);
        awaitPartitionMapExchange();
        Iterator it = G.allGrids().iterator();
        while (it.hasNext()) {
            IgniteCache cache2 = ((Ignite) it.next()).cache("cache");
            for (int i3 = 0; i3 < 3200; i3++) {
                assertEquals(new IndexedObject(i3 + 1), cache2.get(Integer.valueOf(i3)));
            }
        }
    }

    @Test
    public void testRebalanceRemoves() throws Exception {
        IgniteEx startGrid = startGrid(0);
        startGrid(1);
        startGrid.cluster().active(true);
        IgniteCache cache = startGrid.cache("cache");
        for (int i = 0; i < 3200; i++) {
            cache.put(Integer.valueOf(i), new IndexedObject(i));
        }
        forceCheckpoint();
        stopGrid(1, false);
        for (int i2 = 0; i2 < 3200; i2++) {
            if (i2 % 3 != 2) {
                cache.put(Integer.valueOf(i2), new IndexedObject(i2 + 1));
            } else {
                cache.remove(Integer.valueOf(i2));
            }
        }
        forceCheckpoint();
        startGrid(1);
        awaitPartitionMapExchange();
        Iterator it = G.allGrids().iterator();
        while (it.hasNext()) {
            IgniteCache cache2 = ((Ignite) it.next()).cache("cache");
            for (int i3 = 0; i3 < 3200; i3++) {
                if (i3 % 3 != 2) {
                    assertEquals(new IndexedObject(i3 + 1), cache2.get(Integer.valueOf(i3)));
                } else {
                    assertNull(cache2.get(Integer.valueOf(i3)));
                }
            }
        }
    }

    @Test
    public void testWithLocalWalChange() throws Exception {
        System.setProperty("IGNITE_DISABLE_WAL_DURING_REBALANCING", "true");
        IgniteEx startGrids = startGrids(4);
        startGrids.cluster().active(true);
        IgniteCache cache = startGrids.cache("cache");
        for (int i = 0; i < 320; i++) {
            cache.put(Integer.valueOf(i), new IndexedObject(i - 1));
        }
        forceCheckpoint();
        stopAllGrids();
        IgniteEx startGrids2 = startGrids(2);
        startGrids2.cluster().active(true);
        IgniteCache cache2 = startGrids2.cache("cache");
        int groupId = startGrids2.cachex("cache").context().groupId();
        for (int i2 = 0; i2 < 320; i2++) {
            cache2.put(Integer.valueOf(i2), new IndexedObject(i2));
        }
        forceCheckpoint();
        IgniteEx startGrid = startGrid(2);
        awaitPartitionMapExchange();
        Assert.assertTrue(startGrid.configuration().getCommunicationSpi().walRebalanceVersions(groupId).contains(Long.valueOf(startGrid.cluster().topologyVersion())));
        for (int i3 = 0; i3 < 320; i3++) {
            if (i3 % 3 == 0) {
                cache2.put(Integer.valueOf(i3), new IndexedObject(i3 + 1));
            } else if (i3 % 3 == 1) {
                cache2.remove(Integer.valueOf(i3));
            }
        }
        forceCheckpoint();
        stopGrid(0);
        stopGrid(1);
        IgniteEx startGrid2 = startGrid(3);
        awaitPartitionMapExchange();
        Assert.assertFalse(startGrid2.configuration().getCommunicationSpi().walRebalanceVersions(groupId).contains(Long.valueOf(startGrid2.cluster().topologyVersion())));
        Iterator it = G.allGrids().iterator();
        while (it.hasNext()) {
            IgniteCache cache3 = ((Ignite) it.next()).cache("cache");
            for (int i4 = 0; i4 < 320; i4++) {
                if (i4 % 3 == 0) {
                    assertEquals(new IndexedObject(i4 + 1), cache3.get(Integer.valueOf(i4)));
                } else if (i4 % 3 == 1) {
                    assertNull(cache3.get(Integer.valueOf(i4)));
                } else {
                    assertEquals(new IndexedObject(i4), cache3.get(Integer.valueOf(i4)));
                }
            }
        }
    }

    @Test
    public void testWithGlobalWalChange() throws Exception {
        IgniteEx startGrids = startGrids(3);
        startGrids.cluster().active(true);
        IgniteCache cache = startGrids.cache("cache");
        for (int i = 0; i < 320; i++) {
            cache.put(Integer.valueOf(i), new IndexedObject(i - 1));
        }
        forceCheckpoint();
        stopAllGrids();
        IgniteEx startGrids2 = startGrids(2);
        startGrids2.cluster().active(true);
        startGrids2.cluster().disableWal("cache");
        IgniteCache cache2 = startGrids2.cache("cache");
        int groupId = startGrids2.cachex("cache").context().groupId();
        for (int i2 = 0; i2 < 320; i2++) {
            cache2.put(Integer.valueOf(i2), new IndexedObject(i2));
        }
        forceCheckpoint();
        startGrids2.cluster().enableWal("cache");
        IgniteEx startGrid = startGrid(2);
        awaitPartitionMapExchange();
        Assert.assertFalse(startGrid.configuration().getCommunicationSpi().walRebalanceVersions(groupId).contains(Long.valueOf(startGrid.cluster().topologyVersion())));
        stopGrid(2);
        forceCheckpoint();
        for (int i3 = 0; i3 < 320; i3++) {
            cache2.put(Integer.valueOf(i3), new IndexedObject(i3 + 1));
        }
        IgniteEx startGrid2 = startGrid(2);
        awaitPartitionMapExchange();
        Assert.assertTrue(startGrid2.configuration().getCommunicationSpi().walRebalanceVersions(groupId).contains(Long.valueOf(startGrid2.cluster().topologyVersion())));
        Iterator it = G.allGrids().iterator();
        while (it.hasNext()) {
            IgniteCache cache3 = ((Ignite) it.next()).cache("cache");
            for (int i4 = 0; i4 < 320; i4++) {
                assertEquals(new IndexedObject(i4 + 1), cache3.get(Integer.valueOf(i4)));
            }
        }
    }

    @Test
    public void testRebalanceCancelOnSupplyError() throws Exception {
        IgniteEx startGrids = startGrids(3);
        startGrids.cluster().active(true);
        IgniteCache cache = startGrids.cache("cache");
        for (int i = 0; i < 320; i++) {
            cache.put(Integer.valueOf(i), new IndexedObject(i - 1));
        }
        forceCheckpoint();
        stopAllGrids();
        IgniteEx startGrid = startGrid(0);
        startGrid.cluster().active(true);
        IgniteCache cache2 = startGrid.cache("cache");
        for (int i2 = 0; i2 < 320; i2++) {
            cache2.put(Integer.valueOf(i2), new IndexedObject(i2));
        }
        forceCheckpoint();
        int groupId = startGrid.cachex("cache").context().groupId();
        this.blockMessagePredicate = (clusterNode, message) -> {
            return (message instanceof GridDhtPartitionDemandMessage) && ((GridDhtPartitionDemandMessage) message).groupId() == groupId;
        };
        IgniteEx startGrid2 = startGrid(2);
        AffinityTopologyVersion affinityTopologyVersion = startGrid2.context().discovery().topologyVersionEx();
        GridCachePreloader preloader = startGrid2.cachex("cache").context().group().preloader();
        GridTestUtils.waitForCondition(() -> {
            return preloader.rebalanceFuture().topologyVersion().equals(affinityTopologyVersion);
        }, getTestTimeout());
        FailingIOFactory failingIOFactory = new FailingIOFactory(new RandomAccessFileIOFactory());
        startGrid.cachex("cache").context().shared().wal().setFileIOFactory(failingIOFactory);
        failingIOFactory.throwExceptionOnWalRead();
        startGrid2.configuration().getCommunicationSpi().stopBlock();
        Assert.assertEquals("Rebalance should be cancelled on demander node: " + preloader.rebalanceFuture(), false, (Boolean) preloader.rebalanceFuture().get());
        this.blockMessagePredicate = null;
        failingIOFactory.reset();
        startGrid(1);
        awaitPartitionMapExchange();
        Iterator it = G.allGrids().iterator();
        while (it.hasNext()) {
            IgniteCache cache3 = ((Ignite) it.next()).cache("cache");
            for (int i3 = 0; i3 < 320; i3++) {
                assertEquals(new IndexedObject(i3), cache3.get(Integer.valueOf(i3)));
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1436084194:
                if (implMethodName.equals("lambda$testRebalanceCancelOnSupplyError$93f20b50$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/lang/IgniteBiPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest") && serializedLambda.getImplMethodSignature().equals("(ILorg/apache/ignite/cluster/ClusterNode;Lorg/apache/ignite/plugin/extensions/communication/Message;)Z")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return (clusterNode, message) -> {
                        return (message instanceof GridDhtPartitionDemandMessage) && ((GridDhtPartitionDemandMessage) message).groupId() == intValue;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
