/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.metastorage.server;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.close.ManuallyCloseable;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.failure.FailureType;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.IgniteSystemProperties;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.EntryEvent;
import org.apache.ignite3.internal.metastorage.RevisionUpdateListener;
import org.apache.ignite3.internal.metastorage.WatchEvent;
import org.apache.ignite3.internal.metastorage.WatchListener;
import org.apache.ignite3.internal.metastorage.server.NotificationEnqueuedListener;
import org.apache.ignite3.internal.metastorage.server.Watch;
import org.apache.ignite3.internal.metastorage.server.WatchAndEvents;
import org.apache.ignite3.internal.metastorage.server.WatchEventHandlingCallback;
import org.apache.ignite3.internal.metastorage.server.raft.MetaStorageWriteHandler;
import org.apache.ignite3.internal.metastorage.timebag.TimeBag;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteBusyLock;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.jetbrains.annotations.VisibleForTesting;

public class WatchProcessor
implements ManuallyCloseable {
    private final boolean longHandlingLoggingEnabled = IgniteSystemProperties.getBoolean("IGNITE_LONG_HANDLING_LOGGING_ENABLED", false);
    private static final IgniteLogger LOG = Loggers.forClass(WatchProcessor.class);
    private static final int WATCH_EVENT_PROCESSING_LOG_THRESHOLD_MILLIS = 100;
    private static final int WATCH_EVENT_PROCESSING_LOG_KEYS = 10;
    private final List<Watch> watches = new CopyOnWriteArrayList<Watch>();
    private CompletableFuture<Void> notificationFuture = CompletableFutures.nullCompletedFuture();
    private final Object notificationFutureMutex = new Object();
    private final List<NotificationEnqueuedListener> notificationEnqueuedListeners = new CopyOnWriteArrayList<NotificationEnqueuedListener>();
    private final EntryReader entryReader;
    private volatile WatchEventHandlingCallback watchEventHandlingCallback;
    private final ExecutorService watchExecutor;
    private final List<RevisionUpdateListener> revisionUpdateListeners = new CopyOnWriteArrayList<RevisionUpdateListener>();
    private final FailureProcessor failureProcessor;
    private final AtomicBoolean firedFailureOnChain = new AtomicBoolean(false);
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean stopped = new AtomicBoolean(false);

    public WatchProcessor(String nodeName, EntryReader entryReader, FailureProcessor failureProcessor) {
        this.entryReader = entryReader;
        IgniteThreadFactory threadFactory = IgniteThreadFactory.create(nodeName, "metastorage-watch-executor", LOG, ThreadOperation.NOTHING_ALLOWED);
        this.watchExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory, new ThreadPoolExecutor.DiscardPolicy());
        this.failureProcessor = failureProcessor;
    }

    public void addWatch(Watch watch) {
        this.watches.add(watch);
    }

    void removeWatch(WatchListener listener) {
        this.watches.removeIf(watch -> watch.listener() == listener);
    }

    public OptionalLong minWatchRevision() {
        return this.watches.stream().mapToLong(Watch::startRevision).min();
    }

    public void setWatchEventHandlingCallback(WatchEventHandlingCallback callback) {
        assert (this.watchEventHandlingCallback == null);
        this.watchEventHandlingCallback = callback;
    }

    public void registerNotificationEnqueuedListener(NotificationEnqueuedListener listener) {
        this.notificationEnqueuedListeners.add(listener);
    }

    private <T> CompletableFuture<T> inBusyLockAsync(Supplier<CompletableFuture<T>> fn) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, fn);
    }

    private void inBusyLock(Runnable fn) {
        IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, fn);
    }

    private void inBusyLockSafe(Runnable fn) {
        IgniteUtils.inBusyLockSafe(this.busyLock, fn);
    }

    public CompletableFuture<Void> notifyWatches(long newRevision, List<Entry> updatedEntries, HybridTimestamp time) {
        return this.inBusyLockAsync(() -> this.notifyWatchesInternal(newRevision, updatedEntries, time));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    CompletableFuture<Void> enqueue(Supplier<CompletableFuture<Void>> asyncAction, Consumer<CompletableFuture<Void>> afterEnqueuing, Supplier<String> additionalInfoSupplier) {
        Object object = this.notificationFutureMutex;
        synchronized (object) {
            this.notificationFuture = ((CompletableFuture)this.notificationFuture.thenComposeAsync(v -> this.inBusyLockAsync(asyncAction), (Executor)this.watchExecutor)).whenComplete((unused, e) -> {
                if (e != null) {
                    this.notifyFailureHandlerOnFirstFailureInNotificationChain((Throwable)e, additionalInfoSupplier);
                }
            });
            afterEnqueuing.accept(this.notificationFuture);
            return this.notificationFuture;
        }
    }

    private CompletableFuture<Void> notifyWatchesInternal(long newRevision, List<Entry> updatedEntries, HybridTimestamp time) {
        assert (time != null);
        List filteredUpdatedEntries = updatedEntries.isEmpty() ? Collections.emptyList() : updatedEntries.stream().filter(WatchProcessor::isNotIdempotentCacheCommand).collect(Collectors.toList());
        return this.enqueue(() -> {
            List<WatchAndEvents> watchAndEvents = this.collectWatchesAndEvents(filteredUpdatedEntries, newRevision);
            long startTimeNanos = this.longHandlingLoggingEnabled ? System.nanoTime() : 0L;
            CompletableFuture<Void> notifyWatchesFuture = WatchProcessor.performWatchesNotifications(watchAndEvents, newRevision, time);
            CompletableFuture<Void> notifyUpdateRevisionFuture = this.notifyUpdateRevisionListeners(newRevision);
            CompletionStage newNotificationFuture = CompletableFuture.allOf(notifyWatchesFuture, notifyUpdateRevisionFuture).thenRunAsync(() -> this.inBusyLock(() -> this.invokeOnRevisionCallback(newRevision, time)), this.watchExecutor);
            ((CompletableFuture)newNotificationFuture).whenComplete((u, e) -> this.maybeLogLongProcessing(filteredUpdatedEntries, watchAndEvents, startTimeNanos));
            return newNotificationFuture;
        }, newNotificationFuture -> this.invokeNotificationFutureListeners((CompletableFuture<Void>)newNotificationFuture, filteredUpdatedEntries, time), WatchProcessor.updatedEntriesKeysInfo(updatedEntries));
    }

    private void invokeNotificationFutureListeners(CompletableFuture<Void> newNotificationFuture, List<Entry> filteredUpdatedEntries, HybridTimestamp time) {
        for (NotificationEnqueuedListener listener : this.notificationEnqueuedListeners) {
            listener.onEnqueued(newNotificationFuture, filteredUpdatedEntries, time);
        }
    }

    private static Supplier<String> updatedEntriesKeysInfo(List<Entry> updatedEntries) {
        return () -> updatedEntries.stream().map(entry -> new String(entry.key(), StandardCharsets.UTF_8)).collect(Collectors.joining(",", "Keys of updated entries: ", ""));
    }

    private static CompletableFuture<Void> performWatchesNotifications(List<WatchAndEvents> watchAndEventsList, long revision, HybridTimestamp time) {
        if (watchAndEventsList.isEmpty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        CompletableFuture[] notifyWatchFutures = new CompletableFuture[watchAndEventsList.size()];
        for (int i = 0; i < watchAndEventsList.size(); ++i) {
            CompletionStage<Object> notifyWatchFuture;
            WatchAndEvents watchAndEvents = watchAndEventsList.get(i);
            try {
                WatchEvent event = new WatchEvent(watchAndEvents.events, revision, time, watchAndEvents.timeBag);
                event.timeBag().start();
                notifyWatchFuture = watchAndEvents.watch.onUpdate(event);
                event.timeBag().finishGlobalStage("Sync notification");
                notifyWatchFuture = notifyWatchFuture.whenComplete((unused, e) -> event.timeBag().finishGlobalStage("Async notification"));
            }
            catch (Throwable throwable) {
                notifyWatchFuture = CompletableFuture.failedFuture(throwable);
            }
            notifyWatchFutures[i] = notifyWatchFuture;
        }
        return CompletableFuture.allOf(notifyWatchFutures);
    }

    private void maybeLogLongProcessing(List<Entry> updatedEntries, List<WatchAndEvents> watchAndEvents, long startTimeNanos) {
        if (!this.longHandlingLoggingEnabled) {
            return;
        }
        long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
        if (durationMillis > 100L) {
            String keysHead = updatedEntries.stream().limit(10L).map(entry -> new String(entry.key(), StandardCharsets.UTF_8)).collect(Collectors.joining(", "));
            String keysTail = updatedEntries.size() > 10 ? ", ..." : "";
            LOG.warn("Watch event processing has been too long [duration={}, keys=[{}{}]]", durationMillis, keysHead, keysTail);
            String timingsHead = watchAndEvents.stream().limit(10L).map(watchAndEventsItem -> {
                String listenerName = watchAndEventsItem.watch.listener().getClass().getName();
                String stages = watchAndEventsItem.timeBag.stagesTimings().stream().collect(Collectors.joining(",", "[", "]"));
                return "lsnr=" + listenerName + ", stages=" + stages;
            }).collect(Collectors.joining(", "));
            LOG.warn("Watch event processing timings [{}{}]", timingsHead, keysTail);
        }
    }

    private List<WatchAndEvents> collectWatchesAndEvents(List<Entry> updatedEntries, long revision) {
        if (this.watches.isEmpty() || updatedEntries.isEmpty()) {
            return List.of();
        }
        ArrayList<WatchAndEvents> watchAndEvents = new ArrayList<WatchAndEvents>();
        for (Watch watch : this.watches) {
            List<EntryEvent> events = List.of();
            for (Entry newEntry : updatedEntries) {
                byte[] newKey = newEntry.key();
                assert (newEntry.revision() == revision);
                if (!watch.matches(newKey, revision)) continue;
                Entry oldEntry = this.entryReader.get(newKey, revision - 1L);
                if (events.isEmpty()) {
                    events = new ArrayList<EntryEvent>();
                }
                events.add(new EntryEvent(oldEntry, newEntry));
            }
            if (events.isEmpty()) continue;
            watchAndEvents.add(new WatchAndEvents(watch, events, TimeBag.createTimeBag(this.longHandlingLoggingEnabled, false)));
        }
        return watchAndEvents;
    }

    private void invokeOnRevisionCallback(long revision, HybridTimestamp time) {
        this.watchEventHandlingCallback.onSafeTimeAdvanced(time);
        this.watchEventHandlingCallback.onRevisionApplied(revision);
    }

    public void advanceSafeTime(Runnable callback, HybridTimestamp time) {
        this.inBusyLockSafe(() -> this.advanceSafeTimeInternal(callback, time));
    }

    private void advanceSafeTimeInternal(Runnable callback, HybridTimestamp time) {
        assert (time != null);
        this.enqueue(() -> {
            callback.run();
            this.watchEventHandlingCallback.onSafeTimeAdvanced(time);
            return CompletableFutures.nullCompletedFuture();
        }, newNotificationFuture -> this.invokeNotificationFutureListeners((CompletableFuture<Void>)newNotificationFuture, List.of(), time), () -> "<nothing>");
    }

    private void notifyFailureHandlerOnFirstFailureInNotificationChain(Throwable e, Supplier<String> additionalInfoSupplier) {
        if (this.firedFailureOnChain.compareAndSet(false, true)) {
            boolean nodeStopping = ExceptionUtils.hasCause(e, NodeStoppingException.class);
            if (!nodeStopping) {
                LOG.error("Notification chain encountered an error, so no notifications will be ever fired for subsequent revisions until a restart. Notifying the FailureManager. Additional info: '{}'", additionalInfoSupplier.get());
                this.failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, e));
            } else {
                LOG.info("Notification chain encountered a NodeStoppingException, so no notifications will be ever fired for subsequent revisions until a restart.", new Object[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        if (!this.stopped.compareAndSet(false, true)) {
            return;
        }
        this.busyLock.block();
        Object object = this.notificationFutureMutex;
        synchronized (object) {
            this.notificationFuture.completeExceptionally(new NodeStoppingException());
        }
        IgniteUtils.shutdownAndAwaitTermination(this.watchExecutor, 10L, TimeUnit.SECONDS);
    }

    void registerRevisionUpdateListener(RevisionUpdateListener listener) {
        this.revisionUpdateListeners.add(listener);
    }

    void unregisterRevisionUpdateListener(RevisionUpdateListener listener) {
        this.revisionUpdateListeners.remove(listener);
    }

    CompletableFuture<Void> notifyUpdateRevisionListeners(long newRevision) {
        List<CompletableFuture<?>> futures = List.of();
        for (RevisionUpdateListener listener : this.revisionUpdateListeners) {
            if (futures.isEmpty()) {
                futures = new ArrayList();
            }
            futures.add(listener.onUpdated(newRevision));
        }
        return futures.isEmpty() ? CompletableFutures.nullCompletedFuture() : CompletableFuture.allOf((CompletableFuture[])futures.toArray(CompletableFuture[]::new));
    }

    private static boolean isNotIdempotentCacheCommand(Entry entry) {
        int prefixLength = MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES.length;
        if (entry.key().length <= prefixLength) {
            return true;
        }
        return !Arrays.equals(entry.key(), 0, prefixLength, MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES, 0, prefixLength);
    }

    @FunctionalInterface
    public static interface EntryReader {
        public Entry get(byte[] var1, long var2);
    }
}

