package org.apache.ignite.messaging;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.internal.DiscoverySpiTestListener;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.config.GridTestProperties;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/ignite/messaging/GridMessagingSelfTest.class */
public class GridMessagingSelfTest extends GridCommonAbstractTest implements Serializable {
    private static final String MSG_1 = "MSG-1";
    private static final String MSG_2 = "MSG-2";
    private static final String MSG_3 = "MSG-3";
    private static final String S_TOPIC_1 = "TOPIC-1";
    private static final String S_TOPIC_2 = "TOPIC-2";
    private static final Integer I_TOPIC_1 = 1;
    private static final Integer I_TOPIC_2 = 2;
    private static AtomicInteger MSG_CNT;
    public static final String EXT_RESOURCE_CLS_NAME = "org.apache.ignite.tests.p2p.TestUserResource";
    protected static CountDownLatch rcvLatch;
    protected Ignite ignite1;
    protected Ignite ignite2;

    /* loaded from: input_file:org/apache/ignite/messaging/GridMessagingSelfTest$TestMessage.class */
    private static class TestMessage implements Externalizable {
        private Object body;
        private long delayMs;

        public TestMessage() {
        }

        TestMessage(Object obj) {
            this.body = obj;
        }

        TestMessage(Object obj, long j) {
            this.body = obj;
            this.delayMs = j;
        }

        public String toString() {
            return "TestMessage [body=" + this.body + "]";
        }

        public int hashCode() {
            return this.body.hashCode();
        }

        public boolean equals(Object obj) {
            return (obj instanceof TestMessage) && this.body.equals(((TestMessage) obj).body);
        }

