package org.gridgain.grid.internal.processors.dr;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.plugin.CachePluginConfiguration;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.configuration.DrReceiverConfiguration;
import org.gridgain.grid.configuration.GridGainCacheConfiguration;
import org.gridgain.grid.dr.DrSender;
import org.gridgain.grid.dr.DrSenderConnectionState;
import org.gridgain.internal.processors.dr.DrAbstractTest;
import org.gridgain.plugin.security.SecurityServicePermissionsTest;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/gridgain/grid/internal/processors/dr/DrSenderMBeanTest.class */
public class DrSenderMBeanTest extends DrAbstractTest {
    private final Map<String, String> mtdNameToLogWarning = new HashMap<String, String>() { // from class: org.gridgain.grid.internal.processors.dr.DrSenderMBeanTest.1
        {
            put("pause", "Can't pause replication to remote DC. Data center is not configured: dcId=");
            put("paused", "Can't acquire state for remote DC. Data center is not configured: dcId=");
            put("resume", "Can't resume replication to remote DC. Data center is not configured: dcId=");
        }
    };
    private final ListeningTestLogger testLogger = new ListeningTestLogger(false, GridAbstractTest.log);

    public DrSenderMBeanTest() {
        log = this.testLogger;
    }

    @Test
    public void testPauseResumeViaJmx1() throws Exception {
        CacheConfiguration cacheConfig = cacheConfig(SecurityServicePermissionsTest.CACHE_NAME);
        startCluster((byte) 2, DrAbstractTest.ConfigurationBuilder.node(DrAbstractTest.TOP2_NODE_RCV).withReceiver(receiverHubConfig(DrAbstractTest.RCV_PORT_2)).addCache(cacheConfig).asClient().build(), DrAbstractTest.ConfigurationBuilder.node(DrAbstractTest.TOP2_NODE).addCache(cacheConfig).asServer().build());
        startCluster((byte) 3, DrAbstractTest.ConfigurationBuilder.node("top3_node_rcv").withReceiver(receiverHubConfig(12313)).addCache(cacheConfig).asClient().build(), DrAbstractTest.ConfigurationBuilder.node("top3_node").addCache(cacheConfig).asServer().build());
        startCluster((byte) 1, DrAbstractTest.ConfigurationBuilder.node(DrAbstractTest.TOP1_NODE_SND).withSender(senderHubConfig(senderHubReplicaConfig((byte) 2, "127.0.0.1:12312"), senderHubReplicaConfig((byte) 3, "127.0.0.1:12313"))).addCache(cacheConfig).asClient().build(), DrAbstractTest.ConfigurationBuilder.node(DrAbstractTest.TOP1_NODE).addCache(cacheConfig).asServer().build());
        verifyPauseAndResumeViaJmx();
    }

    @Test
    public void testPauseResumeViaJmx2() throws Exception {
        CacheConfiguration cacheConfig = cacheConfig(SecurityServicePermissionsTest.CACHE_NAME);
        startCluster((byte) 2, DrAbstractTest.ConfigurationBuilder.node(DrAbstractTest.TOP2_NODE_RCV).withReceiver(receiverHubConfig(DrAbstractTest.RCV_PORT_2)).addCache(cacheConfig).asServer().build(), DrAbstractTest.ConfigurationBuilder.node(DrAbstractTest.TOP2_NODE).addCache(cacheConfig).asServer().build());
        startCluster((byte) 3, DrAbstractTest.ConfigurationBuilder.node("top3_node_rcv").withReceiver(receiverHubConfig(12313)).addCache(cacheConfig).asServer().build(), DrAbstractTest.ConfigurationBuilder.node("top3_node").addCache(cacheConfig).asServer().build());
        startCluster((byte) 1, DrAbstractTest.ConfigurationBuilder.node(DrAbstractTest.TOP1_NODE_SND).withSender(senderHubConfig(senderHubReplicaConfig((byte) 2, "127.0.0.1:12312"), senderHubReplicaConfig((byte) 3, "127.0.0.1:12313"))).addCache(cacheConfig).asServer().build(), DrAbstractTest.ConfigurationBuilder.node(DrAbstractTest.TOP1_NODE).addCache(cacheConfig).asServer().build());
        verifyPauseAndResumeViaJmx();
    }

    public void verifyPauseAndResumeViaJmx() throws Exception {
        IgniteEx grid = grid(DrAbstractTest.TOP1_NODE_SND);
        GridTestUtils.waitForCondition(() -> {
            return isBothDcConnected(grid);
        }, 5000L);
        checkReplState(grid, (byte) 2, false);
        checkReplState(grid, (byte) 3, false);
        Map<? extends Object, ? extends Object> putData = putData(0, 10, DrAbstractTest.TOP1_NODE);
        verifyData(DrAbstractTest.TOP2_NODE, putData);
        verifyData("top3_node", putData);
        invokeMethod(grid, "pause", new T2<>((byte) 2, "byte"));
        checkReplState(grid, (byte) 2, true);
        checkReplState(grid, (byte) 3, false);
        Map<? extends Object, ? extends Object> putData2 = putData(10, 20, DrAbstractTest.TOP1_NODE);
        HashMap hashMap = new HashMap();
        hashMap.putAll(putData);
        hashMap.putAll(putData2);
        verifyData("top3_node", hashMap);
        Assert.assertTrue(grid(DrAbstractTest.TOP2_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME).getAll(putData2.keySet()).isEmpty());
        invokeMethod(grid, "resume", new T2<>((byte) 2, "byte"));
        checkReplState(grid, (byte) 2, false);
        checkReplState(grid, (byte) 3, false);
        verifyData(DrAbstractTest.TOP2_NODE, hashMap);
    }

