/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.snapshots.coordinator;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.catalog.Catalog;
import org.apache.ignite3.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.dsl.Condition;
import org.apache.ignite3.internal.metastorage.dsl.Conditions;
import org.apache.ignite3.internal.metastorage.dsl.Operations;
import org.apache.ignite3.internal.network.ClusterNodeImpl;
import org.apache.ignite3.internal.rest.api.snapshot.SnapshotType;
import org.apache.ignite3.internal.util.ByteUtils;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.table.QualifiedName;
import org.apache.ignite3.table.QualifiedNameHelper;
import org.gridgain.internal.snapshots.DataKeyGenerator;
import org.gridgain.internal.snapshots.SnapshotException;
import org.gridgain.internal.snapshots.SnapshotIllegalArgumentException;
import org.gridgain.internal.snapshots.SnapshotManager;
import org.gridgain.internal.snapshots.SnapshotManagerContext;
import org.gridgain.internal.snapshots.SnapshotNotFoundException;
import org.gridgain.internal.snapshots.SnapshotTablesNotFoundException;
import org.gridgain.internal.snapshots.SnapshotUtils;
import org.gridgain.internal.snapshots.communication.metastorage.CreateSnapshotGlobalState;
import org.gridgain.internal.snapshots.communication.metastorage.CreateSnapshotGlobalStateSerializer;
import org.gridgain.internal.snapshots.communication.metastorage.GlobalSnapshotState;
import org.gridgain.internal.snapshots.communication.metastorage.MetaStorageKeys;
import org.gridgain.internal.snapshots.communication.metastorage.RestoreSnapshotGlobalState;
import org.gridgain.internal.snapshots.communication.metastorage.SnapshotStatus;
import org.gridgain.internal.snapshots.coordinator.CreateSnapshotLocalStateWatch;
import org.gridgain.internal.snapshots.coordinator.CreateSnapshotRecoveryHandler;
import org.gridgain.internal.snapshots.coordinator.DeleteSnapshotRecoveryHandler;
import org.gridgain.internal.snapshots.coordinator.LocalSnapshotStateListener;
import org.gridgain.internal.snapshots.coordinator.RestoreSnapshotRecoveryHandler;
import org.gridgain.internal.snapshots.coordinator.SnapshotCoordinatorState;
import org.gridgain.internal.snapshots.coordinator.SnapshotDeletionProcess;
import org.gridgain.internal.snapshots.coordinator.SnapshotRebalanceWatch;
import org.gridgain.internal.snapshots.coordinator.SnapshotRestorationProcess;
import org.gridgain.internal.snapshots.coordinator.SnapshotsCache;
import org.gridgain.internal.snapshots.filesystem.SnapshotUri;
import org.gridgain.internal.snapshots.signature.SnapshotSignature;
import org.jetbrains.annotations.Nullable;

public class SnapshotCoordinatorRole {
    private static final IgniteLogger LOG = Loggers.forClass(SnapshotManager.class);
    private static final int MAX_CAS_RETRIES = 3;
    private final SnapshotManagerContext context;
    private final SnapshotsCache cache;
    private final ConcurrentMap<UUID, LocalSnapshotStateListener> ongoingSnapshots = new ConcurrentHashMap<UUID, LocalSnapshotStateListener>();
    private final SnapshotRebalanceWatch rebalanceWatch;

    public SnapshotCoordinatorRole(SnapshotManagerContext context) {
        this(context, new SnapshotsCache(context));
    }

    private SnapshotCoordinatorRole(SnapshotManagerContext context, SnapshotsCache cache) {
        this.context = context;
        this.cache = cache;
        this.rebalanceWatch = new SnapshotRebalanceWatch(context, this.ongoingSnapshots);
    }

