/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.QueryIndexType;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.BlockTcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Test;

public class BinaryMetadataConcurrentUpdateWithIndexesTest
extends GridCommonAbstractTest {
    private static final int FIELDS = 2;
    private static final int MB = 0x100000;
    private volatile boolean syncMeta;
    private CountDownLatch initMetaReq = new CountDownLatch(2);
    private ThreadLocal<Boolean> delayMetadataUpdateThreadLoc = new ThreadLocal();
    public static final CountDownLatch localMetaUpdatedLatch = new CountDownLatch(1);

    protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
        cfg.setIncludeEventTypes(EventType.EVTS_DISCOVERY);
        BlockTcpDiscoverySpi spi = new BlockTcpDiscoverySpi();
        Field rndAddrsField = U.findField(BlockTcpDiscoverySpi.class, (String)"skipAddrsRandomization");
        BinaryMetadataConcurrentUpdateWithIndexesTest.assertNotNull((Object)rndAddrsField);
        rndAddrsField.set(spi, true);
        cfg.setDiscoverySpi((DiscoverySpi)spi.setIpFinder(sharedStaticIpFinder));
        cfg.setClientMode(igniteInstanceName.startsWith("client"));
        QueryEntity qryEntity = new QueryEntity("java.lang.Integer", "Value");
        LinkedHashMap<String, String> fields = new LinkedHashMap<String, String>();
        ArrayList<QueryIndex> indexes = new ArrayList<QueryIndex>(2);
        for (int i = 0; i < 2; ++i) {
            String name = "s" + i;
            fields.put(name, "java.lang.String");
            indexes.add(new QueryIndex(name, QueryIndexType.SORTED));
        }
        qryEntity.setFields(fields);
        qryEntity.setIndexes(indexes);
        cfg.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(0x3200000L)));
        cfg.setCacheConfiguration(new CacheConfiguration[]{new CacheConfiguration("default").setBackups(0).setQueryEntities(Collections.singleton(qryEntity)).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC).setCacheMode(CacheMode.PARTITIONED)});
        return cfg;
    }

    @Test
    public void testMissingSchemaUpdate() throws Exception {
        IgniteEx node0 = this.startGrid("node0");
        IgniteEx node1 = this.startGrid("node1");
        IgniteEx client0 = this.startGrid("client0");
        CacheObjectBinaryProcessorImpl.TestBinaryContext clientCtx = (CacheObjectBinaryProcessorImpl.TestBinaryContext)((CacheObjectBinaryProcessorImpl)client0.context().cacheObjects()).binaryContext();
        clientCtx.addListener(new CacheObjectBinaryProcessorImpl.TestBinaryContext.TestBinaryContextListener(){

            public void onAfterMetadataRequest(int typeId, BinaryType type) {
                if (BinaryMetadataConcurrentUpdateWithIndexesTest.this.syncMeta) {
                    try {
                        BinaryMetadataConcurrentUpdateWithIndexesTest.this.initMetaReq.countDown();
                        BinaryMetadataConcurrentUpdateWithIndexesTest.this.initMetaReq.await();
                    }
                    catch (Exception e) {
                        throw new BinaryObjectException((Throwable)e);
                    }
                }
            }

            public void onBeforeMetadataUpdate(int typeId, BinaryMetadata metadata) {
                if (BinaryMetadataConcurrentUpdateWithIndexesTest.this.delayMetadataUpdateThreadLoc.get() != null) {
                    BinaryMetadataConcurrentUpdateWithIndexesTest.this.await(localMetaUpdatedLatch, 5000L);
                }
            }
        });
        IgniteEx node2 = this.startGrid("node2");
        IgniteEx node3 = this.startGrid("node3");
        this.startGrid("node4");
        node0.cluster().active(true);
        this.awaitPartitionMapExchange();
        this.syncMeta = true;
        CountDownLatch clientProposeMsgBlockedLatch = new CountDownLatch(1);
        AtomicBoolean clientWait = new AtomicBoolean();
        Object clientMux = new Object();
        AtomicBoolean srvWait = new AtomicBoolean();
        Object srvMux = new Object();
        ((BlockTcpDiscoverySpi)node1.configuration().getDiscoverySpi()).setClosure((IgniteBiClosure & Serializable)(snd, msg) -> {
            if (msg instanceof MetadataUpdateProposedMessage && Thread.currentThread().getName().contains("client")) {
                log.info("Block custom message to client0: [locNode=" + snd + ", msg=" + msg + ']');
                clientProposeMsgBlockedLatch.countDown();
                Object object = clientMux;
                synchronized (object) {
                    while (!clientWait.get()) {
                        try {
                            clientMux.wait();
                        }
                        catch (InterruptedException e) {
                            BinaryMetadataConcurrentUpdateWithIndexesTest.fail();
                        }
                    }
                }
            }
            return null;
        });
        ((BlockTcpDiscoverySpi)node2.configuration().getDiscoverySpi()).setClosure((IgniteBiClosure & Serializable)(snd, msg) -> {
            if (msg instanceof MetadataUpdateProposedMessage) {
                MetadataUpdateProposedMessage msg0 = (MetadataUpdateProposedMessage)msg;
                int pendingVer = (Integer)U.field((Object)msg0, (String)"pendingVer");
                if (pendingVer == 0) {
                    return null;
                }
                log.info("Block custom message to next server: [locNode=" + snd + ", msg=" + msg + ']');
                Object object = srvMux;
                synchronized (object) {
                    while (!srvWait.get()) {
                        try {
                            srvMux.wait();
                        }
                        catch (InterruptedException e) {
                            BinaryMetadataConcurrentUpdateWithIndexesTest.fail();
                        }
                    }
                }
            }
            return null;
        });
        Integer key = this.primaryKey(node3.cache("default"));
        IgniteInternalFuture fut0 = GridTestUtils.runAsync(() -> {
            try (Transaction tx = client0.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);){
                client0.cache("default").put((Object)key, (Object)this.build((Ignite)client0, "val", 0));
                tx.commit();
            }
            catch (Throwable t) {
                log.error("err", t);
            }
        });
        IgniteInternalFuture fut1 = GridTestUtils.runAsync(() -> {
            this.await(this.initMetaReq, 5000L);
            this.await(clientProposeMsgBlockedLatch, 5000L);
            clientWait.set(true);
            Object object = clientMux;
            synchronized (object) {
                clientMux.notify();
            }
            BinaryMetadataConcurrentUpdateWithIndexesTest.doSleep((long)3000L);
            localMetaUpdatedLatch.countDown();
            BinaryMetadataConcurrentUpdateWithIndexesTest.doSleep((long)3000L);
            srvWait.set(true);
            object = srvMux;
            synchronized (object) {
                srvMux.notify();
            }
        });
        IgniteInternalFuture fut2 = GridTestUtils.runAsync(() -> {
            this.delayMetadataUpdateThreadLoc.set(true);
            try (Transaction tx = client0.transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ, 0L, 1);){
                client0.cache("default").put((Object)key, (Object)this.build((Ignite)client0, "val", 0));
                tx.commit();
            }
        });
        fut0.get();
        fut1.get();
        fut2.get();
    }

    private void await(CountDownLatch latch, long timeout) {
        try {
            latch.await(5000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        long cnt = this.initMetaReq.getCount();
        if (cnt != 0L) {
            throw new RuntimeException("Invalid latch count after wait: " + cnt);
        }
    }

    protected BinaryObject build(Ignite ignite, String prefix, int ... fields) {
        BinaryObjectBuilder builder = ignite.binary().builder("Value");
        for (int field : fields) {
            BinaryMetadataConcurrentUpdateWithIndexesTest.assertTrue((field < 2 ? 1 : 0) != 0);
            builder.setField("i" + field, (Object)field);
            builder.setField("s" + field, (Object)(prefix + field));
        }
        return builder.build();
    }

    protected void beforeTest() throws Exception {
        super.beforeTest();
        CacheObjectBinaryProcessorImpl.useTestBinaryCtx = true;
    }

    protected void afterTest() throws Exception {
        super.afterTest();
        CacheObjectBinaryProcessorImpl.useTestBinaryCtx = false;
        this.stopAllGrids();
    }
}

