/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.pitr.metastorage;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.catalog.CatalogCommand;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.EntryEvent;
import org.apache.ignite3.internal.metastorage.WatchEvent;
import org.apache.ignite3.internal.metastorage.WatchListener;
import org.apache.ignite3.internal.metastorage.dsl.Condition;
import org.apache.ignite3.internal.metastorage.dsl.Conditions;
import org.apache.ignite3.internal.metastorage.dsl.Operation;
import org.apache.ignite3.internal.metastorage.dsl.Operations;
import org.apache.ignite3.internal.util.ByteUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.gridgain.internal.pitr.CoordinatorState;
import org.gridgain.internal.pitr.PitrManagerContext;
import org.gridgain.internal.pitr.PitrUtils;
import org.gridgain.internal.pitr.TableName;
import org.gridgain.internal.pitr.catalog.DropExistingTablesCommand;
import org.gridgain.internal.pitr.catalog.RenameTempTables;
import org.gridgain.internal.pitr.catalog.TablesAccessCommand;
import org.gridgain.internal.pitr.metastorage.PitrGlobalState;
import org.gridgain.internal.pitr.metastorage.PitrLocalState;
import org.gridgain.internal.pitr.metastorage.PitrMetaStorageKeys;
import org.gridgain.internal.pitr.metastorage.PitrProgress;
import org.gridgain.internal.pitr.metastorage.PitrStatus;
import org.jetbrains.annotations.Nullable;

