/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table.distributed.replicator.secondary;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.BitSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.function.Supplier;
import org.apache.ignite3.internal.catalog.Catalog;
import org.apache.ignite3.internal.catalog.CatalogCommand;
import org.apache.ignite3.internal.catalog.CatalogManager;
import org.apache.ignite3.internal.catalog.CatalogService;
import org.apache.ignite3.internal.catalog.commands.SecondaryStorageAvailableCommand;
import org.apache.ignite3.internal.catalog.descriptors.CatalogSecondaryStorageState;
import org.apache.ignite3.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.lang.ComponentStoppingException;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.MetaStorageManager;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.placementdriver.ReplicaMeta;
import org.apache.ignite3.internal.replicator.ReplicaService;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.storage.RowId;
import org.apache.ignite3.internal.storage.TimedBinaryRowAndRowId;
import org.apache.ignite3.internal.table.InternalTable;
import org.apache.ignite3.internal.table.TableViewInternal;
import org.apache.ignite3.internal.table.distributed.replicator.secondary.ReplicaServiceWrapper;
import org.apache.ignite3.internal.table.distributed.replicator.secondary.SecondaryStorageSubscriber;
import org.apache.ignite3.internal.table.distributed.replicator.secondary.SecondaryStorageVersionScanSubscriber;
import org.apache.ignite3.internal.tx.ReadOnlyTransactionsHelper;
import org.apache.ignite3.internal.tx.TxManager;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.util.TrackerClosedException;
import org.apache.ignite3.lang.TableNotFoundException;
import org.apache.ignite3.table.QualifiedName;
import org.jetbrains.annotations.Nullable;

class SecondaryFullStateTransferHandler {
    private static final IgniteLogger LOG = Loggers.forClass(SecondaryFullStateTransferHandler.class);
    static final String SECONDARY_SYNC_STATE_PREFIX = "secondary.sync.state.";
    private static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
    private static final int SECONDARY_SYNC_STATE_SIZE = 9;
    private static final byte FST_IS_RUNNING = 0;
    static final byte FST_IS_COMPLETE = 1;
    private static final ByteBuffer FST_COMPLETED_VALUE = ByteBuffer.allocate(9).order(ORDER).clear().put((byte)1).putLong(Long.MIN_VALUE);
    private final TableViewInternal table;
    private final BitSet partitionReadinessSet;
    private final ClockService clockService;
    private final CatalogService catalogService;
    private final ReplicaService replicaService;
    private final MetaStorageManager metaStorageManager;
    private final TopologyService topologyService;
    private final FailureProcessor failureProcessor;
    private final Executor ioExecutor;
    private final ReadOnlyTransactionsHelper transactions;
    private final ConcurrentMap<Integer, SecondaryStorageVersionScanSubscriber> subscribers;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

    SecondaryFullStateTransferHandler(TableViewInternal table, TxManager txManager, ClockService clockService, CatalogService catalogService, ReplicaService replicaService, MetaStorageManager metaStorageManager, TopologyService topologyService, FailureProcessor failureProcessor, Executor ioExecutor) {
        this.table = table;
        this.clockService = clockService;
        this.catalogService = catalogService;
        this.replicaService = replicaService;
        this.metaStorageManager = metaStorageManager;
        this.topologyService = topologyService;
        this.failureProcessor = failureProcessor;
        this.ioExecutor = ioExecutor;
        int partitions = table.internalTable().partitions();
        this.partitionReadinessSet = new BitSet(partitions);
        this.subscribers = new ConcurrentHashMap<Integer, SecondaryStorageVersionScanSubscriber>(IgniteUtils.capacity(partitions));
        this.transactions = new ReadOnlyTransactionsHelper(txManager);
    }

    CompletableFuture<?> startFullStateTransfer(long operationTime) {
        return this.executeWithBusyLockInFuture(() -> {
            int partitions = this.table.internalTable().partitions();
            byte[] initialSyncState = ByteBuffer.allocate(9).order(ORDER).put((byte)0).putLong(operationTime).array();
            CompletableFuture[] futures = new CompletableFuture[partitions];
            for (int partitionId = 0; partitionId < partitions; ++partitionId) {
                futures[partitionId] = this.startFullStateTransferForPartition(partitionId, operationTime, initialSyncState);
            }
            return CompletableFuture.allOf(futures);
        });
    }

