package org.apache.ignite.internal;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterGroupEmptyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

/* loaded from: input_file:org/apache/ignite/internal/CommunicationMessageAcknowledgeTest.class */
public class CommunicationMessageAcknowledgeTest extends GridCommonAbstractTest {
    private static final int SINGLE_PUT_SIZE = 104857600;
    private static final int PUT_COUNT = 5;
    private static final int REMOTE_HEAP_SIZE_MB = 512;
    private static final int MILLIS_BETWEEN_PUTS = 100;
    private static final String LOCAL_NODE_NAME = "local";
    private static final String REMOTE_NODE_NAME = "remote";
    private static final String CACHE_NAME = "cache1";
    private Consumer<TcpCommunicationSpi> communicationSpiCustomizer;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        IgniteConfiguration configuration = super.getConfiguration(str);
        TcpCommunicationSpi communicationSpi = configuration.getCommunicationSpi();
        disableAcksForTestDuration(communicationSpi);
        this.communicationSpiCustomizer.accept(communicationSpi);
        CacheConfiguration cacheConfiguration = new CacheConfiguration("default");
        cacheConfiguration.setName("cache1");
        cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
        cacheConfiguration.setBackups(1);
        configuration.setCacheConfiguration(new CacheConfiguration[]{cacheConfiguration});
        return configuration;
    }

    private static void disableAcksForTestDuration(TcpCommunicationSpi tcpCommunicationSpi) {
        long millis = TimeUnit.DAYS.toMillis(365L);
        tcpCommunicationSpi.setIdleConnectionTimeout(millis);
        tcpCommunicationSpi.setAckSendThreshold(1000);
        tcpCommunicationSpi.setAckSendThresholdBytes(Long.MAX_VALUE);
        tcpCommunicationSpi.setAckSendThresholdMillis(millis);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.common.GridCommonAbstractTest, org.apache.ignite.testframework.junits.GridAbstractTest
    public void beforeTest() throws Exception {
        super.beforeTest();
        stopAllGrids();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTest() throws Exception {
        this.communicationSpiCustomizer = null;
        stopAllGrids();
        super.afterTest();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public boolean isMultiJvm() {
        return true;
    }

    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    protected List<String> additionalRemoteJvmArgs() {
        return Arrays.asList("-Xmx512m", "-Xms512m", "-XX:+CrashOnOutOfMemoryError");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public boolean isRemoteJvm(String str) {
        return str.startsWith(REMOTE_NODE_NAME);
    }

    @Test
    public void acksShouldBeSentOnCountThreshold() throws Exception {
        testMessagesAcking(tcpCommunicationSpi -> {
            tcpCommunicationSpi.setAckSendThreshold(1);
        });
    }

    @Test
    public void acksShouldBeSentOnIdleConnectionTimeout() throws Exception {
        testMessagesAcking(tcpCommunicationSpi -> {
            tcpCommunicationSpi.setIdleConnectionTimeout(10L);
        });
    }

    @Test
    public void acksShouldBeSentOnAccruedSizeThreshold() throws Exception {
        testMessagesAcking(tcpCommunicationSpi -> {
            tcpCommunicationSpi.setAckSendThresholdBytes(1048576L);
        });
    }

    @Test
    public void acksShouldBeSentOnTimeout() throws Exception {
        testMessagesAcking(tcpCommunicationSpi -> {
            tcpCommunicationSpi.setAckSendThresholdMillis(10L);
        });
    }

    private void testMessagesAcking(Consumer<TcpCommunicationSpi> consumer) throws Exception {
        this.communicationSpiCustomizer = consumer;
        IgniteEx startGrid = startGrid(LOCAL_NODE_NAME);
        IgniteEx startGrid2 = startGrid(REMOTE_NODE_NAME);
        AtomicBoolean listenForRemoteNodeFailure = listenForRemoteNodeFailure(startGrid);
        startGrid.getOrCreateCache("cache1");
        IgniteCache cache = startGrid2.cache("cache1");
        Random random = new Random();
        byte[] bArr = new byte[SINGLE_PUT_SIZE];
        for (int i = 0; i < 5 && !listenForRemoteNodeFailure.get(); i++) {
            random.nextBytes(bArr);
            try {
                cache.put(Integer.valueOf(i), bArr);
                cache.remove(Integer.valueOf(i));
                Thread.sleep(100L);
            } catch (ClusterGroupEmptyException e) {
                if (!"Topology projection is empty.".equals(e.getMessage())) {
                    throw e;
                }
                throw new AssertionError("Remote node seems to have died", e);
            }
        }
        assertFalse("Remote node failed, probably an OOM", listenForRemoteNodeFailure.get());
    }

    private static AtomicBoolean listenForRemoteNodeFailure(IgniteEx igniteEx) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        igniteEx.events().localListen(event -> {
            if (!REMOTE_NODE_NAME.equals(((DiscoveryEvent) event).eventNode().attribute("org.apache.ignite.ignite.name"))) {
                return false;
            }
            atomicBoolean.set(true);
            return false;
        }, new int[]{12});
        return atomicBoolean;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1138688499:
                if (implMethodName.equals("lambda$listenForRemoteNodeFailure$1409a9a6$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/lang/IgnitePredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/ignite/internal/CommunicationMessageAcknowledgeTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicBoolean;Lorg/apache/ignite/events/Event;)Z")) {
                    AtomicBoolean atomicBoolean = (AtomicBoolean) serializedLambda.getCapturedArg(0);
                    return event -> {
                        if (!REMOTE_NODE_NAME.equals(((DiscoveryEvent) event).eventNode().attribute("org.apache.ignite.ignite.name"))) {
                            return false;
                        }
                        atomicBoolean.set(true);
                        return false;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
