/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.catalog;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.apache.ignite3.internal.catalog.BulkUpdateProducer;
import org.apache.ignite3.internal.catalog.Catalog;
import org.apache.ignite3.internal.catalog.CatalogApplyResult;
import org.apache.ignite3.internal.catalog.CatalogCommand;
import org.apache.ignite3.internal.catalog.CatalogManager;
import org.apache.ignite3.internal.catalog.CatalogNotFoundException;
import org.apache.ignite3.internal.catalog.CatalogSystemViewRegistry;
import org.apache.ignite3.internal.catalog.CatalogValidationException;
import org.apache.ignite3.internal.catalog.CatalogVersionAwareValidationException;
import org.apache.ignite3.internal.catalog.SchemaChangeIsForbiddenException;
import org.apache.ignite3.internal.catalog.UpdateContext;
import org.apache.ignite3.internal.catalog.UpdateProducer;
import org.apache.ignite3.internal.catalog.commands.AlterZoneSetDefaultCommand;
import org.apache.ignite3.internal.catalog.commands.CatalogUtils;
import org.apache.ignite3.internal.catalog.commands.CreateSchemaCommand;
import org.apache.ignite3.internal.catalog.commands.CreateZoneCommand;
import org.apache.ignite3.internal.catalog.commands.CreateZoneCommandBuilder;
import org.apache.ignite3.internal.catalog.commands.StorageProfileParams;
import org.apache.ignite3.internal.catalog.events.CatalogEvent;
import org.apache.ignite3.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite3.internal.catalog.storage.Fireable;
import org.apache.ignite3.internal.catalog.storage.SnapshotEntry;
import org.apache.ignite3.internal.catalog.storage.UpdateEntry;
import org.apache.ignite3.internal.catalog.storage.UpdateLog;
import org.apache.ignite3.internal.catalog.storage.UpdateLogEvent;
import org.apache.ignite3.internal.catalog.storage.VersionedUpdate;
import org.apache.ignite3.internal.components.NodeProperties;
import org.apache.ignite3.internal.components.SystemPropertiesNodeProperties;
import org.apache.ignite3.internal.event.AbstractEventProducer;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.sql.SqlCommon;
import org.apache.ignite3.internal.systemview.api.SystemView;
import org.apache.ignite3.internal.systemview.api.SystemViewProvider;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.util.PendingComparableValuesTracker;
import org.apache.ignite3.lang.ErrorGroups;
import org.jetbrains.annotations.TestOnly;

