/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.metastorage.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.ignite3.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.manager.IgniteComponent;
import org.apache.ignite3.internal.metastorage.Revisions;
import org.apache.ignite3.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite3.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration;
import org.apache.ignite3.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite3.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite3.internal.metastorage.server.ReadOperationForCompactionTracker;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteBusyLock;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;

public class MetaStorageCompactionTrigger
implements IgniteComponent {
    private static final IgniteLogger LOG = Loggers.forClass(MetaStorageCompactionTrigger.class);
    private final String localNodeName;
    private final KeyValueStorage storage;
    private final MetaStorageManagerImpl metaStorageManager;
    private final FailureProcessor failureProcessor;
    private final ReadOperationForCompactionTracker readOperationForCompactionTracker;
    private final MetaStorageCompactionTriggerConfiguration config;
    private final ScheduledExecutorService compactionExecutor;
    @Nullable
    private ScheduledFuture<?> lastScheduledFuture;
    private boolean isLocalNodeLeader;
    private boolean started;
    private final Lock lock = new ReentrantLock();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean stopGuard = new AtomicBoolean();

    public MetaStorageCompactionTrigger(String localNodeName, KeyValueStorage storage, MetaStorageManagerImpl metaStorageManager, FailureProcessor failureProcessor, ReadOperationForCompactionTracker readOperationForCompactionTracker, SystemDistributedConfiguration systemDistributedConfig) {
        this.localNodeName = localNodeName;
        this.storage = storage;
        this.metaStorageManager = metaStorageManager;
        this.failureProcessor = failureProcessor;
        this.readOperationForCompactionTracker = readOperationForCompactionTracker;
        this.config = new MetaStorageCompactionTriggerConfiguration(systemDistributedConfig);
        this.compactionExecutor = Executors.newSingleThreadScheduledExecutor(IgniteThreadFactory.create(localNodeName, "metastorage-compaction-executor", LOG, new ThreadOperation[0]));
        storage.registerCompactionRevisionUpdateListener(this::onCompactionRevisionUpdate);
        metaStorageManager.addElectionListener(this::onLeaderElected);
    }

    @Override
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            this.lock.lock();
            try {
                this.config.init();
                this.startCompactionOnRecoveryInBackground();
                this.started = true;
                this.scheduleNextCompactionBusy();
                CompletableFuture completableFuture = CompletableFutures.nullCompletedFuture();
                return completableFuture;
            }
            finally {
                this.lock.unlock();
            }
        });
    }

    @Override
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.busyLock.block();
        this.cancelLastScheduledFutureBusy();
        IgniteUtils.shutdownAndAwaitTermination(this.compactionExecutor, 10L, TimeUnit.SECONDS);
        return CompletableFutures.nullCompletedFuture();
    }

    private void doCompactionBusy() {
        this.lock.lock();
        try {
            if (!this.isLocalNodeLeader) {
                return;
            }
            HybridTimestamp candidateCompactionRevisionTimestamp = this.createCandidateCompactionRevisionTimestampBusy();
            Long newCompactionRevision = this.calculateCandidateCompactionRevisionBusy(candidateCompactionRevisionTimestamp);
            if (newCompactionRevision == null) {
                this.scheduleNextCompactionBusy();
            } else {
                this.metaStorageManager.sendCompactionCommand(newCompactionRevision).whenComplete((unused, throwable) -> {
                    if (throwable != null && !ExceptionUtils.hasCause(throwable, NodeStoppingException.class)) {
                        String errorMessage = String.format("Unknown error occurred while sending the metastorage compaction command: [newCompactionRevision=%s]", newCompactionRevision);
                        this.failureProcessor.process(new FailureContext((Throwable)throwable, errorMessage));
                        IgniteUtils.inBusyLockSafe(this.busyLock, this::scheduleNextCompactionBusy);
                    }
                });
            }
        }
        catch (Throwable t) {
            this.failureProcessor.process(new FailureContext(t, "Unknown error on new metastorage compaction revision scheduling"));
            IgniteUtils.inBusyLockSafe(this.busyLock, this::scheduleNextCompactionBusy);
        }
        finally {
            this.lock.unlock();
        }
    }

    private HybridTimestamp createCandidateCompactionRevisionTimestampBusy() {
        HybridTimestamp safeTime = this.metaStorageManager.clusterTime().currentSafeTime();
        long dataAvailabilityTime = this.config.dataAvailabilityTime();
        return safeTime.getPhysical() <= dataAvailabilityTime ? HybridTimestamp.MIN_VALUE : safeTime.subtractPhysicalTime(dataAvailabilityTime);
    }

    @Nullable
    private Long calculateCandidateCompactionRevisionBusy(HybridTimestamp candidateTimestamp) {
        try {
            long candidateCompactionRevision = this.storage.revisionByTimestamp(candidateTimestamp);
            long currentStorageRevision = this.storage.revision();
            if (candidateCompactionRevision >= currentStorageRevision) {
                candidateCompactionRevision = currentStorageRevision - 1L;
            }
            return candidateCompactionRevision <= this.storage.getCompactionRevision() ? null : Long.valueOf(candidateCompactionRevision);
        }
        catch (CompactedException exception) {
            return null;
        }
    }

    private void scheduleNextCompactionBusy() {
        this.lock.lock();
        try {
            if (this.started && this.isLocalNodeLeader) {
                this.lastScheduledFuture = this.compactionExecutor.schedule(() -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, this::doCompactionBusy), this.config.interval(), TimeUnit.MILLISECONDS);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void scheduleNextCompactionIfNotScheduleBusy() {
        this.lock.lock();
        try {
            ScheduledFuture<?> lastScheduledFuture;
            if (this.started && this.isLocalNodeLeader && ((lastScheduledFuture = this.lastScheduledFuture) == null || lastScheduledFuture.isDone())) {
                this.lastScheduledFuture = this.compactionExecutor.schedule(() -> IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, this::doCompactionBusy), this.config.interval(), TimeUnit.MILLISECONDS);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void onCompactionRevisionUpdate(long compactionRevision) {
        IgniteUtils.inBusyLockSafe(this.busyLock, () -> this.onCompactionRevisionUpdateBusy(compactionRevision));
    }

    private void onCompactionRevisionUpdateBusy(long compactionRevision) {
        ((CompletableFuture)((CompletableFuture)CompletableFuture.supplyAsync(() -> this.readOperationForCompactionTracker.collect(compactionRevision), this.compactionExecutor).thenComposeAsync(Function.identity(), (Executor)this.compactionExecutor)).thenRunAsync(() -> this.storage.compact(compactionRevision), this.compactionExecutor)).whenComplete((unused, throwable) -> {
            if (throwable != null && !ExceptionUtils.hasCause(throwable, NodeStoppingException.class, RejectedExecutionException.class)) {
                String errorMessage = String.format("Unknown error on new metastorage compaction revision: %s", compactionRevision);
                this.failureProcessor.process(new FailureContext((Throwable)throwable, errorMessage));
            }
            IgniteUtils.inBusyLockSafe(this.busyLock, this::scheduleNextCompactionIfNotScheduleBusy);
        });
    }

    private void onLeaderElected(InternalClusterNode newLeader) {
        IgniteUtils.inBusyLockSafe(this.busyLock, () -> this.onLeaderElectedBusy(newLeader));
    }

    private void onLeaderElectedBusy(InternalClusterNode newLeader) {
        this.lock.lock();
        try {
            if (this.localNodeName.equals(newLeader.name())) {
                this.isLocalNodeLeader = true;
                this.scheduleNextCompactionBusy();
            } else {
                this.isLocalNodeLeader = false;
                this.cancelLastScheduledFutureBusy();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void cancelLastScheduledFutureBusy() {
        this.lock.lock();
        try {
            ScheduledFuture<?> lastScheduledFuture = this.lastScheduledFuture;
            if (lastScheduledFuture != null) {
                lastScheduledFuture.cancel(true);
            }
            this.lastScheduledFuture = null;
        }
        finally {
            this.lock.unlock();
        }
    }

    private void startCompactionOnRecoveryInBackground() {
        CompletableFuture<Revisions> recoveryFuture = this.metaStorageManager.recoveryFinishedFuture();
        assert (recoveryFuture.isDone());
        long recoveredCompactionRevision = recoveryFuture.join().compactionRevision();
        if (recoveredCompactionRevision != -1L) {
            CompletableFuture.runAsync(() -> IgniteUtils.inBusyLockSafe(this.busyLock, () -> this.storage.compact(recoveredCompactionRevision)), this.compactionExecutor).whenComplete((unused, throwable) -> {
                if (throwable != null && !ExceptionUtils.hasCause(throwable, NodeStoppingException.class, RejectedExecutionException.class)) {
                    String errorMessage = String.format("Unknown error during metastore compaction launched on node recovery: [compactionRevision=%s]", recoveredCompactionRevision);
                    this.failureProcessor.process(new FailureContext((Throwable)throwable, errorMessage));
                }
            });
        }
    }
}

