package org.gridgain.internal.processors.dr.ist;

import java.util.Arrays;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.management.ObjectName;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.metric.MetricExporterSpi;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.gridgain.grid.GridDr;
import org.gridgain.grid.cache.dr.CacheDrEntry;
import org.gridgain.grid.cache.dr.CacheDrEntryFilter;
import org.gridgain.grid.cache.dr.CacheDrReceiverMetrics;
import org.gridgain.grid.cache.dr.CacheDrSenderMetrics;
import org.gridgain.grid.configuration.DrReceiverConfiguration;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.dr.DrReceiverInMetrics;
import org.gridgain.grid.dr.DrReceiverOutMetrics;
import org.gridgain.grid.dr.DrSenderInMetrics;
import org.gridgain.grid.dr.DrSenderOutMetrics;
import org.gridgain.internal.processors.dr.DrAbstractTest;
import org.gridgain.internal.processors.dr.qa.DrJmxMetricsAbstractTest;
import org.gridgain.plugin.security.SecurityServicePermissionsTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

/* loaded from: input_file:org/gridgain/internal/processors/dr/ist/IncrementalDRMetricsTest.class */
public class IncrementalDRMetricsTest extends DrAbstractTest {
    private static final int FILTERED_ENTRY_VALUE = Integer.MAX_VALUE;
    private static final long WAIT_TIMEOUT = 10000;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/gridgain/internal/processors/dr/ist/IncrementalDRMetricsTest$DrEntryFilter.class */
    public static final class DrEntryFilter implements CacheDrEntryFilter<Integer, Integer> {
        private static final long serialVersionUID = 0;

        protected DrEntryFilter() {
        }

