package org.gridgain.internal.pitr;

import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.gridgain.internal.pitr.metastorage.PitrGlobalState;
import org.gridgain.internal.pitr.metastorage.PitrLocalStateWatch;
import org.gridgain.internal.pitr.metastorage.PitrMetaStorageKeys;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/gridgain/internal/pitr/CoordinatorFailoverSubscriber.class */
public class CoordinatorFailoverSubscriber implements Flow.Subscriber<Entry> {
    private static final IgniteLogger LOG = Loggers.forClass(CoordinatorFailoverSubscriber.class);
    private final PitrManagerContext context;
    private final CoordinatorState coordinatorState;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CoordinatorFailoverSubscriber(PitrManagerContext pitrManagerContext, CoordinatorState coordinatorState) {
        this.context = pitrManagerContext;
        this.coordinatorState = coordinatorState;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        LOG.error("Error while performing PITR Coordinator failover", th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(Entry entry) {
        PitrGlobalState pitrGlobalState = (PitrGlobalState) ByteUtils.fromBytes(entry.value());
        if (this.coordinatorState.hasOngoingOperation(pitrGlobalState.operationId())) {
            return;
        }
        switch (pitrGlobalState.status()) {
            case COMPLETED:
            case FAILED:
                return;
            case PREPARED:
                performRecovery(pitrGlobalState, pitrLocalStateWatch -> {
                    return new PitrOperation(this.context, this.coordinatorState).start(pitrLocalStateWatch, pitrGlobalState, this.context.catalogManager().catalog(this.context.catalogManager().activeCatalogVersion(pitrGlobalState.timestampLong())));
                });
                return;
            case STARTED:
                performRecovery(pitrGlobalState, pitrLocalStateWatch2 -> {
                    return replayEvents(pitrGlobalState, pitrLocalStateWatch2);
                });
                return;
            default:
                LOG.error("Unexpected PITR status: {}", new Object[]{pitrGlobalState.status()});
                return;
        }
    }

    private void performRecovery(PitrGlobalState pitrGlobalState, Function<PitrLocalStateWatch, CompletableFuture<Void>> function) {
        LOG.info("Starting PITR recovery for operation {}, current state: {}", new Object[]{pitrGlobalState.operationId(), pitrGlobalState});
        PitrLocalStateWatch pitrLocalStateWatch = new PitrLocalStateWatch(this.context, this.coordinatorState, pitrGlobalState);
        this.context.logicalTopologyService().logicalTopologyOnLeader().thenComposeAsync(logicalTopologySnapshot -> {
            Set difference = CollectionUtils.difference(pitrGlobalState.nodeNames(), (Set) logicalTopologySnapshot.nodes().stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toSet()));
            if (difference.isEmpty()) {
                return (CompletionStage) function.apply(pitrLocalStateWatch);
            }
            LOG.info("Nodes {} are no longer present in the topology, cancelling PITR operation [ID = {}]", new Object[]{difference, pitrGlobalState.operationId()});
            return pitrLocalStateWatch.onFail(this.context.nodeName(), "Nodes are missing from the topology: " + difference);
        }, (Executor) this.context.threadPool()).thenRunAsync(() -> {
            if (pitrGlobalState.catalogVersion() != -1) {
                this.coordinatorState.rebalanceWatch().addOperation(pitrGlobalState.operationId(), PitrUtils.tmpTableDescriptors(pitrGlobalState, this.context.catalogManager().catalog(pitrGlobalState.catalogVersion())));
            }
        }, (Executor) this.context.threadPool()).whenComplete((r10, th) -> {
            if (th == null) {
                LOG.info("Recovery completed for PITR operation [ID = {}]", new Object[]{pitrGlobalState.operationId()});
                return;
            }
            String format = IgniteStringFormatter.format("Failed to recover PITR operation [ID = {}] reason: {}", new Object[]{pitrGlobalState.operationId(), th.getMessage()});
            LOG.error(format, th);
            pitrLocalStateWatch.onFail(this.context.nodeName(), format);
        });
    }

    private CompletableFuture<Void> replayEvents(PitrGlobalState pitrGlobalState, PitrLocalStateWatch pitrLocalStateWatch) {
        ByteArray pitrLocalStatePrefix = PitrMetaStorageKeys.pitrLocalStatePrefix(pitrGlobalState.operationId());
        this.context.metaStorageManager().registerPrefixWatch(pitrLocalStatePrefix, pitrLocalStateWatch);
        return IgniteUtils.inBusyLockAsync(this.context.busyLock(), () -> {
            long appliedRevision = this.context.metaStorageManager().appliedRevision();
            HybridTimestamp timestampByRevisionLocally = this.context.metaStorageManager().timestampByRevisionLocally(appliedRevision);
            CompletableFuture nullCompletedFuture = CompletableFutures.nullCompletedFuture();
            Cursor<Entry> prefixLocally = this.context.metaStorageManager().prefixLocally(pitrLocalStatePrefix, appliedRevision);
            try {
                for (Entry entry : prefixLocally) {
                    nullCompletedFuture = nullCompletedFuture.thenComposeAsync(r13 -> {
                        return IgniteUtils.inBusyLockAsync(this.context.busyLock(), () -> {
                            return pitrLocalStateWatch.onUpdate(new WatchEvent(List.of(new EntryEvent((Entry) null, entry)), appliedRevision, timestampByRevisionLocally));
                        });
                    }, (Executor) this.context.threadPool());
                }
                if (prefixLocally != null) {
                    prefixLocally.close();
                }
                return nullCompletedFuture;
            } catch (Throwable th) {
                if (prefixLocally != null) {
                    try {
                        prefixLocally.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }
}
