/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.persistence.pagemem;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointBufferOverflowWatchdog;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.ExponentialBackoffThrottlingStrategy;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.IntervalBasedMeasurement;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottlePolicy;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.SpeedBasedMemoryConsumptionThrottlingStrategy;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteOutClosure;

public class PagesWriteSpeedBasedThrottle
implements PagesWriteThrottlePolicy {
    static final long NO_THROTTLING_MARKER = Long.MIN_VALUE;
    private final PageMemoryImpl pageMemory;
    private final IgniteOutClosure<CheckpointProgress> cpProgress;
    private final GridConcurrentHashSet<Thread> parkedThreads = new GridConcurrentHashSet();
    private final IntervalBasedMeasurement markSpeedAndAvgParkTime = new IntervalBasedMeasurement(250, 3);
    private final CheckpointLockStateChecker cpLockStateChecker;
    private final IgniteLogger log;
    private final AtomicLong prevWarnTime = new AtomicLong();
    private static final long WARN_MIN_DELAY_NS = TimeUnit.SECONDS.toNanos(10L);
    static final double WARN_THRESHOLD = 0.2;
    private final ExponentialBackoffThrottlingStrategy cpBufferProtector = new ExponentialBackoffThrottlingStrategy();
    private final SpeedBasedMemoryConsumptionThrottlingStrategy cleanPagesProtector;
    private final CheckpointBufferOverflowWatchdog cpBufferWatchdog;

    public PagesWriteSpeedBasedThrottle(PageMemoryImpl pageMemory, IgniteOutClosure<CheckpointProgress> cpProgress, CheckpointLockStateChecker stateChecker, IgniteLogger log, MetricRegistry mreg) {
        this.pageMemory = pageMemory;
        this.cpProgress = cpProgress;
        this.cpLockStateChecker = stateChecker;
        this.log = log;
        this.cleanPagesProtector = new SpeedBasedMemoryConsumptionThrottlingStrategy(pageMemory, cpProgress, this.markSpeedAndAvgParkTime);
        this.cpBufferWatchdog = new CheckpointBufferOverflowWatchdog(pageMemory);
        mreg.register("SpeedBasedThrottlingPercentage", this::throttleWeight, "Measurement shows how much throttling time is involved into average marking time.");
        mreg.register("MarkDirtySpeed", this::getMarkDirtySpeed, "Speed of marking pages dirty. Value from past 750-1000 millis only. Pages/second.");
        mreg.register("CpWriteSpeed", this::getCpWriteSpeed, "Speed average checkpoint write speed. Current and 3 past checkpoints used. Pages/second.");
        mreg.register("LastEstimatedSpeedForMarkAll", this::getLastEstimatedSpeedForMarkAll, "Last estimated speed for marking all clear pages as dirty till the end of checkpoint.");
        mreg.register("CurrDirtyRatio", this::getCurrDirtyRatio, "Current dirty pages ratio.");
        mreg.register("TargetDirtyRatio", this::getTargetDirtyRatio, "Target (maximum) dirty pages ratio, after which throttling will start.");
        mreg.register("ThrottleParkTime", this::throttleParkTime, "Exponential backoff counter.");
        mreg.register("CpTotalPages", this.cleanPagesProtector::cpTotalPages, "Number of pages in current checkpoint.");
        mreg.register("CpEvictedPages", this.cleanPagesProtector::cpEvictedPages, "Number of evicted pages.");
        mreg.register("CpWrittenPages", this::cpWrittenPages, "Number of written pages.");
        mreg.register("CpSyncedPages", this.cleanPagesProtector::cpSyncedPages, "Counter for fsynced checkpoint pages.");
        mreg.register("CheckpointBufferPagesCount", pageMemory::checkpointBufferPagesCount, "Number of pages used in checkpoint buffer.");
        mreg.register("CheckpointBufferPagesSize", pageMemory::checkpointBufferPagesSize, "Number of used pages in checkpoint buffer.");
    }

    @Override
    public void onMarkDirty(boolean isPageInCheckpoint) {
        assert (this.cpLockStateChecker.checkpointLockIsHeldByThread());
        long curNanoTime = System.nanoTime();
        long throttleParkTimeNs = this.computeThrottlingParkTime(isPageInCheckpoint, curNanoTime);
        if (throttleParkTimeNs == Long.MIN_VALUE) {
            return;
        }
        if (throttleParkTimeNs > 0L) {
            this.recurrentLogIfNeeded();
            this.doPark(throttleParkTimeNs);
        }
        this.pageMemory.metrics().addThrottlingTime(U.nanosToMillis(System.nanoTime() - curNanoTime));
        this.markSpeedAndAvgParkTime.addMeasurementForAverageCalculation(throttleParkTimeNs);
    }

    private long computeThrottlingParkTime(boolean isPageInCheckpoint, long curNanoTime) {
        if (isPageInCheckpoint && this.isCpBufferOverflowThresholdExceeded()) {
            return this.cpBufferProtector.protectionParkTime();
        }
        if (isPageInCheckpoint) {
            this.cpBufferProtector.resetBackoff();
        }
        return this.cleanPagesProtector.protectionParkTime(curNanoTime);
    }

    protected void doPark(long throttleParkTimeNs) {
        if (throttleParkTimeNs > LOGGING_THRESHOLD) {
            U.warn(this.log, "Parking thread=" + Thread.currentThread().getName() + " for timeout(ms)=" + throttleParkTimeNs / 1000000L);
        }
        this.parkedThreads.add(Thread.currentThread());
        try {
            LockSupport.parkNanos(throttleParkTimeNs);
        }
        finally {
            this.parkedThreads.remove(Thread.currentThread());
        }
    }

    private int cpWrittenPages() {
        CheckpointProgress cpProgress = this.cpProgress.apply();
        if (cpProgress == null) {
            return 0;
        }
        AtomicInteger writtenPagesCntr = cpProgress.writtenPagesCounter();
        return writtenPagesCntr == null ? 0 : writtenPagesCntr.get();
    }

    private void recurrentLogIfNeeded() {
        long prevWarningNs = this.prevWarnTime.get();
        long curNs = System.nanoTime();
        if (prevWarningNs != 0L && curNs - prevWarningNs <= WARN_MIN_DELAY_NS) {
            return;
        }
        double weight = this.throttleWeight();
        if (weight <= 0.2) {
            return;
        }
        if (this.prevWarnTime.compareAndSet(prevWarningNs, curNs) && this.log.isInfoEnabled()) {
            String msg = String.format("Throttling is applied to page modifications [fractionOfParkTime=%.2f, markDirty=%d pages/sec, checkpointWrite=%d pages/sec, estIdealMarkDirty=%d pages/sec, curDirty=%.2f, maxDirty=%.2f, avgParkTime=%d ns, pages: (total=%d, evicted=%d, written=%d, synced=%d, cpBufUsed=%d, cpBufTotal=%d)]", weight, this.getMarkDirtySpeed(), this.getCpWriteSpeed(), this.getLastEstimatedSpeedForMarkAll(), this.getCurrDirtyRatio(), this.getTargetDirtyRatio(), this.throttleParkTime(), this.cleanPagesProtector.cpTotalPages(), this.cleanPagesProtector.cpEvictedPages(), this.cpWrittenPages(), this.cleanPagesProtector.cpSyncedPages(), this.pageMemory.checkpointBufferPagesCount(), this.pageMemory.checkpointBufferPagesSize());
            this.log.info(msg);
        }
    }

    long getCleanPagesProtectionParkTime(double dirtyPagesRatio, long fullyCompletedPages, int cpTotalPages, int nThreads, long markDirtySpeed, long curCpWriteSpeed) {
        return this.cleanPagesProtector.getParkTime(dirtyPagesRatio, fullyCompletedPages, cpTotalPages, nThreads, markDirtySpeed, curCpWriteSpeed);
    }

    @Override
    public void onBeginCheckpoint() {
        this.cleanPagesProtector.reset();
    }

    @Override
    public void onFinishCheckpoint() {
        this.cpBufferProtector.resetBackoff();
        this.cleanPagesProtector.finish();
        this.markSpeedAndAvgParkTime.finishInterval();
        this.unparkParkedThreads();
    }

    private void unparkParkedThreads() {
        this.parkedThreads.forEach(LockSupport::unpark);
    }

    public long throttleParkTime() {
        return this.markSpeedAndAvgParkTime.getAverage();
    }

    public double getTargetDirtyRatio() {
        return this.cleanPagesProtector.getTargetDirtyRatio();
    }

    public double getCurrDirtyRatio() {
        return this.cleanPagesProtector.getCurrDirtyRatio();
    }

    public long getMarkDirtySpeed() {
        return this.markSpeedAndAvgParkTime.getSpeedOpsPerSec(System.nanoTime());
    }

    public long getCpWriteSpeed() {
        return this.cleanPagesProtector.getCpWriteSpeed();
    }

    public long getLastEstimatedSpeedForMarkAll() {
        return this.cleanPagesProtector.getLastEstimatedSpeedForMarkAll();
    }

    public double throttleWeight() {
        long speed = this.markSpeedAndAvgParkTime.getSpeedOpsPerSec(System.nanoTime());
        if (speed <= 0L) {
            return 0.0;
        }
        long timeForOnePage = this.cleanPagesProtector.nsPerOperation(speed);
        if (timeForOnePage == 0L) {
            return 0.0;
        }
        return 1.0 * (double)this.throttleParkTime() / (double)timeForOnePage;
    }

    @Override
    public void wakeupThrottledThreads() {
        if (!this.isCpBufferOverflowThresholdExceeded()) {
            this.cpBufferProtector.resetBackoff();
            this.unparkParkedThreads();
        }
    }

    @Override
    public boolean isCpBufferOverflowThresholdExceeded() {
        return this.cpBufferWatchdog.isInDangerZone();
    }
}

