/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.control.agent.processor.lifecycle;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.gridgain.control.agent.AgentCommonAbstractTest;
import org.gridgain.control.agent.test.TestUtils;
import org.junit.Test;

public class AsyncMessageProcessingTest
extends AgentCommonAbstractTest {
    private final AtomicInteger blockedMessages = new AtomicInteger();
    private CountDownLatch unlockLatch;

    @Override
    public void setup() throws Exception {
        super.setup();
        this.blockedMessages.set(0);
        this.unlockLatch = new CountDownLatch(1);
    }

    @Override
    protected IgniteConfiguration getConfiguration(String instanceName) {
        IgniteConfiguration cfg = super.getConfiguration(instanceName);
        cfg.setCommunicationSpi((CommunicationSpi)new BlockingCommunicationSpi());
        return cfg;
    }

    @Test
    public void discoveryThreadsShouldNotBlockOnNodeJoin() {
        try {
            IgniteEx node1 = this.startGrid(1);
            node1.cluster().state(ClusterState.ACTIVE);
            this.changeAgentConfiguration(node1);
            IgniteEx node2 = this.startGrid(2);
            TestUtils.assertWithPoll(() -> this.blockedMessages.get() > 0);
            node2.createCache("cache");
        }
        finally {
            this.unlockLatch.countDown();
        }
    }

    public class BlockingCommunicationSpi
    extends TcpCommunicationSpi {
        public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
            if (!this.isStateChangedMessage(msg)) {
                super.sendMessage(node, msg, ackC);
                return;
            }
            AsyncMessageProcessingTest.this.blockedMessages.incrementAndGet();
            try {
                AsyncMessageProcessingTest.this.unlockLatch.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private boolean isStateChangedMessage(Message msg) {
            if (!(msg instanceof GridIoMessage) || !(((GridIoMessage)msg).message() instanceof GridIoUserMessage)) {
                return false;
            }
            Object topic = ((GridIoUserMessage)((GridIoMessage)msg).message()).topic();
            return "topic-state-changed".equals(topic);
        }
    }
}

