/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache;

import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

public class CacheIndexStreamerTest
extends GridCommonAbstractTest {
    @Test
    public void testStreamerAtomic() throws Exception {
        this.checkStreamer(CacheAtomicityMode.ATOMIC);
    }

    @Test
    public void testStreamerTx() throws Exception {
        this.checkStreamer(CacheAtomicityMode.TRANSACTIONAL);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void checkStreamer(CacheAtomicityMode atomicityMode) throws Exception {
        IgniteEx ignite = this.startGrid(0);
        final IgniteCache cache = ignite.createCache(this.cacheConfiguration(atomicityMode));
        final AtomicBoolean stop = new AtomicBoolean();
        int KEYS = 10000;
        try {
            IgniteInternalFuture streamerFut = GridTestUtils.runAsync((Callable)new Callable((Ignite)ignite, atomicityMode){
                final /* synthetic */ Ignite val$ignite;
                final /* synthetic */ CacheAtomicityMode val$atomicityMode;
                {
                    this.val$ignite = ignite;
                    this.val$atomicityMode = cacheAtomicityMode;
                }

                public Void call() throws Exception {
                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
                    while (!stop.get()) {
                        IgniteDataStreamer streamer = this.val$ignite.dataStreamer("default");
                        Throwable throwable = null;
                        try {
                            streamer.allowOverwrite(this.val$atomicityMode == CacheAtomicityMode.TRANSACTIONAL);
                            for (int i = 0; i < 1; ++i) {
                                streamer.addData((Object)rnd.nextInt(10000), (Object)String.valueOf(i));
                            }
                        }
                        catch (Throwable throwable2) {
                            throwable = throwable2;
                            throw throwable2;
                        }
                        finally {
                            if (streamer == null) continue;
                            if (throwable != null) {
                                try {
                                    streamer.close();
                                }
                                catch (Throwable throwable3) {
                                    throwable.addSuppressed(throwable3);
                                }
                                continue;
                            }
                            streamer.close();
                        }
                    }
                    return null;
                }
            }, (String)"streamer-thread");
            IgniteInternalFuture updateFut = GridTestUtils.runMultiThreadedAsync((Callable)new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
                    while (!stop.get()) {
                        for (int i = 0; i < 100; ++i) {
                            Integer key = rnd.nextInt(10000);
                            cache.put((Object)key, (Object)String.valueOf(key));
                            cache.remove((Object)key);
                        }
                    }
                    return null;
                }
            }, (int)1, (String)"update-thread");
            U.sleep((long)30000L);
            stop.set(true);
            streamerFut.get();
            updateFut.get();
        }
        finally {
            stop.set(true);
            this.stopAllGrids();
        }
    }

    private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode) {
        CacheConfiguration ccfg = new CacheConfiguration("default");
        ccfg.setAtomicityMode(atomicityMode);
        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
        ccfg.setBackups(1);
        ccfg.setIndexedTypes(new Class[]{Integer.class, String.class});
        return ccfg;
    }
}

