/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.snapshots.coordinator;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.EntryEvent;
import org.apache.ignite3.internal.metastorage.WatchEvent;
import org.apache.ignite3.internal.metastorage.timebag.TimeBag;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.Cursor;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.gridgain.internal.snapshots.SnapshotManager;
import org.gridgain.internal.snapshots.SnapshotManagerContext;
import org.gridgain.internal.snapshots.coordinator.LocalSnapshotStateListener;

abstract class RecoverySubscriber
implements Flow.Subscriber<Entry> {
    private static final IgniteLogger LOG = Loggers.forClass(SnapshotManager.class);
    protected final SnapshotManagerContext context;

    RecoverySubscriber(SnapshotManagerContext context) {
        this.context = context;
    }

    @Override
    public final void onSubscribe(Flow.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override
    public final void onError(Throwable throwable) {
        LOG.error("Error when performing Snapshot Coordinator failover.", throwable);
    }

    @Override
    public final void onComplete() {
    }

    final CompletableFuture<Void> replayEvents(LocalSnapshotStateListener localStateListener, ByteArray prefix) {
        return IgniteUtils.inBusyLockAsync(this.context.busyLock(), () -> {
            long currentLocalRevision = this.context.metaStorageManager().appliedRevision();
            HybridTimestamp revisionTimestamp = this.context.metaStorageManager().timestampByRevisionLocally(currentLocalRevision);
            CompletionStage recoveryFuture = CompletableFutures.nullCompletedFuture();
            try (Cursor<Entry> cursor = this.context.metaStorageManager().prefixLocally(prefix, currentLocalRevision);){
                for (Entry entry : cursor) {
                    recoveryFuture = recoveryFuture.thenComposeAsync(v -> IgniteUtils.inBusyLockAsync(this.context.busyLock(), () -> {
                        EntryEvent surrogateEntryEvent = new EntryEvent(null, entry);
                        WatchEvent surrogateEvent = new WatchEvent(List.of(surrogateEntryEvent), currentLocalRevision, revisionTimestamp, TimeBag.createTimeBag(false, false));
                        return localStateListener.onUpdate(surrogateEvent);
                    }), (Executor)this.context.threadPool());
                }
            }
            return recoveryFuture;
        });
    }
}

