package org.apache.ignite.messaging;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.class */
public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest implements Serializable {
    private static final int THREADS = 10;
    private final String TOPIC = "topic";
    private final String msgStr = "message";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/messaging/IgniteMessagingSendAsyncTest$Message.class */
    public static class Message implements Serializable {
        private final String threadName;
        private final String msg;

        private Message(String str, String str2) {
            this.threadName = str;
            this.msg = str2;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTest() throws Exception {
        stopAllGrids();
        super.afterTest();
    }

    @Test
    public void testSendDefaultMode() throws Exception {
        send(startGrid(1).message(), "message", new IgniteBiInClosure<String, Thread>() { // from class: org.apache.ignite.messaging.IgniteMessagingSendAsyncTest.1
            public void apply(String str, Thread thread) {
                Assert.assertEquals(Thread.currentThread(), thread);
                Assert.assertEquals("message", str);
            }
        }, false);
    }

    @Test
    public void testSendAsyncMode() throws Exception {
        send(startGrid(1).message(), "message", new IgniteBiInClosure<String, Thread>() { // from class: org.apache.ignite.messaging.IgniteMessagingSendAsyncTest.2
            public void apply(String str, Thread thread) {
                Assert.assertTrue(!Thread.currentThread().equals(thread));
                Assert.assertEquals("message", str);
            }
        }, true);
    }

    @Test
    public void testSendDefaultMode2Nodes() throws Exception {
        sendWith2Nodes(startGrid(2), startGrid(1).message(), "message", new IgniteBiInClosure<String, Thread>() { // from class: org.apache.ignite.messaging.IgniteMessagingSendAsyncTest.3
            public void apply(String str, Thread thread) {
                Assert.assertEquals(Thread.currentThread(), thread);
                Assert.assertEquals("message", str);
            }
        }, false);
    }

    @Test
    public void testSendAsyncMode2Node() throws Exception {
        sendWith2Nodes(startGrid(2), startGrid(1).message(), "message", new IgniteBiInClosure<String, Thread>() { // from class: org.apache.ignite.messaging.IgniteMessagingSendAsyncTest.4
            public void apply(String str, Thread thread) {
                Assert.assertTrue(!Thread.currentThread().equals(thread));
                Assert.assertEquals("message", str);
            }
        }, true);
    }

    @Test
    public void testSendOrderedDefaultMode() throws Exception {
        IgniteEx startGrid = startGrid(1);
        final List<String> orderedMessages = orderedMessages();
        sendOrdered(startGrid.message(), orderedMessages, new IgniteBiInClosure<List<String>, List<Thread>>() { // from class: org.apache.ignite.messaging.IgniteMessagingSendAsyncTest.5
            public void apply(List<String> list, List<Thread> list2) {
                IgniteMessagingSendAsyncTest.assertFalse(list2.contains(Thread.currentThread()));
                IgniteMessagingSendAsyncTest.assertTrue(orderedMessages.equals(list));
            }
        });
    }

    @Test
    public void testSendOrderedDefaultMode2Node() throws Exception {
        IgniteEx startGrid = startGrid(1);
        IgniteEx startGrid2 = startGrid(2);
        final List<String> orderedMessages = orderedMessages();
        sendOrderedWith2Node(startGrid2, startGrid.message(), orderedMessages, new IgniteBiInClosure<List<String>, List<Thread>>() { // from class: org.apache.ignite.messaging.IgniteMessagingSendAsyncTest.6
            public void apply(List<String> list, List<Thread> list2) {
                IgniteMessagingSendAsyncTest.assertFalse(list2.contains(Thread.currentThread()));
                IgniteMessagingSendAsyncTest.assertTrue(orderedMessages.equals(list));
            }
        });
    }

    @Test
    public void testSendOrderedDefaultModeMultiThreads() throws Exception {
        sendOrderedMultiThreads(startGrid(1).message());
    }

    @Test
    public void testSendOrderedDefaultModeMultiThreadsWith2Node() throws Exception {
        sendOrderedMultiThreadsWith2Node(startGrid(2), startGrid(1).message());
    }

    private void sendOrderedMultiThreadsWith2Node(Ignite ignite, IgniteMessaging igniteMessaging) throws Exception {
        sendOrderedMultiThreadsWith2Node(ignite, igniteMessaging, Maps.newConcurrentMap(), Maps.newConcurrentMap(), orderedMessages());
    }

    private void sendOrderedMultiThreads(IgniteMessaging igniteMessaging) throws Exception {
        sendOrderedMultiThreads(igniteMessaging, Maps.newConcurrentMap(), Maps.newConcurrentMap(), orderedMessages());
    }

    private void sendOrderedMultiThreadsWith2Node(Ignite ignite, IgniteMessaging igniteMessaging, ConcurrentMap<String, List<String>> concurrentMap, ConcurrentMap<String, List<String>> concurrentMap2, List<String> list) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(10 * list.size());
        final ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        ignite.message().localListen("topic", new IgniteBiPredicate<UUID, Message>() { // from class: org.apache.ignite.messaging.IgniteMessagingSendAsyncTest.7
            public boolean apply(UUID uuid, Message message) {
                newConcurrentMap.putIfAbsent(message.threadName, Lists.newArrayList());
                ((List) newConcurrentMap.get(message.threadName)).add(message.msg);
                countDownLatch.countDown();
                return true;
            }
        });
        sendOrderedMultiThreads(igniteMessaging, concurrentMap, concurrentMap2, list);
        countDownLatch.await();
        assertEquals(concurrentMap.size(), newConcurrentMap.size());
        for (Map.Entry<String, List<String>> entry : concurrentMap.entrySet()) {
            assertTrue(((List) newConcurrentMap.get(entry.getKey())).equals(entry.getValue()));
        }
    }

    private void sendOrderedMultiThreads(final IgniteMessaging igniteMessaging, final ConcurrentMap<String, List<String>> concurrentMap, final ConcurrentMap<String, List<String>> concurrentMap2, final List<String> list) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(10 * list.size());
        igniteMessaging.localListen("topic", new IgniteBiPredicate<UUID, Message>() { // from class: org.apache.ignite.messaging.IgniteMessagingSendAsyncTest.8
            public boolean apply(UUID uuid, Message message) {
                concurrentMap2.putIfAbsent(message.threadName, Lists.newArrayList());
                ((List) concurrentMap2.get(message.threadName)).add(message.msg);
                countDownLatch.countDown();
                return true;
            }
        });
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() { // from class: org.apache.ignite.messaging.IgniteMessagingSendAsyncTest.9
                @Override // java.lang.Runnable
                public void run() {
                    String name = Thread.currentThread().getName();
                    ArrayList newArrayList = Lists.newArrayList();
                    concurrentMap.put(name, newArrayList);
                    for (String str : list) {
                        newArrayList.add(str);
                        igniteMessaging.sendOrdered("topic", new Message(name, str), 1000L);
                    }
                }
            }).start();
        }
        countDownLatch.await();
        assertEquals(concurrentMap.size(), concurrentMap2.size());
        for (Map.Entry<String, List<String>> entry : concurrentMap.entrySet()) {
            assertTrue(concurrentMap2.get(entry.getKey()).equals(entry.getValue()));
        }
    }

    private void sendWith2Nodes(Ignite ignite, IgniteMessaging igniteMessaging, final String str, IgniteBiInClosure<String, Thread> igniteBiInClosure, boolean z) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ignite.message().localListen("topic", new IgniteBiPredicate<UUID, String>() { // from class: org.apache.ignite.messaging.IgniteMessagingSendAsyncTest.10
            public boolean apply(UUID uuid, String str2) {
                Assert.assertEquals(str, str2);
                countDownLatch.countDown();
                return true;
            }
        });
        send(igniteMessaging, str, igniteBiInClosure, z);
        countDownLatch.await();
    }

    private void send(IgniteMessaging igniteMessaging, String str, IgniteBiInClosure<String, Thread> igniteBiInClosure, boolean z) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        igniteMessaging.localListen("topic", new IgniteBiPredicate<UUID, String>() { // from class: org.apache.ignite.messaging.IgniteMessagingSendAsyncTest.11
            public boolean apply(UUID uuid, String str2) {
                atomicReference.set(Thread.currentThread());
                atomicReference2.set(str2);
                countDownLatch.countDown();
                return true;
            }
        });
        if (z) {
            igniteMessaging.withAsync().send("topic", str);
        } else {
            igniteMessaging.send("topic", str);
        }
        countDownLatch.await();
        igniteBiInClosure.apply(atomicReference2.get(), atomicReference.get());
    }

    private void sendOrderedWith2Node(Ignite ignite, IgniteMessaging igniteMessaging, List<String> list, IgniteBiInClosure<List<String>, List<Thread>> igniteBiInClosure) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(list.size());
        final ArrayList newArrayList = Lists.newArrayList();
        ignite.message().localListen("topic", new IgniteBiPredicate<UUID, String>() { // from class: org.apache.ignite.messaging.IgniteMessagingSendAsyncTest.12
            public boolean apply(UUID uuid, String str) {
                newArrayList.add(str);
                countDownLatch.countDown();
                return true;
            }
        });
        sendOrdered(igniteMessaging, list, igniteBiInClosure);
        countDownLatch.await();
        assertTrue(list.equals(newArrayList));
    }

    private <T> void sendOrdered(IgniteMessaging igniteMessaging, List<T> list, IgniteBiInClosure<List<T>, List<Thread>> igniteBiInClosure) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(list.size());
        final ArrayList newArrayList = Lists.newArrayList();
        final ArrayList newArrayList2 = Lists.newArrayList();
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            igniteMessaging.sendOrdered("topic", it.next(), 1000L);
        }
        igniteMessaging.localListen("topic", new IgniteBiPredicate<UUID, T>() { // from class: org.apache.ignite.messaging.IgniteMessagingSendAsyncTest.13
            public boolean apply(UUID uuid, T t) {
                newArrayList.add(t);
                newArrayList2.add(Thread.currentThread());
                countDownLatch.countDown();
                return true;
            }

            /* JADX WARN: Multi-variable type inference failed */
            public /* bridge */ /* synthetic */ boolean apply(Object obj, Object obj2) {
                return apply((UUID) obj, (UUID) obj2);
            }
        });
        countDownLatch.await();
        igniteBiInClosure.apply(newArrayList, newArrayList2);
    }

    private List<String> orderedMessages() {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 1000; i++) {
            newArrayList.add(String.valueOf(ThreadLocalRandom.current().nextInt()));
        }
        return newArrayList;
    }
}