        @Override // java.io.Externalizable
        public void writeExternal(ObjectOutput objectOutput) throws IOException {
            if (this.delayMs > 0) {
                try {
                    Thread.sleep(this.delayMs);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            objectOutput.writeObject(this.body);
        }

        @Override // java.io.Externalizable
        public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
            this.body = objectInput.readObject();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/messaging/GridMessagingSelfTest$TestTopic.class */
    public enum TestTopic {
        TOPIC_1,
        TOPIC_2
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.JUnit3TestLegacySupport
    public void beforeTest() throws Exception {
        MSG_CNT = new AtomicInteger();
        this.ignite1 = startGrid(1);
        this.ignite2 = startGrid(2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.JUnit3TestLegacySupport
    public void afterTest() throws Exception {
        stopAllGrids();
        this.ignite1 = null;
        this.ignite2 = null;
    }

    @Test
    public void testSendReceiveMessage() throws Exception {
        final GridConcurrentHashSet gridConcurrentHashSet = new GridConcurrentHashSet();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        this.ignite1.message().localListen((Object) null, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.1
            public boolean apply(UUID uuid, Object obj) {
                try {
                    GridMessagingSelfTest.this.log.info("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ']');
                    if (uuid.equals(GridMessagingSelfTest.this.ignite2.cluster().localNode().id())) {
                        gridConcurrentHashSet.add(obj);
                        countDownLatch.countDown();
                        return true;
                    }
                    GridMessagingSelfTest.this.log.error("Unexpected sender node: " + uuid);
                    atomicBoolean.set(true);
                    countDownLatch.countDown();
                    return false;
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        });
        ClusterGroup forRemotes = this.ignite2.cluster().forRemotes();
        message(forRemotes).send((Object) null, MSG_1);
        message(forRemotes).send((Object) null, MSG_2);
        message(forRemotes).send((Object) null, MSG_3);
        assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        assertFalse(atomicBoolean.get());
        assertTrue(gridConcurrentHashSet.contains(MSG_1));
        assertTrue(gridConcurrentHashSet.contains(MSG_2));
        assertTrue(gridConcurrentHashSet.contains(MSG_3));
    }

    @Test
    public void testStopLocalListen() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        final AtomicInteger atomicInteger3 = new AtomicInteger();
        P2<UUID, Object> p2 = new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.2
            public boolean apply(UUID uuid, Object obj) {
                GridMessagingSelfTest.this.log.info("Listener1 received new message [msg=" + obj + ", senderNodeId=" + uuid + ']');
                atomicInteger.incrementAndGet();
                return true;
            }
        };
        P2<UUID, Object> p22 = new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.3
            public boolean apply(UUID uuid, Object obj) {
                GridMessagingSelfTest.this.log.info("Listener2 received new message [msg=" + obj + ", senderNodeId=" + uuid + ']');
                atomicInteger2.incrementAndGet();
                return true;
            }
        };
        P2<UUID, Object> p23 = new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.4
            public boolean apply(UUID uuid, Object obj) {
                GridMessagingSelfTest.this.log.info("Listener3 received new message [msg=" + obj + ", senderNodeId=" + uuid + ']');
                atomicInteger3.incrementAndGet();
                return true;
            }
        };
        this.ignite1.message().localListen((Object) null, p2);
        this.ignite1.message().localListen("top1", p22);
        this.ignite1.message().localListen("top3", p23);
        ClusterGroup forRemotes = this.ignite2.cluster().forRemotes();
        message(forRemotes).send((Object) null, "msg1-1");
        message(forRemotes).send("top1", "msg1-2");
        message(forRemotes).send("top3", "msg1-3");
        GridTestUtils.waitForCondition(new PA() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.5
            public boolean apply() {
                return atomicInteger.get() > 0 && atomicInteger2.get() > 0 && atomicInteger3.get() > 0;
            }
        }, 5000L);
        assertEquals(1, atomicInteger.get());
        assertEquals(1, atomicInteger2.get());
        assertEquals(1, atomicInteger3.get());
        this.ignite1.message().stopLocalListen("top1", p22);
        message(forRemotes).send((Object) null, "msg2-1");
        message(forRemotes).send("top1", "msg2-2");
        message(forRemotes).send("top3", "msg2-3");
        GridTestUtils.waitForCondition(new PA() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.6
            public boolean apply() {
                return atomicInteger.get() > 1 && atomicInteger3.get() > 1;
            }
        }, 5000L);
        assertEquals(2, atomicInteger.get());
        assertEquals(1, atomicInteger2.get());
        assertEquals(2, atomicInteger3.get());
        this.ignite1.message().stopLocalListen("top1", p2);
        message(forRemotes).send((Object) null, "msg3-1");
        message(forRemotes).send("top1", "msg3-2");
        message(forRemotes).send("top3", "msg3-3");
        GridTestUtils.waitForCondition(new PA() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.7
            public boolean apply() {
                return atomicInteger.get() > 2 && atomicInteger3.get() > 2;
            }
        }, 5000L);
        assertEquals(3, atomicInteger.get());
        assertEquals(1, atomicInteger2.get());
        assertEquals(3, atomicInteger3.get());
        this.ignite1.message().stopLocalListen((Object) null, p2);
        this.ignite1.message().stopLocalListen("top3", p23);
        message(forRemotes).send((Object) null, "msg4-1");
        message(forRemotes).send("top1", "msg4-2");
        message(forRemotes).send("top3", "msg4-3");
        U.sleep(1000L);
        assertEquals(3, atomicInteger.get());
        assertEquals(1, atomicInteger2.get());
        assertEquals(3, atomicInteger3.get());
    }

