package org.gridgain.internal.snapshots.coordinator;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.EntryEvent;
import org.apache.ignite3.internal.metastorage.WatchEvent;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.Cursor;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.gridgain.internal.snapshots.SnapshotManager;
import org.gridgain.internal.snapshots.SnapshotManagerContext;

/* loaded from: input_file:org/gridgain/internal/snapshots/coordinator/RecoverySubscriber.class */
abstract class RecoverySubscriber implements Flow.Subscriber<Entry> {
    private static final IgniteLogger LOG = Loggers.forClass(SnapshotManager.class);
    protected final SnapshotManagerContext context;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecoverySubscriber(SnapshotManagerContext snapshotManagerContext) {
        this.context = snapshotManagerContext;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public final void onSubscribe(Flow.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public final void onError(Throwable th) {
        LOG.error("Error when performing Snapshot Coordinator failover.", th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public final void onComplete() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final CompletableFuture<Void> replayEvents(LocalSnapshotStateListener localSnapshotStateListener, ByteArray byteArray) {
        return IgniteUtils.inBusyLockAsync(this.context.busyLock(), () -> {
            long appliedRevision = this.context.metaStorageManager().appliedRevision();
            HybridTimestamp timestampByRevisionLocally = this.context.metaStorageManager().timestampByRevisionLocally(appliedRevision);
            CompletableFuture nullCompletedFuture = CompletableFutures.nullCompletedFuture();
            Cursor<Entry> prefixLocally = this.context.metaStorageManager().prefixLocally(byteArray, appliedRevision);
            try {
                for (Entry entry : prefixLocally) {
                    nullCompletedFuture = nullCompletedFuture.thenComposeAsync(r13 -> {
                        return IgniteUtils.inBusyLockAsync(this.context.busyLock(), () -> {
                            return localSnapshotStateListener.onUpdate(new WatchEvent(List.of(new EntryEvent(null, entry)), appliedRevision, timestampByRevisionLocally));
                        });
                    }, (Executor) this.context.threadPool());
                }
                if (prefixLocally != null) {
                    prefixLocally.close();
                }
                return nullCompletedFuture;
            } catch (Throwable th) {
                if (prefixLocally != null) {
                    try {
                        prefixLocally.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }
}
