/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.query.schema;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheStat;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationException;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.jetbrains.annotations.Nullable;

public class SchemaIndexCachePartitionWorker
extends GridWorker {
    public static final int DFLT_IGNITE_INDEX_REBUILD_BATCH_SIZE = 1000;
    private final int batchSize = IgniteSystemProperties.getInteger("IGNITE_INDEX_REBUILD_BATCH_SIZE", 1000);
    private final GridCacheContext cctx;
    private final AtomicBoolean stop;
    @Nullable
    private final SchemaIndexOperationCancellationToken cancel;
    private final SchemaIndexCacheVisitorClosureWrapper wrappedClo;
    private final GridDhtLocalPartition locPart;
    private final GridFutureAdapter<SchemaIndexCacheStat> fut;
    private final AtomicInteger partsCnt;

    public SchemaIndexCachePartitionWorker(GridCacheContext cctx, GridDhtLocalPartition locPart, AtomicBoolean stop, @Nullable SchemaIndexOperationCancellationToken cancel, SchemaIndexCacheVisitorClosure clo, GridFutureAdapter<SchemaIndexCacheStat> fut, AtomicInteger partsCnt) {
        super(cctx.igniteInstanceName(), "parallel-idx-worker-" + cctx.cache().name() + "-part-" + locPart.id(), cctx.logger(SchemaIndexCachePartitionWorker.class));
        this.cctx = cctx;
        this.locPart = locPart;
        this.cancel = cancel;
        assert (Objects.nonNull(stop));
        assert (Objects.nonNull(clo));
        assert (Objects.nonNull(fut));
        assert (Objects.nonNull(partsCnt));
        this.stop = stop;
        this.wrappedClo = new SchemaIndexCacheVisitorClosureWrapper(clo);
        this.fut = fut;
        this.partsCnt = partsCnt;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
        Throwable err = null;
        try {
            this.processPartition();
            this.fut.onDone(this.wrappedClo.indexCacheStat, err);
        }
        catch (Throwable e) {
            try {
                err = Error.class.isInstance(e) ? new IgniteException(e) : e;
                U.error(this.log, "Error during create/rebuild index for partition: " + this.locPart.id(), e);
                this.stop.set(true);
                int cnt = this.partsCnt.getAndSet(0);
                if (cnt > 0) {
                    this.cctx.group().metrics().addIndexBuildCountPartitionsLeft(-cnt);
                }
            }
            catch (Throwable throwable) {
                throw throwable;
            }
            finally {
                this.fut.onDone(this.wrappedClo.indexCacheStat, err);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processPartition() throws IgniteCheckedException {
        if (this.stop()) {
            return;
        }
        this.checkCancelled();
        boolean reserved = false;
        GridDhtPartitionState partState = this.locPart.state();
        if (partState != GridDhtPartitionState.EVICTED) {
            boolean bl = reserved = (partState == GridDhtPartitionState.OWNING || partState == GridDhtPartitionState.MOVING || partState == GridDhtPartitionState.LOST) && this.locPart.reserve();
        }
        if (!reserved) {
            return;
        }
        try (GridCursor<? extends CacheDataRow> cursor = this.locPart.dataStore().cursor(this.cctx.cacheId(), null, null, CacheDataRowAdapter.RowData.KEY_ONLY);){
            boolean locked = false;
            try {
                int cntr = 0;
                while (!this.stop() && cursor.next()) {
                    KeyCacheObject key = cursor.get().key();
                    if (!locked) {
                        this.cctx.shared().database().checkpointReadLock();
                        locked = true;
                    }
                    this.processKey(key);
                    if (++cntr % this.batchSize == 0) {
                        this.cctx.shared().database().checkpointReadUnlock();
                        locked = false;
                    }
                    this.cctx.cache().metrics0().addIndexRebuildKeyProcessed(1L);
                    if (this.locPart.state() != GridDhtPartitionState.RENTING) continue;
                    break;
                }
                this.wrappedClo.addNumberProcessedKeys(cntr);
            }
            finally {
                if (locked) {
                    this.cctx.shared().database().checkpointReadUnlock();
                }
            }
        }
        catch (Exception e) {
            if (!(e instanceof SchemaIndexOperationCancellationException)) {
                throw new IgniteCheckedException(e);
            }
        }
        finally {
            this.locPart.release();
            if (this.partsCnt.getAndUpdate(v -> v > 0 ? v - 1 : 0) > 0) {
                this.cctx.group().metrics().decrementIndexBuildCountPartitionsLeft();
            }
        }
    }

    private void processKey(KeyCacheObject key) throws IgniteCheckedException {
        assert (Objects.nonNull(key));
        while (!this.stop()) {
            try {
                this.checkCancelled();
                GridCacheEntryEx entry = this.cctx.cache().entryEx(key);
                try {
                    entry.updateIndex(this.wrappedClo);
                    break;
                }
                finally {
                    entry.touch();
                }
            }
            catch (GridDhtInvalidPartitionException ignore) {
                break;
            }
            catch (GridCacheEntryRemovedException gridCacheEntryRemovedException) {
            }
        }
    }

    private void checkCancelled() throws SchemaIndexOperationCancellationException {
        if (Objects.nonNull(this.cancel) && this.cancel.isCancelled()) {
            throw new SchemaIndexOperationCancellationException("Index creation was cancelled.");
        }
    }

    private boolean stop() {
        return this.stop.get() || this.cctx.kernalContext().isStopping();
    }

    @Override
    public String toString() {
        return S.toString(SchemaIndexCachePartitionWorker.class, this);
    }

    private class SchemaIndexCacheVisitorClosureWrapper
    implements SchemaIndexCacheVisitorClosure {
        private final SchemaIndexCacheVisitorClosure clo;
        @Nullable
        private final SchemaIndexCacheStat indexCacheStat;

        private SchemaIndexCacheVisitorClosureWrapper(SchemaIndexCacheVisitorClosure clo) {
            this.clo = clo;
            this.indexCacheStat = IgniteSystemProperties.getBoolean("IGNITE_ENABLE_EXTRA_INDEX_REBUILD_LOGGING", false) ? new SchemaIndexCacheStat() : null;
        }

        @Override
        public void apply(CacheDataRow row) throws IgniteCheckedException {
            if (row != null) {
                QueryTypeDescriptorImpl type;
                this.clo.apply(row);
                if (this.indexCacheStat != null && (type = SchemaIndexCachePartitionWorker.this.cctx.kernalContext().query().typeByValue(SchemaIndexCachePartitionWorker.this.cctx.cache().name(), SchemaIndexCachePartitionWorker.this.cctx.cacheObjectContext(), row.key(), row.value(), true)) != null) {
                    this.indexCacheStat.addType(type);
                }
            }
        }

        private void addNumberProcessedKeys(int cnt) {
            if (Objects.nonNull(this.indexCacheStat)) {
                this.indexCacheStat.add(cnt);
            }
        }
    }
}

