/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.pitr.metastorage;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.gridgain.internal.pitr.PitrManagerContext;
import org.gridgain.internal.pitr.PitrReader;
import org.gridgain.internal.pitr.exception.PitrCancelledException;
import org.gridgain.internal.pitr.metastorage.PitrGlobalState;
import org.gridgain.internal.pitr.metastorage.PitrLocalProgressHandler;
import org.gridgain.internal.pitr.metastorage.PitrLocalState;
import org.gridgain.internal.pitr.metastorage.PitrMetaStorageKeys;
import org.gridgain.internal.pitr.metastorage.PitrStateContext;
import org.gridgain.internal.pitr.metastorage.PitrStatus;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class PitrGlobalStateWatch
implements WatchListener {
    private static final IgniteLogger LOG = Loggers.forClass(PitrGlobalStateWatch.class);
    private final PitrManagerContext context;
    private final Map<UUID, PitrStateContext> ongoingPitrOperationById = new ConcurrentHashMap<UUID, PitrStateContext>();
    private PitrReader pitrReader;

    public PitrGlobalStateWatch(PitrManagerContext context) {
        this.context = context;
        this.pitrReader = new PitrReader(context);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> onUpdate(WatchEvent event) {
        if (!this.context.busyLock().enterBusy()) {
            LOG.debug("Skipping Global point in time recovery state update because the node is stopping", new Object[0]);
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            for (EntryEvent entryEvent : event.entryEvents()) {
                Entry entry = entryEvent.newEntry();
                if (entry.value() == null) continue;
                this.processNewStateEntry((PitrGlobalState)ByteUtils.fromBytes((byte[])entry.value()), entry.revision());
            }
            CompletableFuture completableFuture = CompletableFutures.nullCompletedFuture();
            return completableFuture;
        }
        finally {
            this.context.busyLock().leaveBusy();
        }
    }

    private void processNewStateEntry(PitrGlobalState globalState, long causalityToken) {
        LOG.info("Point in time recovery {} [ID = {}, node = {}]", new Object[]{globalState.status(), globalState.operationId(), this.context.nodeName()});
        switch (globalState.status()) {
            case PREPARED: 
            case COMPLETED: {
                break;
            }
            case STARTED: {
                if (!globalState.nodeNames().contains(this.context.nodeName())) {
                    LOG.info("Received a point in time recovery request, but the node is not in the list of target nodes, skipping.", new Object[0]);
                    break;
                }
                this.context.threadPool().execute(() -> {
                    LOG.debug("Starting local point in time recovery [ID = {}, node = {}]", new Object[]{globalState.operationId(), this.context.nodeName()});
                    this.start(globalState, causalityToken);
                });
                break;
            }
            case FAILED: {
                this.context.threadPool().execute(() -> {
                    LOG.error("Point in time recovery failed [ID = {}, node = {}] reason: {}", new Object[]{globalState.operationId(), this.context.nodeName(), globalState.description()});
                    this.cancel(globalState);
                });
                break;
            }
            default: {
                throw new AssertionError((Object)("Unexpected point in time recovery status: " + String.valueOf((Object)globalState.status())));
            }
        }
    }

    private void start(PitrGlobalState globalState, long causalityToken) {
        PitrStateContext pitrContext = new PitrStateContext(globalState, this.context.busyLock(), causalityToken);
        this.registerPitrOperation(pitrContext);
        PitrLocalState initialState = new PitrLocalState(PitrStatus.STARTED);
        PitrLocalProgressHandler handler = new PitrLocalProgressHandler(initialState);
        ((CompletableFuture)this.initLocalState(pitrContext, initialState).thenComposeAsync(unused -> this.pitrReader.recover(pitrContext, handler), (Executor)this.context.threadPool())).whenComplete((unused, ex) -> {
            if (ex != null) {
                handler.handleRecoveryException((Throwable)ex);
            }
        });
        ((CompletableFuture)handler.localState().handleAsync((localState, ex) -> this.finalizeLocalState(pitrContext, (PitrLocalState)localState, (Throwable)ex), (Executor)this.context.threadPool())).whenComplete((v, ex) -> pitrContext.complete());
    }

    private CompletableFuture<Void> initLocalState(PitrStateContext pitrContext, PitrLocalState localState) {
        return pitrContext.inBusyLockAsync(() -> {
            ByteArray localStateKey = PitrMetaStorageKeys.pitrLocalStateKey(pitrContext.operationId(), this.context.nodeName());
            return this.context.metaStorageManager().put(localStateKey, ByteUtils.toBytes((Object)localState)).whenComplete((v, e) -> {
                if (e != null) {
                    LOG.error("Unable to initialize local point in time recovery state", new Object[0]);
                }
            });
        });
    }

    private CompletableFuture<Boolean> finalizeLocalState(PitrStateContext pitrContext, PitrLocalState localState, @Nullable Throwable ex) {
        return IgniteUtils.inBusyLockAsync((IgniteSpinBusyLock)this.context.busyLock(), () -> {
            byte[] finalState;
            if (ex == null) {
                finalState = ByteUtils.toBytes((Object)localState);
                LOG.info("Point in time recovered rows = {} [ID = {}, node = {}]", new Object[]{localState.rowsUpdatedTotal(), pitrContext.operationId(), this.context.nodeName()});
            } else {
                Throwable unwrapped = ExceptionUtils.unwrapCause((Throwable)ex);
                if (unwrapped instanceof PitrCancelledException) {
                    LOG.info("Point in time recovery has been cancelled [ID = {}]", new Object[]{pitrContext.operationId()});
                } else {
                    LOG.error("Point in time recovery error [ID = {}]", unwrapped, new Object[]{pitrContext.operationId()});
                }
                if (pitrContext.isCancelledByCoordinator()) {
                    return CompletableFutures.nullCompletedFuture();
                }
                finalState = ByteUtils.toBytes((Object)localState);
            }
            ByteArray localStateKey = PitrMetaStorageKeys.pitrLocalStateKey(pitrContext.operationId(), this.context.nodeName());
            CompletableFuture invokeFuture = this.context.metaStorageManager().invoke((Condition)Conditions.exists((ByteArray)localStateKey), List.of(Operations.put((ByteArray)localStateKey, (byte[])finalState)), List.of());
            return invokeFuture.whenComplete((success, err) -> {
                if (err != null) {
                    LOG.error("Unable to finalize local point in time recovery state", err);
                }
                if (!success.booleanValue()) {
                    LOG.error("Unable to update metastorage LOCAL key for point in time recovery [ID = {}, node = {}]", new Object[]{pitrContext.operationId(), this.context.nodeName()});
                }
            });
        });
    }

    private void cancel(PitrGlobalState globalState) {
        CompletableFuture<Void> cancellationFuture;
        PitrStateContext pitrContext = this.ongoingPitrOperationById.get(globalState.operationId());
        if (pitrContext == null) {
            cancellationFuture = CompletableFutures.nullCompletedFuture();
        } else {
            pitrContext.cancelByCoordinator();
            cancellationFuture = pitrContext.completionFuture();
        }
        cancellationFuture.whenComplete((v, e) -> {
            ByteArray localStateKey = PitrMetaStorageKeys.pitrLocalStateKey(globalState.operationId(), this.context.nodeName());
            this.context.metaStorageManager().remove(localStateKey);
        });
    }

    public CompletableFuture<Void> cancelAllOngoingPitrOperationsDueToLocalFailure() {
        CompletableFuture[] futures = (CompletableFuture[])this.ongoingPitrOperationById.values().stream().map(ctx -> {
            ctx.cancel();
            return ctx.completionFuture();
        }).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(futures);
    }

    private void registerPitrOperation(PitrStateContext pitrContext) {
        this.ongoingPitrOperationById.put(pitrContext.operationId(), pitrContext);
        pitrContext.completionFuture().whenComplete((v, e) -> this.ongoingPitrOperationById.remove(pitrContext.operationId()));
    }

    @TestOnly
    public void updatePitrReader(Function<PitrManagerContext, PitrReader> mapper) {
        this.pitrReader = mapper.apply(this.context);
    }
}