    @Test
    public void testPauseOnNonConfiguredDc() throws Exception {
        CacheConfiguration cacheConfig = cacheConfig(SecurityServicePermissionsTest.CACHE_NAME);
        startCluster((byte) 2, DrAbstractTest.ConfigurationBuilder.node(DrAbstractTest.TOP2_NODE).withReceiver(receiverHubConfig(DrAbstractTest.RCV_PORT_2)).addCache(cacheConfig).asServer().build());
        startCluster((byte) 1, DrAbstractTest.ConfigurationBuilder.node(DrAbstractTest.TOP1_NODE).withSender(senderHubConfig(senderHubReplicaConfig((byte) 2, "127.0.0.1:12312"))).addCache(cacheConfig).asServer().build());
        IgniteEx grid = grid(DrAbstractTest.TOP1_NODE);
        for (String str : this.mtdNameToLogWarning.keySet()) {
            Iterator it = Arrays.asList((byte) 1, (byte) 3).iterator();
            while (it.hasNext()) {
                verifyLog(grid, ((Byte) it.next()).byteValue(), str);
            }
            Iterator it2 = Arrays.asList((byte) 0, (byte) -3).iterator();
            while (it2.hasNext()) {
                byte byteValue = ((Byte) it2.next()).byteValue();
                GridTestUtils.assertThrowsWithCause(() -> {
                    return invokeMethod(grid, str, new T2<>(Byte.valueOf(byteValue), "byte"));
                }, IllegalArgumentException.class);
            }
        }
        verifyData(DrAbstractTest.TOP2_NODE, putData(0, 10, DrAbstractTest.TOP1_NODE));
    }

    private boolean isBothDcConnected(Ignite ignite) {
        DrSender localSender = dr(ignite).localSender();
        return localSender.connection((byte) 2).connectionState() == DrSenderConnectionState.CONNECTED && localSender.connection((byte) 3).connectionState() == DrSenderConnectionState.CONNECTED;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public DrReceiverConfiguration receiverHubConfig(int i) {
        return super.receiverHubConfig(i).setFlushFrequency(100L);
    }

    private void verifyLog(Ignite ignite, byte b, String str) throws Exception {
        LogListener build = LogListener.matches(this.mtdNameToLogWarning.get(str) + ((int) b)).times(1).build();
        this.testLogger.registerListener(build);
        invokeMethod(ignite, str, new T2<>(Byte.valueOf(b), "byte"));
        Assert.assertTrue(build.check(1000L));
        this.testLogger.unregisterListener(build);
    }

    private void checkReplState(Ignite ignite, byte b, boolean z) throws IgniteInterruptedCheckedException {
        Assert.assertTrue(GridTestUtils.waitForCondition(() -> {
            try {
                return z == Boolean.parseBoolean(invokeMethod(ignite, "paused", new T2<>(Byte.valueOf(b), "byte")).toString());
            } catch (Exception e) {
                return false;
            }
        }, 5000L));
    }

    private Map<Object, Object> putData(int i, int i2, String str) {
        HashMap hashMap = new HashMap();
        for (int i3 = i; i3 < i + i2; i3++) {
            hashMap.put(Integer.valueOf(i3), Integer.valueOf(i3));
        }
        grid(str).cache(SecurityServicePermissionsTest.CACHE_NAME).putAll(hashMap);
        return hashMap;
    }

    private void verifyData(String str, Map<Object, Object> map) throws IgniteInterruptedCheckedException {
        HashMap hashMap = new HashMap();
        GridTestUtils.waitForCondition(() -> {
            hashMap.clear();
            IgniteCache cache = grid(str).cache(SecurityServicePermissionsTest.CACHE_NAME);
            hashMap.putAll(cache.getAll(map.keySet()));
            return map.equals(hashMap) && map.size() == cache.size(new CachePeekMode[0]);
        }, 10000L);
        Assert.assertEquals(map, hashMap);
        Assert.assertEquals(map.size(), grid(str).cache(SecurityServicePermissionsTest.CACHE_NAME).size(new CachePeekMode[0]));
    }

    public CacheConfiguration cacheConfig(String str) {
        return cacheConfig(str, CacheMode.PARTITIONED).setPluginConfigurations(new CachePluginConfiguration[]{new GridGainCacheConfiguration().setDrReceiverEnabled(true).setDrSenderConfiguration(new CacheDrSenderConfiguration().setBatchSendSize(1).setBatchSendFrequency(100L))});
    }

    @SafeVarargs
    private final Object invokeMethod(Ignite ignite, String str, T2<Object, String>... t2Arr) throws Exception {
        return ignite.configuration().getMBeanServer().invoke(IgniteUtils.makeMBeanName(ignite.name(), "Data center replication", "Sender hub"), str, t2Arr == null ? null : Stream.of((Object[]) t2Arr).map((v0) -> {
            return v0.getKey();
        }).toArray(), t2Arr == null ? null : (String[]) Stream.of((Object[]) t2Arr).map((v0) -> {
            return v0.getValue();
        }).toArray(i -> {
            return new String[i];
        }));
    }
}
