/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.pitr;

import java.util.HashMap;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.ignite3.internal.catalog.Catalog;
import org.apache.ignite3.internal.catalog.CatalogManager;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite3.internal.components.NodeProperties;
import org.apache.ignite3.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite3.internal.hlc.HybridClock;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.manager.IgniteComponent;
import org.apache.ignite3.internal.metastorage.impl.MetaStorageEvent;
import org.apache.ignite3.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite3.internal.metastorage.impl.MetaStorageServiceImpl;
import org.apache.ignite3.internal.network.ClusterService;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.NetworkMessage;
import org.apache.ignite3.internal.partition.replicator.PartitionReplicaLifecycleManager;
import org.apache.ignite3.internal.replicator.ReplicaManager;
import org.apache.ignite3.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite3.internal.table.distributed.TableManager;
import org.apache.ignite3.internal.tx.TxManager;
import org.apache.ignite3.internal.util.ByteUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.util.Lazy;
import org.gridgain.internal.license.LicenseFeatureChecker;
import org.gridgain.internal.pitr.CoordinatorRole;
import org.gridgain.internal.pitr.PitrFacade;
import org.gridgain.internal.pitr.PitrManagerContext;
import org.gridgain.internal.pitr.PitrReader;
import org.gridgain.internal.pitr.configuration.NodePitrConfiguration;
import org.gridgain.internal.pitr.exception.PitrException;
import org.gridgain.internal.pitr.message.ErrorResponseMessage;
import org.gridgain.internal.pitr.message.PitrMessageGroup;
import org.gridgain.internal.pitr.message.RecoveryRequestMessage;
import org.gridgain.internal.pitr.message.StateRequestMessage;
import org.gridgain.internal.pitr.metastorage.PitrGlobalState;
import org.gridgain.internal.pitr.metastorage.PitrGlobalStateWatch;
import org.gridgain.internal.pitr.metastorage.PitrMetaStorageKeys;
import org.gridgain.internal.pitr.metastorage.PitrProgress;
import org.gridgain.internal.rbac.authorization.Authorizer;
import org.jetbrains.annotations.TestOnly;

