/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table.distributed.gc;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.ignite3.internal.close.ManuallyCloseable;
import org.apache.ignite3.internal.event.EventListener;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.lowwatermark.LowWatermark;
import org.apache.ignite3.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
import org.apache.ignite3.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.schema.configuration.GcConfiguration;
import org.apache.ignite3.internal.schema.configuration.GcView;
import org.apache.ignite3.internal.table.distributed.gc.GcStorageHandler;
import org.apache.ignite3.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite3.internal.table.distributed.gc.StorageRemovedException;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.util.TrackerClosedException;
import org.apache.ignite3.lang.ErrorGroups;
import org.jetbrains.annotations.TestOnly;

public class MvGc
implements ManuallyCloseable {
    private static final IgniteLogger LOG = Loggers.forClass(MvGc.class);
    private final String nodeName;
    private final GcConfiguration gcConfig;
    private volatile ThreadPoolExecutor executor;
    private final AtomicBoolean closeGuard = new AtomicBoolean();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final LowWatermark lowWatermark;
    private final FailureProcessor failureProcessor;
    private final ConcurrentMap<TablePartitionId, GcStorageHandler> storageHandlerByPartitionId = new ConcurrentHashMap<TablePartitionId, GcStorageHandler>();

    public MvGc(String nodeName, GcConfiguration gcConfig, LowWatermark lowWatermark, FailureProcessor failureProcessor) {
        this.nodeName = nodeName;
        this.gcConfig = gcConfig;
        this.lowWatermark = lowWatermark;
        this.failureProcessor = failureProcessor;
    }

    public void start() {
        this.inBusyLock(() -> {
            int threadCount = (Integer)this.gcConfig.threads().value();
            this.executor = new ThreadPoolExecutor(threadCount, threadCount, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), IgniteThreadFactory.create(this.nodeName, "mv-gc", LOG, ThreadOperation.STORAGE_READ, ThreadOperation.STORAGE_WRITE));
            this.executor.allowCoreThreadTimeOut(true);
            this.lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED, EventListener.fromConsumer(this::onLwmChanged));
        });
    }

    public void addStorage(TablePartitionId tablePartitionId, GcUpdateHandler gcUpdateHandler) {
        this.inBusyLock(() -> {
            GcStorageHandler previous = this.storageHandlerByPartitionId.putIfAbsent(tablePartitionId, new GcStorageHandler(gcUpdateHandler));
            if (previous == null && this.lowWatermark.getLowWatermark() != null) {
                this.scheduleGcForStorage(tablePartitionId);
            }
        });
    }

    public CompletableFuture<Void> removeStorage(TablePartitionId tablePartitionId) {
        return this.inBusyLock(() -> {
            CompletableFuture<Void> gcInProgressFuture;
            GcStorageHandler removed = (GcStorageHandler)this.storageHandlerByPartitionId.remove(tablePartitionId);
            if (removed == null) {
                return CompletableFutures.nullCompletedFuture();
            }
            CompletableFuture<Void> awaitSafeTimeFuture = removed.awaitSafeTimeFuture.get();
            if (awaitSafeTimeFuture != null && !awaitSafeTimeFuture.isDone()) {
                awaitSafeTimeFuture.completeExceptionally(new StorageRemovedException());
            }
            return (gcInProgressFuture = removed.gcInProgressFuture.get()) == null ? CompletableFutures.nullCompletedFuture() : gcInProgressFuture;
        });
    }

    private void onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
        IgniteUtils.inBusyLockSafe(this.busyLock, () -> this.executor.submit(() -> this.inBusyLock(this::initNewGcBusy)));
    }

    @Override
    public void close() throws Exception {
        if (!this.closeGuard.compareAndSet(false, true)) {
            return;
        }
        this.busyLock.block();
        if (this.executor != null) {
            IgniteUtils.shutdownAndAwaitTermination(this.executor, 10L, TimeUnit.SECONDS);
        }
    }

    private void initNewGcBusy() {
        this.storageHandlerByPartitionId.keySet().forEach(this::scheduleGcForStorage);
    }

    private void scheduleGcForStorage(TablePartitionId tablePartitionId) {
        this.inBusyLock(() -> {
            CompletableFuture currentGcFuture = new CompletableFuture();
            CompletableFuture currentAwaitSafeTimeFuture = new CompletableFuture();
            GcStorageHandler storageHandler = this.storageHandlerByPartitionId.compute(tablePartitionId, (tablePartId, gcStorageHandler) -> {
                if (gcStorageHandler == null) {
                    return null;
                }
                CompletableFuture<Void> inProgressFuture = gcStorageHandler.gcInProgressFuture.get();
                CompletableFuture<Void> awaitProgressFuture = gcStorageHandler.awaitSafeTimeFuture.get();
                if (inProgressFuture == null || inProgressFuture.isDone()) {
                    boolean casResult = gcStorageHandler.gcInProgressFuture.compareAndSet(inProgressFuture, currentGcFuture);
                    assert (casResult) : tablePartId;
                    casResult = gcStorageHandler.awaitSafeTimeFuture.compareAndSet(awaitProgressFuture, currentAwaitSafeTimeFuture);
                    assert (casResult) : tablePartId;
                } else {
                    inProgressFuture.whenComplete((unused, throwable) -> this.scheduleGcForStorage(tablePartitionId));
                }
                return gcStorageHandler;
            });
            if (storageHandler == null) {
                return;
            }
            if (storageHandler.gcInProgressFuture.get() != currentGcFuture) {
                return;
            }
            try {
                HybridTimestamp lowWatermark = this.lowWatermark.getLowWatermark();
                assert (lowWatermark != null) : tablePartitionId;
                if (!this.storageHandlerByPartitionId.containsKey(tablePartitionId)) {
                    currentGcFuture.complete(null);
                    return;
                }
                GcUpdateHandler gcUpdateHandler = storageHandler.gcUpdateHandler;
                gcUpdateHandler.getSafeTimeTracker().waitFor(lowWatermark).whenComplete((unused, throwable) -> {
                    if (throwable == null) {
                        currentAwaitSafeTimeFuture.complete(null);
                    } else {
                        currentAwaitSafeTimeFuture.completeExceptionally((Throwable)throwable);
                    }
                });
                ((CompletableFuture)currentAwaitSafeTimeFuture.thenApplyAsync(unused -> gcUpdateHandler.vacuumBatch(lowWatermark, ((GcView)this.gcConfig.value()).batchSize()), (Executor)this.executor)).whenComplete((isGarbageLeft, throwable) -> {
                    if (throwable != null) {
                        if (ExceptionUtils.hasCause(throwable, TrackerClosedException.class, StorageRemovedException.class)) {
                            LOG.debug("Caught an expected exception", (Throwable)throwable);
                            currentGcFuture.complete(null);
                        } else {
                            this.failureProcessor.process(new FailureContext((Throwable)throwable, "Error when running GC"));
                            currentGcFuture.completeExceptionally((Throwable)throwable);
                        }
                        return;
                    }
                    currentGcFuture.complete(null);
                    if (isGarbageLeft.booleanValue() && this.storageHandlerByPartitionId.containsKey(tablePartitionId)) {
                        this.scheduleGcForStorage(tablePartitionId);
                    }
                });
            }
            catch (Throwable t) {
                this.failureProcessor.process(new FailureContext(t, "Error when running GC"));
                currentGcFuture.completeExceptionally(t);
            }
        });
    }

    private <T> T inBusyLock(Supplier<T> supplier) {
        if (!this.busyLock.enterBusy()) {
            throw new IgniteInternalException(ErrorGroups.GarbageCollector.CLOSED_ERR);
        }
        try {
            T t = supplier.get();
            return t;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private void inBusyLock(Runnable runnable) {
        this.inBusyLock(() -> {
            runnable.run();
            return null;
        });
    }

    @TestOnly
    void scheduleGcForAllStorages() {
        this.inBusyLock(this::initNewGcBusy);
    }
}