        public boolean accept(CacheDrEntry<Integer, Integer> cacheDrEntry) {
            return !Integer.valueOf(IncrementalDRMetricsTest.FILTERED_ENTRY_VALUE).equals(cacheDrEntry.value());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public boolean useSenderGroups() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public IgniteConfiguration config(GridGainConfiguration gridGainConfiguration, String str, byte b, TcpDiscoveryIpFinder tcpDiscoveryIpFinder, @Nullable DrSenderConfiguration drSenderConfiguration, @Nullable DrReceiverConfiguration drReceiverConfiguration, boolean z, @Nullable CacheConfiguration... cacheConfigurationArr) throws IgniteCheckedException {
        return super.config(gridGainConfiguration, str, b, tcpDiscoveryIpFinder, drSenderConfiguration, drReceiverConfiguration, z, cacheConfigurationArr).setMetricExporterSpi(new MetricExporterSpi[]{new JmxMetricExporterSpi()});
    }

    @Test
    public void testDrBuffersMetrics() throws Exception {
        startTopology(createSenderTopology());
        IgniteEx grid = grid(DrAbstractTest.TOP1_NODE);
        IgniteEx grid2 = grid(DrAbstractTest.TOP1_NODE_2);
        IgniteEx grid3 = grid(DrAbstractTest.TOP1_NODE_SND);
        assertNotNull(dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).status());
        assertNull(dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).status().reason());
        IgniteCache cache = grid.cache(SecurityServicePermissionsTest.CACHE_NAME);
        assertEquals(0L, dr(grid3).senderAggregatedOutMetrics().stateTransferBufferSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
        for (int i = 0; i < 200; i++) {
            cache.put(Integer.valueOf(i), Integer.valueOf(i));
        }
        assertTrue(dr(grid3).senderAggregatedOutMetrics().stateTransferBufferSize() > 0);
        assertEquals(200L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize());
        assertEquals(200L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
        TcpDiscoveryIpFinder createReceiverTopology = createReceiverTopology();
        startTopology(createReceiverTopology);
        waitForCacheReplicated(SecurityServicePermissionsTest.CACHE_NAME, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        assertEquals(0L, dr(grid3).senderAggregatedOutMetrics().stateTransferBufferSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize());
        assertEquals(200L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
        grid.cachex(SecurityServicePermissionsTest.CACHE_NAME).cache().context().dr().syncDrState();
        grid2.cachex(SecurityServicePermissionsTest.CACHE_NAME).cache().context().dr().syncDrState();
        waitForEmptyBackupQueue(SecurityServicePermissionsTest.CACHE_NAME, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        assertEquals(0L, dr(grid3).senderAggregatedOutMetrics().stateTransferBufferSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
        stopGrid(DrAbstractTest.TOP2_NODE_RCV);
        assertEquals(0L, dr(grid3).senderAggregatedOutMetrics().stateTransferBufferSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
        IgniteFuture stateTransfer = dr(grid).stateTransfer(SecurityServicePermissionsTest.CACHE_NAME, new byte[0]);
        GridTestUtils.waitForCondition(() -> {
            return dr(grid3).senderAggregatedOutMetrics().stateTransferBufferSize() > 0;
        }, 15000L);
        assertTrue(dr(grid3).senderAggregatedOutMetrics().stateTransferBufferSize() > 0);
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
        startGrid(receiverConfig(createReceiverTopology));
        stateTransfer.get();
        assertEquals(0L, dr(grid3).senderAggregatedOutMetrics().stateTransferBufferSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).pendingQueueSize());
        assertEquals(0L, dr(grid).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize() + dr(grid2).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).backupQueueSize());
    }

    @Test
    public void testMessageQueuesMetric() throws Exception {
        startTopology(createSenderTopology());
        startTopology(createReceiverTopology());
        IgniteCache cache = grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        IgniteEx grid = grid(DrAbstractTest.TOP2_NODE);
        IgniteCache cache2 = grid.cache(SecurityServicePermissionsTest.CACHE_NAME);
        Phaser phaser = new Phaser(2);
        GridTestUtils.runAsync(() -> {
            try {
                Transaction txStart = grid.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
                Throwable th = null;
                try {
                    try {
                        cache2.put(1, 1);
                        phaser.awaitAdvanceInterruptibly(phaser.arrive(), WAIT_TIMEOUT, TimeUnit.MILLISECONDS);
                        phaser.awaitAdvanceInterruptibly(phaser.arrive(), WAIT_TIMEOUT, TimeUnit.MILLISECONDS);
                        txStart.rollback();
                        if (txStart != null) {
                            if (0 != 0) {
                                try {
                                    txStart.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                txStart.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (InterruptedException | TimeoutException e) {
                throw new GridClosureException(e);
            }
        });
        phaser.awaitAdvanceInterruptibly(phaser.arrive(), WAIT_TIMEOUT, TimeUnit.MILLISECONDS);
        for (int i = 0; i < 10; i++) {
            cache.put(1, Integer.valueOf(i));
        }
        ObjectName makeMBeanName = IgniteUtils.makeMBeanName(DrAbstractTest.TOP1_NODE_SND, "dr", "sender.buffer.global");
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return dr(grid(DrAbstractTest.TOP1_NODE_SND)).senderAggregatedOutMetrics(SecurityServicePermissionsTest.CACHE_NAME).stateTransferBufferSize() > 0 && DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP1_NODE_SND), makeMBeanName).metric("StoreSize") > 0;
        }, WAIT_TIMEOUT));
        ObjectName makeMBeanName2 = IgniteUtils.makeMBeanName(DrAbstractTest.TOP2_NODE_RCV, "dr", "receiver");
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return dr(grid(DrAbstractTest.TOP2_NODE_RCV)).receiverAggregatedInMetrics(SecurityServicePermissionsTest.CACHE_NAME).messageQueueSizeBytes() > 0 && DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP2_NODE_RCV), makeMBeanName2).metric("MessageQueueSizeBytes") > 0;
        }, WAIT_TIMEOUT));
        phaser.awaitAdvanceInterruptibly(phaser.arrive(), WAIT_TIMEOUT, TimeUnit.MILLISECONDS);
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return dr(grid(DrAbstractTest.TOP1_NODE_SND)).senderAggregatedOutMetrics(SecurityServicePermissionsTest.CACHE_NAME).stateTransferBufferSize() == 0 && DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP1_NODE_SND), makeMBeanName).metric("StoreSize") == 0;
        }, WAIT_TIMEOUT));
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return dr(grid(DrAbstractTest.TOP2_NODE_RCV)).receiverAggregatedInMetrics(SecurityServicePermissionsTest.CACHE_NAME).messageQueueSizeBytes() == 0 && DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP2_NODE_RCV), makeMBeanName2).metric("MessageQueueSizeBytes") == 0;
        }, WAIT_TIMEOUT));
    }

    @Test
    public void testSenderHubMetrics() throws Exception {
        startTopology(createSenderTopology());
        startTopology(createReceiverTopology());
        IgniteCache cache = grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        cache.put(primaryKey(cache), 1);
        cache.put(backupKey(cache), Integer.valueOf(FILTERED_ENTRY_VALUE));
        waitForCacheReplicated(SecurityServicePermissionsTest.CACHE_NAME, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        GridDr dr = dr(grid(DrAbstractTest.TOP1_NODE_SND));
        GridTestUtils.assertThrows(log, () -> {
            return dr.receiverAggregatedInMetrics((byte) 2);
        }, IllegalStateException.class, (String) null);
        GridTestUtils.assertThrows(log, () -> {
            return dr.receiverAggregatedInMetrics();
        }, IllegalStateException.class, (String) null);
        GridTestUtils.assertThrows(log, () -> {
            return dr.receiverAggregatedOutMetrics();
        }, IllegalStateException.class, (String) null);
        DrSenderInMetrics senderAggregatedInMetrics = dr.senderAggregatedInMetrics();
        assertEquals(1, senderAggregatedInMetrics.batchesReceived());
        assertEquals(1L, senderAggregatedInMetrics.entriesReceived());
        assertTrue(senderAggregatedInMetrics.bytesReceived() > 0);
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP1_NODE_SND), IgniteUtils.makeMBeanName(DrAbstractTest.TOP1_NODE_SND, "dr", "sender.cache"));
        assertEquals(1L, metrics.metric("BatchesReceived"));
        assertEquals(1L, metrics.metric("EntriesReceived"));
        assertTrue(metrics.metric("BytesReceived") > 0);
        DrSenderInMetrics senderInMetrics = dr.senderInMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        assertEquals(1, senderInMetrics.batchesReceived());
        assertEquals(1L, senderInMetrics.entriesReceived());
        assertTrue(senderInMetrics.bytesReceived() > 0);
        DrSenderOutMetrics senderAggregatedOutMetrics = dr.senderAggregatedOutMetrics();
        assertTrue(senderAggregatedOutMetrics.bytesSent() > 0);
        assertEquals(1, senderAggregatedOutMetrics.batchesSent());
        assertEquals(1L, senderAggregatedOutMetrics.entriesSent());
        assertEquals(1, senderAggregatedOutMetrics.batchesAcked());
        assertEquals(1L, senderAggregatedOutMetrics.entriesAcked());
        assertTrue(senderAggregatedOutMetrics.bytesAcked() > 0);
        assertTrue(senderAggregatedOutMetrics.averageBatchAckTime() > 0.1d);
        DrSenderOutMetrics senderAggregatedOutMetrics2 = dr.senderAggregatedOutMetrics((byte) 2);
        assertTrue(senderAggregatedOutMetrics2.bytesSent() > 0);
        assertEquals(1, senderAggregatedOutMetrics2.batchesSent());
        assertEquals(1L, senderAggregatedOutMetrics2.entriesSent());
        assertEquals(1, senderAggregatedOutMetrics2.batchesAcked());
        assertEquals(1L, senderAggregatedOutMetrics2.entriesAcked());
        assertTrue(senderAggregatedOutMetrics2.bytesAcked() > 0);
        assertTrue(senderAggregatedOutMetrics2.averageBatchAckTime() > 0.1d);
        DrSenderOutMetrics senderAggregatedOutMetrics3 = dr.senderAggregatedOutMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        assertTrue(senderAggregatedOutMetrics3.bytesSent() > 0);
        assertEquals(1, senderAggregatedOutMetrics3.batchesSent());
        assertEquals(1L, senderAggregatedOutMetrics3.entriesSent());
        assertEquals(1, senderAggregatedOutMetrics3.batchesAcked());
        assertEquals(1L, senderAggregatedOutMetrics3.entriesAcked());
        assertTrue(senderAggregatedOutMetrics3.bytesAcked() > 0);
        assertTrue(senderAggregatedOutMetrics3.averageBatchAckTime() > 0.1d);
        DrSenderOutMetrics senderOutMetrics = dr.senderOutMetrics((byte) 2, SecurityServicePermissionsTest.CACHE_NAME);
        assertTrue(senderOutMetrics.bytesSent() > 0);
        assertEquals(1, senderOutMetrics.batchesSent());
        assertEquals(1L, senderOutMetrics.entriesSent());
        assertEquals(1, senderOutMetrics.batchesAcked());
        assertEquals(1L, senderOutMetrics.entriesAcked());
        assertTrue(senderOutMetrics.bytesAcked() > 0);
        assertTrue(senderOutMetrics.averageBatchAckTime() > 0.1d);
        CacheDrSenderMetrics senderCacheMetrics = dr(grid(DrAbstractTest.TOP1_NODE)).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        GridTestUtils.assertThrows(log, () -> {
            return dr(grid(DrAbstractTest.TOP1_NODE)).receiverCacheMetrics("cache_3");
        }, IllegalArgumentException.class, (String) null);
        assertEquals(1, senderCacheMetrics.batchesSent());
        assertEquals(1L, senderCacheMetrics.entriesSent());
        assertEquals(1, senderCacheMetrics.batchesAcked());
        assertEquals(0, senderCacheMetrics.batchesFailed());
        assertEquals(0L, senderCacheMetrics.entriesFiltered());
        assertEquals(1L, senderCacheMetrics.backupQueueSize());
        CacheDrSenderMetrics senderCacheMetrics2 = dr(grid(DrAbstractTest.TOP1_NODE_2)).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        assertEquals(0, senderCacheMetrics2.batchesSent());
        assertEquals(0L, senderCacheMetrics2.entriesSent());
        assertEquals(0, senderCacheMetrics2.batchesAcked());
        assertEquals(0, senderCacheMetrics2.batchesFailed());
        assertEquals(1L, senderCacheMetrics2.entriesFiltered());
        assertEquals(1L, senderCacheMetrics2.backupQueueSize());
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics2 = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP1_NODE_SND), IgniteUtils.makeMBeanName(DrAbstractTest.TOP1_NODE_SND, "dr", "sender.dc2.cache"));
        assertEquals(1L, metrics2.metric("BatchesSent"));
        assertEquals(1L, metrics2.metric("EntriesSent"));
        assertTrue(metrics2.metric("BytesSent") > 0);
        assertEquals(1L, metrics2.metric("BatchesAcked"));
        assertEquals(1L, metrics2.metric("EntriesAcked"));
        assertTrue(metrics2.metric("BytesAcked") > 0);
        assertTrue(metrics2.metric("AverageBatchAckTime") > 0);
        checkSenderHubResetMetrics(dr);
        checkStream();
    }

    private void checkStream() throws Exception {
        Ignite grid = grid(DrAbstractTest.TOP1_NODE);
        Ignite grid2 = grid(DrAbstractTest.TOP2_NODE);
        IgniteCache orCreateCache = grid.getOrCreateCache(SecurityServicePermissionsTest.CACHE_NAME);
        IgniteCache orCreateCache2 = grid2.getOrCreateCache(SecurityServicePermissionsTest.CACHE_NAME);
        orCreateCache.clear();
        orCreateCache.enableStatistics(true);
        orCreateCache2.clear();
        orCreateCache2.enableStatistics(true);
        assertEquals(0L, orCreateCache.metrics().getCacheSize());
        assertEquals(0L, orCreateCache2.metrics().getCacheSize());
        IgniteDataStreamer dataStreamer = grid.dataStreamer(SecurityServicePermissionsTest.CACHE_NAME);
        Throwable th = null;
        try {
            for (int i = 0; i < 100; i++) {
                dataStreamer.addData(String.valueOf(i), Integer.valueOf(i));
            }
            assertFalse(GridTestUtils.waitForCondition(() -> {
                return dataStreamer.future().isDone();
            }, 5000L));
            if (dataStreamer != null) {
                if (0 != 0) {
                    try {
                        dataStreamer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    dataStreamer.close();
                }
            }
            awaitMetricsUpdate(1, Arrays.asList(grid, grid2));
            assertEquals(100L, orCreateCache.metrics().getCacheSize());
            assertEquals(100L, orCreateCache2.metrics().getCacheSize());
            assertEquals(100L, dr(grid(DrAbstractTest.TOP1_NODE_SND)).senderAggregatedOutMetrics().entriesSent());
        } catch (Throwable th3) {
            if (dataStreamer != null) {
                if (0 != 0) {
                    try {
                        dataStreamer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    dataStreamer.close();
                }
            }
            throw th3;
        }
    }

    private void checkSenderHubResetMetrics(GridDr gridDr) throws Exception {
        gridDr.resetMetrics();
        GridTestUtils.assertThrows(log, () -> {
            return gridDr.receiverAggregatedInMetrics((byte) 2);
        }, IllegalStateException.class, (String) null);
        GridTestUtils.assertThrows(log, () -> {
            return gridDr.receiverAggregatedInMetrics();
        }, IllegalStateException.class, (String) null);
        GridTestUtils.assertThrows(log, () -> {
            return gridDr.receiverAggregatedOutMetrics();
        }, IllegalStateException.class, (String) null);
        DrSenderInMetrics senderAggregatedInMetrics = gridDr.senderAggregatedInMetrics();
        assertEquals(0, senderAggregatedInMetrics.batchesReceived());
        assertEquals(0L, senderAggregatedInMetrics.entriesReceived());
        assertEquals(0L, senderAggregatedInMetrics.bytesReceived());
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP1_NODE_SND), IgniteUtils.makeMBeanName(DrAbstractTest.TOP1_NODE_SND, "dr", "sender.cache"));
        assertEquals(0L, metrics.metric("BatchesReceived"));
        assertEquals(0L, metrics.metric("EntriesReceived"));
        assertEquals(0L, metrics.metric("BytesReceived"));
        DrSenderOutMetrics senderAggregatedOutMetrics = gridDr.senderAggregatedOutMetrics();
        assertEquals(0L, senderAggregatedOutMetrics.bytesSent());
        assertEquals(0, senderAggregatedOutMetrics.batchesSent());
        assertEquals(0L, senderAggregatedOutMetrics.entriesSent());
        assertEquals(0, senderAggregatedOutMetrics.batchesAcked());
        assertEquals(0L, senderAggregatedOutMetrics.entriesAcked());
        assertEquals(0L, senderAggregatedOutMetrics.bytesAcked());
        DrSenderInMetrics senderInMetrics = gridDr.senderInMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        assertEquals(0, senderInMetrics.batchesReceived());
        assertEquals(0L, senderInMetrics.entriesReceived());
        assertEquals(0L, senderInMetrics.bytesReceived());
        DrSenderOutMetrics senderAggregatedOutMetrics2 = gridDr.senderAggregatedOutMetrics((byte) 2);
        assertEquals(0L, senderAggregatedOutMetrics2.bytesSent());
        assertEquals(0, senderAggregatedOutMetrics2.batchesSent());
        assertEquals(0L, senderAggregatedOutMetrics2.entriesSent());
        assertEquals(0, senderAggregatedOutMetrics2.batchesAcked());
        assertEquals(0L, senderAggregatedOutMetrics2.entriesAcked());
        assertEquals(0L, senderAggregatedOutMetrics2.bytesAcked());
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics2 = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP1_NODE_SND), IgniteUtils.makeMBeanName("top1_node_snd_2", "dr", "sender.dc2.cache"));
        assertEquals(0L, metrics2.metric("BatchesSent"));
        assertEquals(0L, metrics2.metric("EntriesSent"));
        assertEquals(0L, metrics2.metric("BytesSent"));
        assertEquals(0L, metrics2.metric("BatchesAcked"));
        assertEquals(0L, metrics2.metric("EntriesAcked"));
        assertEquals(0L, metrics2.metric("BytesAcked"));
        assertEquals(0L, metrics2.metric("AverageBatchAckTime"));
    }

    @Test
    public void testReceiverHubMetrics() throws Exception {
        startTopology(createSenderTopology());
        startTopology(createReceiverTopology());
        IgniteCache cache = grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        GridDr dr = dr(grid(DrAbstractTest.TOP2_NODE_RCV));
        cache.put("test_key", 1);
        waitForCacheReplicated(SecurityServicePermissionsTest.CACHE_NAME, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        DrReceiverInMetrics receiverAggregatedInMetrics = dr.receiverAggregatedInMetrics();
        assertTrue(receiverAggregatedInMetrics.bytesReceived() > 0);
        assertEquals(1, receiverAggregatedInMetrics.batchesReceived());
        assertEquals(1L, receiverAggregatedInMetrics.entriesReceived());
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP2_NODE_RCV), IgniteUtils.makeMBeanName(DrAbstractTest.TOP2_NODE_RCV, "dr", "receiver.dc1.cache"));
        assertEquals(1L, metrics.metric("BatchesReceived"));
        assertEquals(1L, metrics.metric("EntriesReceived"));
        assertTrue(metrics.metric("BytesReceived") > 0);
        DrReceiverInMetrics receiverAggregatedInMetrics2 = dr.receiverAggregatedInMetrics((byte) 1);
        assertEquals(1, receiverAggregatedInMetrics2.batchesReceived());
        assertEquals(1L, receiverAggregatedInMetrics2.entriesReceived());
        assertTrue(receiverAggregatedInMetrics2.bytesReceived() > 0);
        DrReceiverInMetrics receiverAggregatedInMetrics3 = dr.receiverAggregatedInMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        assertEquals(1, receiverAggregatedInMetrics3.batchesReceived());
        assertEquals(1L, receiverAggregatedInMetrics3.entriesReceived());
        assertTrue(receiverAggregatedInMetrics3.bytesReceived() > 0);
        DrReceiverInMetrics receiverInMetrics = dr.receiverInMetrics((byte) 1, SecurityServicePermissionsTest.CACHE_NAME);
        assertEquals(1, receiverInMetrics.batchesReceived());
        assertEquals(1L, receiverInMetrics.entriesReceived());
        assertTrue(receiverInMetrics.bytesReceived() > 0);
        DrReceiverInMetrics receiverAggregatedInMetrics4 = dr.receiverAggregatedInMetrics((byte) 1);
        assertEquals(receiverAggregatedInMetrics.batchesReceived(), receiverAggregatedInMetrics4.batchesReceived());
        assertEquals(receiverAggregatedInMetrics.entriesReceived(), receiverAggregatedInMetrics4.entriesReceived());
        assertEquals(receiverAggregatedInMetrics.bytesReceived(), receiverAggregatedInMetrics4.bytesReceived());
        DrReceiverOutMetrics receiverAggregatedOutMetrics = dr.receiverAggregatedOutMetrics();
        assertEquals(1, receiverAggregatedOutMetrics.batchesAcked());
        assertEquals(1L, receiverAggregatedOutMetrics.entriesAcked());
        assertTrue(receiverAggregatedOutMetrics.bytesAcked() > 0);
        assertEquals(1, receiverAggregatedOutMetrics.batchesSent());
        assertEquals(1L, receiverAggregatedOutMetrics.entriesSent());
        assertTrue(receiverAggregatedOutMetrics.bytesSent() > 0);
        assertTrue(receiverAggregatedOutMetrics.averageBatchAckTime() > 0.1d);
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics2 = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP2_NODE_RCV), IgniteUtils.makeMBeanName(DrAbstractTest.TOP2_NODE_RCV, "dr", "receiver.cache"));
        assertEquals(1L, metrics2.metric("BatchesSent"));
        assertEquals(1L, metrics2.metric("EntriesSent"));
        assertTrue(metrics2.metric("BytesSent") > 0);
        assertEquals(1L, metrics2.metric("BatchesAcked"));
        assertEquals(1L, metrics2.metric("EntriesAcked"));
        assertTrue(metrics2.metric("BytesAcked") > 0);
        assertTrue(metrics2.metric("AverageBatchAckTime") > 0);
        assertEquals(1L, dr(grid(DrAbstractTest.TOP2_NODE)).receiverCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).entriesReceived());
        GridTestUtils.assertThrows(log, () -> {
            return dr(grid(DrAbstractTest.TOP2_NODE)).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        }, IllegalStateException.class, (String) null);
        checkReceiverHubResetMetrics(dr);
    }

    @Test
    public void testReceiverHubMetricsWithTwoCaches() throws Exception {
        startTopology(createSenderTopology());
        startTopology(createReceiverTopology());
        IgniteCache cache = grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        IgniteCache cache2 = grid(DrAbstractTest.TOP1_NODE).cache("cache_2");
        GridDr dr = dr(grid(DrAbstractTest.TOP2_NODE_RCV));
        for (int i = 0; i < 100; i++) {
            cache.put(String.valueOf(i), Integer.valueOf(i));
            cache2.put(String.valueOf(i), Integer.valueOf(i));
        }
        waitForCacheReplicated(SecurityServicePermissionsTest.CACHE_NAME, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        waitForCacheReplicated("cache_2", DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        DrReceiverInMetrics receiverAggregatedInMetrics = dr.receiverAggregatedInMetrics();
        assertEquals(2 * 100, receiverAggregatedInMetrics.batchesReceived());
        assertEquals(2 * 100, receiverAggregatedInMetrics.entriesReceived());
        assertTrue(receiverAggregatedInMetrics.bytesReceived() > 0);
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP2_NODE_RCV), IgniteUtils.makeMBeanName(DrAbstractTest.TOP2_NODE_RCV, "dr", "receiver.dc1.cache"));
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics2 = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP2_NODE_RCV), IgniteUtils.makeMBeanName(DrAbstractTest.TOP2_NODE_RCV, "dr", "receiver.dc1.cache_2"));
        assertEquals(100, metrics.metric("BatchesReceived"));
        assertEquals(100, metrics2.metric("BatchesReceived"));
        assertEquals(100, metrics.metric("EntriesReceived"));
        assertEquals(100, metrics2.metric("EntriesReceived"));
        assertTrue(metrics.metric("BytesReceived") > 0);
        assertTrue(metrics2.metric("BytesReceived") > 0);
        DrReceiverOutMetrics receiverAggregatedOutMetrics = dr.receiverAggregatedOutMetrics();
        assertEquals(2 * 100, receiverAggregatedOutMetrics.batchesSent());
        assertEquals(2 * 100, receiverAggregatedOutMetrics.entriesSent());
        assertTrue(receiverAggregatedOutMetrics.bytesSent() > 0);
        assertEquals(2 * 100, receiverAggregatedOutMetrics.batchesAcked());
        assertEquals(2 * 100, receiverAggregatedOutMetrics.entriesAcked());
        assertTrue(receiverAggregatedOutMetrics.bytesAcked() > 0);
        assertTrue(receiverAggregatedOutMetrics.averageBatchAckTime() > 0.1d);
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics3 = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP2_NODE_RCV), IgniteUtils.makeMBeanName(DrAbstractTest.TOP2_NODE_RCV, "dr", "receiver.cache"));
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics4 = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP2_NODE_RCV), IgniteUtils.makeMBeanName(DrAbstractTest.TOP2_NODE_RCV, "dr", "receiver.cache_2"));
        assertEquals(100, metrics3.metric("BatchesSent"));
        assertEquals(100, metrics4.metric("BatchesSent"));
        assertEquals(100, metrics3.metric("EntriesSent"));
        assertEquals(100, metrics4.metric("EntriesSent"));
        assertTrue(metrics3.metric("BytesSent") > 0);
        assertTrue(metrics4.metric("BytesSent") > 0);
        assertEquals(100, metrics3.metric("BatchesAcked"));
        assertEquals(100, metrics4.metric("BatchesAcked"));
        assertEquals(100, metrics3.metric("EntriesAcked"));
        assertEquals(100, metrics4.metric("EntriesAcked"));
        assertTrue(metrics3.metric("BytesAcked") > 0);
        assertTrue(metrics4.metric("BytesAcked") > 0);
        assertTrue(metrics3.metric("AverageBatchAckTime") > 0);
        assertTrue(metrics4.metric("AverageBatchAckTime") > 0);
        CacheDrReceiverMetrics receiverCacheMetrics = dr(grid(DrAbstractTest.TOP2_NODE)).receiverCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        CacheDrReceiverMetrics receiverCacheMetrics2 = dr(grid(DrAbstractTest.TOP2_NODE)).receiverCacheMetrics("cache_2");
        assertEquals(100, receiverCacheMetrics.entriesReceived());
        assertEquals(100, receiverCacheMetrics2.entriesReceived());
    }

    private void checkReceiverHubResetMetrics(GridDr gridDr) throws Exception {
        gridDr.resetMetrics();
        DrReceiverInMetrics receiverAggregatedInMetrics = gridDr.receiverAggregatedInMetrics();
        assertEquals(0, receiverAggregatedInMetrics.batchesReceived());
        assertEquals(0L, receiverAggregatedInMetrics.entriesReceived());
        assertEquals(0L, receiverAggregatedInMetrics.bytesReceived());
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP2_NODE_RCV), IgniteUtils.makeMBeanName(DrAbstractTest.TOP2_NODE_RCV, "dr", "receiver.dc1.cache"));
        assertEquals(0L, metrics.metric("BatchesReceived"));
        assertEquals(0L, metrics.metric("EntriesReceived"));
        assertEquals(0L, metrics.metric("BytesReceived"));
        DrReceiverOutMetrics receiverAggregatedOutMetrics = gridDr.receiverAggregatedOutMetrics();
        assertEquals(0, receiverAggregatedOutMetrics.batchesAcked());
        assertEquals(0L, receiverAggregatedOutMetrics.entriesAcked());
        assertEquals(0L, receiverAggregatedOutMetrics.bytesAcked());
        assertEquals(0, receiverAggregatedOutMetrics.batchesSent());
        assertEquals(0L, receiverAggregatedOutMetrics.entriesSent());
        assertEquals(0L, receiverAggregatedOutMetrics.bytesSent());
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics2 = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP2_NODE_RCV), IgniteUtils.makeMBeanName(DrAbstractTest.TOP2_NODE_RCV, "dr", "receiver.cache"));
        assertEquals(0L, metrics2.metric("BatchesSent"));
        assertEquals(0L, metrics2.metric("EntriesSent"));
        assertEquals(0L, metrics2.metric("BytesSent"));
        assertEquals(0L, metrics2.metric("BatchesAcked"));
        assertEquals(0L, metrics2.metric("EntriesAcked"));
        assertEquals(0L, metrics2.metric("BytesAcked"));
        assertEquals(0L, metrics2.metric("AverageBatchAckTime"));
    }

    @Test
    public void testSenderHubMetricsWithTwoCaches() throws Exception {
        startTopology(createSenderTopology());
        startTopology(createReceiverTopology());
        IgniteCache cache = grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        IgniteCache cache2 = grid(DrAbstractTest.TOP1_NODE).cache("cache_2");
        GridDr dr = dr(grid(DrAbstractTest.TOP1_NODE_SND));
        cache.put("1", 1);
        cache2.put("2", 2);
        waitForCacheReplicated(SecurityServicePermissionsTest.CACHE_NAME, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        DrSenderInMetrics senderAggregatedInMetrics = dr.senderAggregatedInMetrics();
        assertEquals(2, senderAggregatedInMetrics.batchesReceived());
        assertEquals(2L, senderAggregatedInMetrics.entriesReceived());
        assertTrue(senderAggregatedInMetrics.bytesReceived() > 0);
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP1_NODE_SND), IgniteUtils.makeMBeanName(DrAbstractTest.TOP1_NODE_SND, "dr", "sender.cache"));
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics2 = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP1_NODE_SND), IgniteUtils.makeMBeanName(DrAbstractTest.TOP1_NODE_SND, "dr", "sender.cache_2"));
        assertEquals(1L, metrics.metric("BatchesReceived"));
        assertEquals(1L, metrics.metric("EntriesReceived"));
        assertTrue(metrics.metric("BytesReceived") > 0);
        assertEquals(1L, metrics2.metric("BatchesReceived"));
        assertEquals(1L, metrics2.metric("EntriesReceived"));
        assertTrue(metrics2.metric("BytesReceived") > 0);
        DrSenderOutMetrics senderAggregatedOutMetrics = dr.senderAggregatedOutMetrics();
        assertEquals(2, senderAggregatedOutMetrics.batchesSent());
        assertEquals(2L, senderAggregatedOutMetrics.entriesSent());
        assertTrue(senderAggregatedOutMetrics.bytesSent() > 0);
        assertEquals(2, senderAggregatedOutMetrics.batchesAcked());
        assertEquals(2L, senderAggregatedOutMetrics.entriesAcked());
        assertTrue(senderAggregatedOutMetrics.bytesAcked() > 0);
        assertTrue(senderAggregatedOutMetrics.averageBatchAckTime() > 0.1d);
        DrSenderOutMetrics senderAggregatedOutMetrics2 = dr.senderAggregatedOutMetrics((byte) 2);
        assertEquals(2, senderAggregatedOutMetrics.batchesSent());
        assertEquals(2L, senderAggregatedOutMetrics.entriesSent());
        assertTrue(senderAggregatedOutMetrics2.bytesSent() > 0);
        assertEquals(2, senderAggregatedOutMetrics2.batchesAcked());
        assertEquals(2L, senderAggregatedOutMetrics2.entriesAcked());
        assertTrue(senderAggregatedOutMetrics2.bytesAcked() > 0);
        assertTrue(senderAggregatedOutMetrics2.averageBatchAckTime() > 0.1d);
        DrSenderInMetrics senderInMetrics = dr.senderInMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        assertEquals(1, senderInMetrics.batchesReceived());
        assertEquals(1L, senderInMetrics.entriesReceived());
        assertTrue(senderInMetrics.bytesReceived() > 0);
        DrSenderInMetrics senderInMetrics2 = dr.senderInMetrics("cache_2");
        assertEquals(1, senderInMetrics2.batchesReceived());
        assertEquals(1L, senderInMetrics2.entriesReceived());
        assertTrue(senderInMetrics2.bytesReceived() > 0);
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics3 = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP1_NODE_SND), IgniteUtils.makeMBeanName(DrAbstractTest.TOP1_NODE_SND, "dr", "sender.dc2.cache"));
        DrJmxMetricsAbstractTest.MetricsStateHolder metrics4 = DrJmxMetricsAbstractTest.metrics(grid(DrAbstractTest.TOP1_NODE_SND), IgniteUtils.makeMBeanName(DrAbstractTest.TOP1_NODE_SND, "dr", "sender.dc2.cache_2"));
        assertEquals(1L, metrics3.metric("BatchesSent"));
        assertEquals(1L, metrics4.metric("BatchesSent"));
        assertEquals(1L, metrics3.metric("EntriesSent"));
        assertEquals(1L, metrics4.metric("EntriesSent"));
        assertTrue(metrics3.metric("BytesSent") > 0);
        assertTrue(metrics4.metric("BytesSent") > 0);
        assertEquals(1L, metrics3.metric("BatchesAcked"));
        assertEquals(1L, metrics4.metric("BatchesAcked"));
        assertEquals(1L, metrics3.metric("EntriesAcked"));
        assertEquals(1L, metrics4.metric("EntriesAcked"));
        assertTrue(metrics3.metric("BytesAcked") > 0);
        assertTrue(metrics4.metric("BytesAcked") > 0);
        assertTrue(metrics3.metric("AverageBatchAckTime") > 0);
        assertTrue(metrics4.metric("AverageBatchAckTime") > 0);
    }

    @Test
    public void testRemoveMetricsOnCacheDestroy() throws Exception {
        startTopology(createSenderTopology());
        startTopology(createReceiverTopology());
        IgniteCache cache = grid(DrAbstractTest.TOP1_NODE).cache(SecurityServicePermissionsTest.CACHE_NAME);
        cache.put(primaryKey(cache), 1);
        cache.put(backupKey(cache), Integer.valueOf(FILTERED_ENTRY_VALUE));
        GridDr dr = dr(grid(DrAbstractTest.TOP1_NODE_SND));
        waitForCacheReplicated(SecurityServicePermissionsTest.CACHE_NAME, DrAbstractTest.TOP1_NODE, DrAbstractTest.TOP1_NODE_2);
        DrSenderInMetrics senderAggregatedInMetrics = dr.senderAggregatedInMetrics();
        assertEquals(1, senderAggregatedInMetrics.batchesReceived());
        assertEquals(1L, senderAggregatedInMetrics.entriesReceived());
        assertTrue(senderAggregatedInMetrics.bytesReceived() > 0);
        DrSenderOutMetrics senderAggregatedOutMetrics = dr.senderAggregatedOutMetrics();
        assertEquals(1, senderAggregatedOutMetrics.batchesSent());
        assertEquals(1L, senderAggregatedOutMetrics.entriesSent());
        assertTrue(senderAggregatedOutMetrics.bytesSent() > 0);
        assertEquals(1, senderAggregatedOutMetrics.batchesAcked());
        assertEquals(1L, senderAggregatedOutMetrics.entriesAcked());
        assertTrue(senderAggregatedOutMetrics.bytesAcked() > 0);
        assertTrue(senderAggregatedOutMetrics.averageBatchAckTime() > 0.1d);
        CacheDrSenderMetrics senderCacheMetrics = dr(grid(DrAbstractTest.TOP1_NODE)).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        assertEquals(1, senderCacheMetrics.batchesSent());
        assertEquals(1L, senderCacheMetrics.entriesSent());
        assertEquals(1, senderCacheMetrics.batchesAcked());
        assertEquals(0, senderCacheMetrics.batchesFailed());
        assertEquals(0L, senderCacheMetrics.entriesFiltered());
        assertEquals(1L, senderCacheMetrics.backupQueueSize());
        CacheDrSenderMetrics senderCacheMetrics2 = dr(grid(DrAbstractTest.TOP1_NODE_2)).senderCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME);
        assertEquals(0, senderCacheMetrics2.batchesSent());
        assertEquals(0L, senderCacheMetrics2.entriesSent());
        assertEquals(0, senderCacheMetrics2.batchesAcked());
        assertEquals(0, senderCacheMetrics2.batchesFailed());
        assertEquals(1L, senderCacheMetrics2.entriesFiltered());
        assertEquals(1L, senderCacheMetrics2.backupQueueSize());
        GridDr dr2 = dr(grid(DrAbstractTest.TOP2_NODE_RCV));
        DrReceiverInMetrics receiverAggregatedInMetrics = dr2.receiverAggregatedInMetrics();
        assertEquals(1, receiverAggregatedInMetrics.batchesReceived());
        assertEquals(1L, receiverAggregatedInMetrics.entriesReceived());
        assertTrue(receiverAggregatedInMetrics.bytesReceived() > 0);
        DrReceiverOutMetrics receiverAggregatedOutMetrics = dr2.receiverAggregatedOutMetrics();
        assertEquals(1, receiverAggregatedOutMetrics.batchesAcked());
        assertEquals(1L, receiverAggregatedOutMetrics.entriesAcked());
        assertTrue(receiverAggregatedOutMetrics.bytesAcked() > 0);
        assertEquals(1, receiverAggregatedOutMetrics.batchesSent());
        assertEquals(1L, receiverAggregatedOutMetrics.entriesSent());
        assertTrue(receiverAggregatedOutMetrics.bytesSent() > 0);
        assertTrue(receiverAggregatedOutMetrics.averageBatchAckTime() > 0.1d);
        assertEquals(1L, dr(grid(DrAbstractTest.TOP2_NODE)).receiverCacheMetrics(SecurityServicePermissionsTest.CACHE_NAME).entriesReceived());
        grid(DrAbstractTest.TOP1_NODE).destroyCache(SecurityServicePermissionsTest.CACHE_NAME);
        grid(DrAbstractTest.TOP2_NODE).destroyCache(SecurityServicePermissionsTest.CACHE_NAME);
        assertMetricRegistryEmpty(DrAbstractTest.TOP1_NODE_SND, "dr.sender", SecurityServicePermissionsTest.CACHE_NAME);
        assertMetricRegistryEmpty(DrAbstractTest.TOP1_NODE_SND, "dr.sender", (byte) 2, SecurityServicePermissionsTest.CACHE_NAME);
        assertMetricRegistryEmpty(DrAbstractTest.TOP2_NODE_RCV, "dr.receiver", SecurityServicePermissionsTest.CACHE_NAME);
        assertMetricRegistryEmpty(DrAbstractTest.TOP2_NODE_RCV, "dr.receiver", (byte) 1, SecurityServicePermissionsTest.CACHE_NAME);
    }

    private void assertMetricRegistryEmpty(String str, String str2, byte b, String str3) throws IgniteInterruptedCheckedException {
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return !grid(str).context().metric().registry(new StringBuilder().append(str2).append(".dc").append((int) b).append(".").append(str3).toString()).iterator().hasNext();
        }, 5000L));
    }

    private void assertMetricRegistryEmpty(String str, String str2, String str3) throws IgniteInterruptedCheckedException {
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return !grid(str).context().metric().registry(new StringBuilder().append(str2).append(".").append(str3).toString()).iterator().hasNext();
        }, 5000L));
    }

    protected TcpDiscoveryIpFinder createSenderTopology() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        GridGainConfiguration batchSendSizeBytes = new GridGainConfiguration().setBatchSendSizeBytes(256);
        GridGainConfiguration batchSendSizeBytes2 = new GridGainConfiguration().setBatchSendSizeBytes(256);
        CacheConfiguration affinity = senderCacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, 1, 1000L).setBackups(1).setAffinity(new RendezvousAffinityFunction().setPartitions(8));
        CacheConfiguration affinity2 = senderCacheConfig("cache_2", CacheMode.PARTITIONED, 1, 1000L).setBackups(1).setAffinity(new RendezvousAffinityFunction().setPartitions(8));
        ggCacheConfig(affinity).getDrSenderConfiguration().setBackupSyncFrequency(2147483647L).setEntryFilter(new DrEntryFilter());
        ggCacheConfig(affinity).getDrSenderConfiguration().setBackupSyncFrequency(2147483647L).setEntryFilter(new DrEntryFilter());
        addTopology(ipFinder, config(batchSendSizeBytes, DrAbstractTest.TOP1_NODE, (byte) 1, ipFinder, null, null, affinity, affinity2), config(batchSendSizeBytes2, DrAbstractTest.TOP1_NODE_2, (byte) 1, ipFinder, null, null, affinity, affinity2), senderNodeConfig(ipFinder));
        return ipFinder;
    }

    private TcpDiscoveryIpFinder createReceiverTopology() throws Exception {
        TcpDiscoveryIpFinder ipFinder = ipFinder();
        addTopology(ipFinder, config(new GridGainConfiguration(), DrAbstractTest.TOP2_NODE, (byte) 2, ipFinder, null, null, cacheConfig(SecurityServicePermissionsTest.CACHE_NAME, CacheMode.PARTITIONED, false).setAffinity(new RendezvousAffinityFunction().setPartitions(8)), cacheConfig("cache_2", CacheMode.PARTITIONED, false).setAffinity(new RendezvousAffinityFunction().setPartitions(8))), receiverConfig(ipFinder));
        return ipFinder;
    }

    private IgniteConfiguration receiverConfig(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws IgniteCheckedException {
        return config(new GridGainConfiguration(), DrAbstractTest.TOP2_NODE_RCV, (byte) 2, tcpDiscoveryIpFinder, null, new DrReceiverConfiguration().setPerNodeBufferSize(1).setPerNodeParallelLoadOperations(1).setMessageQueueLimit(3).setLocalInboundPort(DrAbstractTest.RCV_PORT_1), new CacheConfiguration[0]);
    }

    private IgniteConfiguration senderNodeConfig(TcpDiscoveryIpFinder tcpDiscoveryIpFinder) throws Exception {
        return config(new GridGainConfiguration(), DrAbstractTest.TOP1_NODE_SND, (byte) 1, tcpDiscoveryIpFinder, senderHubConfig(senderHubReplicaConfig((byte) 2, DrAbstractTest.SND_ADDR_1)).setFullStateTransferBufferSize(33554432L), null, new CacheConfiguration[0]);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.internal.processors.dr.DrAbstractTest
    public void waitForCacheReplicated(String str, String... strArr) throws IgniteInterruptedCheckedException {
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return Arrays.stream(strArr).allMatch(str2 -> {
                return dr(grid(str2)).senderCacheMetrics(str).pendingQueueSize() == 0;
            });
        }, 15000L));
    }

    private void waitForEmptyBackupQueue(String str, String... strArr) throws IgniteInterruptedCheckedException {
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return Arrays.stream(strArr).allMatch(str2 -> {
                return dr(grid(str2)).senderCacheMetrics(str).backupQueueSize() == 0;
            });
        }, 15000L));
    }
}
