/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.cdc.core;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.catalog.CatalogCommand;
import org.apache.ignite3.internal.catalog.CatalogManager;
import org.apache.ignite3.internal.catalog.commands.ColumnParams;
import org.apache.ignite3.internal.catalog.commands.CreateTableCommand;
import org.apache.ignite3.internal.catalog.commands.CreateTableCommandBuilder;
import org.apache.ignite3.internal.catalog.commands.CreateZoneCommand;
import org.apache.ignite3.internal.catalog.commands.CreateZoneCommandBuilder;
import org.apache.ignite3.internal.catalog.commands.StorageProfileParams;
import org.apache.ignite3.internal.catalog.commands.TableHashPrimaryKey;
import org.apache.ignite3.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite3.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite3.internal.configuration.SystemDistributedView;
import org.apache.ignite3.internal.configuration.SystemPropertyView;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.lang.AsyncCursor;
import org.apache.ignite3.sql.ColumnType;
import org.apache.ignite3.table.IgniteTables;
import org.apache.ignite3.table.KeyValueView;
import org.apache.ignite3.table.QualifiedName;
import org.apache.ignite3.table.RecordView;
import org.apache.ignite3.table.Table;
import org.apache.ignite3.table.mapper.Mapper;
import org.apache.ignite3.tx.IgniteTransactions;
import org.apache.ignite3.tx.Transaction;
import org.apache.ignite3.tx.TransactionOptions;
import org.gridgain.internal.cdc.api.CdcManager;
import org.gridgain.internal.cdc.api.exception.CdcDisabledException;
import org.gridgain.internal.cdc.api.exception.replication.CdcReplicationAlreadyExistsException;
import org.gridgain.internal.cdc.api.exception.replication.CdcReplicationNotFoundException;
import org.gridgain.internal.cdc.api.exception.replication.CdcReplicationStartException;
import org.gridgain.internal.cdc.api.exception.replication.CdcReplicationStopException;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkAlreadyExistsException;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkNotFoundException;
import org.gridgain.internal.cdc.api.exception.source.CdcSourceAlreadyExistsException;
import org.gridgain.internal.cdc.api.exception.source.CdcSourceNotFoundException;
import org.gridgain.internal.cdc.api.replication.CdcReplicationDefinition;
import org.gridgain.internal.cdc.api.replication.CdcReplicationExecNodes;
import org.gridgain.internal.cdc.api.replication.CdcReplicationInstance;
import org.gridgain.internal.cdc.api.replication.CdcReplicationStatus;
import org.gridgain.internal.cdc.api.sink.SinkDefinition;
import org.gridgain.internal.cdc.api.sink.SinkStatus;
import org.gridgain.internal.cdc.api.source.SourceDefinition;
import org.gridgain.internal.cdc.api.source.SourceStatus;
import org.gridgain.internal.cdc.core.exec.CdcReplicationHandle;
import org.gridgain.internal.cdc.core.exec.LocalCdcReplicationExecutor;
import org.gridgain.internal.cdc.core.failover.CdcFailover;
import org.gridgain.internal.cdc.core.replication.CdcReplicationInstanceMappings;
import org.gridgain.internal.cdc.core.replication.CdcReplicationInstanceValue;
import org.gridgain.internal.cdc.core.sink.CdcSinkFactory;
import org.gridgain.internal.cdc.core.sink.CdcSinkValidatorFactory;
import org.gridgain.internal.cdc.core.sink.SinkDefinitionMappings;
import org.gridgain.internal.cdc.core.sink.SinkDefinitionValue;
import org.gridgain.internal.cdc.core.source.SourceDefinitionMappings;
import org.gridgain.internal.cdc.core.source.SourceDefinitionValue;
import org.jetbrains.annotations.Nullable;