    void abortFullStateTransfer() {
        this.busyLock.block();
        int partitions = this.table.internalTable().partitions();
        for (int partitionId = 0; partitionId < partitions; ++partitionId) {
            SecondaryStorageVersionScanSubscriber subscriber = (SecondaryStorageVersionScanSubscriber)this.subscribers.remove(partitionId);
            if (subscriber == null) continue;
            subscriber.stop();
        }
        this.metaStorageManager.removeByPrefix(SecondaryFullStateTransferHandler.getSecondarySyncStateKeyPrefix(this.table.tableId()));
    }

    private CompletableFuture<Void> startFullStateTransferForPartition(int partitionId, long operationTime, byte[] initialSyncState) {
        int tableId = this.table.tableId();
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
        ReplicaServiceWrapper replicaServiceWrapper = new ReplicaServiceWrapper(partitionId, this.table.internalTable(), this.replicaService, busyLock);
        SecondaryStorageVersionScanSubscriber subscriber = new SecondaryStorageVersionScanSubscriber(this.table, partitionId, this.clockService, replicaServiceWrapper, this.failureProcessor, busyLock, result);
        SecondaryStorageSubscriber oldSubscriber = this.subscribers.put(partitionId, subscriber);
        assert (oldSubscriber == null);
        return ((CompletableFuture)this.executeWithBusyLockInFuture(() -> this.awaitPrimaryReplica(partitionId)).thenCompose(replicaMeta -> this.executeWithBusyLockInFuture(() -> {
            if (this.nodeIsNotLeaseHolder(replicaMeta.getLeaseholderId())) {
                this.subscribers.remove(partitionId);
                return CompletableFutures.nullCompletedFuture();
            }
            return this.metaStorageManager.put(SecondaryFullStateTransferHandler.getSecondarySyncStateKey(tableId, partitionId), initialSyncState).thenComposeAsync(ignore -> this.executeWithBusyLockInFuture(() -> this.startFullStateTransferForPartition(partitionId, operationTime, subscriber, result, busyLock)), this.ioExecutor);
        }))).whenComplete((v, error) -> this.handlePossibleError((Throwable)error, String.format("Unable to start initial full state transfer for partition [tableId=%d, tableName=%s, partitionId=%d].", tableId, this.table.qualifiedName(), partitionId)));
    }

    private CompletableFuture<Void> startFullStateTransferForPartition(int partitionId, long operationTime, SecondaryStorageVersionScanSubscriber subscriber, CompletableFuture<Void> result, IgniteSpinBusyLock busyLock) {
        int tableId = this.table.tableId();
        InternalTable internalTable = this.table.internalTable();
        Integer secondaryZoneId = internalTable.secondaryZoneId();
        assert (secondaryZoneId != null) : "It is unexpected to have no secondary zone ID while starting full state transfer for table [tableId=" + tableId + ", tableName=" + this.table.qualifiedName() + ", partitionId=" + partitionId + "]";
        HybridTimestamp readTimestamp = HybridTimestamp.hybridTimestamp(operationTime);
        return internalTable.partitionLocation(partitionId).thenCompose(primaryReplica -> IgniteUtils.inBusyLockAsync(busyLock, () -> this.runScanAllVersionsInTransaction(tableId, partitionId, (InternalClusterNode)primaryReplica, readTimestamp, subscriber, result)));
    }

