/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.managers.encryption;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.encryption.ReencryptStateUtils;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.util.BasicRateLimiter;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.apache.ignite.thread.OomExceptionHandler;

public class CacheGroupPageScanner
implements CheckpointListener {
    private static final String REENCRYPT_THREAD_PREFIX = "reencrypt";
    private final GridKernalContext ctx;
    private final IgniteLogger log;
    private final ReentrantLock lock = new ReentrantLock();
    private final Map<Integer, GroupScanTask> grps = new ConcurrentHashMap<Integer, GroupScanTask>();
    private final Collection<GroupScanTask> cpWaitGrps = new ConcurrentLinkedQueue<GroupScanTask>();
    private final ThreadPoolExecutor singleExecSvc;
    private final int batchSize;
    private final BasicRateLimiter limiter;
    private boolean stopped;

    public CacheGroupPageScanner(GridKernalContext ctx) {
        this.ctx = ctx;
        this.log = ctx.log(this.getClass());
        DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration();
        if (ctx.clientNode() || !CU.isPersistenceEnabled(dsCfg)) {
            this.batchSize = -1;
            this.limiter = null;
            this.singleExecSvc = null;
            return;
        }
        double rateLimit = dsCfg.getEncryptionConfiguration().getReencryptionRateLimit();
        this.limiter = new BasicRateLimiter(this.calcPermits(rateLimit, dsCfg));
        this.batchSize = dsCfg.getEncryptionConfiguration().getReencryptionBatchSize();
        this.singleExecSvc = new IgniteThreadPoolExecutor(REENCRYPT_THREAD_PREFIX, ctx.igniteInstanceName(), 1, 1, 60000L, new LinkedBlockingQueue<Runnable>(), 2, new OomExceptionHandler(ctx));
        this.singleExecSvc.allowCoreThreadTimeOut(true);
    }

    @Override
    public void onCheckpointBegin(CheckpointListener.Context cpCtx) {
    }

    @Override
    public void beforeCheckpointBegin(CheckpointListener.Context cpCtx) {
        HashSet completeCandidates = new HashSet();
        this.cpWaitGrps.removeIf(completeCandidates::add);
        cpCtx.finishedStateFut().listen(f -> {
            if (f.error() != null || f.isCancelled()) {
                this.cpWaitGrps.addAll(completeCandidates);
                return;
            }
            this.lock.lock();
            try {
                for (GroupScanTask grpScanTask : completeCandidates) {
                    this.grps.remove(grpScanTask.groupId());
                    grpScanTask.onDone();
                    if (!this.log.isInfoEnabled()) continue;
                    this.log.info("Cache group reencryption is finished [grpId=" + grpScanTask.groupId() + "]");
                }
                if (!this.grps.isEmpty()) {
                    return;
                }
                ((GridCacheDatabaseSharedManager)this.ctx.cache().context().database()).removeCheckpointListener(this);
            }
            finally {
                this.lock.unlock();
            }
        });
    }

    @Override
    public void onMarkCheckpointBegin(CheckpointListener.Context ctx) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IgniteInternalFuture<Void> schedule(final int grpId) throws IgniteCheckedException {
        CacheGroupContext grp = this.ctx.cache().cacheGroup(grpId);
        if (grp == null || !grp.affinityNode()) {
            if (this.log.isInfoEnabled()) {
                this.log.info("Skip reencryption, cache group doesn't exist on the local node [grp=" + grpId + "]");
            }
            return new GridFinishedFuture<Void>();
        }
        this.lock.lock();
        try {
            GroupScanTask prevState;
            if (this.stopped) {
                throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
            }
            if (this.grps.isEmpty()) {
                ((GridCacheDatabaseSharedManager)this.ctx.cache().context().database()).addCheckpointListener(this);
            }
            if ((prevState = this.grps.get(grpId)) != null && !prevState.isDone()) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Reencryption already scheduled [grpId=" + grpId + "]");
                }
                GroupScanTask groupScanTask = prevState;
                return groupScanTask;
            }
            final HashSet<Integer> parts = new HashSet<Integer>();
            final long[] pagesLeft = new long[1];
            this.forEachPageStore(grp, new IgniteInClosureX<Integer>(){

                @Override
                public void applyx(Integer partId) {
                    long encState = CacheGroupPageScanner.this.ctx.encryption().getEncryptionState(grpId, partId);
                    if (encState == 0L) {
                        if (CacheGroupPageScanner.this.log.isDebugEnabled()) {
                            CacheGroupPageScanner.this.log.debug("Skipping partition reencryption [grp=" + grpId + ", p=" + partId + "]");
                        }
                        return;
                    }
                    parts.add(partId);
                    pagesLeft[0] = pagesLeft[0] + (long)(ReencryptStateUtils.pageCount(encState) - ReencryptStateUtils.pageIndex(encState));
                }
            });
            GroupScanTask grpScan = new GroupScanTask(grp, parts, pagesLeft[0]);
            this.singleExecSvc.submit(grpScan);
            if (this.log.isInfoEnabled()) {
                this.log.info("Scheduled reencryption [grpId=" + grpId + "]");
            }
            this.grps.put(grpId, grpScan);
            GroupScanTask groupScanTask = grpScan;
            return groupScanTask;
        }
        finally {
            this.lock.unlock();
        }
    }

    public IgniteInternalFuture<Void> statusFuture(int grpId) {
        GroupScanTask grpScanTask = this.grps.get(grpId);
        return grpScanTask == null ? new GridFinishedFuture() : grpScanTask;
    }

    public void stop() throws IgniteCheckedException {
        this.lock.lock();
        try {
            this.stopped = true;
            for (GroupScanTask grpScanTask : this.grps.values()) {
                grpScanTask.cancel();
            }
            if (this.singleExecSvc != null) {
                this.singleExecSvc.shutdownNow();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public boolean excludePartition(int grpId, int partId) {
        GroupScanTask grpScanTask = this.grps.get(grpId);
        if (grpScanTask == null) {
            return false;
        }
        return grpScanTask.excludePartition(partId);
    }

    public long[] pagesCount(final CacheGroupContext grp) throws IgniteCheckedException {
        final long[] partStates = new long[grp.affinity().partitions() + 1];
        this.ctx.cache().context().database().checkpointReadLock();
        try {
            this.forEachPageStore(grp, new IgniteInClosureX<Integer>(){

                @Override
                public void applyx(Integer partId) throws IgniteCheckedException {
                    int pagesCnt = CacheGroupPageScanner.this.ctx.cache().context().pageStore().pages(grp.groupId(), partId);
                    partStates[Math.min((int)partId.intValue(), (int)(partStates.length - 1))] = pagesCnt;
                }
            });
        }
        finally {
            this.ctx.cache().context().database().checkpointReadUnlock();
        }
        return partStates;
    }

    public long remainingPagesCount(int grpId) {
        GroupScanTask grpScanTask = this.grps.get(grpId);
        if (grpScanTask != null) {
            return grpScanTask.remainingPagesCount();
        }
        return 0L;
    }

    public double getRate() {
        DataStorageConfiguration dsCfg = this.ctx.config().getDataStorageConfiguration();
        if (CU.isPersistenceEnabled(dsCfg)) {
            return (double)dsCfg.getPageSize() * this.limiter.getRate() / 1048576.0;
        }
        return 0.0;
    }

    public void setRate(double rate) {
        DataStorageConfiguration dsCfg = this.ctx.config().getDataStorageConfiguration();
        if (CU.isPersistenceEnabled(dsCfg)) {
            this.limiter.setRate(this.calcPermits(rate, dsCfg));
        }
    }

    private double calcPermits(double rate, DataStorageConfiguration dsCfg) {
        return rate * 1048576.0 / (double)(dsCfg.getPageSize() == 0 ? 4096 : dsCfg.getPageSize());
    }

    private void forEachPageStore(CacheGroupContext grp, IgniteInClosureX<Integer> hnd) throws IgniteCheckedException {
        int parts = grp.affinity().partitions();
        IgnitePageStoreManager pageStoreMgr = this.ctx.cache().context().pageStore();
        for (int p = 0; p < parts; ++p) {
            if (!pageStoreMgr.exists(grp.groupId(), p)) continue;
            hnd.applyx(p);
        }
        hnd.applyx(65535);
    }

    private class GroupScanTask
    extends GridFutureAdapter<Void>
    implements Runnable {
        private final CacheGroupContext grp;
        private final Set<Integer> parts;
        private final PageMemoryEx pageMem;
        private final AtomicLong remainingPagesCntr;

        public GroupScanTask(CacheGroupContext grp, Set<Integer> parts, long remainingPagesCnt) {
            this.grp = grp;
            this.parts = new GridConcurrentHashSet<Integer>(parts);
            this.remainingPagesCntr = new AtomicLong(remainingPagesCnt);
            this.pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
        }

        @Override
        public synchronized boolean cancel() throws IgniteCheckedException {
            CacheGroupPageScanner.this.grps.remove(this.grp.groupId());
            return this.onCancelled();
        }

        public synchronized boolean excludePartition(int partId) {
            long state = CacheGroupPageScanner.this.ctx.encryption().getEncryptionState(this.groupId(), partId);
            this.remainingPagesCntr.addAndGet(ReencryptStateUtils.pageIndex(state) - ReencryptStateUtils.pageCount(state));
            return this.parts.remove(partId);
        }

        public int groupId() {
            return this.grp.groupId();
        }

        public long remainingPagesCount() {
            return this.remainingPagesCntr.get();
        }

        @Override
        public void run() {
            try {
                for (int partId : this.parts) {
                    long state = CacheGroupPageScanner.this.ctx.encryption().getEncryptionState(this.grp.groupId(), partId);
                    if (state == 0L) continue;
                    this.scanPartition(partId, ReencryptStateUtils.pageIndex(state), ReencryptStateUtils.pageCount(state));
                    if (!this.isDone()) continue;
                    return;
                }
                boolean added = CacheGroupPageScanner.this.cpWaitGrps.add(this);
                assert (added);
            }
            catch (Throwable t2) {
                if (X.hasCause(t2, NodeStoppingException.class)) {
                    this.onCancelled();
                }
                this.onDone(t2);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void scanPartition(int partId, int off, int cnt) throws IgniteCheckedException {
            if (CacheGroupPageScanner.this.log.isDebugEnabled()) {
                CacheGroupPageScanner.this.log.debug("Partition reencryption is started [grpId=" + this.grp.groupId() + ", p=" + partId + ", remain=" + (cnt - off) + ", total=" + cnt + "]");
            }
            while (off < cnt) {
                int pagesCnt = Math.min(CacheGroupPageScanner.this.batchSize, cnt - off);
                CacheGroupPageScanner.this.limiter.acquire(pagesCnt);
                GroupScanTask groupScanTask = this;
                synchronized (groupScanTask) {
                    if (this.isDone() || !this.parts.contains(partId)) {
                        break;
                    }
                    CacheGroupPageScanner.this.ctx.cache().context().database().checkpointReadLock();
                    try {
                        off += this.scanPages(partId, off, pagesCnt);
                    }
                    finally {
                        CacheGroupPageScanner.this.ctx.cache().context().database().checkpointReadUnlock();
                    }
                }
                this.remainingPagesCntr.addAndGet(-pagesCnt);
                CacheGroupPageScanner.this.ctx.encryption().setEncryptionState(this.grp, partId, off, cnt);
            }
            if (CacheGroupPageScanner.this.log.isDebugEnabled()) {
                CacheGroupPageScanner.this.log.debug("Partition reencryption is finished [grpId=" + this.grp.groupId() + ", p=" + partId + ", remain=" + (cnt - off) + ", total=" + cnt + "]");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private int scanPages(int partId, int off, int cnt) throws IgniteCheckedException {
            int grpId = this.grp.groupId();
            byte flag = GroupPartitionId.getFlagByPartId(partId);
            for (int pageIdx = off; pageIdx < off + cnt; ++pageIdx) {
                long pageId = PageIdUtils.pageId(partId, flag, pageIdx);
                long page = this.pageMem.acquirePage(grpId, pageId);
                try {
                    if (this.pageMem.isDirty(grpId, pageId, page)) continue;
                    this.pageMem.writeLock(grpId, pageId, page, true);
                    this.pageMem.writeUnlock(grpId, pageId, page, null, true);
                    continue;
                }
                finally {
                    this.pageMem.releasePage(grpId, pageId, page);
                }
            }
            return cnt;
        }
    }
}

