/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.pitr;

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.network.ClusterNodeImpl;
import org.apache.ignite3.internal.util.ByteUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.gridgain.internal.pitr.CoordinatorFailoverSubscriber;
import org.gridgain.internal.pitr.CoordinatorState;
import org.gridgain.internal.pitr.PitrManagerContext;
import org.gridgain.internal.pitr.PitrOperation;
import org.gridgain.internal.pitr.RebalanceWatch;
import org.gridgain.internal.pitr.exception.PitrException;
import org.gridgain.internal.pitr.message.RecoveryRequestMessage;
import org.gridgain.internal.pitr.metastorage.PitrGlobalState;
import org.gridgain.internal.pitr.metastorage.PitrLocalStateWatch;
import org.gridgain.internal.pitr.metastorage.PitrMetaStorageKeys;

class CoordinatorRole {
    public static final IgniteLogger LOG = Loggers.forClass(CoordinatorRole.class);
    private final PitrManagerContext context;
    private final RebalanceWatch rebalanceWatch;
    private final ConcurrentMap<UUID, PitrLocalStateWatch> ongoingOperations = new ConcurrentHashMap<UUID, PitrLocalStateWatch>();

    CoordinatorRole(PitrManagerContext context) {
        this.context = context;
        this.rebalanceWatch = new RebalanceWatch(context, this.ongoingOperations);
    }

    CompletableFuture<Void> onBecomeCoordinator(long term) {
        LOG.info("PITR coordinator elected, starting failover process [node = {}]", this.context.nodeName());
        this.context.logicalTopologyService().addEventListener(new LogicalTopologyEventListener(){

            @Override
            public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) {
                CoordinatorRole.this.ongoingOperations.forEach((operationId, localStateWatch) -> {
                    if (localStateWatch.expectedNodeNames().contains(leftNode.name())) {
                        localStateWatch.onFail(CoordinatorRole.this.context.nodeName(), IgniteStringFormatter.format("Node has left the topology [ID = {}, node = {}]", localStateWatch.operationId(), leftNode.name()));
                    }
                });
            }
        });
        this.rebalanceWatch.register();
        CoordinatorState coordinatorState = new CoordinatorState(term, this.ongoingOperations, this.rebalanceWatch);
        CoordinatorFailoverSubscriber failoverSubscriber = new CoordinatorFailoverSubscriber(this.context, coordinatorState);
        this.context.metaStorageManager().prefix(PitrMetaStorageKeys.pitrGlobalStatePrefix()).subscribe(failoverSubscriber);
        return CompletableFutures.nullCompletedFuture();
    }

    CompletableFuture<Void> prepareOperation(long term, UUID operationId, RecoveryRequestMessage message) {
        return ((CompletableFuture)((CompletableFuture)this.context.metaStorageManager().get(PitrMetaStorageKeys.pitrGlobalStateKey(operationId)).thenAccept(entry -> {
            if (entry != null && entry.value() != null) {
                CoordinatorRole.validateForRestoration((PitrGlobalState)ByteUtils.fromBytes(entry.value()));
            }
        })).thenCompose(unused -> this.context.logicalTopologyService().logicalTopologyOnLeader())).thenComposeAsync(topology -> {
            Set<String> nodeNames = topology.nodes().stream().map(ClusterNodeImpl::name).collect(Collectors.toSet());
            CoordinatorState coordinatorState = new CoordinatorState(term, this.ongoingOperations, this.rebalanceWatch);
            PitrOperation operation = new PitrOperation(this.context, coordinatorState);
            return operation.prepare(message, operationId, nodeNames);
        }, (Executor)this.context.threadPool());
    }

    private static void validateForRestoration(PitrGlobalState globalState) {
        switch (globalState.status()) {
            case COMPLETED: {
                break;
            }
            case FAILED: {
                throw new PitrException(IgniteStringFormatter.format("Point in time recovery is in FAILED state, can not proceed [ID = {}]", globalState.operationId()));
            }
            case PREPARED: 
            case STARTED: {
                throw new PitrException(IgniteStringFormatter.format("Point in time recovery is not in COMPLETED state [ID = {}]", globalState.operationId()));
            }
            default: {
                throw new IllegalStateException(IgniteStringFormatter.format("Unknown point in time recovery status: {} [ID = {}]", new Object[]{globalState.status(), globalState.operationId()}));
            }
        }
    }
}