    private CompletableFuture<Void> runScanAllVersionsInTransaction(int tableId, int partitionId, InternalClusterNode primaryReplica, HybridTimestamp readTimestamp, SecondaryStorageVersionScanSubscriber subscriber, CompletableFuture<Void> result) {
        return ((CompletableFuture)this.transactions.runInReadOnlyTransaction(readTimestamp, transaction -> {
            Flow.Publisher<TimedBinaryRowAndRowId> publisher = this.table.internalTable().scanAllVersions(partitionId, transaction.id(), RowId.lowestRowId(partitionId), HybridTimestamp.MIN_VALUE, readTimestamp, primaryReplica, transaction.coordinatorId());
            publisher.subscribe(subscriber);
            return result;
        }).thenCompose(v -> IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.metaStorageManager.put(SecondaryFullStateTransferHandler.getSecondarySyncStateKey(tableId, partitionId), FST_COMPLETED_VALUE.array())))).thenRun(() -> this.subscribers.remove(partitionId));
    }

    boolean completeFullStateTransferForPartition(int partitionId) {
        Boolean result = this.executeWithBusyLock(() -> {
            this.partitionReadinessSet.set(partitionId);
            boolean isFstComplete = this.partitionReadinessSet.cardinality() == this.table.internalTable().partitions();
            int tableId = this.table.tableId();
            if (isFstComplete) {
                this.sendCatalogAvailable(tableId);
            }
            return isFstComplete;
        });
        return result != null && result != false;
    }

    private void sendCatalogAvailable(int tableId) {
        Catalog latestCatalog = this.catalogService.catalog(this.catalogService.latestCatalogVersion());
        CatalogTableDescriptor tableDescriptor = latestCatalog.table(tableId);
        if (tableDescriptor == null || tableDescriptor.secondaryStorageState() == CatalogSecondaryStorageState.AVAILABLE) {
            return;
        }
        LOG.info("Marking secondary storage for table as 'Available' [tableId={}, tableName={}]", tableId, tableDescriptor.name());
        this.executeWithBusyLockInFuture(() -> this.awaitPrimaryReplica(0)).thenAccept(replicaMeta -> this.executeWithBusyLock(() -> {
            if (this.nodeIsNotLeaseHolder(replicaMeta.getLeaseholderId())) {
                return;
            }
            this.metaStorageManager.removeByPrefix(SecondaryFullStateTransferHandler.getSecondarySyncStateKeyPrefix(tableId)).whenComplete((c, error) -> {
                if (error != null && !ExceptionUtils.hasCause(error, NodeStoppingException.class)) {
                    String errorMessage = String.format("Failed to enable secondary zone for the table [tableId=%s]", tableId);
                    this.failureProcessor.process(new FailureContext((Throwable)error, errorMessage));
                }
            });
            QualifiedName tableName = this.table.qualifiedName();
            CatalogCommand secondaryStorageAvailableCommand = SecondaryStorageAvailableCommand.builder().tableName(tableDescriptor.name()).schemaName(tableName.schemaName()).build();
            ((CatalogManager)this.catalogService).execute(secondaryStorageAvailableCommand).whenComplete((c, error) -> {
                if (error != null && !ExceptionUtils.hasCause(error, NodeStoppingException.class)) {
                    String errorMessage = String.format("Failed to enable secondary zone for the table [tableId=%s]", tableId);
                    this.failureProcessor.process(new FailureContext((Throwable)error, errorMessage));
                }
            });
        }));
    }

    private CompletableFuture<ReplicaMeta> awaitPrimaryReplica(int partitionId) {
        return this.table.internalTable().secondaryPartitionLocation(partitionId);
    }

    private boolean nodeIsNotLeaseHolder(@Nullable UUID leaseholderId) {
        return !this.topologyService.localMember().id().equals(leaseholderId);
    }

    private static ByteArray getSecondarySyncStateKeyPrefix(int tableId) {
        String key = SECONDARY_SYNC_STATE_PREFIX + tableId;
        return new ByteArray(key.getBytes(StandardCharsets.UTF_8));
    }

    private static ByteArray getSecondarySyncStateKey(int tableId, int partitionId) {
        String key = SECONDARY_SYNC_STATE_PREFIX + new TablePartitionId(tableId, partitionId);
        return new ByteArray(key.getBytes(StandardCharsets.UTF_8));
    }

    private void executeWithBusyLock(Runnable action) {
        if (!this.busyLock.enterBusy()) {
            return;
        }
        try {
            action.run();
        }
        catch (Throwable e) {
            this.handlePossibleError(e, String.format("Full state transfer error [tableName=%s]", this.table.qualifiedName()));
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private <R> R executeWithBusyLock(Supplier<R> action) {
        if (!this.busyLock.enterBusy()) {
            return null;
        }
        try {
            R r = action.get();
            return r;
        }
        catch (Throwable e) {
            this.handlePossibleError(e, String.format("Full state transfer error [tableName=%s]", this.table.qualifiedName()));
            R r = null;
            return r;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <R> CompletableFuture<R> executeWithBusyLockInFuture(Supplier<CompletableFuture<R>> action) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletableFuture<R> completableFuture = action.get();
            return completableFuture;
        }
        catch (Throwable e) {
            this.handlePossibleError(e, String.format("Full state transfer error [tableName=%s]", this.table.qualifiedName()));
            CompletableFuture completableFuture = CompletableFutures.nullCompletedFuture();
            return completableFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private void handlePossibleError(@Nullable Throwable throwable, String message) {
        if (throwable == null) {
            return;
        }
        this.executeWithBusyLock(() -> {
            boolean shouldNotifyFailureHandler;
            boolean bl = shouldNotifyFailureHandler = !ExceptionUtils.hasCause(throwable, NodeStoppingException.class, TrackerClosedException.class, ComponentStoppingException.class, TableNotFoundException.class);
            if (shouldNotifyFailureHandler) {
                this.failureProcessor.process(new FailureContext(throwable, message));
            }
        });
    }
}

