/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.query.h2;

import java.io.File;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.stream.LongStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.index.DynamicIndexAbstractSelfTest;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.junit.Test;

public class GridIndexRebuildSelfTest
extends DynamicIndexAbstractSelfTest {
    protected static final int AMOUNT = 50;
    protected static final String CACHE_NAME = "T";
    private static GridIndexRebuildSelfTest INSTANCE;
    private final CountDownLatch rebuildLatch = new CountDownLatch(1);
    private Integer buildIdxThreadPoolSize;
    private Class<? extends GridQueryIndexing> qryIndexingCls = BlockingIndexing.class;

    @Override
    protected IgniteConfiguration commonConfiguration(int idx) throws Exception {
        IgniteConfiguration cfg = super.commonConfiguration(idx);
        cfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration().setMaxSize(314572800L).setPersistenceEnabled(true);
        if (Objects.nonNull(this.buildIdxThreadPoolSize)) {
            cfg.setBuildIndexThreadPoolSize(this.buildIdxThreadPoolSize.intValue());
        }
        return cfg;
    }

    @Override
    protected IgniteConfiguration serverConfiguration(int idx) throws Exception {
        IgniteConfiguration cfg = super.serverConfiguration(idx);
        if (Objects.nonNull(this.qryIndexingCls)) {
            GridQueryProcessor.idxCls = this.qryIndexingCls;
        }
        return cfg;
    }

    protected void beforeTest() throws Exception {
        super.beforeTest();
        this.cleanPersistenceDir();
        INSTANCE = this;
        BlockingIndexing.slowRebuildIdxFut = false;
    }

    protected void afterTest() throws Exception {
        super.afterTest();
        this.stopAllGrids();
        this.cleanPersistenceDir();
        GridQueryProcessor.idxCls = null;
    }

    @Override
    protected void afterTestsStopped() throws Exception {
        this.cleanPersistenceDir();
        super.afterTestsStopped();
    }

    @Test
    public void testIndexRebuild() throws Exception {
        IgniteEx srv = this.startServer();
        IgniteInternalCache cc = this.createAndFillTableWithIndex(srv);
        this.checkDataState(srv, false);
        File idxPath = this.indexFile(cc);
        this.stopAllGrids();
        GridIndexRebuildSelfTest.assertTrue((boolean)IgniteUtils.delete((File)idxPath));
        srv = this.startServer();
        this.putData((Ignite)srv, true);
        this.checkDataState(srv, true);
    }

    @Test
    public void testDefaultCntThreadForRebuildIdx() throws Exception {
        this.checkCntThreadForRebuildIdx(IgniteConfiguration.DFLT_BUILD_IDX_THREAD_POOL_SIZE);
    }

    @Test
    public void testCustomCntThreadForRebuildIdx() throws Exception {
        this.checkCntThreadForRebuildIdx(6);
    }

    @Test
    public void testDataRaceWhenMarkIdxRebuild() throws Exception {
        IgniteEx srv = this.startServer();
        IgniteInternalCache internalCache = this.createAndFillTableWithIndex(srv);
        File idxFile = this.indexFile(internalCache);
        this.stopAllGrids();
        GridIndexRebuildSelfTest.assertTrue((boolean)IgniteUtils.delete((File)idxFile));
        BlockingIndexing.slowRebuildIdxFut = true;
        srv = this.startServer();
        srv.cache(CACHE_NAME).indexReadyFuture().get();
        IgniteH2Indexing idx = (IgniteH2Indexing)srv.context().query().getIndexing();
        GridH2Table tbl = idx.schemaManager().dataTable("PUBLIC", CACHE_NAME);
        GridIndexRebuildSelfTest.assertNotNull((Object)tbl);
        GridIndexRebuildSelfTest.assertFalse((boolean)tbl.rebuildFromHashInProgress());
    }

    private void checkCntThreadForRebuildIdx(int buildIdxThreadCnt) throws Exception {
        this.qryIndexingCls = null;
        IgniteEx srv = this.startServer();
        IgniteInternalCache internalCache = this.createAndFillTableWithIndex(srv);
        int partCnt = internalCache.configuration().getAffinity().partitions();
        GridIndexRebuildSelfTest.assertTrue((partCnt > buildIdxThreadCnt ? 1 : 0) != 0);
        File idxPath = this.indexFile(internalCache);
        this.stopAllGrids();
        GridIndexRebuildSelfTest.assertTrue((boolean)IgniteUtils.delete((File)idxPath));
        this.buildIdxThreadPoolSize = buildIdxThreadCnt;
        srv = this.startServer();
        srv.cache(CACHE_NAME).indexReadyFuture().get();
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        long buildIdxRunnerCnt = LongStream.of(threadMXBean.getAllThreadIds()).mapToObj(threadMXBean::getThreadInfo).filter(threadInfo -> threadInfo.getThreadName().startsWith("build-idx-runner")).count();
        GridIndexRebuildSelfTest.assertEquals((long)buildIdxThreadCnt, (long)buildIdxRunnerCnt);
    }

    private IgniteInternalCache createAndFillTableWithIndex(IgniteEx node) throws Exception {
        Objects.requireNonNull(node);
        String cacheName = CACHE_NAME;
        this.execute((Ignite)node, "CREATE TABLE T(k int primary key, v int) WITH \"cache_name=" + cacheName + ",wrap_value=false,atomicity=transactional\"");
        this.execute((Ignite)node, "CREATE INDEX IDX ON T(v)");
        IgniteInternalCache cc = node.cachex(cacheName);
        GridIndexRebuildSelfTest.assertNotNull((Object)cc);
        this.putData((Ignite)node, false);
        return cc;
    }

    protected File indexFile(IgniteInternalCache internalCache) {
        Objects.requireNonNull(internalCache);
        File cacheWorkDir = ((FilePageStoreManager)internalCache.context().shared().pageStore()).cacheWorkDir(internalCache.configuration());
        return cacheWorkDir.toPath().resolve("index.bin").toFile();
    }

    protected void checkDataState(IgniteEx srv, boolean afterRebuild) throws IgniteCheckedException {
        IgniteInternalCache icache = srv.cachex(CACHE_NAME);
        IgniteCache cache = srv.cache(CACHE_NAME);
        GridIndexRebuildSelfTest.assertNotNull((Object)icache);
        for (IgniteCacheOffheapManager.CacheDataStore store : icache.context().offheap().cacheDataStores()) {
            GridCursor cur = store.cursor();
            while (cur.next()) {
                CacheDataRow row = (CacheDataRow)cur.get();
                int key = (Integer)row.key().value((CacheObjectValueContext)icache.context().cacheObjectContext(), false);
                if (!afterRebuild || key <= 25) {
                    GridIndexRebuildSelfTest.assertEquals((Object)key, (Object)cache.get((Object)key));
                    continue;
                }
                GridIndexRebuildSelfTest.assertEquals((Object)-1, (Object)cache.get((Object)key));
            }
        }
    }

    protected void putData(Ignite node, boolean forConcurrentPut) throws Exception {
        IgniteCache cache = node.cache(CACHE_NAME);
        GridIndexRebuildSelfTest.assertNotNull((Object)cache);
        for (int i = 1; i <= 50; ++i) {
            if (forConcurrentPut) {
                if (i <= 25) continue;
                cache.put((Object)i, (Object)-1);
                this.rebuildLatch.countDown();
                continue;
            }
            for (int j = 1; j <= i; ++j) {
                cache.put((Object)i, (Object)j);
            }
        }
    }

    protected IgniteEx startServer() throws Exception {
        IgniteEx srvNode = this.startGrid(this.serverConfiguration(0));
        srvNode.active(true);
        return srvNode;
    }

    private static class BlockingIndexing
    extends IgniteH2Indexing {
        private boolean firstRbld = true;
        static boolean slowRebuildIdxFut;

        private BlockingIndexing() {
        }

        protected void rebuildIndexesFromHash0(GridCacheContext cctx, SchemaIndexCacheVisitorClosure clo, GridFutureAdapter<Void> rebuildIdxFut, SchemaIndexOperationCancellationToken cancel) {
            if (!this.firstRbld) {
                try {
                    U.await((CountDownLatch)INSTANCE.rebuildLatch);
                }
                catch (IgniteInterruptedCheckedException e) {
                    throw new IgniteException((Throwable)e);
                }
            } else {
                this.firstRbld = false;
            }
            if (slowRebuildIdxFut) {
                rebuildIdxFut.listen((IgniteInClosure & Serializable)fut -> {
                    try {
                        U.sleep((long)1000L);
                    }
                    catch (IgniteInterruptedCheckedException e) {
                        log.error("Error while slow down " + fut, (Throwable)e);
                    }
                });
            }
            super.rebuildIndexesFromHash0(cctx, clo, rebuildIdxFut, cancel);
        }
    }
}

