package org.gridgain.internal.processors.dr.ist;

import java.util.Arrays;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.gridgain.grid.configuration.DrReceiverConfiguration;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.internal.processors.dr.DrAbstractTest;
import org.gridgain.plugin.security.SecurityServicePermissionsTest;
import org.junit.Test;

@WithSystemProperty(key = "GG_INCREMENTAL_STATE_TRANSFER", value = "true")
/* loaded from: input_file:org/gridgain/internal/processors/dr/ist/IncrementalDRMetricsTest.class */
public class IncrementalDRMetricsTest extends DrAbstractTest {
    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public boolean useSenderGroups() {
        return true;
    }

    @Test
    public void testDrBuffersMetrics() throws Exception {
        TcpDiscoveryIpFinder createSenderTopology = createSenderTopology();
        startTopology(createSenderTopology);
        IgniteEx grid = grid("top1_node");
        IgniteEx grid2 = grid("top1_node_2");
        IgniteEx startGrid = startGrid(senderNodeConfig(createSenderTopology));
        IgniteCache cache = grid.cache(SecurityServicePermissionsTest.CACHE_NAME);
        assertEquals(0L, dr(startGrid).senderAggregatedOutMetrics().stateTransferBufferSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
        for (int i = 0; i < 200; i++) {
            cache.put(Integer.valueOf(i), Integer.valueOf(i));
        }
        assertTrue(dr(startGrid).senderAggregatedOutMetrics().stateTransferBufferSize() > 0);
        assertEquals(0L, dr(startGrid).senderAggregatedOutMetrics().storeSize());
        assertEquals(200L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize());
        assertEquals(200L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
        startTopology(createReceiverTopology());
        waitForCacheReplicated(SecurityServicePermissionsTest.CACHE_NAME, "top1_node", "top1_node_2");
        assertEquals(0L, dr(startGrid).senderAggregatedOutMetrics().stateTransferBufferSize());
        assertEquals(0L, dr(startGrid).senderAggregatedOutMetrics().storeSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize());
        assertEquals(200L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
        grid.cachex(SecurityServicePermissionsTest.CACHE_NAME).cache().context().dr().syncDrState();
        grid2.cachex(SecurityServicePermissionsTest.CACHE_NAME).cache().context().dr().syncDrState();
        waitForEmptyBackupQueue(SecurityServicePermissionsTest.CACHE_NAME, "top1_node", "top1_node_2");
        assertEquals(0L, dr(startGrid).senderAggregatedOutMetrics().stateTransferBufferSize());
        assertEquals(0L, dr(startGrid).senderAggregatedOutMetrics().storeSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
        stopGrid("top2_node");
        assertEquals(0L, dr(startGrid).senderAggregatedOutMetrics().stateTransferBufferSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
        IgniteFuture stateTransfer = dr(grid).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, new byte[0]);
        GridTestUtils.waitForCondition(() -> {
            return dr(startGrid).senderAggregatedOutMetrics().stateTransferBufferSize() > 0;
        }, 15000L);
        assertTrue(dr(startGrid).senderAggregatedOutMetrics().stateTransferBufferSize() > 0);
        assertEquals(0L, dr(startGrid).senderAggregatedOutMetrics().storeSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
        startTopology(createReceiverTopology());
        stateTransfer.get();
        assertEquals(0L, dr(startGrid).senderAggregatedOutMetrics().stateTransferBufferSize());
        assertEquals(0L, dr(startGrid).senderAggregatedOutMetrics().storeSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
    }

    protected TcpDiscoveryIpFinder createSenderTopology() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        GridGainConfiguration batchSendSizeBytes = new GridGainConfiguration().setBatchSendSizeBytes(256);
        GridGainConfiguration batchSendSizeBytes2 = new GridGainConfiguration().setBatchSendSizeBytes(256);
        CacheConfiguration atomicityMode = senderCacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, 1, 1000L).setBackups(2).setAffinity(new RendezvousAffinityFunction().setPartitions(8)).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
        ggCacheConfig(atomicityMode).getDrSenderConfiguration().setBackupSyncFrequency(2147483647L);
        addTopology(ipFinder, config(batchSendSizeBytes, "top1_node", (byte) 1, ipFinder, null, null, atomicityMode), config(batchSendSizeBytes2, "top1_node_2", (byte) 1, ipFinder, null, null, atomicityMode));
        return ipFinder;
    }

    protected TcpDiscoveryIpFinder createReceiverTopology() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        addTopology(ipFinder, config(new GridGainConfiguration(), "top2_node", (byte) 2, ipFinder, null, new DrReceiverConfiguration().setFlushFrequency(500L).setLocalInboundPort(12311), cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, false)));
        return ipFinder;
    }

    IgniteConfiguration senderNodeConfig(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws Exception {
        GridGainConfiguration gridGainConfiguration = new GridGainConfiguration();
        DrSenderConfiguration senderHubConfig = senderHubConfig(senderHubReplicaConfig((byte) 2, "127.0.0.1:12311"));
        senderHubConfig.setFullStateTransferBufferSize(33554432L);
        return config(gridGainConfiguration, "top1_node_snd", (byte) 1, tcpDiscoveryIpFinder, senderHubConfig, null, new CacheConfiguration[0]);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public void waitForCacheReplicated(String str, String... strArr) throws IgniteInterruptedCheckedException {
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return Arrays.stream(strArr).allMatch(str2 -> {
                return dr(grid(str2)).senderCacheMetrics(str).pendingQueueSize() == 0;
            });
        }, 15000L));
    }

    private void waitForEmptyBackupQueue(String str, String... strArr) throws IgniteInterruptedCheckedException {
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return Arrays.stream(strArr).allMatch(str2 -> {
                return dr(grid(str2)).senderCacheMetrics(str).backupQueueSize() == 0;
            });
        }, 15000L));
    }
}
