/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.metastorage.server;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.CompactionRevisionUpdateListener;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.RevisionUpdateListener;
import org.apache.ignite3.internal.metastorage.Revisions;
import org.apache.ignite3.internal.metastorage.WatchListener;
import org.apache.ignite3.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite3.internal.metastorage.impl.EntryImpl;
import org.apache.ignite3.internal.metastorage.server.AdvanceSafeTimeEvent;
import org.apache.ignite3.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite3.internal.metastorage.server.KeyValueStorageUtils;
import org.apache.ignite3.internal.metastorage.server.KeyValueUpdateContext;
import org.apache.ignite3.internal.metastorage.server.NotificationEnqueuedListener;
import org.apache.ignite3.internal.metastorage.server.NotifyWatchProcessorEvent;
import org.apache.ignite3.internal.metastorage.server.ReadOperationForCompactionTracker;
import org.apache.ignite3.internal.metastorage.server.RecoveryRevisionsListener;
import org.apache.ignite3.internal.metastorage.server.Value;
import org.apache.ignite3.internal.metastorage.server.Watch;
import org.apache.ignite3.internal.metastorage.server.WatchProcessor;
import org.apache.ignite3.internal.rocksdb.RocksUtils;
import org.jetbrains.annotations.Nullable;

public abstract class AbstractKeyValueStorage
implements KeyValueStorage {
    protected static final Comparator<byte[]> KEY_COMPARATOR = Arrays::compareUnsigned;
    protected final IgniteLogger log = Loggers.forClass(this.getClass());
    protected final FailureProcessor failureProcessor;
    protected final WatchProcessor watchProcessor;
    protected final Object watchProcessorMutex = new Object();
    @Nullable
    private volatile RecoveryRevisionsListener recoveryRevisionListener;
    protected volatile long rev;
    protected volatile long compactionRevision = -1L;
    private volatile long plannedUpdateCompactionRevision = -1L;
    protected final AtomicBoolean stopCompaction = new AtomicBoolean();
    protected final ReadOperationForCompactionTracker readOperationForCompactionTracker;
    @Nullable
    protected TreeSet<NotifyWatchProcessorEvent> notifyWatchProcessorEventsBeforeStartingWatches = new TreeSet();
    private final List<CompactionRevisionUpdateListener> compactionRevisionUpdateListeners = new CopyOnWriteArrayList<CompactionRevisionUpdateListener>();

    protected AbstractKeyValueStorage(String nodeName, FailureProcessor failureProcessor, ReadOperationForCompactionTracker readOperationForCompactionTracker) {
        this.failureProcessor = failureProcessor;
        this.readOperationForCompactionTracker = readOperationForCompactionTracker;
        this.watchProcessor = new WatchProcessor(nodeName, this::get, failureProcessor);
    }

    protected abstract long[] keyRevisionsForOperation(byte[] var1);

    @Nullable
    protected abstract Value valueForOperation(byte[] var1, long var2);

    private boolean isInRecoveryState() {
        return this.recoveryRevisionListener != null;
    }

    protected abstract boolean areWatchesStarted();

    @Override
    public void registerNotificationEnqueuedListener(NotificationEnqueuedListener listener) {
        this.watchProcessor.registerNotificationEnqueuedListener(listener);
    }

    @Override
    public Entry get(byte[] key) {
        return this.doGet(key, this.rev);
    }

    @Override
    public Entry get(byte[] key, long revUpperBound) {
        return this.doGet(key, revUpperBound);
    }

    @Override
    public List<Entry> getAll(List<byte[]> keys) {
        return this.doGetAll(keys, this.rev);
    }

    @Override
    public List<Entry> getAll(List<byte[]> keys, long revUpperBound) {
        return this.doGetAll(keys, revUpperBound);
    }

    @Override
    public long revision() {
        return this.rev;
    }

    @Override
    public void saveCompactionRevision(long revision, KeyValueUpdateContext context) {
        assert (revision >= 0L) : revision;
        KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent(revision, this.rev);
        this.saveCompactionRevision(revision, context, true);
    }

    protected abstract void saveCompactionRevision(long var1, KeyValueUpdateContext var3, boolean var4);

    @Override
    public void setCompactionRevision(long revision) {
        assert (revision >= 0L) : revision;
        KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent(revision, this.rev);
        this.compactionRevision = revision;
        this.notifyRevisionsUpdate();
    }

    @Override
    public long getCompactionRevision() {
        return this.compactionRevision;
    }

    @Override
    public void updateCompactionRevision(long compactionRevision, KeyValueUpdateContext context) {
        assert (compactionRevision >= 0L) : compactionRevision;
        KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent(compactionRevision, this.rev);
        this.saveCompactionRevision(compactionRevision, context, false);
        if (this.isInRecoveryState()) {
            this.setCompactionRevision(compactionRevision);
        } else if (compactionRevision > this.plannedUpdateCompactionRevision) {
            this.plannedUpdateCompactionRevision = compactionRevision;
            this.notifyWatchProcessor(new AdvanceSafeTimeEvent(() -> {
                this.setCompactionRevision(compactionRevision);
                this.compactionRevisionUpdateListeners.forEach(listener -> listener.onUpdate(compactionRevision));
            }, context.timestamp));
        } else if (this.areWatchesStarted()) {
            this.watchProcessor.advanceSafeTime(() -> {}, context.timestamp);
        }
    }

    @Override
    public void stopCompaction() {
        this.stopCompaction.set(true);
    }

    @Override
    public byte @Nullable [] nextKey(byte[] key) {
        return RocksUtils.incrementPrefix(key);
    }

    @Override
    public void registerRevisionUpdateListener(RevisionUpdateListener listener) {
        this.watchProcessor.registerRevisionUpdateListener(listener);
    }

    @Override
    public void unregisterRevisionUpdateListener(RevisionUpdateListener listener) {
        this.watchProcessor.unregisterRevisionUpdateListener(listener);
    }

    @Override
    public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long newRevision) {
        return this.watchProcessor.notifyUpdateRevisionListeners(newRevision);
    }

    @Override
    public void registerCompactionRevisionUpdateListener(CompactionRevisionUpdateListener listener) {
        this.compactionRevisionUpdateListeners.add(listener);
    }

    @Override
    public void unregisterCompactionRevisionUpdateListener(CompactionRevisionUpdateListener listener) {
        this.compactionRevisionUpdateListeners.remove(listener);
    }

    @Override
    public void setRecoveryRevisionsListener(@Nullable RecoveryRevisionsListener listener) {
        this.recoveryRevisionListener = listener;
    }

    @Override
    public void removeWatch(WatchListener listener) {
        this.watchProcessor.removeWatch(listener);
    }

    @Override
    public void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev, WatchListener listener) {
        assert (rev > 0L) : rev;
        Predicate<byte[]> rangePredicate = keyTo == null ? k -> KEY_COMPARATOR.compare(keyFrom, (byte[])k) <= 0 : k -> KEY_COMPARATOR.compare(keyFrom, (byte[])k) <= 0 && KEY_COMPARATOR.compare(keyTo, (byte[])k) > 0;
        this.watchProcessor.addWatch(new Watch(rev, listener, rangePredicate));
    }

    @Override
    public void watchExact(Collection<byte[]> keys, long rev, WatchListener listener) {
        assert (rev > 0L) : rev;
        assert (!keys.isEmpty());
        TreeSet<byte[]> keySet = new TreeSet<byte[]>(KEY_COMPARATOR);
        keySet.addAll(keys);
        Predicate<byte[]> inPredicate = keySet::contains;
        this.watchProcessor.addWatch(new Watch(rev, listener, inPredicate));
    }

    @Override
    public void watchExact(byte[] key, long rev, WatchListener listener) {
        assert (rev > 0L) : rev;
        Predicate<byte[]> exactPredicate = k -> KEY_COMPARATOR.compare((byte[])k, key) == 0;
        this.watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
    }

    protected void notifyRevisionsUpdate() {
        RecoveryRevisionsListener listener = this.recoveryRevisionListener;
        if (listener != null) {
            listener.onUpdate(this.createCurrentRevisions());
        }
    }

    protected Entry doGet(byte[] key, long revUpperBound) {
        assert (revUpperBound >= 0L) : revUpperBound;
        long[] keyRevisions = this.keyRevisionsForOperation(key);
        int maxRevisionIndex = KeyValueStorageUtils.maxRevisionIndex(keyRevisions, revUpperBound);
        if (maxRevisionIndex == -1) {
            CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound, this.compactionRevision);
            return EntryImpl.empty(key);
        }
        long revision = keyRevisions[maxRevisionIndex];
        Value value = this.valueForOperation(key, revision);
        if (value == null) {
            CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound, this.compactionRevision);
            return EntryImpl.empty(key);
        }
        if (!KeyValueStorageUtils.isLastIndex(keyRevisions, maxRevisionIndex) || value.tombstone()) {
            CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound, this.compactionRevision);
        }
        return EntryImpl.toEntry(key, revision, value);
    }

    private List<Entry> doGetAll(List<byte[]> keys, long revUpperBound) {
        assert (!keys.isEmpty());
        assert (revUpperBound >= 0L) : revUpperBound;
        ArrayList<Entry> res = new ArrayList<Entry>(keys.size());
        for (byte[] key : keys) {
            res.add(this.doGet(key, revUpperBound));
        }
        return res;
    }

    @Override
    public void advanceSafeTime(KeyValueUpdateContext context) {
        this.setIndexAndTerm(context.index, context.term);
        if (this.areWatchesStarted()) {
            this.watchProcessor.advanceSafeTime(() -> {}, context.timestamp);
        }
    }

    @Override
    public Revisions revisions() {
        return this.createCurrentRevisions();
    }

    private Revisions createCurrentRevisions() {
        return new Revisions(this.rev, this.compactionRevision);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void notifyWatchProcessor(NotifyWatchProcessorEvent event) {
        Object object = this.watchProcessorMutex;
        synchronized (object) {
            if (this.areWatchesStarted()) {
                event.notify(this.watchProcessor);
            } else {
                boolean added = this.notifyWatchProcessorEventsBeforeStartingWatches.add(event);
                assert (added) : event;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void drainNotifyWatchProcessorEventsBeforeStartingWatches() {
        Object object = this.watchProcessorMutex;
        synchronized (object) {
            assert (!this.areWatchesStarted());
            this.notifyWatchProcessorEventsBeforeStartingWatches.forEach(event -> event.notify(this.watchProcessor));
            this.notifyWatchProcessorEventsBeforeStartingWatches = null;
        }
    }
}

