package org.apache.ignite3.internal.metastorage.server;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.close.ManuallyCloseable;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureManager;
import org.apache.ignite3.internal.failure.FailureType;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.CompactionRevisionUpdateListener;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.EntryEvent;
import org.apache.ignite3.internal.metastorage.RevisionUpdateListener;
import org.apache.ignite3.internal.metastorage.WatchEvent;
import org.apache.ignite3.internal.metastorage.WatchListener;
import org.apache.ignite3.internal.metastorage.server.raft.MetaStorageWriteHandler;
import org.apache.ignite3.internal.metrics.DistributionMetric;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteUtils;

/* loaded from: input_file:org/apache/ignite3/internal/metastorage/server/WatchProcessor.class */
public class WatchProcessor implements ManuallyCloseable {
    private static final IgniteLogger LOG;
    private static final int WATCH_EVENT_PROCESSING_LOG_THRESHOLD_MILLIS = 100;
    private static final int WATCH_EVENT_PROCESSING_LOG_KEYS = 10;
    private final EntryReader entryReader;
    private volatile WatchEventHandlingCallback watchEventHandlingCallback;
    private final ExecutorService watchExecutor;
    private final FailureManager failureManager;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final List<Watch> watches = new CopyOnWriteArrayList();
    private volatile CompletableFuture<Void> notificationFuture = CompletableFutures.nullCompletedFuture();
    private final List<RevisionUpdateListener> revisionUpdateListeners = new CopyOnWriteArrayList();
    private final List<CompactionRevisionUpdateListener> compactionRevisionUpdateListeners = new CopyOnWriteArrayList();
    private final AtomicBoolean firedFailureOnChain = new AtomicBoolean(false);

    @FunctionalInterface
    /* loaded from: input_file:org/apache/ignite3/internal/metastorage/server/WatchProcessor$EntryReader.class */
    public interface EntryReader {
        Entry get(byte[] bArr, long j);
    }

    public WatchProcessor(String str, EntryReader entryReader, FailureManager failureManager) {
        this.entryReader = entryReader;
        this.watchExecutor = Executors.newFixedThreadPool(4, IgniteThreadFactory.create(str, "metastorage-watch-executor", LOG, ThreadOperation.NOTHING_ALLOWED));
        this.failureManager = failureManager;
    }

    public void addWatch(Watch watch) {
        this.watches.add(watch);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeWatch(WatchListener watchListener) {
        this.watches.removeIf(watch -> {
            return watch.listener() == watchListener;
        });
    }

    public OptionalLong minWatchRevision() {
        return this.watches.stream().mapToLong((v0) -> {
            return v0.startRevision();
        }).min();
    }

    public void setWatchEventHandlingCallback(WatchEventHandlingCallback watchEventHandlingCallback) {
        if (!$assertionsDisabled && this.watchEventHandlingCallback != null) {
            throw new AssertionError();
        }
        this.watchEventHandlingCallback = watchEventHandlingCallback;
    }

    public CompletableFuture<Void> notifyWatches(List<Entry> list, HybridTimestamp hybridTimestamp) {
        if (!$assertionsDisabled && hybridTimestamp == null) {
            throw new AssertionError();
        }
        CompletableFuture thenComposeAsync = this.notificationFuture.thenComposeAsync(r10 -> {
            long revision = ((Entry) list.get(0)).revision();
            List<Entry> list2 = (List) list.stream().filter(entry -> {
                return entry.key().length <= MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES.length || (entry.key().length > MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES.length && !ByteBuffer.wrap(entry.key(), 0, MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES.length).equals(ByteBuffer.wrap(MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES)));
            }).collect(Collectors.toList());
            return collectWatchesAndEvents(list2, revision).thenComposeAsync(list3 -> {
                long nanoTime = System.nanoTime();
                CompletableFuture<Void> thenRunAsync = CompletableFuture.allOf(notifyWatches(list3, revision, hybridTimestamp), notifyUpdateRevisionListeners(revision)).thenRunAsync(() -> {
                    invokeOnRevisionCallback(revision, hybridTimestamp);
                }, (Executor) this.watchExecutor);
                thenRunAsync.whenComplete((r8, th) -> {
                    maybeLogLongProcessing(list2, nanoTime);
                    if (th != null) {
                        notifyFailureHandlerOnFirstFailureInNotificationChain(th);
                    }
                });
                return thenRunAsync;
            }, (Executor) this.watchExecutor);
        }, (Executor) this.watchExecutor);
        this.notificationFuture = thenComposeAsync;
        return thenComposeAsync;
    }

    private CompletableFuture<Void> notifyWatches(List<WatchAndEvents> list, long j, HybridTimestamp hybridTimestamp) {
        CompletableFuture<Void> failedFuture;
        if (list.isEmpty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        CompletableFuture[] completableFutureArr = new CompletableFuture[list.size()];
        for (int i = 0; i < list.size(); i++) {
            WatchAndEvents watchAndEvents = list.get(i);
            try {
                failedFuture = watchAndEvents.watch.onUpdate(new WatchEvent(watchAndEvents.events, j, hybridTimestamp)).whenComplete((r5, th) -> {
                    if (th != null) {
                        if (th instanceof CompletionException) {
                            th = th.getCause();
                        }
                        if (!(th instanceof NodeStoppingException)) {
                            notifyFailureHandlerOnFirstFailureInNotificationChain(th);
                        }
                        watchAndEvents.watch.onError(th);
                    }
                });
            } catch (Throwable th2) {
                watchAndEvents.watch.onError(th2);
                failedFuture = CompletableFuture.failedFuture(th2);
                notifyFailureHandlerOnFirstFailureInNotificationChain(th2);
            }
            completableFutureArr[i] = failedFuture;
        }
        return CompletableFuture.allOf(completableFutureArr);
    }

    private static void maybeLogLongProcessing(List<Entry> list, long j) {
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - j);
        if (millis > 100) {
            LOG.warn("Watch event processing has been too long [duration={}, keys=[{}{}]]", Long.valueOf(millis), (String) list.stream().limit(10L).map(entry -> {
                return new String(entry.key(), StandardCharsets.UTF_8);
            }).collect(Collectors.joining(DistributionMetric.BUCKET_DIVIDER)), list.size() > 10 ? ", ..." : "");
        }
    }

    private CompletableFuture<List<WatchAndEvents>> collectWatchesAndEvents(List<Entry> list, long j) {
        return this.watches.isEmpty() ? CompletableFutures.emptyListCompletedFuture() : CompletableFuture.supplyAsync(() -> {
            ArrayList arrayList = new ArrayList();
            for (Watch watch : this.watches) {
                List of = List.of();
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    Entry entry = (Entry) it.next();
                    byte[] key = entry.key();
                    if (!$assertionsDisabled && entry.revision() != j) {
                        throw new AssertionError();
                    }
                    if (watch.matches(key, j)) {
                        Entry entry2 = this.entryReader.get(key, j - 1);
                        if (of.isEmpty()) {
                            of = new ArrayList();
                        }
                        of.add(new EntryEvent(entry2, entry));
                    }
                }
                if (!of.isEmpty()) {
                    arrayList.add(new WatchAndEvents(watch, of));
                }
            }
            return arrayList;
        }, this.watchExecutor);
    }