public class CdcManagerImpl
implements CdcManager {
    private static final IgniteLogger LOG = Loggers.forClass(CdcManagerImpl.class);
    public static final String SCHEMA_NAME = "SYSTEM";
    private static final String SOURCES_TABLE_NAME = "CDC_SOURCES";
    private static final String SINKS_TABLE_NAME = "CDC_SINKS";
    private static final String REPLICATIONS_TABLE_NAME = "CDC_REPLICATIONS";
    public static final String REPLICATIONS_PROGRESS_TABLE_NAME = "CDC_REPLICATIONS_PROGRESS";
    private static final String CDC_SYSTEM_ZONE_NAME = "CDC_SYSTEM_ZONE";
    private static final int TIMEOUT_MILLIS_RW_TX = 10000;
    private static final int TIMEOUT_MILLIS_RO_TX = 30000;
    private static final int THREAD_POOL_SIZE = 4;
    private static final int CDC_ZONE_PARTITIONS_COUNT = 8;
    private static final String CDC_ENABLED_PROPERTY_NAME = "cdcEnabled";
    private final IgniteTables tables;
    private final CatalogManager catalogManager;
    private final IgniteTransactions transactions;
    private final String localNodeName;
    private final CdcFailover failover;
    private final AtomicBoolean cdcConfigEnabled;
    private final ClockService clockService;
    private final LongSupplier idleSafeTimePropagationPeriodMsSupplier;
    private final LocalCdcReplicationExecutor localExecutor;
    private final Map<String, CdcReplicationHandle> handles = new ConcurrentHashMap<String, CdcReplicationHandle>();
    private final Executor executor;
    private final AtomicReference<CompletableFuture<Void>> tablesInitFuture = new AtomicReference();
    private volatile KeyValueView<String, SourceDefinitionValue> sourcesKv;
    private volatile RecordView<SourceDefinition> sourcesRv;
    private volatile KeyValueView<String, SinkDefinitionValue> sinksKv;
    private volatile RecordView<SinkDefinition> sinksRv;
    private volatile KeyValueView<String, CdcReplicationInstanceValue> replicationsKv;
    private volatile RecordView<CdcReplicationInstance> replicationsRv;

    public CdcManagerImpl(String localNodeName, IgniteTables igniteTables, IgniteTransactions transactions, CatalogManager catalogManager, CdcSinkFactory sinkFactory, CdcSinkValidatorFactory validatorFactory, LogicalTopologyService logicalTopology, TopologyService topologyService, SystemDistributedConfiguration systemPropertyConfiguration, ClockService clockService, LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier) {
        this.localNodeName = localNodeName;
        this.tables = igniteTables;
        this.catalogManager = catalogManager;
        this.transactions = transactions;
        this.idleSafeTimePropagationPeriodMsSupplier = partitionIdleSafeTimePropagationPeriodMsSupplier;
        this.clockService = clockService;
        this.executor = Executors.newFixedThreadPool(4, IgniteThreadFactory.create(localNodeName, "cdc-manager", LOG, new ThreadOperation[0]));
        this.localExecutor = new LocalCdcReplicationExecutor(localNodeName, sinkFactory, validatorFactory, igniteTables);
        this.failover = new CdcFailover(localNodeName, logicalTopology, topologyService, this);
        this.cdcConfigEnabled = new AtomicBoolean(false);
        this.handeEnableProperty(systemPropertyConfiguration);
    }

    private void handeEnableProperty(SystemDistributedConfiguration systemPropertyConfiguration) {
        systemPropertyConfiguration.listen(newConfig -> {
            SystemPropertyView cdcEnabledNewProperty = ((SystemDistributedView)newConfig.newValue()).properties().get(CDC_ENABLED_PROPERTY_NAME);
            if (cdcEnabledNewProperty != null && cdcEnabledNewProperty.name().equals(CDC_ENABLED_PROPERTY_NAME)) {
                this.cdcConfigEnabled.set(Boolean.parseBoolean(cdcEnabledNewProperty.propertyValue()));
                if (this.cdcConfigEnabled.get()) {
                    this.failover.start();
                } else {
                    this.failover.stop();
                }
            }
            return CompletableFutures.nullCompletedFuture();
        });
    }

    private <T> CompletableFuture<T> runInTransaction(Function<Transaction, CompletableFuture<T>> transactionFunction) {
        this.checkCdcEnabled();
        return this.createTablesIfNeeded().thenComposeAsync(v -> this.transactions.runInTransactionAsync(transactionFunction, new TransactionOptions().timeoutMillis(10000L).readOnly(false)), this.executor);
    }

    private <T> CompletableFuture<T> runInReadTransaction(Function<Transaction, CompletableFuture<T>> transactionFunction) {
        this.checkCdcEnabled();
        return this.createTablesIfNeeded().thenComposeAsync(v -> this.transactions.runInTransactionAsync(transactionFunction, new TransactionOptions().timeoutMillis(30000L).readOnly(true)), this.executor);
    }

    @Override
    public CompletableFuture<Void> createSource(SourceDefinition sourceDefinition) {
        return this.runInTransaction(tx -> {
            if (this.sourcesKv.get((Transaction)tx, sourceDefinition.name()) != null) {
                return CompletableFuture.failedFuture(new CdcSourceAlreadyExistsException(sourceDefinition.name()));
            }
            return this.sourcesKv.putAsync((Transaction)tx, sourceDefinition.name(), SourceDefinitionValue.from(sourceDefinition));
        });
    }

    private void checkCdcEnabled() {
        if (!this.cdcConfigEnabled.get()) {
            throw new CdcDisabledException();
        }
    }

    @Override
    public CompletableFuture<Void> deleteSource(String sourceName) {
        return this.runInTransaction(tx -> this.sourcesKv.removeAsync((Transaction)tx, sourceName).thenCompose(wasRemoved -> wasRemoved != false ? CompletableFutures.nullCompletedFuture() : CompletableFuture.failedFuture(new CdcSourceNotFoundException(sourceName))));
    }

    @Override
    public CompletableFuture<SourceDefinition> getSource(String sourceName) {
        return this.runInReadTransaction(tx -> this.sourcesKv.getAsync((Transaction)tx, sourceName).thenCompose(s -> s == null ? CompletableFuture.failedFuture(new CdcSourceNotFoundException(sourceName)) : CompletableFuture.completedFuture(s.toSourceDefinition(sourceName))));
    }

    @Override
    public CompletableFuture<Collection<SourceDefinition>> listSources() {
        return this.runInReadTransaction(tx -> this.sourcesRv.queryAsync((Transaction)tx, null).thenApply(CdcManagerImpl::readAll));
    }

    @Override
    public CompletableFuture<Void> updateSource(SourceDefinition sourceToUpdate) {
        return this.runInTransaction(tx -> this.sourcesKv.getAsync((Transaction)tx, sourceToUpdate.name()).thenCompose(s -> {
            if (s == null) {
                return CompletableFuture.failedFuture(new CdcSourceNotFoundException(sourceToUpdate.name()));
            }
            return this.sourcesKv.putAsync((Transaction)tx, sourceToUpdate.name(), SourceDefinitionValue.from(sourceToUpdate));
        }));
    }

    @Override
    public CompletableFuture<SourceStatus> getSourceStatus(String name) {
        return this.runInReadTransaction(tx -> this.sourcesKv.getAsync((Transaction)tx, name).thenCompose(s -> s == null ? CompletableFuture.failedFuture(new CdcSourceNotFoundException(name)) : this.localExecutor.getSourceStatus(s.toSourceDefinition(name))));
    }

    @Override
    public CompletableFuture<Void> createSink(SinkDefinition sinkDefinition) {
        return this.runInTransaction(tx -> {
            if (this.sinksKv.get((Transaction)tx, sinkDefinition.name()) != null) {
                return CompletableFuture.failedFuture(new CdcSinkAlreadyExistsException(sinkDefinition.name()));
            }
            return this.sinksKv.putAsync((Transaction)tx, sinkDefinition.name(), SinkDefinitionValue.from(sinkDefinition));
        });
    }

    @Override
    public CompletableFuture<Void> deleteSink(String sinkName) {
        return this.runInTransaction(tx -> {
            if (this.sinksKv.get((Transaction)tx, sinkName) == null) {
                return CompletableFuture.failedFuture(new CdcSinkNotFoundException(sinkName));
            }
            return this.sinksKv.removeAsync((Transaction)tx, sinkName).thenApply(r -> null);
        });
    }

    @Override
    public CompletableFuture<SinkDefinition> getSink(String sinkName) {
        return this.runInReadTransaction(tx -> this.sinksKv.getAsync((Transaction)tx, sinkName).thenApply(s -> {
            if (s == null) {
                throw new CdcSinkNotFoundException(sinkName);
            }
            return s.toSinkDefinition(sinkName);
        }));
    }

    @Override
    public CompletableFuture<Collection<SinkDefinition>> listSinks() {
        return this.runInReadTransaction(tx -> this.sinksRv.queryAsync((Transaction)tx, null).thenApply(CdcManagerImpl::readAll));
    }

    @Override
    public CompletableFuture<Void> updateSink(SinkDefinition sink) {
        return this.runInTransaction(tx -> this.sinksKv.getAsync((Transaction)tx, sink.name()).thenCompose(s -> {
            if (s == null) {
                return CompletableFuture.failedFuture(new CdcSinkNotFoundException(sink.name()));
            }
            return this.sinksKv.putAsync((Transaction)tx, sink.name(), SinkDefinitionValue.from(sink));
        }));
    }

    @Override
    public CompletableFuture<SinkStatus> getSinkStatus(String name) {
        return this.runInReadTransaction(tx -> this.sinksKv.getAsync((Transaction)tx, name).thenCompose(s -> s == null ? CompletableFuture.failedFuture(new CdcSinkNotFoundException(name)) : this.localExecutor.getSinkStatus(s.toSinkDefinition(name))));
    }

    @Override
    public CompletableFuture<CdcReplicationInstance> createReplication(CdcReplicationDefinition replicationDefinition) {
        return this.runInTransaction(tx -> {
            if (this.replicationsKv.get((Transaction)tx, replicationDefinition.name()) != null) {
                return CompletableFuture.failedFuture(new CdcReplicationAlreadyExistsException(replicationDefinition.name()));
            }
            if (this.sourcesKv.get((Transaction)tx, replicationDefinition.sourceName()) == null) {
                return CompletableFuture.failedFuture(new CdcSourceNotFoundException(replicationDefinition.sourceName()));
            }
            if (this.sinksKv.get((Transaction)tx, replicationDefinition.sinkName()) == null) {
                return CompletableFuture.failedFuture(new CdcSinkNotFoundException(replicationDefinition.sinkName()));
            }
            CdcReplicationDefinition replicationDefinitionWithExecNodes = this.setExecutionNodesIfEmpty(replicationDefinition);
            return this.replicationsKv.putAsync((Transaction)tx, replicationDefinition.name(), CdcReplicationInstanceValue.from(replicationDefinitionWithExecNodes)).thenCompose(v2 -> this.replicationsKv.getAsync((Transaction)tx, replicationDefinition.name()).thenApply(r2 -> r2.toReplicationInstance(replicationDefinition.name())));
        });
    }

    private CdcReplicationDefinition setExecutionNodesIfEmpty(CdcReplicationDefinition replicationDefinition) {
        List<String> nodes = replicationDefinition.executionNodes().nodes();
        if (nodes == null || nodes.isEmpty()) {
            return replicationDefinition.toBuilder().executionNodes(new CdcReplicationExecNodes(List.of(this.localNodeName))).build();
        }
        if (nodes.contains(this.localNodeName)) {
            return replicationDefinition;
        }
        throw new IllegalStateException(String.format("CDC replication instance must be executed on the local node (nodeId=%s), but replication defined executionNodes=%s.", this.localNodeName, nodes));
    }

    @Override
    public CompletableFuture<Void> updateReplication(CdcReplicationInstance replicationInstance) {
        return this.runInTransaction(tx -> this.updateReplication((Transaction)tx, replicationInstance));
    }

    private CompletableFuture<Void> updateReplication(@Nullable Transaction tx, CdcReplicationInstance replicationInstance) {
        return this.replicationsKv.getAsync(tx, replicationInstance.name()).thenCompose(r -> {
            if (r == null) {
                return CompletableFuture.failedFuture(new CdcReplicationNotFoundException(replicationInstance.name()));
            }
            return this.replicationsKv.putAsync(tx, replicationInstance.name(), CdcReplicationInstanceValue.from(replicationInstance));
        });
    }

    @Override
    public CompletableFuture<Void> deleteReplication(String replicationName) {
        return this.runInTransaction(tx -> {
            if (this.replicationsKv.get((Transaction)tx, replicationName) == null) {
                return CompletableFuture.failedFuture(new CdcReplicationNotFoundException(replicationName));
            }
            return this.replicationsKv.removeAsync((Transaction)tx, replicationName).thenApply(r -> null);
        });
    }

    @Override
    public CompletableFuture<CdcReplicationInstance> getReplication(String replicationName) {
        return this.runInReadTransaction(tx -> this.getReplication((Transaction)tx, replicationName));
    }

    private CompletableFuture<CdcReplicationInstance> getReplication(@Nullable Transaction tx, String replicationName) {
        return this.replicationsKv.getAsync(tx, replicationName).thenCompose(r -> {
            if (r == null) {
                if (tx == null) {
                    return CompletableFuture.failedFuture(new CdcReplicationNotFoundException(replicationName));
                }
                return tx.rollbackAsync().thenCompose(ignore -> CompletableFuture.failedFuture(new CdcReplicationNotFoundException(replicationName)));
            }
            return CompletableFuture.completedFuture(r.toReplicationInstance(replicationName));
        });
    }

    @Override
    public CompletableFuture<Collection<CdcReplicationInstance>> listReplications() {
        return this.runInReadTransaction(tx -> this.replicationsRv.queryAsync((Transaction)tx, null).thenApply(CdcManagerImpl::readAll));
    }

    @Override
    public CompletableFuture<Collection<CdcReplicationInstance>> listReplicationsByStatus(CdcReplicationStatus status) {
        return this.runInReadTransaction(tx -> this.replicationsRv.queryAsync((Transaction)tx, null).thenApply(CdcManagerImpl::readAll)).thenApply(list -> list.stream().filter(r -> r.status() == status).collect(Collectors.toList()));
    }

    @Override
    public CompletableFuture<CdcReplicationInstance> startReplication(String replicationName) {
        return this.runInTransaction(tx -> this.getReplication((Transaction)tx, replicationName).thenCompose(instance -> {
            if (instance == null) {
                return CompletableFuture.failedFuture(new CdcReplicationNotFoundException(replicationName));
            }
            if (instance.status() == CdcReplicationStatus.RUNNING) {
                return CompletableFuture.failedFuture(new CdcReplicationStartException(String.format("Invalid replication state: %s", new Object[]{instance.status()})));
            }
            return ((CompletableFuture)this.sinksKv.getAsync((Transaction)tx, instance.sinkName()).thenComposeAsync(sink -> this.sourcesKv.getAsync((Transaction)tx, instance.sourceName()).thenCompose(source -> this.localExecutor.startReplication((Transaction)tx, (CdcReplicationInstance)instance, source.toSourceDefinition(instance.sourceName()), sink.toSinkDefinition(instance.sinkName()))), this.executor)).thenCompose(handle -> {
                this.handles.put(replicationName, (CdcReplicationHandle)handle);
                CdcReplicationInstance instanceWithNodeId = handle.replicationInstance().toBuilder().runningOnNodeId(this.localNodeName).build();
                return this.updateReplication((Transaction)tx, instanceWithNodeId).thenApply(vv -> instanceWithNodeId);
            });
        }));
    }

    @Override
    public CompletableFuture<CdcReplicationInstance> stopReplication(String replicationName) {
        CompletableFuture fut = this.runInTransaction(tx -> this.getReplication((Transaction)tx, replicationName).thenCompose(instance -> {
            if (instance.status() == CdcReplicationStatus.STOPPED || instance.status() == CdcReplicationStatus.CREATED) {
                return CompletableFuture.failedFuture(new CdcReplicationStopException(String.format("Invalid replication state: %s", new Object[]{instance.status()})));
            }
            CdcReplicationHandle handle = this.handles.get(replicationName);
            return ((CompletableFuture)handle.stop().thenCompose(vv -> {
                CdcReplicationInstance updatedInstance = instance.toBuilder().status(CdcReplicationStatus.STOPPED).build();
                return this.updateReplication((Transaction)tx, updatedInstance);
            })).exceptionally(th -> {
                LOG.warn("Unable to stop replication: {}", (Throwable)th, (Object)replicationName);
                CdcReplicationInstance updatedInstance = instance.toBuilder().status(CdcReplicationStatus.FAILED).errorContext(th.getMessage()).build();
                CdcManagerImpl.get(this.updateReplication((Transaction)tx, updatedInstance));
                return null;
            });
        }));
        return fut.thenCompose(v -> this.getReplication(replicationName));
    }

    private static void get(CompletableFuture<Void> fut) {
        try {
            fut.get(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IgniteInternalException(e);
        }
        catch (ExecutionException | TimeoutException e) {
            throw new IgniteInternalException(e);
        }
    }

    private static <R> Collection<R> readAll(AsyncCursor<R> cursor) {
        ArrayList<R> result = new ArrayList<R>();
        do {
            Iterable<R> page = cursor.currentPage();
            for (R record : page) {
                result.add(record);
            }
            cursor.fetchNextPage();
        } while (cursor.hasMorePages());
        return result;
    }

    private CompletableFuture<Void> createTablesIfNeeded() {
        this.checkCdcEnabled();
        if (this.sourcesKv != null) {
            return CompletableFutures.nullCompletedFuture();
        }
        CompletableFuture<Void> result = this.tablesInitFuture.updateAndGet(existing -> {
            if (existing != null) {
                return existing;
            }
            return this.createTablesAndInitFields();
        });
        result.whenComplete((v, th) -> {
            if (th != null) {
                LOG.error("Failed to create CDC tables", (Throwable)th);
                this.tablesInitFuture.compareAndSet(result, null);
            }
        });
        return result;
    }

    private CompletableFuture<Void> createTablesAndInitFields() {
        Table sinksTable = this.tables.table(QualifiedName.of(SCHEMA_NAME, SINKS_TABLE_NAME));
        if (sinksTable != null) {
            this.sinksKv = sinksTable.keyValueView(Mapper.of(String.class), SinkDefinitionMappings.valueMapper());
            this.sinksRv = sinksTable.recordView(SinkDefinitionMappings.recordMapper());
            Table sourcesTable = this.tables.table(QualifiedName.of(SCHEMA_NAME, SOURCES_TABLE_NAME));
            if (sourcesTable == null) {
                return CompletableFuture.failedFuture(new IllegalStateException("CDC tables were not created properly."));
            }
            this.sourcesKv = sourcesTable.keyValueView(Mapper.of(String.class), SourceDefinitionMappings.valueMapper());
            this.sourcesRv = sourcesTable.recordView(SourceDefinitionMappings.recordMapper());
            Table replicationsTable = this.tables.table(QualifiedName.of(SCHEMA_NAME, REPLICATIONS_TABLE_NAME));
            if (replicationsTable == null) {
                return CompletableFuture.failedFuture(new IllegalStateException("CDC tables were not created properly."));
            }
            this.replicationsKv = replicationsTable.keyValueView(Mapper.of(String.class), CdcReplicationInstanceMappings.valueMapper());
            this.replicationsRv = replicationsTable.recordView(CdcReplicationInstanceMappings.recordMapper());
            return CompletableFutures.nullCompletedFuture();
        }
        return ((CompletableFuture)((CompletableFuture)this.catalogManager.execute(List.of(this.createCdcZoneCommand())).thenCompose(zoneResult -> this.catalogManager.execute(List.of(CdcManagerImpl.createSourceTableCommand(), CdcManagerImpl.createSinkTableCommand(), CdcManagerImpl.createReplicationTableCommand(), CdcManagerImpl.createReplicationProgressTableCommand())))).thenComposeAsync(res -> {
            HybridTimestamp waitTime = HybridTimestamp.hybridTimestamp(res.getCatalogTime()).addPhysicalTime(this.idleSafeTimePropagationPeriodMsSupplier.getAsLong() + this.clockService.maxClockSkewMillis());
            return this.clockService.waitFor(waitTime);
        }, this.executor)).thenCompose(res -> CompletableFuture.allOf(new CompletableFuture[]{this.tables.tableAsync(QualifiedName.of(SCHEMA_NAME, SINKS_TABLE_NAME)).thenApply(t -> {
            this.sinksKv = t.keyValueView(Mapper.of(String.class), SinkDefinitionMappings.valueMapper());
            this.sinksRv = t.recordView(SinkDefinitionMappings.recordMapper());
            return null;
        }), this.tables.tableAsync(QualifiedName.of(SCHEMA_NAME, SOURCES_TABLE_NAME)).thenApply(t -> {
            this.sourcesKv = t.keyValueView(Mapper.of(String.class), SourceDefinitionMappings.valueMapper());
            this.sourcesRv = t.recordView(SourceDefinitionMappings.recordMapper());
            return null;
        }), this.tables.tableAsync(QualifiedName.of(SCHEMA_NAME, REPLICATIONS_TABLE_NAME)).thenApply(t -> {
            this.replicationsKv = t.keyValueView(Mapper.of(String.class), CdcReplicationInstanceMappings.valueMapper());
            this.replicationsRv = t.recordView(CdcReplicationInstanceMappings.recordMapper());
            return null;
        })}));
    }

    private CatalogCommand createCdcZoneCommand() {
        return ((CreateZoneCommandBuilder)CreateZoneCommand.builder((f, sp, r) -> 8).zoneName(CDC_SYSTEM_ZONE_NAME)).replicas(Integer.MAX_VALUE).ifNotExists(true).dataNodesAutoAdjustScaleDown(0).consistencyModeParams(ConsistencyMode.HIGH_AVAILABILITY).storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile("default").build())).build();
    }

    private static CatalogCommand createSourceTableCommand() {
        return ((CreateTableCommandBuilder)((CreateTableCommandBuilder)((CreateTableCommandBuilder)CreateTableCommand.builder().schemaName(SCHEMA_NAME)).tableName(SOURCES_TABLE_NAME)).validateSystemSchemas(false).ifTableExists(true)).zone(CDC_SYSTEM_ZONE_NAME).columns(List.of(ColumnParams.builder().name("NAME").type(ColumnType.STRING).length(256).build(), ColumnParams.builder().name("TYPE").type(ColumnType.INT16).build(), ColumnParams.builder().name("PARAMETERS").type(ColumnType.STRING).length(Integer.MAX_VALUE).build(), ColumnParams.builder().name("TABLES").nullable(true).type(ColumnType.STRING).length(Integer.MAX_VALUE).build())).primaryKey(((TableHashPrimaryKey.Builder)TableHashPrimaryKey.builder().columns((List)List.of("NAME"))).build()).build();
    }

    private static CatalogCommand createSinkTableCommand() {
        return ((CreateTableCommandBuilder)((CreateTableCommandBuilder)((CreateTableCommandBuilder)CreateTableCommand.builder().schemaName(SCHEMA_NAME)).tableName(SINKS_TABLE_NAME)).validateSystemSchemas(false).ifTableExists(true)).zone(CDC_SYSTEM_ZONE_NAME).columns(List.of(ColumnParams.builder().name("NAME").type(ColumnType.STRING).length(256).build(), ColumnParams.builder().name("TYPE").type(ColumnType.INT16).build(), ColumnParams.builder().name("PARAMETERS").type(ColumnType.STRING).length(Integer.MAX_VALUE).build(), ColumnParams.builder().name("CREATE_TABLE_IF_NOT_EXISTS").type(ColumnType.BOOLEAN).build())).primaryKey(((TableHashPrimaryKey.Builder)TableHashPrimaryKey.builder().columns((List)List.of("NAME"))).build()).build();
    }

    private static CatalogCommand createReplicationTableCommand() {
        return ((CreateTableCommandBuilder)((CreateTableCommandBuilder)((CreateTableCommandBuilder)CreateTableCommand.builder().schemaName(SCHEMA_NAME)).tableName(REPLICATIONS_TABLE_NAME)).validateSystemSchemas(false).ifTableExists(true)).zone(CDC_SYSTEM_ZONE_NAME).columns(List.of(ColumnParams.builder().name("NAME").type(ColumnType.STRING).length(256).build(), ColumnParams.builder().name("STATUS").type(ColumnType.INT16).build(), ColumnParams.builder().name("MODE").type(ColumnType.INT16).build(), ColumnParams.builder().name("SINK_NAME").type(ColumnType.STRING).length(256).build(), ColumnParams.builder().name("SOURCE_NAME").type(ColumnType.STRING).length(256).build(), ColumnParams.builder().name("EXECUTION_NODES").type(ColumnType.STRING).length(256).build(), ColumnParams.builder().name("RUNNING_ON_NODE_ID").type(ColumnType.STRING).length(256).build(), ColumnParams.builder().name("ERROR_CONTEXT").nullable(true).type(ColumnType.STRING).length(Integer.MAX_VALUE).build())).primaryKey(((TableHashPrimaryKey.Builder)TableHashPrimaryKey.builder().columns((List)List.of("NAME"))).build()).build();
    }

    private static CatalogCommand createReplicationProgressTableCommand() {
        return ((CreateTableCommandBuilder)((CreateTableCommandBuilder)((CreateTableCommandBuilder)CreateTableCommand.builder().schemaName(SCHEMA_NAME)).tableName(REPLICATIONS_PROGRESS_TABLE_NAME)).validateSystemSchemas(false).ifTableExists(true)).zone(CDC_SYSTEM_ZONE_NAME).columns(List.of(ColumnParams.builder().name("REPLICATION_NAME").type(ColumnType.STRING).length(256).build(), ColumnParams.builder().name("TABLE_NAME").type(ColumnType.STRING).length(256).build(), ColumnParams.builder().name("LAST_PROCESSED_WATERMARK_COL").type(ColumnType.BYTE_ARRAY).length(Integer.MAX_VALUE).build())).primaryKey(((TableHashPrimaryKey.Builder)TableHashPrimaryKey.builder().columns((List)List.of("REPLICATION_NAME", "TABLE_NAME"))).build()).build();
    }

    @Override
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        if (this.cdcConfigEnabled.get()) {
            return CompletableFuture.runAsync(this.failover::start, this.executor);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    @Override
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        if (this.cdcConfigEnabled.get()) {
            return CompletableFuture.runAsync(this.failover::stop, this.executor);
        }
        return CompletableFutures.nullCompletedFuture();
    }
}

