package org.gridgain.internal.pitr.metastorage;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.gridgain.internal.pitr.PitrManagerContext;
import org.gridgain.internal.pitr.PitrReader;
import org.gridgain.internal.pitr.exception.PitrCancelledException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/* loaded from: input_file:org/gridgain/internal/pitr/metastorage/PitrGlobalStateWatch.class */
public class PitrGlobalStateWatch implements WatchListener {
    private static final IgniteLogger LOG = Loggers.forClass(PitrGlobalStateWatch.class);
    private final PitrManagerContext context;
    private final Map<UUID, PitrStateContext> ongoingPitrOperationById = new ConcurrentHashMap();
    private PitrReader pitrReader;

    public PitrGlobalStateWatch(PitrManagerContext pitrManagerContext) {
        this.context = pitrManagerContext;
        this.pitrReader = new PitrReader(pitrManagerContext);
    }

    public CompletableFuture<Void> onUpdate(WatchEvent watchEvent) {
        if (!this.context.busyLock().enterBusy()) {
            LOG.debug("Skipping Global point in time recovery state update because the node is stopping", new Object[0]);
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            Iterator it = watchEvent.entryEvents().iterator();
            while (it.hasNext()) {
                Entry newEntry = ((EntryEvent) it.next()).newEntry();
                if (newEntry.value() != null) {
                    processNewStateEntry((PitrGlobalState) ByteUtils.fromBytes(newEntry.value()), newEntry.revision());
                }
            }
            CompletableFuture<Void> nullCompletedFuture = CompletableFutures.nullCompletedFuture();
            this.context.busyLock().leaveBusy();
            return nullCompletedFuture;
        } catch (Throwable th) {
            this.context.busyLock().leaveBusy();
            throw th;
        }
    }

    private void processNewStateEntry(PitrGlobalState pitrGlobalState, long j) {
        LOG.info("Point in time recovery {} [ID = {}, node = {}]", new Object[]{pitrGlobalState.status(), pitrGlobalState.operationId(), this.context.nodeName()});
        switch (pitrGlobalState.status()) {
            case PREPARED:
            case COMPLETED:
                return;
            case STARTED:
                if (pitrGlobalState.nodeNames().contains(this.context.nodeName())) {
                    this.context.threadPool().execute(() -> {
                        LOG.debug("Starting local point in time recovery [ID = {}, node = {}]", new Object[]{pitrGlobalState.operationId(), this.context.nodeName()});
                        start(pitrGlobalState, j);
                    });
                    return;
                } else {
                    LOG.info("Received a point in time recovery request, but the node is not in the list of target nodes, skipping.", new Object[0]);
                    return;
                }
            case FAILED:
                this.context.threadPool().execute(() -> {
                    LOG.error("Point in time recovery failed [ID = {}, node = {}] reason: {}", new Object[]{pitrGlobalState.operationId(), this.context.nodeName(), pitrGlobalState.description()});
                    cancel(pitrGlobalState);
                });
                return;
            default:
                throw new AssertionError("Unexpected point in time recovery status: " + pitrGlobalState.status());
        }
    }

    private void start(PitrGlobalState pitrGlobalState, long j) {
        PitrStateContext pitrStateContext = new PitrStateContext(pitrGlobalState, this.context.busyLock(), j);
        registerPitrOperation(pitrStateContext);
        PitrLocalState pitrLocalState = new PitrLocalState(PitrStatus.STARTED);
        PitrLocalProgressHandler pitrLocalProgressHandler = new PitrLocalProgressHandler(pitrLocalState);
        initLocalState(pitrStateContext, pitrLocalState).thenComposeAsync(r7 -> {
            return this.pitrReader.recover(pitrStateContext, pitrLocalProgressHandler);
        }, (Executor) this.context.threadPool()).whenComplete((BiConsumer<? super U, ? super Throwable>) (l, th) -> {
            if (th != null) {
                pitrLocalProgressHandler.handleRecoveryException(th);
            }
        });
        pitrLocalProgressHandler.localState().handleAsync((pitrLocalState2, th2) -> {
            return finalizeLocalState(pitrStateContext, pitrLocalState2, th2);
        }, (Executor) this.context.threadPool()).whenComplete((BiConsumer<? super U, ? super Throwable>) (completableFuture, th3) -> {
            pitrStateContext.complete();
        });
    }