    private void invokeOnRevisionCallback(long j, HybridTimestamp hybridTimestamp) {
        this.watchEventHandlingCallback.onSafeTimeAdvanced(hybridTimestamp);
        this.watchEventHandlingCallback.onRevisionApplied(j);
    }

    public void advanceSafeTime(HybridTimestamp hybridTimestamp) {
        if (!$assertionsDisabled && hybridTimestamp == null) {
            throw new AssertionError();
        }
        this.notificationFuture = this.notificationFuture.thenRunAsync(() -> {
            this.watchEventHandlingCallback.onSafeTimeAdvanced(hybridTimestamp);
        }, (Executor) this.watchExecutor).whenComplete((r4, th) -> {
            if (th != null) {
                notifyFailureHandlerOnFirstFailureInNotificationChain(th);
            }
        });
    }

    private void notifyFailureHandlerOnFirstFailureInNotificationChain(Throwable th) {
        if (this.firedFailureOnChain.compareAndSet(false, true)) {
            LOG.info("Notification chain encountered an error, so no notifications will be ever fired for subsequent revisions until a restart. Notifying the FailureManager", new Object[0]);
            this.failureManager.process(new FailureContext(FailureType.CRITICAL_ERROR, th));
        }
    }

    @Override // org.apache.ignite3.internal.close.ManuallyCloseable
    public void close() {
        this.notificationFuture.cancel(true);
        IgniteUtils.shutdownAndAwaitTermination(this.watchExecutor, 10L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerRevisionUpdateListener(RevisionUpdateListener revisionUpdateListener) {
        this.revisionUpdateListeners.add(revisionUpdateListener);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unregisterRevisionUpdateListener(RevisionUpdateListener revisionUpdateListener) {
        this.revisionUpdateListeners.remove(revisionUpdateListener);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerCompactionRevisionUpdateListener(CompactionRevisionUpdateListener compactionRevisionUpdateListener) {
        this.compactionRevisionUpdateListeners.add(compactionRevisionUpdateListener);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unregisterCompactionRevisionUpdateListener(CompactionRevisionUpdateListener compactionRevisionUpdateListener) {
        this.compactionRevisionUpdateListeners.remove(compactionRevisionUpdateListener);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> notifyUpdateRevisionListeners(long j) {
        List of = List.of();
        for (RevisionUpdateListener revisionUpdateListener : this.revisionUpdateListeners) {
            if (of.isEmpty()) {
                of = new ArrayList();
            }
            of.add(revisionUpdateListener.onUpdated(j));
        }
        return of.isEmpty() ? CompletableFutures.nullCompletedFuture() : CompletableFuture.allOf((CompletableFuture[]) of.toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateCompactionRevision(long j, HybridTimestamp hybridTimestamp) {
        this.notificationFuture = this.notificationFuture.thenRunAsync(() -> {
            this.compactionRevisionUpdateListeners.forEach(compactionRevisionUpdateListener -> {
                compactionRevisionUpdateListener.onUpdate(j);
            });
            this.watchEventHandlingCallback.onSafeTimeAdvanced(hybridTimestamp);
        }, (Executor) this.watchExecutor).whenComplete((r7, th) -> {
            if (th != null) {
                this.failureManager.process(new FailureContext(FailureType.CRITICAL_ERROR, th));
            }
        });
    }

    static {
        $assertionsDisabled = !WatchProcessor.class.desiredAssertionStatus();
        LOG = Loggers.forClass(WatchProcessor.class);
    }
}