public class PitrLocalStateWatch
implements WatchListener {
    private static final IgniteLogger LOG = Loggers.forClass(PitrLocalStateWatch.class);
    private final PitrManagerContext context;
    private final PitrGlobalState initialState;
    private final CoordinatorState coordinatorState;
    private final UUID operationId;
    private final Set<String> expectedNodeNames;
    private final AtomicBoolean completed = new AtomicBoolean(false);

    public PitrLocalStateWatch(PitrManagerContext context, CoordinatorState coordinatorState, PitrGlobalState initialState) {
        this.context = context;
        this.initialState = initialState;
        this.operationId = initialState.operationId();
        this.expectedNodeNames = ConcurrentHashMap.newKeySet(IgniteUtils.capacity(initialState.nodeNames().size()));
        this.coordinatorState = coordinatorState;
        this.expectedNodeNames.addAll(initialState.nodeNames());
        coordinatorState.onOperationStarted(this.operationId, this);
    }

    public Set<String> expectedNodeNames() {
        return this.expectedNodeNames;
    }

    public UUID operationId() {
        return this.operationId;
    }

    @Override
    public CompletableFuture<Void> onUpdate(WatchEvent event) {
        Collection<EntryEvent> entryEvents = event.entryEvents();
        if (entryEvents.size() > 1) {
            assert (entryEvents.stream().allMatch(e -> e.newEntry().tombstone())) : entryEvents;
            this.context.metaStorageManager().unregisterWatch(this);
            return CompletableFutures.nullCompletedFuture();
        }
        Entry entry = event.entryEvent().newEntry();
        String nodeName = PitrMetaStorageKeys.nodeNameFromPitrLocalStateKey(entry.key());
        byte[] stateBytes = entry.value();
        assert (stateBytes != null);
        PitrLocalState localState = (PitrLocalState)ByteUtils.fromBytes(stateBytes);
        CompletableFuture.supplyAsync(() -> this.onStateUpdate(nodeName, localState), this.context.threadPool()).whenComplete((v, e) -> {
            if (e != null) {
                LOG.error("Error when handling local state update from node {}: {}", (Throwable)e, (Object)nodeName, (Object)localState);
            }
        });
        return CompletableFutures.nullCompletedFuture();
    }

    private CompletableFuture<Void> onStateUpdate(String nodeName, PitrLocalState newState) {
        if (this.expectedNodeNames.isEmpty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        PitrStatus status = newState.status();
        LOG.info("Point in time recovery (local) {} [ID = {}, nodeName = {}]", new Object[]{status, this.operationId, nodeName});
        if (status == PitrStatus.STARTED) {
            return CompletableFutures.nullCompletedFuture();
        }
        boolean removed = this.expectedNodeNames.remove(nodeName);
        if (!removed) {
            return CompletableFutures.nullCompletedFuture();
        }
        switch (status) {
            case COMPLETED: {
                return this.onComplete(nodeName);
            }
            case FAILED: {
                return this.onFail(nodeName, newState.description());
            }
        }
        return CompletableFuture.failedFuture((Throwable)((Object)new AssertionError((Object)("Unexpected status: " + status))));
    }

    public CompletableFuture<Void> onComplete(String nodeName) {
        if (!this.expectedNodeNames.isEmpty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        if (!this.completed.compareAndSet(false, true)) {
            LOG.warn("Point in time recovery has been already completed [ID = {}, nodeName = {}]", this.operationId, nodeName);
            return CompletableFutures.nullCompletedFuture();
        }
        LOG.info("Point in time recovery completed [ID = {}, node = {}]", this.operationId, nodeName);
        this.context.metaStorageManager().unregisterWatch(this);
        return this.onCompleteImpl().thenCompose(this::transitionToTerminalState);
    }

    public CompletableFuture<Void> onFail(String nodeName, @Nullable String message) {
        if (!this.completed.compareAndSet(false, true)) {
            LOG.warn("Point in time recovery has been already completed [ID = {}]", this.operationId);
            return CompletableFutures.nullCompletedFuture();
        }
        this.expectedNodeNames.clear();
        String msg = IgniteStringFormatter.format("Point in time recovery failed [ID = {}, node = {}] {}", this.operationId, nodeName, message);
        LOG.error(msg, new Object[0]);
        this.context.metaStorageManager().unregisterWatch(this);
        return this.onFailImpl(msg).thenCompose(this::transitionToTerminalState);
    }

    private CompletableFuture<List<Operation>> onCompleteImpl() {
        List<CatalogCommand> commands = List.of(TablesAccessCommand.unlock(this.initialState.tables()), new DropExistingTablesCommand(this.initialState.tables()), new RenameTempTables("__RECOVERY", this.initialState.tables()));
        return ((CompletableFuture)this.context.catalogManager().execute(commands).thenComposeAsync(v -> PitrProgress.mergeLocalStates(this.context, this.initialState.operationId(), -1L), (Executor)this.context.threadPool())).thenApplyAsync(progress -> this.toTerminalOperations(PitrStatus.COMPLETED, "", (PitrProgress)progress), (Executor)this.context.threadPool());
    }

    private CompletableFuture<List<Operation>> onFailImpl(String message) {
        Set<TableName> tmpTables = this.initialState.tables().stream().map(tableName -> {
            String schema = tableName.schema();
            String name = tableName.name();
            String newName = PitrUtils.tmpTableName(name);
            return new TableName(schema, newName);
        }).collect(Collectors.toSet());
        List<CatalogCommand> commands = List.of(TablesAccessCommand.unlock(this.initialState.tables()), new DropExistingTablesCommand(tmpTables));
        return ((CompletableFuture)this.context.catalogManager().execute(commands).thenComposeAsync(v -> PitrProgress.mergeLocalStates(this.context, this.initialState.operationId(), -1L), (Executor)this.context.threadPool())).thenApplyAsync(progress -> this.toTerminalOperations(PitrStatus.FAILED, message, (PitrProgress)progress), (Executor)this.context.threadPool());
    }

    private List<Operation> toTerminalOperations(PitrStatus newStatus, String description, PitrProgress progress) {
        progress.setStatus(newStatus);
        progress.setDescription(description);
        PitrGlobalState newGlobalState = new PitrGlobalState(newStatus, this.initialState.operationId(), this.initialState.nodeNames(), this.initialState.tables(), this.initialState.timestamp(), description, this.initialState.catalogVersion(), progress);
        ArrayList<Operation> operations = new ArrayList<Operation>(this.initialState.nodeNames().size() + 2);
        operations.add(Operations.put(PitrMetaStorageKeys.pitrGlobalStateKey(this.operationId), ByteUtils.toBytes(newGlobalState)));
        for (TableName tableName : newGlobalState.tables()) {
            operations.add(Operations.remove(PitrMetaStorageKeys.buildTableLockKey(tableName)));
        }
        for (String nodeName : this.initialState.nodeNames()) {
            operations.add(Operations.remove(PitrMetaStorageKeys.pitrLocalStateKey(this.operationId, nodeName)));
        }
        return operations;
    }

    private CompletableFuture<Void> transitionToTerminalState(List<Operation> operations) {
        ByteArray coordinatorTermKey = PitrMetaStorageKeys.pitrCoordinatorTermKey(this.operationId);
        byte[] termBytes = ByteUtils.longToBytesKeepingOrder(this.coordinatorState.term());
        ArrayList<Operation> operationsWithTermUpdate = new ArrayList<Operation>(operations);
        operationsWithTermUpdate.add(Operations.put(coordinatorTermKey, termBytes));
        return this.context.metaStorageManager().invoke((Condition)Conditions.value(coordinatorTermKey).le(termBytes), operationsWithTermUpdate, List.of()).handle((success, e) -> {
            if (e != null) {
                LOG.error("Point in time recovery failed [ID = {}]", (Throwable)e, (Object)this.operationId);
            } else if (success.booleanValue()) {
                LOG.info("Point in time recovery completed successfully [ID = {}]", this.operationId);
                this.coordinatorState.onOperationComplete(this.operationId);
            } else {
                LOG.info("Point in time recovery coordinator has changed, process will be completed by the new coordinator.", new Object[0]);
            }
            return null;
        });
    }
}