public class PitrManager
implements IgniteComponent {
    public static final String TMP_PREFIX = "__RECOVERY";
    private static final IgniteLogger LOG = Loggers.forClass(PitrManager.class);
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final PitrManagerContext context;
    private final PitrGlobalStateWatch globalWatch;
    private final Lazy<CoordinatorRole> coordinatorRole;

    public PitrManager(NodePitrConfiguration pitrConfiguration, ClusterService clusterService, MetaStorageManagerImpl metaStorageManager, CatalogManager catalogManager, TableManager tableManager, TxManager txManager, DistributionZoneManager distributionZoneManager, LogicalTopologyService logicalTopologyService, HybridClock clock, ReplicationConfiguration replicationConfiguration, ReplicaManager replicaManager, PartitionReplicaLifecycleManager partitionReplicaLifecycleManager, NodeProperties nodeProperties) {
        this.context = new PitrManagerContext(pitrConfiguration, clusterService, metaStorageManager, catalogManager, tableManager, txManager, distributionZoneManager, logicalTopologyService, clock, replicationConfiguration, replicaManager, partitionReplicaLifecycleManager, nodeProperties);
        this.globalWatch = new PitrGlobalStateWatch(this.context);
        this.coordinatorRole = new Lazy<CoordinatorRole>(() -> new CoordinatorRole(this.context));
        metaStorageManager.listen(MetaStorageEvent.ON_LEADER_ELECTED, parameters -> this.coordinatorRole.get().onBecomeCoordinator(parameters.term()).thenApply(unused -> true));
    }

    public PitrFacade api(LicenseFeatureChecker licenseFeatureChecker, Authorizer authorizer) {
        return new PitrFacade(this.context, licenseFeatureChecker, authorizer);
    }

    @Override
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        this.context.messagingService().addMessageHandler(PitrMessageGroup.class, (message, sender, correlationId) -> {
            if (message instanceof RecoveryRequestMessage) {
                this.executeOnCoordinator((RecoveryRequestMessage)message, sender, Objects.requireNonNull(correlationId), this::onRecoveryMessage);
            } else if (message instanceof StateRequestMessage) {
                this.executeOnCoordinator((StateRequestMessage)message, sender, Objects.requireNonNull(correlationId), this::onStateMessage);
            }
        });
        this.context.metaStorageManager().registerPrefixWatch(PitrMetaStorageKeys.pitrGlobalStatePrefix(), this.globalWatch);
        return CompletableFutures.nullCompletedFuture();
    }

    @Override
    public void beforeNodeStop() {
        CompletableFuture<Void> cancelFuture = this.globalWatch.cancelAllOngoingPitrOperationsDueToLocalFailure();
        try {
            cancelFuture.get(5L, TimeUnit.SECONDS);
        }
        catch (ExecutionException e) {
            LOG.error("Exception while waiting for point in time recovery cancellation", (Throwable)e);
        }
        catch (InterruptedException e) {
            LOG.error("Interrupted while waiting for point in time recovery cancellation", new Object[0]);
        }
        catch (TimeoutException e) {
            LOG.error("Timeout while waiting for point in time recovery cancellation", new Object[0]);
        }
    }

    @Override
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.context.close();
        return CompletableFutures.nullCompletedFuture();
    }

    private <T, R extends NetworkMessage> void executeOnCoordinator(T message, InternalClusterNode sender, long correlationId, BiFunction<Long, T, CompletableFuture<R>> callback) {
        CompletableFuture<MetaStorageServiceImpl> serviceFuture = this.context.metaStorageManager().metaStorageService();
        if (!serviceFuture.isDone()) {
            ErrorResponseMessage response2 = this.context.messagesFactory().errorResponseMessage().errorDescription("Meta Storage Raft group is not ready").build();
            this.context.messagingService().respond(sender, (NetworkMessage)response2, correlationId);
            return;
        }
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)serviceFuture.thenCompose(service -> service.raftGroupService().refreshAndGetLeaderWithTerm())).thenComposeAsync(leaderWithTerm -> {
            String leaderName = leaderWithTerm.leader().consistentId();
            if (!leaderName.equals(this.context.nodeName())) {
                LOG.debug("Not a coordinator node: {}", this.context.nodeName());
                return CompletableFuture.completedFuture(this.context.messagesFactory().notCoordinatorMessage().build());
            }
            LOG.debug("Coordinator node found: {}", this.context.nodeName());
            return (CompletionStage)callback.apply(leaderWithTerm.term(), message);
        }, (Executor)this.context.threadPool())).exceptionally(e -> {
            LOG.error("Error when processing message: {}.", (Throwable)e, message);
            return this.context.messagesFactory().errorResponseMessage().errorDescription(e.getMessage()).build();
        })).thenAccept(response -> this.context.messagingService().respond(sender, (NetworkMessage)response, correlationId));
    }

    private CompletableFuture<NetworkMessage> onStateMessage(Long term, StateRequestMessage message) {
        return ((CompletableFuture)this.context.metaStorageManager().get(PitrMetaStorageKeys.pitrGlobalStateKey(message.operationId())).thenComposeAsync(entry -> {
            if (entry == null || entry.value() == null) {
                return CompletableFuture.failedFuture(new PitrException("Point in time recovery id not found " + message.operationId()));
            }
            PitrGlobalState globalState = (PitrGlobalState)ByteUtils.fromBytes(entry.value());
            if (globalState.progress() == null) {
                return PitrProgress.mergeLocalStates(this.context, message.operationId());
            }
            return CompletableFuture.completedFuture(globalState.progress());
        }, (Executor)this.context.threadPool())).thenApply(progress -> {
            Catalog catalog = this.context.catalogManager().catalog(this.context.catalogManager().latestCatalogVersion());
            HashMap<String, Long> progressMap = new HashMap<String, Long>();
            progress.tables().forEach((tableId, details) -> {
                long rows = details.rowsUpdated();
                progressMap.put(catalog.table((int)tableId).name(), rows);
            });
            return this.context.messagesFactory().stateResponseMessage().status(progress.status()).progress(progressMap).description(progress.description()).build();
        });
    }

    private CompletableFuture<NetworkMessage> onRecoveryMessage(Long term, RecoveryRequestMessage message) {
        return IgniteUtils.inBusyLockAsync(this.context.busyLock(), () -> {
            LOG.info("Point in time message received [ts={}, tableNames={}]", message.timestampLong(), message.tableNames());
            UUID operationId = UUID.randomUUID();
            return Objects.requireNonNull(this.coordinatorRole.get()).prepareOperation(term, operationId, message).thenApply(unused -> this.context.messagesFactory().recoveryResponseMessage().operationId(operationId).build());
        });
    }

    @TestOnly
    public PitrManagerContext context() {
        return this.context;
    }

    @TestOnly
    public void updatePitrReader(Function<PitrManagerContext, PitrReader> mapper) {
        this.globalWatch.updatePitrReader(mapper);
    }
}

