/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.table.distributed.replicator.secondary;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.BitSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.commands.SecondaryStorageAvailableCommand;
import org.apache.ignite.internal.catalog.descriptors.CatalogSecondaryStorageState;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.table.QualifiedName;
import org.jetbrains.annotations.Nullable;

class SecondaryFullStateTransferHandler {
    private static final IgniteLogger LOG = Loggers.forClass(SecondaryFullStateTransferHandler.class);
    static final String SECONDARY_SYNC_STATE_PREFIX = "secondary.sync.state.";
    private static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
    private static final int SECONDARY_SYNC_STATE_SIZE = 9;
    private static final byte FST_IS_RUNNING = 0;
    static final byte FST_IS_COMPLETE = 1;
    private final TableViewInternal table;
    private final BitSet partitionReadinessSet;
    private final CatalogService catalogService;
    private final MetaStorageManager metaStorageManager;
    private final TopologyService topologyService;
    private final FailureProcessor failureProcessor;

    SecondaryFullStateTransferHandler(TableViewInternal table, CatalogService catalogService, MetaStorageManager metaStorageManager, TopologyService topologyService, FailureProcessor failureProcessor, Executor ioExecutor) {
        this.table = table;
        this.catalogService = catalogService;
        this.metaStorageManager = metaStorageManager;
        this.topologyService = topologyService;
        this.failureProcessor = failureProcessor;
        int partitions = table.internalTable().partitions();
        this.partitionReadinessSet = new BitSet(partitions);
    }

    void startFullStateTransfer(long operationTime, CompletableFuture<Void> catalogActivationFuture) {
        int tableId = this.table.tableId();
        ((CompletableFuture)catalogActivationFuture.thenCompose(v -> this.initializeAndStartFullStateTransfer(operationTime))).whenComplete((v, error) -> {
            if (error != null && !ExceptionUtils.hasCause((Throwable)error, (Class[])new Class[]{NodeStoppingException.class})) {
                String errorMessage = String.format("Unable to start full state transfer state for table [tableId=%d, tableName=%s].", tableId, this.table.qualifiedName());
                this.failureProcessor.process(new FailureContext(error, errorMessage));
            }
        });
    }

    private CompletableFuture<Void> initializeAndStartFullStateTransfer(long operationTime) {
        int tableId = this.table.tableId();
        int partitions = this.table.internalTable().partitions();
        byte[] initialSyncState = ByteBuffer.allocate(9).order(ORDER).put((byte)0).putLong(operationTime).array();
        CompletableFuture[] futures = new CompletableFuture[partitions];
        int i = 0;
        while (i < partitions) {
            int partitionId = i++;
            futures[partitionId] = ((CompletableFuture)this.awaitPrimaryReplica(partitionId).thenCompose(replicaMeta -> {
                if (this.nodeIsNotLeaseHolder(replicaMeta.getLeaseholderId())) {
                    return CompletableFutures.nullCompletedFuture();
                }
                return this.metaStorageManager.put(SecondaryFullStateTransferHandler.getSecondarySyncStateKey(tableId, partitionId), initialSyncState).thenCompose(ignore -> this.startFullStateTransferForPartition(partitionId));
            })).whenComplete((v, error) -> {
                if (error != null && !ExceptionUtils.hasCause((Throwable)error, (Class[])new Class[]{NodeStoppingException.class})) {
                    String errorMessage = String.format("Unable to start initial full state transfer for partition [tableId=%d, tableName=%s, partitionId=%d].", tableId, this.table.qualifiedName(), partitionId);
                    this.failureProcessor.process(new FailureContext(error, errorMessage));
                }
            });
        }
        return CompletableFuture.allOf(futures);
    }

    private CompletableFuture<Void> startFullStateTransferForPartition(int partitionId) {
        int tableId = this.table.tableId();
        ByteBuffer value = ByteBuffer.allocate(9).order(ORDER).clear().put((byte)1).putLong(Long.MIN_VALUE);
        return this.metaStorageManager.put(SecondaryFullStateTransferHandler.getSecondarySyncStateKey(tableId, partitionId), value.array());
    }

    boolean completeFullStateTransferForPartition(int partitionId) {
        this.partitionReadinessSet.set(partitionId);
        boolean isFstComplete = this.partitionReadinessSet.cardinality() == this.table.internalTable().partitions();
        int tableId = this.table.tableId();
        if (isFstComplete) {
            this.sendCatalogAvailable(tableId);
        }
        return isFstComplete;
    }

    private void sendCatalogAvailable(int tableId) {
        Catalog latestCatalog = this.catalogService.catalog(this.catalogService.latestCatalogVersion());
        CatalogTableDescriptor tableDescriptor = latestCatalog.table(tableId);
        if (tableDescriptor == null || tableDescriptor.secondaryStorageState() == CatalogSecondaryStorageState.AVAILABLE) {
            return;
        }
        LOG.info("Marking secondary storage for table as 'Available' [tableId={}, tableName={}]", new Object[]{tableId, tableDescriptor.name()});
        this.awaitPrimaryReplica(0).thenAccept(replicaMeta -> {
            if (this.nodeIsNotLeaseHolder(replicaMeta.getLeaseholderId())) {
                return;
            }
            this.metaStorageManager.removeByPrefix(SecondaryFullStateTransferHandler.getSecondarySyncStateKeyPrefix(tableId)).whenComplete((c, error) -> {
                if (error != null && !ExceptionUtils.hasCause((Throwable)error, (Class[])new Class[]{NodeStoppingException.class})) {
                    String errorMessage = String.format("Failed to enable secondary zone for the table [tableId=%s]", tableId);
                    this.failureProcessor.process(new FailureContext(error, errorMessage));
                }
            });
            QualifiedName tableName = this.table.qualifiedName();
            CatalogCommand secondaryStorageAvailableCommand = SecondaryStorageAvailableCommand.builder().tableName(tableDescriptor.name()).schemaName(tableName.schemaName()).build();
            ((CatalogManager)this.catalogService).execute(secondaryStorageAvailableCommand).whenComplete((c, error) -> {
                if (error != null && !ExceptionUtils.hasCause((Throwable)error, (Class[])new Class[]{NodeStoppingException.class})) {
                    String errorMessage = String.format("Failed to enable secondary zone for the table [tableId=%s]", tableId);
                    this.failureProcessor.process(new FailureContext(error, errorMessage));
                }
            });
        });
    }

    private CompletableFuture<ReplicaMeta> awaitPrimaryReplica(int partitionId) {
        return this.table.internalTable().secondaryPartitionLocation(partitionId);
    }

    private boolean nodeIsNotLeaseHolder(@Nullable UUID leaseholderId) {
        return !this.topologyService.localMember().id().equals(leaseholderId);
    }

    private static ByteArray getSecondarySyncStateKeyPrefix(int tableId) {
        String key = SECONDARY_SYNC_STATE_PREFIX + tableId;
        return new ByteArray(key.getBytes(StandardCharsets.UTF_8));
    }

    private static ByteArray getSecondarySyncStateKey(int tableId, int partitionId) {
        String key = SECONDARY_SYNC_STATE_PREFIX + new TablePartitionId(tableId, partitionId);
        return new ByteArray(key.getBytes(StandardCharsets.UTF_8));
    }
}