    public CompletableFuture<Void> onBecomeCoordinator(long term) {
        LOG.info("Node {} became Snapshot Coordinator, starting the failover process.", this.context.nodeName());
        this.context.logicalTopologyService().addEventListener(new LogicalTopologyEventListener(){

            @Override
            public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) {
                SnapshotCoordinatorRole.this.ongoingSnapshots.forEach((operationId, localStateListener) -> {
                    if (localStateListener.expectedNodeNames().contains(leftNode.name())) {
                        localStateListener.onSnapshotFailed(SnapshotCoordinatorRole.this.context.nodeName(), String.format("Node %s has left the topology", leftNode.name()));
                    }
                });
            }
        });
        this.rebalanceWatch.register();
        CompletableFuture<Void> cacheInitFuture = this.cache.init();
        SnapshotCoordinatorState snapshotCoordinatorState = new SnapshotCoordinatorState(term, this.ongoingSnapshots, this.rebalanceWatch);
        CreateSnapshotRecoveryHandler createSnapshotRecoveryHandler = new CreateSnapshotRecoveryHandler(this.context, snapshotCoordinatorState);
        this.context.metaStorageManager().prefix(MetaStorageKeys.createSnapshotGlobalStatePrefix()).subscribe(createSnapshotRecoveryHandler);
        RestoreSnapshotRecoveryHandler restoreSnapshotRecoveryHandler = new RestoreSnapshotRecoveryHandler(this.context, snapshotCoordinatorState);
        this.context.metaStorageManager().prefix(MetaStorageKeys.restoreSnapshotGlobalStatePrefix()).subscribe(restoreSnapshotRecoveryHandler);
        DeleteSnapshotRecoveryHandler deleteSnapshotRecoveryHandler = new DeleteSnapshotRecoveryHandler(this.context, snapshotCoordinatorState);
        this.context.metaStorageManager().prefix(MetaStorageKeys.deleteSnapshotGlobalStatePrefix()).subscribe(deleteSnapshotRecoveryHandler);
        return cacheInitFuture;
    }

    public CompletableFuture<UUID> startSnapshotCreation(long term, SnapshotType snapshotType, Instant startTime, HybridTimestamp timestamp, Set<String> tablesToSave, @Nullable String destination, @Nullable String encryptionProviderName) {
        HybridTimestamp current = this.context.clock().current();
        if (this.context.clockService().after(timestamp, current)) {
            return CompletableFuture.failedFuture(new SnapshotException(IgniteStringFormatter.format("Timestamp provided is in the future: {}. Current time is {}", timestamp, current)));
        }
        CompletableFuture<Void> schemaSyncFuture = this.context.schemaSyncService().waitForMetadataCompleteness(timestamp);
        CompletableFuture<LogicalTopologySnapshot> logicalTopologyFuture = this.context.logicalTopologyService().logicalTopologyOnLeader();
        SnapshotUri snapshotUri = this.context.snapshotFileSystemManager().snapshotUri(destination);
        return ((CompletableFuture)schemaSyncFuture.thenCombine(logicalTopologyFuture, (none, topology) -> IgniteUtils.inBusyLockAsync(this.context.busyLock(), () -> {
            Catalog catalog = this.context.catalogManager().activeCatalog(timestamp.longValue());
            Collection<CatalogTableDescriptor> tables = tablesToSave.isEmpty() ? catalog.tables() : SnapshotCoordinatorRole.getTables(tablesToSave, catalog);
            return SnapshotUtils.awaitPrimaryReplicas(this.context, tables, catalog).thenComposeAsync(v -> {
                HashSet<Integer> tableIds = new HashSet<Integer>(IgniteUtils.capacity(tables.size()));
                HashSet<QualifiedName> tableNames = new HashSet<QualifiedName>(IgniteUtils.capacity(tables.size()));
                for (CatalogTableDescriptor table : tables) {
                    CatalogSchemaDescriptor schema = catalog.schema(table.schemaId());
                    assert (schema != null);
                    QualifiedName qualifiedName = QualifiedNameHelper.fromNormalized(schema.name(), table.name());
                    tableIds.add(table.id());
                    tableNames.add(qualifiedName);
                }
                return this.startSnapshotCreation(3, term, snapshotType, startTime, timestamp, tables, (Set<Integer>)tableIds, (Set<QualifiedName>)tableNames, (LogicalTopologySnapshot)topology, snapshotUri, encryptionProviderName);
            }, (Executor)this.context.threadPool());
        }))).thenCompose(Function.identity());
    }

    private CompletableFuture<UUID> startSnapshotCreation(int retriesLeft, long term, SnapshotType snapshotType, Instant startTime, HybridTimestamp timestamp, Collection<CatalogTableDescriptor> tables, Set<Integer> tableIds, Set<QualifiedName> tableNames, LogicalTopologySnapshot topology, SnapshotUri destination, @Nullable String encryptionProviderName) {
        return this.getParentSnapshotIfNeeded(snapshotType, tableIds, tableNames).thenComposeAsync(parentSnapshotEntry -> {
            CreateSnapshotGlobalState parentSnapshot = parentSnapshotEntry == null ? null : CreateSnapshotGlobalStateSerializer.deserialize(parentSnapshotEntry.value());
            Set<String> nodeNames = topology.nodes().stream().map(ClusterNodeImpl::name).collect(Collectors.toSet());
            UUID snapshotId = UUID.randomUUID();
            CreateSnapshotGlobalState newSnapshot = new CreateSnapshotGlobalState(snapshotId, SnapshotStatus.STARTED, nodeNames, startTime, tableIds, tableNames, parentSnapshot == null ? HybridTimestamp.MIN_VALUE : parentSnapshot.timestamp(), timestamp, "", parentSnapshot == null ? null : parentSnapshot.snapshotId(), Collections.emptySet(), destination, encryptionProviderName);
            LOG.info("Starting snapshot creation, initial state: {}", newSnapshot);
            this.rebalanceWatch.addOperation(snapshotId, tables);
            CreateSnapshotLocalStateWatch localStateWatch = new CreateSnapshotLocalStateWatch(this.context, new SnapshotCoordinatorState(term, this.ongoingSnapshots, this.rebalanceWatch), newSnapshot);
            this.context.metaStorageManager().registerPrefixWatch(MetaStorageKeys.createSnapshotLocalStatePrefix(snapshotId), localStateWatch);
            CompletableFuture<Void> snapshotMetaFuture = this.saveSnapshotMeta(newSnapshot, destination, encryptionProviderName);
            if (parentSnapshot == null) {
                return ((CompletableFuture)snapshotMetaFuture.thenCompose(v -> this.context.metaStorageManager().putAll(Map.of(MetaStorageKeys.createSnapshotGlobalStateKey(snapshotId), CreateSnapshotGlobalStateSerializer.serialize(newSnapshot), MetaStorageKeys.coordinatorTermKey(newSnapshot.operationId()), ByteUtils.longToBytesKeepingOrder(term))))).thenApply(v -> snapshotId);
            }
            return ((CompletableFuture)snapshotMetaFuture.thenCompose(v -> this.trySaveSnapshotState(term, parentSnapshot, parentSnapshotEntry.revision(), newSnapshot))).thenComposeAsync(success -> {
                if (success.booleanValue()) {
                    return CompletableFuture.completedFuture(snapshotId);
                }
                this.context.metaStorageManager().unregisterWatch(localStateWatch);
                localStateWatch.cancel();
                this.rebalanceWatch.removeOperation(snapshotId);
                if (retriesLeft <= 0) {
                    LOG.warn("Failed to update parent snapshot, can not CAS parent snapshot state", new Object[0]);
                    return CompletableFuture.failedFuture(new SnapshotException("Failed to update parent snapshot after all retries, parent snapshot is updated by another process"));
                }
                return this.startSnapshotCreation(retriesLeft - 1, term, snapshotType, startTime, timestamp, tables, tableIds, tableNames, topology, destination, encryptionProviderName);
            }, (Executor)this.context.threadPool());
        }, (Executor)this.context.threadPool());
    }

    private CompletableFuture<Void> saveSnapshotMeta(CreateSnapshotGlobalState snapshotState, SnapshotUri destination, @Nullable String encryptionProviderName) {
        if (!destination.isSingleCopy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        SnapshotSignature signature = this.createSnapshotSignature(snapshotState.snapshotId(), encryptionProviderName);
        return this.context.snapshotMetaWriter().saveSnapshotMeta(snapshotState, signature);
    }

    @Nullable
    private SnapshotSignature createSnapshotSignature(UUID snapshotId, @Nullable String encryptionProviderName) {
        if (encryptionProviderName == null) {
            return null;
        }
        String chainId = snapshotId.toString();
        this.context.keyManager().createKeyChain(chainId, DataKeyGenerator.create(0), encryptionProviderName);
        return new SnapshotSignature(this.context.keyManager().activeKey(chainId), DataKeyGenerator.defaultCipherAlgorithm());
    }

    private CompletableFuture<Boolean> trySaveSnapshotState(long term, CreateSnapshotGlobalState parentSnapshot, long parentSnapshotRevision, CreateSnapshotGlobalState newSnapshot) {
        HashSet<UUID> newDependentSnapshotIds = new HashSet<UUID>(parentSnapshot.dependentSnapshotIds());
        newDependentSnapshotIds.add(newSnapshot.snapshotId());
        CreateSnapshotGlobalState newParentSnapshot = new CreateSnapshotGlobalState(parentSnapshot.operationId(), parentSnapshot.status(), parentSnapshot.nodeNames(), parentSnapshot.startTime(), parentSnapshot.tableIds(), parentSnapshot.tableNames(), parentSnapshot.fromTimestamp(), parentSnapshot.timestamp(), parentSnapshot.description(), parentSnapshot.parentSnapshotId(), newDependentSnapshotIds, parentSnapshot.snapshotUri(), parentSnapshot.encryptionProviderName());
        return this.context.metaStorageManager().invoke((Condition)Conditions.revision(MetaStorageKeys.createSnapshotGlobalStateKey(parentSnapshot.snapshotId())).eq(parentSnapshotRevision), List.of(Operations.put(MetaStorageKeys.createSnapshotGlobalStateKey(newSnapshot.snapshotId()), CreateSnapshotGlobalStateSerializer.serialize(newSnapshot)), Operations.put(MetaStorageKeys.createSnapshotGlobalStateKey(parentSnapshot.snapshotId()), CreateSnapshotGlobalStateSerializer.serialize(newParentSnapshot)), Operations.put(MetaStorageKeys.coordinatorTermKey(newSnapshot.operationId()), ByteUtils.longToBytesKeepingOrder(term))), List.of());
    }

    private CompletableFuture<@Nullable Entry> getParentSnapshotIfNeeded(SnapshotType snapshotType, Set<Integer> tableIds, Set<QualifiedName> tableNames) {
        switch (snapshotType) {
            case FULL: {
                return CompletableFutures.nullCompletedFuture();
            }
            case INCREMENTAL: {
                return this.getParentSnapshot(tableIds, tableNames);
            }
        }
        throw new SnapshotIllegalArgumentException("Unknown snapshot type: " + snapshotType);
    }

    @Nullable
    private UUID findParentSnapshotId(Set<Integer> tableIds) {
        UUID result = null;
        Iterator<Integer> it = tableIds.iterator();
        if (it.hasNext()) {
            SnapshotsCache.CacheFrame cache = this.cache.get();
            result = cache.getLatestSnapshotId(it.next());
            while (it.hasNext()) {
                UUID snapshotId = cache.getLatestSnapshotId(it.next());
                if (Objects.equals(snapshotId, result)) continue;
                result = null;
                break;
            }
        }
        return result;
    }

    private CompletableFuture<Entry> getParentSnapshot(Set<Integer> tableIds, Set<QualifiedName> tableNames) {
        UUID parentSnapshotId = this.findParentSnapshotId(tableIds);
        if (parentSnapshotId == null) {
            List orderedCanonicalTableNames = tableNames.stream().map(QualifiedName::toCanonicalForm).sorted().collect(Collectors.toList());
            return CompletableFuture.failedFuture(new SnapshotException("No parent snapshot found for the tables: " + orderedCanonicalTableNames));
        }
        return this.context.metaStorageManager().get(MetaStorageKeys.createSnapshotGlobalStateKey(parentSnapshotId)).thenApply(entry -> {
            if (SnapshotUtils.isMissing(entry)) {
                LOG.warn("Parent snapshot with ID {} is missing, can not use it for the new snapshot", parentSnapshotId);
                throw new SnapshotException("Parent snapshot is not longer available");
            }
            CreateSnapshotGlobalState state = CreateSnapshotGlobalStateSerializer.deserialize(entry.value());
            if (state.status() != SnapshotStatus.COMPLETED) {
                LOG.warn("Parent snapshot with ID {} is in {} state, can not use it for new incremental snapshot", new Object[]{parentSnapshotId, state.status()});
                throw new SnapshotException("Parent snapshot is in inconsistent state");
            }
            return entry;
        });
    }

    public CompletableFuture<UUID> prepareSnapshotRestoration(long term, UUID targetSnapshotId, Set<String> tablesToRestore, @Nullable String source, @Nullable String encryptionProviderName) {
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.context.metaStorageManager().get(MetaStorageKeys.createSnapshotGlobalStateKey(targetSnapshotId)).thenAccept(entry -> {
            if (!SnapshotUtils.isMissing(entry)) {
                SnapshotCoordinatorRole.validateForRestoration(CreateSnapshotGlobalStateSerializer.deserialize(entry.value()));
            }
        })).thenCompose(ignored -> this.context.logicalTopologyService().logicalTopologyOnLeader())).thenApplyAsync(topology -> {
            Set<String> nodeNames = topology.nodes().stream().map(ClusterNodeImpl::name).collect(Collectors.toSet());
            Set<QualifiedName> qualifiedTableNames = CollectionUtils.mapToImmutableSet(tablesToRestore, QualifiedName::parse);
            RestoreSnapshotGlobalState restoreGlobalState = new RestoreSnapshotGlobalState(UUID.randomUUID(), SnapshotStatus.PREPARED, nodeNames, qualifiedTableNames, targetSnapshotId, -1, this.startTime(), "", this.context.snapshotFileSystemManager().snapshotUri(source), encryptionProviderName);
            SnapshotCoordinatorState coordinatorState = new SnapshotCoordinatorState(term, this.ongoingSnapshots, this.rebalanceWatch);
            SnapshotRestorationProcess restorationProcess = new SnapshotRestorationProcess(this.context, coordinatorState);
            return restorationProcess.prepareSnapshotRestoration(restoreGlobalState).thenApply(v -> restoreGlobalState.operationId());
        }, (Executor)this.context.threadPool())).thenCompose(Function.identity());
    }

    private static Set<CatalogTableDescriptor> getTables(Collection<String> tableNames, Catalog catalog) {
        HashSet<CatalogTableDescriptor> tables = new HashSet<CatalogTableDescriptor>(IgniteUtils.capacity(tableNames.size()));
        ArrayList<String> missingTableNames = null;
        for (String tableName : tableNames) {
            QualifiedName qualifiedTableName = QualifiedName.parse(tableName);
            CatalogSchemaDescriptor schemaDescriptor = catalog.schema(qualifiedTableName.schemaName());
            if (schemaDescriptor == null) {
                throw new SnapshotException(IgniteStringFormatter.format("Schema does not exist [schema={}, catalogVersion={}].", qualifiedTableName.schemaName(), catalog.version()));
            }
            CatalogTableDescriptor tableDescriptor = schemaDescriptor.table(qualifiedTableName.objectName());
            if (tableDescriptor == null) {
                if (missingTableNames == null) {
                    missingTableNames = new ArrayList<String>();
                }
                missingTableNames.add(qualifiedTableName.toCanonicalForm());
                continue;
            }
            tables.add(tableDescriptor);
        }
        if (missingTableNames != null) {
            Collections.sort(missingTableNames);
            throw new SnapshotTablesNotFoundException(missingTableNames);
        }
        return tables;
    }

    public CompletableFuture<UUID> prepareSnapshotDeletion(long term, UUID targetSnapshotId) {
        ByteArray createStateKey = MetaStorageKeys.createSnapshotGlobalStateKey(targetSnapshotId);
        CompletionStage createGlobalStateFuture = this.context.metaStorageManager().get(createStateKey).thenApply(createGlobalStateEntry -> {
            if (SnapshotUtils.isMissing(createGlobalStateEntry)) {
                throw new SnapshotNotFoundException(targetSnapshotId);
            }
            CreateSnapshotGlobalState createGlobalState = CreateSnapshotGlobalStateSerializer.deserialize(createGlobalStateEntry.value());
            SnapshotCoordinatorRole.validateForDeletion(createGlobalState);
            return createGlobalState;
        });
        return ((CompletableFuture)createGlobalStateFuture).thenComposeAsync(createGlobalState -> IgniteUtils.inBusyLockAsync(this.context.busyLock(), () -> {
            SnapshotCoordinatorState coordinatorState = new SnapshotCoordinatorState(term, this.ongoingSnapshots, this.rebalanceWatch);
            SnapshotDeletionProcess deletionProcess = new SnapshotDeletionProcess(this.context, coordinatorState);
            return deletionProcess.prepareSnapshotDeletion((CreateSnapshotGlobalState)createGlobalState, this.startTime()).thenApply(GlobalSnapshotState::operationId);
        }), (Executor)this.context.threadPool());
    }

    private static void validateForDeletion(CreateSnapshotGlobalState createGlobalState) {
        switch (createGlobalState.status()) {
            case FAILED: 
            case COMPLETED: {
                break;
            }
            case STARTED: 
            case PREPARED: {
                throw new SnapshotIllegalArgumentException(String.format("Snapshot with ID %s is not in COMPLETED state.", createGlobalState.snapshotId()));
            }
            default: {
                throw new SnapshotIllegalArgumentException(String.format("Snapshot with ID %s has an unknown status: %s", new Object[]{createGlobalState.snapshotId(), createGlobalState.status()}));
            }
        }
    }

    private static void validateForRestoration(CreateSnapshotGlobalState createGlobalState) {
        switch (createGlobalState.status()) {
            case COMPLETED: {
                break;
            }
            case FAILED: {
                throw new SnapshotIllegalArgumentException(String.format("Snapshot with ID %s is in FAILED state, can not restore.", createGlobalState.snapshotId()));
            }
            case STARTED: 
            case PREPARED: {
                throw new SnapshotIllegalArgumentException(String.format("Snapshot with ID %s is not in COMPLETED state.", createGlobalState.snapshotId()));
            }
            default: {
                throw new SnapshotIllegalArgumentException(String.format("Snapshot with ID %s has an unknown status: %s", new Object[]{createGlobalState.snapshotId(), createGlobalState.status()}));
            }
        }
    }

    private Instant startTime() {
        return Instant.ofEpochMilli(this.context.clock().current().getPhysical());
    }
}