public class CatalogManagerImpl
extends AbstractEventProducer<CatalogEvent, CatalogEventParameters>
implements CatalogManager,
SystemViewProvider {
    public static final String DEFAULT_ZONE_NAME = "Default";
    public static final int DEFAULT_ZONE_PARTITION_COUNT = 25;
    private static final int MAX_RETRY_COUNT = 10;
    private static final IgniteLogger LOG = Loggers.forClass(CatalogManagerImpl.class);
    private final NavigableMap<Integer, Catalog> catalogByVer = new ConcurrentSkipListMap<Integer, Catalog>();
    private final NavigableMap<Long, Catalog> catalogByTs = new ConcurrentSkipListMap<Long, Catalog>();
    private final CompletableFuture<Void> catalogInitializationFuture = new CompletableFuture();
    private final UpdateLog updateLog;
    private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker(0);
    private final ClockService clockService;
    private final FailureProcessor failureProcessor;
    private final NodeProperties nodeProperties;
    private final LongSupplier delayDurationMsSupplier;
    private final CatalogSystemViewRegistry catalogSystemViewProvider;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private CompletableFuture<Void> lastSaveUpdateFuture = CompletableFutures.nullCompletedFuture();
    private final Object lastSaveUpdateFutureMutex = new Object();
    private final Supplier<CompletableFuture<Boolean>> rollingUpgradeStateSupplier;

    @TestOnly
    public CatalogManagerImpl(UpdateLog updateLog, ClockService clockService, FailureProcessor failureProcessor, LongSupplier delayDurationMsSupplier) {
        this(updateLog, clockService, failureProcessor, new SystemPropertiesNodeProperties(), delayDurationMsSupplier, CompletableFutures::falseCompletedFuture);
    }

    public CatalogManagerImpl(UpdateLog updateLog, ClockService clockService, FailureProcessor failureProcessor, NodeProperties nodeProperties, LongSupplier delayDurationMsSupplier, Supplier<CompletableFuture<Boolean>> rollingUpgradeStateSupplier) {
        this.updateLog = updateLog;
        this.clockService = clockService;
        this.failureProcessor = failureProcessor;
        this.nodeProperties = nodeProperties;
        this.delayDurationMsSupplier = delayDurationMsSupplier;
        this.catalogSystemViewProvider = new CatalogSystemViewRegistry(() -> this.catalogAt(clockService.nowLong()));
        this.rollingUpgradeStateSupplier = rollingUpgradeStateSupplier;
    }

    @Override
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        int objectIdGen = 0;
        Catalog emptyCatalog = new Catalog(0, HybridTimestamp.MIN_VALUE.longValue(), objectIdGen, List.of(), List.of(), null);
        this.registerCatalog(emptyCatalog);
        this.updateLog.registerUpdateHandler(new OnUpdateHandlerImpl());
        return this.updateLog.startAsync(componentContext).thenComposeAsync(none -> {
            if (this.latestCatalogVersion() == emptyCatalog.version()) {
                int initializedCatalogVersion = emptyCatalog.version() + 1;
                ((CompletableFuture)this.catalogReadyFuture(initializedCatalogVersion).thenComposeAsync(ignored -> this.awaitVersionActivation(initializedCatalogVersion), (Executor)componentContext.executor())).handleAsync((r, e) -> this.catalogInitializationFuture.complete(null), (Executor)componentContext.executor());
                return this.initCatalog(emptyCatalog);
            }
            this.catalogInitializationFuture.complete(null);
            return CompletableFutures.nullCompletedFuture();
        }, (Executor)componentContext.executor());
    }

    @Override
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        this.busyLock.block();
        this.versionTracker.close(new NodeStoppingException());
        return this.updateLog.stopAsync(componentContext);
    }

    @Override
    public int activeCatalogVersion(long timestamp) {
        return this.catalogAt(timestamp).version();
    }

    @Override
    public int earliestCatalogVersion() {
        return this.catalogByVer.firstEntry().getKey();
    }

    @Override
    public Catalog earliestCatalog() {
        return this.catalogByVer.firstEntry().getValue();
    }

    @Override
    public int latestCatalogVersion() {
        return this.catalogByVer.lastEntry().getKey();
    }

    @Override
    public CompletableFuture<Void> catalogReadyFuture(int version) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.versionTracker.waitFor(version));
    }

    @Override
    public CompletableFuture<Void> catalogInitializationFuture() {
        return this.catalogInitializationFuture;
    }

    @Override
    public Catalog catalog(int catalogVersion) {
        Catalog catalog = (Catalog)this.catalogByVer.get(catalogVersion);
        if (catalog == null) {
            throw new CatalogNotFoundException("Catalog version not found: " + catalogVersion);
        }
        return catalog;
    }

    @Override
    public Catalog activeCatalog(long timestamp) {
        return this.catalogAt(timestamp);
    }

    private Catalog catalogAt(long timestamp) {
        Map.Entry<Long, Catalog> entry = this.catalogByTs.floorEntry(timestamp);
        if (entry == null) {
            throw new CatalogNotFoundException("Catalog not found for given timestamp: " + timestamp);
        }
        return entry.getValue();
    }

    @Override
    public CompletableFuture<CatalogApplyResult> execute(CatalogCommand command) {
        return this.saveUpdateAndWaitForActivation(List.of(command));
    }

    @Override
    public CompletableFuture<CatalogApplyResult> execute(List<CatalogCommand> commands) {
        if (CollectionUtils.nullOrEmpty(commands)) {
            return CompletableFutures.nullCompletedFuture();
        }
        return this.saveUpdateAndWaitForActivation(List.copyOf(commands));
    }

    public CompletableFuture<Boolean> compactCatalog(int version) {
        Catalog catalog = this.catalog(version);
        if (catalog == null) {
            throw new IllegalArgumentException("Catalog version not found: " + version);
        }
        return this.updateLog.saveSnapshot(new SnapshotEntry(catalog));
    }

    private CompletableFuture<Void> initCatalog(Catalog emptyCatalog) {
        List<CatalogCommand> initCommands = List.of(((CreateZoneCommandBuilder)CreateZoneCommand.builder((f, sp, r) -> 25).zoneName(DEFAULT_ZONE_NAME)).replicas(1).quorumSize(1).dataNodesAutoAdjustScaleUp(CatalogUtils.defaultZoneDefaultAutoAdjustScaleUpTimeoutSeconds(this.nodeProperties.colocationEnabled())).dataNodesAutoAdjustScaleDown(Integer.MAX_VALUE).filter("$..*").storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile("default").build())).build(), AlterZoneSetDefaultCommand.builder().zoneName(DEFAULT_ZONE_NAME).build(), CreateSchemaCommand.builder().name(SqlCommon.DEFAULT_SCHEMA_NAME).build(), CreateSchemaCommand.systemSchemaBuilder().name("SYSTEM").build());
        List<UpdateEntry> entries = new BulkUpdateProducer(initCommands).get(new UpdateContext(emptyCatalog, this));
        return this.updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, entries)).handle((result, error) -> {
            if (error != null && !ExceptionUtils.hasCause(error, NodeStoppingException.class)) {
                this.failureProcessor.process(new FailureContext((Throwable)error, "Unable to create default zone."));
            }
            return null;
        });
    }

    private void registerCatalog(Catalog newCatalog) {
        this.catalogByVer.put(newCatalog.version(), newCatalog);
        this.catalogByTs.put(newCatalog.time(), newCatalog);
    }

    private void truncateUpTo(Catalog catalog) {
        this.catalogByVer.headMap(catalog.version(), false).clear();
        this.catalogByTs.headMap(catalog.time(), false).clear();
        LOG.info("Catalog history was truncated up to version=" + catalog.version(), new Object[0]);
    }

    private CompletableFuture<CatalogApplyResult> saveUpdateAndWaitForActivation(List<UpdateProducer> updateProducers) {
        CompletableFuture<CatalogApplyResult> resultFuture = new CompletableFuture<CatalogApplyResult>();
        ((CompletableFuture)this.rollingUpgradeStateSupplier.get().thenCompose(isUpgrading -> {
            if (isUpgrading.booleanValue()) {
                throw new SchemaChangeIsForbiddenException("Schema modification command was aborted because a cluster upgrade is currently in progress.");
            }
            return this.saveUpdateEliminatingLocalConcurrency(updateProducers).thenCompose(this::awaitVersionActivation);
        })).whenComplete((result, err) -> {
            if (err != null) {
                Throwable errUnwrapped = ExceptionUtils.unwrapCause(err);
                if (errUnwrapped instanceof CatalogVersionAwareValidationException) {
                    CatalogVersionAwareValidationException err0 = (CatalogVersionAwareValidationException)errUnwrapped;
                    Catalog catalog = (Catalog)this.catalogByVer.get(err0.version());
                    Throwable error = err0.initial();
                    if (catalog.version() == 0) {
                        resultFuture.completeExceptionally(error);
                    } else {
                        HybridTimestamp tsSafeForRoReadingInPastOptimization = this.calcClusterWideEnsureActivationTime(catalog);
                        this.clockService.waitFor(tsSafeForRoReadingInPastOptimization).whenComplete((ver, err1) -> {
                            if (err1 != null) {
                                error.addSuppressed((Throwable)err1);
                            }
                            resultFuture.completeExceptionally(error);
                        });
                    }
                } else {
                    resultFuture.completeExceptionally((Throwable)err);
                }
            } else {
                resultFuture.complete((CatalogApplyResult)result);
            }
        });
        return resultFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<CatalogApplyResult> saveUpdateEliminatingLocalConcurrency(List<UpdateProducer> updateProducer) {
        Object object = this.lastSaveUpdateFutureMutex;
        synchronized (object) {
            CompletionStage chainedFuture = this.lastSaveUpdateFuture.thenCompose(unused -> this.saveUpdate(updateProducer, 0));
            this.lastSaveUpdateFuture = ((CompletableFuture)chainedFuture).handle((res, ex) -> null);
            return chainedFuture;
        }
    }

    private CompletableFuture<CatalogApplyResult> awaitVersionActivation(CatalogApplyResult catalogApplyResult) {
        return this.awaitVersionActivation(catalogApplyResult.getCatalogVersion()).thenApply(unused -> catalogApplyResult);
    }

    private CompletableFuture<Integer> awaitVersionActivation(int version) {
        Catalog catalog = (Catalog)this.catalogByVer.get(version);
        HybridTimestamp tsSafeForRoReadingInPastOptimization = this.calcClusterWideEnsureActivationTime(catalog);
        return this.clockService.waitFor(tsSafeForRoReadingInPastOptimization).thenApply(unused -> version);
    }

    private HybridTimestamp calcClusterWideEnsureActivationTime(Catalog catalog) {
        return CatalogUtils.clusterWideEnsuredActivationTimestamp(catalog.time(), this.clockService.maxClockSkewMillis());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    private CompletableFuture<CatalogApplyResult> saveUpdate(List<UpdateProducer> updateProducers, int attemptNo) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            if (attemptNo >= 10) {
                CompletableFuture<CatalogApplyResult> completableFuture = CompletableFuture.failedFuture(new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, "Max retry limit exceeded: " + attemptNo));
                return completableFuture;
            }
            Catalog catalog = this.catalogByVer.lastEntry().getValue();
            BitSet applyResults = new BitSet(updateProducers.size());
            ArrayList<UpdateEntry> bulkUpdateEntries = new ArrayList<UpdateEntry>();
            try {
                UpdateContext updateContext = new UpdateContext(catalog, this);
                for (int i = 0; i < updateProducers.size(); ++i) {
                    UpdateProducer update = updateProducers.get(i);
                    List<UpdateEntry> entries = update.get(updateContext);
                    if (entries.isEmpty()) continue;
                    for (UpdateEntry entry : entries) {
                        updateContext.updateCatalog(cat -> entry.applyUpdate((Catalog)cat, INITIAL_TIMESTAMP));
                    }
                    applyResults.set(i);
                    bulkUpdateEntries.addAll(entries);
                }
            }
            catch (CatalogValidationException ex) {
                CompletableFuture<CatalogApplyResult> completableFuture = CompletableFuture.failedFuture(new CatalogVersionAwareValidationException(ex, catalog.version()));
                this.busyLock.leaveBusy();
                return completableFuture;
            }
            catch (Exception ex) {
                CompletableFuture<CatalogApplyResult> completableFuture = CompletableFuture.failedFuture(ex);
                this.busyLock.leaveBusy();
                return completableFuture;
            }
            if (applyResults.cardinality() == 0) {
                CompletableFuture<CatalogApplyResult> ex = CompletableFuture.completedFuture(new CatalogApplyResult(applyResults, catalog.version(), catalog.time()));
                return ex;
            }
            int newVersion = catalog.version() + 1;
            CompletionStage completionStage = ((CompletableFuture)this.updateLog.append(new VersionedUpdate(newVersion, this.delayDurationMsSupplier.getAsLong(), bulkUpdateEntries)).thenCompose(result -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.versionTracker.waitFor(newVersion).thenApply(none -> result)))).thenCompose(result -> {
                if (result.booleanValue()) {
                    long newCatalogTime = ((Catalog)this.catalogByVer.get(newVersion)).time();
                    return CompletableFuture.completedFuture(new CatalogApplyResult(applyResults, newVersion, newCatalogTime));
                }
                return this.saveUpdate(updateProducers, attemptNo + 1);
            });
            return completionStage;
            {
                catch (Throwable throwable) {
                    throw throwable;
                }
            }
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    @Override
    public List<SystemView<?>> systemViews() {
        return this.catalogSystemViewProvider.systemViews();
    }

    private static Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp) {
        long activationTimestamp = metaStorageUpdateTimestamp.addPhysicalTime(update.delayDurationMs()).longValue();
        assert (activationTimestamp > catalog.time()) : "Activation timestamp " + activationTimestamp + " must be greater than previous catalog version activation timestamp " + catalog.time();
        return new Catalog(update.version(), activationTimestamp, catalog.objectIdGenState(), catalog.zones(), catalog.schemas(), CatalogUtils.defaultZoneIdOpt(catalog));
    }

    class OnUpdateHandlerImpl
    implements UpdateLog.OnUpdateHandler {
        OnUpdateHandlerImpl() {
        }

        @Override
        public CompletableFuture<Void> handle(UpdateLogEvent event, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) {
            if (event instanceof SnapshotEntry) {
                return this.handle((SnapshotEntry)event);
            }
            return this.handle((VersionedUpdate)event, metaStorageUpdateTimestamp, causalityToken);
        }

        private CompletableFuture<Void> handle(SnapshotEntry event) {
            Catalog catalog = event.snapshot();
            CatalogManagerImpl.this.registerCatalog(catalog);
            CatalogManagerImpl.this.truncateUpTo(catalog);
            return CompletableFutures.nullCompletedFuture();
        }

        private CompletableFuture<Void> handle(VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) {
            int version = update.version();
            Catalog catalog = (Catalog)CatalogManagerImpl.this.catalogByVer.get(version - 1);
            assert (catalog != null) : version - 1;
            for (UpdateEntry entry : update.entries()) {
                catalog = entry.applyUpdate(catalog, metaStorageUpdateTimestamp);
            }
            catalog = CatalogManagerImpl.applyUpdateFinal(catalog, update, metaStorageUpdateTimestamp);
            CatalogManagerImpl.this.registerCatalog(catalog);
            ArrayList<CompletableFuture> eventFutures = new ArrayList<CompletableFuture>(update.entries().size());
            for (UpdateEntry entry : update.entries()) {
                if (!(entry instanceof Fireable)) continue;
                Fireable fireEvent = (Fireable)((Object)entry);
                eventFutures.add(CatalogManagerImpl.this.fireEvent(fireEvent.eventType(), fireEvent.createEventParameters(causalityToken, version)));
            }
            return CompletableFuture.allOf((CompletableFuture[])eventFutures.toArray(CompletableFuture[]::new)).whenComplete((ignore, err) -> {
                if (err != null && !ExceptionUtils.hasCause(err, NodeStoppingException.class)) {
                    CatalogManagerImpl.this.failureProcessor.process(new FailureContext((Throwable)err, "Failed to apply catalog update."));
                }
                if (!CatalogManagerImpl.this.busyLock.enterBusy()) {
                    return;
                }
                try {
                    CatalogManagerImpl.this.versionTracker.update(version, null);
                }
                finally {
                    CatalogManagerImpl.this.busyLock.leaveBusy();
                }
            });
        }
    }
}