    @Test
    public void testSendReceiveMessageWithStringTopic() throws Exception {
        final GridConcurrentHashSet gridConcurrentHashSet = new GridConcurrentHashSet();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        this.ignite1.message().localListen(S_TOPIC_1, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.8
            public boolean apply(UUID uuid, Object obj) {
                try {
                    GridMessagingSelfTest.this.log.info("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ", topic=" + GridMessagingSelfTest.S_TOPIC_1 + ']');
                    if (!uuid.equals(GridMessagingSelfTest.this.ignite1.cluster().localNode().id())) {
                        GridMessagingSelfTest.this.log.error("Unexpected sender node: " + uuid);
                        atomicBoolean.set(true);
                        countDownLatch.countDown();
                        return false;
                    }
                    if (GridMessagingSelfTest.MSG_1.equals(obj)) {
                        gridConcurrentHashSet.add(obj);
                        countDownLatch.countDown();
                        return true;
                    }
                    GridMessagingSelfTest.this.log.error("Unexpected message " + obj + " for topic: " + GridMessagingSelfTest.S_TOPIC_1);
                    atomicBoolean.set(true);
                    countDownLatch.countDown();
                    return false;
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        });
        this.ignite1.message().localListen(S_TOPIC_2, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.9
            public boolean apply(UUID uuid, Object obj) {
                try {
                    GridMessagingSelfTest.this.log.info("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ", topic=" + GridMessagingSelfTest.S_TOPIC_2 + ']');
                    if (!uuid.equals(GridMessagingSelfTest.this.ignite1.cluster().localNode().id())) {
                        GridMessagingSelfTest.this.log.error("Unexpected sender node: " + uuid);
                        atomicBoolean.set(true);
                        countDownLatch.countDown();
                        return false;
                    }
                    if (GridMessagingSelfTest.MSG_2.equals(obj)) {
                        gridConcurrentHashSet.add(obj);
                        countDownLatch.countDown();
                        return true;
                    }
                    GridMessagingSelfTest.this.log.error("Unexpected message " + obj + " for topic: " + GridMessagingSelfTest.S_TOPIC_2);
                    atomicBoolean.set(true);
                    countDownLatch.countDown();
                    return false;
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        });
        this.ignite1.message().localListen((Object) null, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.10
            public boolean apply(UUID uuid, Object obj) {
                try {
                    GridMessagingSelfTest.this.log.info("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ", topic=default]");
                    if (!uuid.equals(GridMessagingSelfTest.this.ignite1.cluster().localNode().id())) {
                        GridMessagingSelfTest.this.log.error("Unexpected sender node: " + uuid);
                        atomicBoolean.set(true);
                        countDownLatch.countDown();
                        return false;
                    }
                    if (GridMessagingSelfTest.MSG_3.equals(obj)) {
                        gridConcurrentHashSet.add(obj);
                        countDownLatch.countDown();
                        return true;
                    }
                    GridMessagingSelfTest.this.log.error("Unexpected message " + obj + " for topic: default");
                    atomicBoolean.set(true);
                    countDownLatch.countDown();
                    return false;
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        });
        ClusterGroup forLocal = this.ignite1.cluster().forLocal();
        message(forLocal).send(S_TOPIC_1, MSG_1);
        message(forLocal).send(S_TOPIC_2, MSG_2);
        message(forLocal).send((Object) null, MSG_3);
        assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        assertFalse(atomicBoolean.get());
        assertTrue(gridConcurrentHashSet.contains(MSG_1));
        assertTrue(gridConcurrentHashSet.contains(MSG_2));
        assertTrue(gridConcurrentHashSet.contains(MSG_3));
    }

    @Test
    public void testSendReceiveMessageWithEnumTopic() throws Exception {
        final GridConcurrentHashSet gridConcurrentHashSet = new GridConcurrentHashSet();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        this.ignite1.message().localListen(TestTopic.TOPIC_1, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.11
            public boolean apply(UUID uuid, Object obj) {
                try {
                    GridMessagingSelfTest.this.log.info("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ", topic=" + TestTopic.TOPIC_1 + ']');
                    if (!uuid.equals(GridMessagingSelfTest.this.ignite1.cluster().localNode().id())) {
                        GridMessagingSelfTest.this.log.error("Unexpected sender node: " + uuid);
                        atomicBoolean.set(true);
                        countDownLatch.countDown();
                        return false;
                    }
                    if (GridMessagingSelfTest.MSG_1.equals(obj)) {
                        gridConcurrentHashSet.add(obj);
                        countDownLatch.countDown();
                        return true;
                    }
                    GridMessagingSelfTest.this.log.error("Unexpected message " + obj + " for topic: " + TestTopic.TOPIC_1);
                    atomicBoolean.set(true);
                    countDownLatch.countDown();
                    return false;
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        });
        this.ignite1.message().localListen(TestTopic.TOPIC_2, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.12
            public boolean apply(UUID uuid, Object obj) {
                try {
                    GridMessagingSelfTest.this.log.info("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ", topic=" + TestTopic.TOPIC_2 + ']');
                    if (!uuid.equals(GridMessagingSelfTest.this.ignite1.cluster().localNode().id())) {
                        GridMessagingSelfTest.this.log.error("Unexpected sender node: " + uuid);
                        atomicBoolean.set(true);
                        countDownLatch.countDown();
                        return false;
                    }
                    if (GridMessagingSelfTest.MSG_2.equals(obj)) {
                        gridConcurrentHashSet.add(obj);
                        countDownLatch.countDown();
                        return true;
                    }
                    GridMessagingSelfTest.this.log.error("Unexpected message " + obj + " for topic: " + TestTopic.TOPIC_2);
                    atomicBoolean.set(true);
                    countDownLatch.countDown();
                    return false;
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        });
        this.ignite1.message().localListen((Object) null, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.13
            public boolean apply(UUID uuid, Object obj) {
                try {
                    GridMessagingSelfTest.this.log.info("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ", topic=default]");
                    if (!uuid.equals(GridMessagingSelfTest.this.ignite1.cluster().localNode().id())) {
                        GridMessagingSelfTest.this.log.error("Unexpected sender node: " + uuid);
                        atomicBoolean.set(true);
                        countDownLatch.countDown();
                        return false;
                    }
                    if (GridMessagingSelfTest.MSG_3.equals(obj)) {
                        gridConcurrentHashSet.add(obj);
                        countDownLatch.countDown();
                        return true;
                    }
                    GridMessagingSelfTest.this.log.error("Unexpected message " + obj + " for topic: default");
                    atomicBoolean.set(true);
                    countDownLatch.countDown();
                    return false;
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        });
        ClusterGroup forLocal = this.ignite1.cluster().forLocal();
        message(forLocal).send(TestTopic.TOPIC_1, MSG_1);
        message(forLocal).send(TestTopic.TOPIC_2, MSG_2);
        message(forLocal).send((Object) null, MSG_3);
        assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        assertFalse(atomicBoolean.get());
        assertTrue(gridConcurrentHashSet.contains(MSG_1));
        assertTrue(gridConcurrentHashSet.contains(MSG_2));
        assertTrue(gridConcurrentHashSet.contains(MSG_3));
    }

    @Test
    public void testRemoteListen() throws Exception {
        final GridConcurrentHashSet gridConcurrentHashSet = new GridConcurrentHashSet();
        rcvLatch = new CountDownLatch(4);
        this.ignite2.message().remoteListen((Object) null, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.14
            public boolean apply(UUID uuid, Object obj) {
                try {
                    GridMessagingSelfTest.this.log.info("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ']');
                    gridConcurrentHashSet.add(obj);
                    GridMessagingSelfTest.rcvLatch.countDown();
                    return true;
                } catch (Throwable th) {
                    GridMessagingSelfTest.rcvLatch.countDown();
                    throw th;
                }
            }
        });
        ClusterGroup forRemotes = this.ignite1.cluster().forRemotes();
        message(forRemotes).send((Object) null, MSG_1);
        message(forRemotes).send((Object) null, MSG_2);
        message(this.ignite2.cluster().forLocal()).send((Object) null, MSG_3);
        assertFalse(rcvLatch.await(3L, TimeUnit.SECONDS));
        assertTrue(gridConcurrentHashSet.contains(MSG_1));
        assertTrue(gridConcurrentHashSet.contains(MSG_2));
        assertTrue(gridConcurrentHashSet.contains(MSG_3));
    }

    @Test
    public void testStopRemoteListen() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        final AtomicInteger atomicInteger3 = new AtomicInteger();
        UUID remoteListen = this.ignite2.message().remoteListen((Object) null, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.15
            public boolean apply(UUID uuid, Object obj) {
                System.out.println(Thread.currentThread().getName() + " Listener1 received new message [msg=" + obj + ", senderNodeId=" + uuid + ']');
                atomicInteger.incrementAndGet();
                return true;
            }
        });
        UUID remoteListen2 = this.ignite2.message().remoteListen("top2", new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.16
            public boolean apply(UUID uuid, Object obj) {
                System.out.println(Thread.currentThread().getName() + " Listener2 received new message [msg=" + obj + ", senderNodeId=" + uuid + ']');
                atomicInteger2.incrementAndGet();
                return true;
            }
        });
        UUID remoteListen3 = this.ignite2.message().remoteListen("top3", new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.17
            public boolean apply(UUID uuid, Object obj) {
                System.out.println(Thread.currentThread().getName() + " Listener3 received new message [msg=" + obj + ", senderNodeId=" + uuid + ']');
                atomicInteger3.incrementAndGet();
                return true;
            }
        });
        message(this.ignite1.cluster().forRemotes()).send((Object) null, "msg1-1");
        message(this.ignite1.cluster().forRemotes()).send("top2", "msg1-2");
        message(this.ignite1.cluster().forRemotes()).send("top3", "msg1-3");
        GridTestUtils.waitForCondition(new PA() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.18
            public boolean apply() {
                return atomicInteger.get() > 0 && atomicInteger2.get() > 0 && atomicInteger3.get() > 0;
            }
        }, 5000L);
        assertEquals(1, atomicInteger.get());
        assertEquals(1, atomicInteger2.get());
        assertEquals(1, atomicInteger3.get());
        this.ignite2.message().stopRemoteListen(remoteListen2);
        message(this.ignite1.cluster().forRemotes()).send((Object) null, "msg2-1");
        message(this.ignite1.cluster().forRemotes()).send("top2", "msg2-2");
        message(this.ignite1.cluster().forRemotes()).send("top3", "msg2-3");
        GridTestUtils.waitForCondition(new PA() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.19
            public boolean apply() {
                return atomicInteger.get() > 1 && atomicInteger3.get() > 1;
            }
        }, 5000L);
        assertEquals(2, atomicInteger.get());
        assertEquals(1, atomicInteger2.get());
        assertEquals(2, atomicInteger3.get());
        this.ignite2.message().stopRemoteListen(remoteListen2);
        this.ignite2.message().stopRemoteListen(remoteListen);
        this.ignite2.message().stopRemoteListen(remoteListen3);
        message(this.ignite1.cluster().forRemotes()).send((Object) null, "msg3-1");
        message(this.ignite1.cluster().forRemotes()).send("top2", "msg3-2");
        message(this.ignite1.cluster().forRemotes()).send("top3", "msg3-3");
        U.sleep(1000L);
        assertEquals(2, atomicInteger.get());
        assertEquals(1, atomicInteger2.get());
        assertEquals(2, atomicInteger3.get());
    }

    @Test
    public void testRemoteListenOrderedMessages() throws Exception {
        List asList = Arrays.asList(new TestMessage(MSG_1), new TestMessage(MSG_2, 3000L), new TestMessage(MSG_3));
        final ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        rcvLatch = new CountDownLatch(3);
        this.ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.20
            public boolean apply(UUID uuid, Object obj) {
                try {
                    GridMessagingSelfTest.this.log.info("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ']');
                    if (uuid.equals(GridMessagingSelfTest.this.ignite1.cluster().localNode().id())) {
                        concurrentLinkedDeque.add(obj);
                        GridMessagingSelfTest.rcvLatch.countDown();
                        return true;
                    }
                    GridMessagingSelfTest.this.log.error("Unexpected sender node: " + uuid);
                    atomicBoolean.set(true);
                    GridMessagingSelfTest.rcvLatch.countDown();
                    return false;
                } catch (Throwable th) {
                    GridMessagingSelfTest.rcvLatch.countDown();
                    throw th;
                }
            }
        });
        ClusterGroup forRemotes = this.ignite1.cluster().forRemotes();
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            message(forRemotes).sendOrdered(S_TOPIC_1, (TestMessage) it.next(), 15000L);
        }
        assertTrue(rcvLatch.await(6L, TimeUnit.SECONDS));
        assertFalse(atomicBoolean.get());
        assertEquals(asList, Arrays.asList(concurrentLinkedDeque.toArray()));
    }

    @Test
    public void testRemoteListenWithIntTopic() throws Exception {
        final GridConcurrentHashSet gridConcurrentHashSet = new GridConcurrentHashSet();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        rcvLatch = new CountDownLatch(3);
        this.ignite2.message().remoteListen(I_TOPIC_1, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.21

            @IgniteInstanceResource
            private transient Ignite g;

            public boolean apply(UUID uuid, Object obj) {
                Assert.assertEquals(GridMessagingSelfTest.this.ignite2, this.g);
                try {
                    GridMessagingSelfTest.this.log.info("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ", topic=" + GridMessagingSelfTest.I_TOPIC_1 + ']');
                    if (!uuid.equals(GridMessagingSelfTest.this.ignite1.cluster().localNode().id())) {
                        GridMessagingSelfTest.this.log.error("Unexpected sender node: " + uuid);
                        atomicBoolean.set(true);
                        GridMessagingSelfTest.rcvLatch.countDown();
                        return false;
                    }
                    if (GridMessagingSelfTest.MSG_1.equals(obj)) {
                        gridConcurrentHashSet.add(obj);
                        GridMessagingSelfTest.rcvLatch.countDown();
                        return true;
                    }
                    GridMessagingSelfTest.this.log.error("Unexpected message " + obj + " for topic: " + GridMessagingSelfTest.I_TOPIC_1);
                    atomicBoolean.set(true);
                    GridMessagingSelfTest.rcvLatch.countDown();
                    return false;
                } catch (Throwable th) {
                    GridMessagingSelfTest.rcvLatch.countDown();
                    throw th;
                }
            }
        });
        this.ignite2.message().remoteListen(I_TOPIC_2, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.22

            @IgniteInstanceResource
            private transient Ignite g;

            public boolean apply(UUID uuid, Object obj) {
                Assert.assertEquals(GridMessagingSelfTest.this.ignite2, this.g);
                try {
                    GridMessagingSelfTest.this.log.info("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ", topic=" + GridMessagingSelfTest.I_TOPIC_2 + ']');
                    if (!uuid.equals(GridMessagingSelfTest.this.ignite1.cluster().localNode().id())) {
                        GridMessagingSelfTest.this.log.error("Unexpected sender node: " + uuid);
                        atomicBoolean.set(true);
                        GridMessagingSelfTest.rcvLatch.countDown();
                        return false;
                    }
                    if (GridMessagingSelfTest.MSG_2.equals(obj)) {
                        gridConcurrentHashSet.add(obj);
                        GridMessagingSelfTest.rcvLatch.countDown();
                        return true;
                    }
                    GridMessagingSelfTest.this.log.error("Unexpected message " + obj + " for topic: " + GridMessagingSelfTest.I_TOPIC_2);
                    atomicBoolean.set(true);
                    GridMessagingSelfTest.rcvLatch.countDown();
                    return false;
                } catch (Throwable th) {
                    GridMessagingSelfTest.rcvLatch.countDown();
                    throw th;
                }
            }
        });
        this.ignite2.message().remoteListen((Object) null, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.23

            @IgniteInstanceResource
            private transient Ignite g;

            public boolean apply(UUID uuid, Object obj) {
                Assert.assertEquals(GridMessagingSelfTest.this.ignite2, this.g);
                try {
                    GridMessagingSelfTest.this.log.info("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ", topic=default]");
                    if (!uuid.equals(GridMessagingSelfTest.this.ignite1.cluster().localNode().id())) {
                        GridMessagingSelfTest.this.log.error("Unexpected sender node: " + uuid);
                        atomicBoolean.set(true);
                        GridMessagingSelfTest.rcvLatch.countDown();
                        return false;
                    }
                    if (GridMessagingSelfTest.MSG_3.equals(obj)) {
                        gridConcurrentHashSet.add(obj);
                        GridMessagingSelfTest.rcvLatch.countDown();
                        return true;
                    }
                    GridMessagingSelfTest.this.log.error("Unexpected message " + obj + " for topic: default");
                    atomicBoolean.set(true);
                    GridMessagingSelfTest.rcvLatch.countDown();
                    return false;
                } catch (Throwable th) {
                    GridMessagingSelfTest.rcvLatch.countDown();
                    throw th;
                }
            }
        });
        ClusterGroup forRemotes = this.ignite1.cluster().forRemotes();
        message(forRemotes).send(I_TOPIC_1, MSG_1);
        message(forRemotes).send(I_TOPIC_2, MSG_2);
        message(forRemotes).send((Object) null, MSG_3);
        assertTrue(rcvLatch.await(3L, TimeUnit.SECONDS));
        assertFalse(atomicBoolean.get());
        assertTrue(gridConcurrentHashSet.contains(MSG_1));
        assertTrue(gridConcurrentHashSet.contains(MSG_2));
        assertTrue(gridConcurrentHashSet.contains(MSG_3));
    }

    @Test
    public void testSendMessageWithExternalClassLoader() throws Exception {
        Class<?> loadClass = new URLClassLoader(new URL[]{new URL(GridTestProperties.getProperty("p2p.uri.cls"))}).loadClass(EXT_RESOURCE_CLS_NAME);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.ignite2.message().remoteListen(S_TOPIC_1, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.24
            public boolean apply(UUID uuid, Object obj) {
                try {
                    GridMessagingSelfTest.this.log.info("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ']');
                    if (uuid.equals(GridMessagingSelfTest.this.ignite1.cluster().localNode().id())) {
                        return true;
                    }
                    GridMessagingSelfTest.this.log.error("Unexpected sender node: " + uuid);
                    atomicBoolean.set(true);
                    countDownLatch.countDown();
                    return false;
                } finally {
                    countDownLatch.countDown();
                }
            }
        });
        message(this.ignite1.cluster().forRemotes()).send(S_TOPIC_1, Collections.singleton(loadClass.newInstance()));
        assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        assertFalse(atomicBoolean.get());
    }

    @Test
    public void testNullMessages() throws Exception {
        GridTestUtils.assertThrows(this.log, new Callable<Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.25
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                GridMessagingSelfTest.this.ignite1.message().send((Object) null, (Collection) null);
                return null;
            }
        }, IllegalArgumentException.class, "Ouch! Argument is invalid: msgs cannot be null or empty");
        GridTestUtils.assertThrows(this.log, new Callable<Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.26
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                GridMessagingSelfTest.this.ignite1.message().send((Object) null, Collections.emptyList());
                return null;
            }
        }, IllegalArgumentException.class, "Ouch! Argument is invalid: msgs cannot be null or empty");
        GridTestUtils.assertThrows(this.log, new Callable<Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.27
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                GridMessagingSelfTest.this.ignite1.message().send((Object) null, (Object) null);
                return null;
            }
        }, NullPointerException.class, "Ouch! Argument cannot be null: msg");
        GridTestUtils.assertThrows(this.log, new Callable<Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.28
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                GridMessagingSelfTest.this.ignite1.message().send((Object) null, Arrays.asList(null, new Object()));
                return null;
            }
        }, NullPointerException.class, "Ouch! Argument cannot be null: msg");
    }

    @Test
    public void testAsyncOld() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        IgniteDiscoverySpi discoverySpi = this.ignite2.configuration().getDiscoverySpi();
        DiscoverySpiTestListener discoverySpiTestListener = new DiscoverySpiTestListener();
        discoverySpi.setInternalListener(discoverySpiTestListener);
        assertFalse(this.ignite2.message().isAsync());
        final IgniteMessaging withAsync = this.ignite2.message().withAsync();
        assertTrue(withAsync.isAsync());
        assertFalse(this.ignite2.message().isAsync());
        GridTestUtils.assertThrows(this.log, new Callable<Void>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.29
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                withAsync.future();
                return null;
            }
        }, IllegalStateException.class, null);
        discoverySpiTestListener.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class);
        org.junit.Assert.assertNull(withAsync.remoteListen("topic", new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.30
            public boolean apply(UUID uuid, Object obj) {
                System.out.println(Thread.currentThread().getName() + " Listener received new message [msg=" + obj + ", senderNodeId=" + uuid + ']');
                atomicInteger.incrementAndGet();
                return true;
            }
        }));
        IgniteFuture future = withAsync.future();
        org.junit.Assert.assertNotNull(future);
        U.sleep(500L);
        org.junit.Assert.assertFalse(future.isDone());
        discoverySpiTestListener.stopBlockCustomEvents();
        GridTestUtils.assertThrows(this.log, new Callable<Void>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.31
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                withAsync.future();
                return null;
            }
        }, IllegalStateException.class, null);
        UUID uuid = (UUID) future.get();
        org.junit.Assert.assertNotNull(uuid);
        org.junit.Assert.assertTrue(future.isDone());
        discoverySpiTestListener.blockCustomEvent(StopRoutineDiscoveryMessage.class, new Class[0]);
        message(this.ignite1.cluster().forRemotes()).send("topic", "msg1");
        GridTestUtils.waitForCondition(new PA() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.32
            public boolean apply() {
                return atomicInteger.get() > 0;
            }
        }, 5000L);
        assertEquals(1, atomicInteger.get());
        withAsync.stopRemoteListen(uuid);
        IgniteFuture future2 = withAsync.future();
        org.junit.Assert.assertNotNull(future2);
        GridTestUtils.assertThrows(this.log, new Callable<Void>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.33
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                withAsync.future();
                return null;
            }
        }, IllegalStateException.class, null);
        U.sleep(500L);
        org.junit.Assert.assertFalse(future2.isDone());
        discoverySpiTestListener.stopBlockCustomEvents();
        future2.get();
        org.junit.Assert.assertTrue(future2.isDone());
        message(this.ignite1.cluster().forRemotes()).send("topic", "msg2");
        U.sleep(1000L);
        assertEquals(1, atomicInteger.get());
    }

    @Test
    public void testAsync() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        IgniteDiscoverySpi discoverySpi = this.ignite2.configuration().getDiscoverySpi();
        DiscoverySpiTestListener discoverySpiTestListener = new DiscoverySpiTestListener();
        discoverySpi.setInternalListener(discoverySpiTestListener);
        discoverySpiTestListener.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class);
        IgniteFuture remoteListenAsync = this.ignite2.message().remoteListenAsync("topic", new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.34
            public boolean apply(UUID uuid, Object obj) {
                System.out.println(Thread.currentThread().getName() + " Listener received new message [msg=" + obj + ", senderNodeId=" + uuid + ']');
                atomicInteger.incrementAndGet();
                return true;
            }
        });
        org.junit.Assert.assertNotNull(remoteListenAsync);
        U.sleep(500L);
        org.junit.Assert.assertFalse(remoteListenAsync.isDone());
        discoverySpiTestListener.stopBlockCustomEvents();
        UUID uuid = (UUID) remoteListenAsync.get();
        org.junit.Assert.assertNotNull(uuid);
        org.junit.Assert.assertTrue(remoteListenAsync.isDone());
        discoverySpiTestListener.blockCustomEvent(StopRoutineDiscoveryMessage.class, new Class[0]);
        message(this.ignite1.cluster().forRemotes()).send("topic", "msg1");
        GridTestUtils.waitForCondition(new PA() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.35
            public boolean apply() {
                return atomicInteger.get() > 0;
            }
        }, 5000L);
        assertEquals(1, atomicInteger.get());
        IgniteFuture stopRemoteListenAsync = this.ignite2.message().stopRemoteListenAsync(uuid);
        org.junit.Assert.assertNotNull(stopRemoteListenAsync);
        U.sleep(500L);
        org.junit.Assert.assertFalse(stopRemoteListenAsync.isDone());
        discoverySpiTestListener.stopBlockCustomEvents();
        stopRemoteListenAsync.get();
        org.junit.Assert.assertTrue(stopRemoteListenAsync.isDone());
        message(this.ignite1.cluster().forRemotes()).send("topic", "msg2");
        U.sleep(1000L);
        assertEquals(1, atomicInteger.get());
    }

    @Test
    public void testRemoteListenForOldest() throws Exception {
        remoteListenForOldest(this.ignite1);
        stopGrid(1);
        this.ignite1 = startGrid(1);
        MSG_CNT.set(0);
        remoteListenForOldest(this.ignite2);
    }

    private void remoteListenForOldest(Ignite ignite) throws InterruptedException {
        ClusterGroup forOldest = this.ignite1.cluster().forOldest();
        assertEquals(1, forOldest.nodes().size());
        assertEquals(ignite.cluster().localNode().id(), forOldest.node().id());
        this.ignite1.message(forOldest).remoteListen((Object) null, new P2<UUID, Object>() { // from class: org.apache.ignite.messaging.GridMessagingSelfTest.36
            public boolean apply(UUID uuid, Object obj) {
                System.out.println("Received new message [msg=" + obj + ", senderNodeId=" + uuid + ']');
                GridMessagingSelfTest.MSG_CNT.incrementAndGet();
                return true;
            }
        });
        this.ignite1.message().send((Object) null, MSG_1);
        Thread.sleep(3000L);
        assertEquals(1, MSG_CNT.get());
    }
}
