/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.snapshots.tombstone;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.HybridClock;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.manager.IgniteComponent;
import org.apache.ignite3.internal.storage.MvPartitionStorage;
import org.apache.ignite3.internal.storage.StorageClosedException;
import org.apache.ignite3.internal.storage.StorageDestroyedException;
import org.apache.ignite3.internal.storage.StorageRebalanceException;
import org.apache.ignite3.internal.storage.engine.MvTableStorage;
import org.apache.ignite3.internal.table.TableImpl;
import org.apache.ignite3.internal.table.distributed.TableManager;
import org.apache.ignite3.internal.util.ByteUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.vault.VaultEntry;
import org.apache.ignite3.internal.vault.VaultManager;
import org.apache.ignite3.table.Table;
import org.gridgain.internal.snapshots.configuration.ClusterSnapshotConfiguration;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class SnapshotTombstoneManager
implements IgniteComponent {
    public static final int DEFAULT_SNAPSHOT_TOMBSTONES_TTL_MINUTES = 1440;
    private static final ByteArray TOMBSTONE_LOW_WATERMARK_VAULT_KEY = new ByteArray("snapshot-tombstone-low-watermark");
    private static final long CLEAR_INTERVAL_SECONDS = 300L;
    private final ClusterSnapshotConfiguration snapshotConfiguration;
    private final TableManager tableManager;
    private final HybridClock clock;
    private final FailureProcessor failureProcessor;
    private final ScheduledExecutorService clearScheduler;
    private final ExecutorService clearExecutor;
    private final Set<UUID> ongoingIncrementalSnapshots = new HashSet<UUID>();
    private final VaultManager vaultManager;
    @Nullable
    private HybridTimestamp lowWatermark;
    private volatile ScheduledFuture<?> schedulerFuture;
    private CompletableFuture<Void> clearFuture;
    private final Object clearFutureMutex = new Object();
    private final Object lowWatermarkMutex = new Object();

    public SnapshotTombstoneManager(ClusterSnapshotConfiguration snapshotConfiguration, TableManager tableManager, ScheduledExecutorService scheduledExecutorService, ExecutorService clearExecutor, VaultManager vaultManager, HybridClock clock, FailureProcessor failureProcessor) {
        this.snapshotConfiguration = snapshotConfiguration;
        this.tableManager = tableManager;
        this.clearScheduler = scheduledExecutorService;
        this.clearExecutor = clearExecutor;
        this.vaultManager = vaultManager;
        this.clock = clock;
        this.failureProcessor = failureProcessor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        Object object = this.lowWatermarkMutex;
        synchronized (object) {
            this.lowWatermark = this.readLowWatermarkFromVault();
        }
        this.schedulerFuture = this.clearScheduler.scheduleAtFixedRate(() -> {
            Object object = this.clearFutureMutex;
            synchronized (object) {
                if (this.clearFuture == null || this.clearFuture.isDone()) {
                    this.clearFuture = CompletableFuture.supplyAsync(() -> this.runClear(this.getLowWatermark()), this.clearExecutor).thenCompose(Function.identity());
                }
            }
        }, 0L, 300L, TimeUnit.SECONDS);
        return CompletableFutures.nullCompletedFuture();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        ScheduledFuture<?> schedulerFuture = this.schedulerFuture;
        if (schedulerFuture != null) {
            schedulerFuture.cancel(true);
        }
        Object object = this.clearFutureMutex;
        synchronized (object) {
            CompletableFuture<Void> clearFuture = this.clearFuture;
            if (clearFuture != null) {
                clearFuture.cancel(true);
            }
        }
        return CompletableFutures.nullCompletedFuture();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public HybridTimestamp addOngoingIncrementalSnapshot(UUID snapshotId) {
        Object object = this.lowWatermarkMutex;
        synchronized (object) {
            assert (!this.ongoingIncrementalSnapshots.contains(snapshotId)) : snapshotId;
            this.ongoingIncrementalSnapshots.add(snapshotId);
            return this.getLowWatermark();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeOngoingIncrementalSnapshot(UUID snapshotId) {
        Object object = this.lowWatermarkMutex;
        synchronized (object) {
            assert (this.ongoingIncrementalSnapshots.contains(snapshotId)) : snapshotId;
            this.ongoingIncrementalSnapshots.remove(snapshotId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestOnly
    public CompletableFuture<Void> forceClear(HybridTimestamp watermark) {
        Object object = this.clearFutureMutex;
        synchronized (object) {
            this.clearFuture = this.clearFuture.thenCompose(v -> this.runClear(watermark));
            return this.clearFuture;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestOnly
    public Set<UUID> ongoingIncrementalSnapshots() {
        Object object = this.lowWatermarkMutex;
        synchronized (object) {
            return Collections.unmodifiableSet(this.ongoingIncrementalSnapshots);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> runClear(HybridTimestamp watermark) {
        assert (watermark != null);
        Object object = this.lowWatermarkMutex;
        synchronized (object) {
            if (this.ongoingIncrementalSnapshots.isEmpty()) {
                if (this.lowWatermark == null || watermark.compareTo(this.lowWatermark) > 0) {
                    this.lowWatermark = watermark;
                    this.vaultManager.put(TOMBSTONE_LOW_WATERMARK_VAULT_KEY, ByteUtils.longToBytes(this.lowWatermark.longValue()));
                }
            } else {
                return CompletableFutures.nullCompletedFuture();
            }
        }
        ArrayList<CompletionStage> clearFutures = new ArrayList<CompletionStage>();
        for (Table table : this.tableManager.cachedTables()) {
            MvTableStorage tableStorage = ((TableImpl)table).internalTable().storage();
            for (int i = 0; i < tableStorage.getTableDescriptor().getPartitions(); ++i) {
                int partitionId = i;
                MvPartitionStorage mvPartition = tableStorage.getMvPartition(partitionId);
                if (mvPartition == null) continue;
                CompletionStage partitionClearFuture = CompletableFuture.runAsync(() -> mvPartition.clearSnapshotTombstones(watermark), this.clearExecutor).whenComplete((res, e) -> {
                    if (e != null && SnapshotTombstoneManager.notRelatedToStorageState(e)) {
                        String errorMessage = IgniteStringFormatter.format("Error while clearing snapshot tombstones [table={}, partitionId={}]", table.name(), partitionId);
                        this.failureProcessor.process(new FailureContext((Throwable)e, errorMessage));
                    }
                });
                clearFutures.add(partitionClearFuture);
            }
        }
        return CompletableFutures.allOf(clearFutures);
    }

    private static boolean notRelatedToStorageState(Throwable e) {
        return !ExceptionUtils.hasCause(e, StorageRebalanceException.class, StorageClosedException.class, StorageDestroyedException.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private HybridTimestamp getLowWatermark() {
        long ttlMillis = TimeUnit.MINUTES.toMillis((Long)this.snapshotConfiguration.snapshotTombstonesTtlMinutes().value());
        HybridTimestamp calculated = this.clock.now().subtractPhysicalTime(ttlMillis);
        Object object = this.lowWatermarkMutex;
        synchronized (object) {
            return this.lowWatermark == null ? calculated : HybridTimestamp.max(this.lowWatermark, calculated);
        }
    }

    @Nullable
    private HybridTimestamp readLowWatermarkFromVault() {
        VaultEntry vaultEntry = this.vaultManager.get(TOMBSTONE_LOW_WATERMARK_VAULT_KEY);
        return vaultEntry == null ? null : HybridTimestamp.hybridTimestamp(ByteUtils.bytesToLong(vaultEntry.value()));
    }
}

