/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.pitr;

import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.timebag.TimeBag;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.gridgain.internal.pitr.CoordinatorState;
import org.gridgain.internal.pitr.PitrManagerContext;
import org.gridgain.internal.pitr.PitrOperation;
import org.gridgain.internal.pitr.PitrUtils;
import org.gridgain.internal.pitr.metastorage.PitrGlobalState;
import org.gridgain.internal.pitr.metastorage.PitrLocalStateWatch;
import org.gridgain.internal.pitr.metastorage.PitrMetaStorageKeys;

class CoordinatorFailoverSubscriber
implements Flow.Subscriber<Entry> {
    private static final IgniteLogger LOG = Loggers.forClass(CoordinatorFailoverSubscriber.class);
    private final PitrManagerContext context;
    private final CoordinatorState coordinatorState;

    CoordinatorFailoverSubscriber(PitrManagerContext context, CoordinatorState coordinatorState) {
        this.context = context;
        this.coordinatorState = coordinatorState;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onError(Throwable throwable) {
        LOG.error("Error while performing PITR Coordinator failover", throwable);
    }

    @Override
    public void onComplete() {
    }

    @Override
    public void onNext(Entry entry) {
        PitrGlobalState globalState = (PitrGlobalState)ByteUtils.fromBytes((byte[])entry.value());
        if (this.coordinatorState.hasOngoingOperation(globalState.operationId())) {
            return;
        }
        switch (globalState.status()) {
            case COMPLETED: 
            case FAILED: {
                break;
            }
            case PREPARED: {
                this.performRecovery(globalState, localStateWatch -> {
                    int catalogVersion = this.context.catalogManager().activeCatalogVersion(globalState.timestampLong());
                    Catalog catalogToRestore = this.context.catalogManager().catalog(catalogVersion);
                    PitrOperation pitrProcess = new PitrOperation(this.context, this.coordinatorState);
                    return pitrProcess.start((PitrLocalStateWatch)localStateWatch, globalState, catalogToRestore);
                });
                break;
            }
            case STARTED: {
                this.performRecovery(globalState, localStateWatch -> this.replayEvents(globalState, (PitrLocalStateWatch)localStateWatch));
                break;
            }
            default: {
                LOG.error("Unexpected PITR status: {}", new Object[]{globalState.status()});
            }
        }
    }

    private void performRecovery(PitrGlobalState globalState, Function<PitrLocalStateWatch, CompletableFuture<Void>> action) {
        LOG.info("Starting PITR recovery for operation {}, current state: {}", new Object[]{globalState.operationId(), globalState});
        PitrLocalStateWatch localStateWatch = new PitrLocalStateWatch(this.context, this.coordinatorState, globalState);
        CompletableFuture logicalTopologyFuture = this.context.logicalTopologyService().logicalTopologyOnLeader();
        ((CompletableFuture)((CompletableFuture)logicalTopologyFuture.thenComposeAsync(topology -> {
            Set topologyNodes = topology.nodes().stream().map(ClusterNodeImpl::name).collect(Collectors.toSet());
            Set<String> recoveryNodes = globalState.nodeNames();
            Set missingNodes = CollectionUtils.difference(recoveryNodes, topologyNodes);
            if (!missingNodes.isEmpty()) {
                LOG.info("Nodes {} are no longer present in the topology, cancelling PITR operation [ID = {}]", new Object[]{missingNodes, globalState.operationId()});
                String errorMessage = "Nodes are missing from the topology: " + missingNodes;
                return localStateWatch.onFail(this.context.nodeName(), errorMessage);
            }
            return (CompletionStage)action.apply(localStateWatch);
        }, (Executor)this.context.threadPool())).thenRunAsync(() -> {
            if (globalState.catalogVersion() != -1) {
                Catalog catalog = this.context.catalogManager().catalog(globalState.catalogVersion());
                List<CatalogTableDescriptor> tempTables = PitrUtils.tmpTableDescriptors(globalState, catalog);
                this.coordinatorState.rebalanceWatch().addOperation(globalState.operationId(), tempTables);
            }
        }, this.context.threadPool())).whenComplete((unused, ex) -> {
            if (ex == null) {
                LOG.info("Recovery completed for PITR operation [ID = {}]", new Object[]{globalState.operationId()});
            } else {
                String errorMessage = IgniteStringFormatter.format((String)"Failed to recover PITR operation [ID = {}] reason: {}", (Object[])new Object[]{globalState.operationId(), ex.getMessage()});
                LOG.error(errorMessage, ex);
                localStateWatch.onFail(this.context.nodeName(), errorMessage);
            }
        });
    }

    private CompletableFuture<Void> replayEvents(PitrGlobalState globalState, PitrLocalStateWatch localStateWatch) {
        ByteArray prefix = PitrMetaStorageKeys.pitrLocalStatePrefix(globalState.operationId());
        this.context.metaStorageManager().registerPrefixWatch(prefix, (WatchListener)localStateWatch);
        return IgniteUtils.inBusyLockAsync((IgniteSpinBusyLock)this.context.busyLock(), () -> {
            long currentLocalRevision = this.context.metaStorageManager().appliedRevision();
            HybridTimestamp revisionTimestamp = this.context.metaStorageManager().timestampByRevisionLocally(currentLocalRevision);
            CompletionStage recoveryFuture = CompletableFutures.nullCompletedFuture();
            try (Cursor cursor = this.context.metaStorageManager().prefixLocally(prefix, currentLocalRevision);){
                for (Entry entry : cursor) {
                    recoveryFuture = recoveryFuture.thenComposeAsync(v -> IgniteUtils.inBusyLockAsync((IgniteSpinBusyLock)this.context.busyLock(), () -> {
                        EntryEvent surrogateEntryEvent = new EntryEvent(null, entry);
                        WatchEvent surrogateEvent = new WatchEvent(List.of(surrogateEntryEvent), currentLocalRevision, revisionTimestamp, TimeBag.createTimeBag((boolean)false, (boolean)false));
                        return localStateWatch.onUpdate(surrogateEvent);
                    }), (Executor)this.context.threadPool());
                }
            }
            return recoveryFuture;
        });
    }
}