    private CompletableFuture<Void> initLocalState(PitrStateContext pitrStateContext, PitrLocalState pitrLocalState) {
        return pitrStateContext.inBusyLockAsync(() -> {
            return this.context.metaStorageManager().put(PitrMetaStorageKeys.pitrLocalStateKey(pitrStateContext.operationId(), this.context.nodeName()), ByteUtils.toBytes(pitrLocalState)).whenComplete((r4, th) -> {
                if (th != null) {
                    LOG.error("Unable to initialize local point in time recovery state", new Object[0]);
                }
            });
        });
    }

    private CompletableFuture<Boolean> finalizeLocalState(PitrStateContext pitrStateContext, PitrLocalState pitrLocalState, @Nullable Throwable th) {
        return IgniteUtils.inBusyLockAsync(this.context.busyLock(), () -> {
            byte[] bytes;
            if (th == null) {
                bytes = ByteUtils.toBytes(pitrLocalState);
                LOG.info("Point in time recovered rows = {} [ID = {}, node = {}]", new Object[]{Long.valueOf(pitrLocalState.rowsUpdatedTotal()), pitrStateContext.operationId(), this.context.nodeName()});
            } else {
                Throwable unwrapCause = ExceptionUtils.unwrapCause(th);
                if (unwrapCause instanceof PitrCancelledException) {
                    LOG.info("Point in time recovery has been cancelled [ID = {}]", new Object[]{pitrStateContext.operationId()});
                } else {
                    LOG.error("Point in time recovery error [ID = {}]", unwrapCause, new Object[]{pitrStateContext.operationId()});
                }
                if (pitrStateContext.isCancelledByCoordinator()) {
                    return CompletableFutures.nullCompletedFuture();
                }
                bytes = ByteUtils.toBytes(pitrLocalState);
            }
            ByteArray pitrLocalStateKey = PitrMetaStorageKeys.pitrLocalStateKey(pitrStateContext.operationId(), this.context.nodeName());
            return this.context.metaStorageManager().invoke(Conditions.exists(pitrLocalStateKey), List.of(Operations.put(pitrLocalStateKey, bytes)), List.of()).whenComplete((bool, th2) -> {
                if (th2 != null) {
                    LOG.error("Unable to finalize local point in time recovery state", th2);
                }
                if (bool.booleanValue()) {
                    return;
                }
                LOG.error("Unable to update metastorage LOCAL key for point in time recovery [ID = {}, node = {}]", new Object[]{pitrStateContext.operationId(), this.context.nodeName()});
            });
        });
    }

    private void cancel(PitrGlobalState pitrGlobalState) {
        CompletableFuture<Void> completionFuture;
        PitrStateContext pitrStateContext = this.ongoingPitrOperationById.get(pitrGlobalState.operationId());
        if (pitrStateContext == null) {
            completionFuture = CompletableFutures.nullCompletedFuture();
        } else {
            pitrStateContext.cancelByCoordinator();
            completionFuture = pitrStateContext.completionFuture();
        }
        completionFuture.whenComplete((r5, th) -> {
            this.context.metaStorageManager().remove(PitrMetaStorageKeys.pitrLocalStateKey(pitrGlobalState.operationId(), this.context.nodeName()));
        });
    }

    public CompletableFuture<Void> cancelAllOngoingPitrOperationsDueToLocalFailure() {
        return CompletableFuture.allOf((CompletableFuture[]) this.ongoingPitrOperationById.values().stream().map(pitrStateContext -> {
            pitrStateContext.cancel();
            return pitrStateContext.completionFuture();
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    private void registerPitrOperation(PitrStateContext pitrStateContext) {
        this.ongoingPitrOperationById.put(pitrStateContext.operationId(), pitrStateContext);
        pitrStateContext.completionFuture().whenComplete((r5, th) -> {
            this.ongoingPitrOperationById.remove(pitrStateContext.operationId());
        });
    }

    @TestOnly
    public void updatePitrReader(Function<PitrManagerContext, PitrReader> function) {
        this.pitrReader = function.apply(this.context);
    }
}
