/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.index;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.index.FinalTransactionStateResolver;
import org.apache.ignite.internal.index.IndexBuildCompletionListener;
import org.apache.ignite.internal.index.IndexBuildTaskId;
import org.apache.ignite.internal.index.IndexBuildTaskStatisticsLoggingListener;
import org.apache.ignite.internal.index.IndexBuilderMetricSource;
import org.apache.ignite.internal.index.IndexManagementUtils;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
import org.apache.ignite.internal.raft.GroupOverloadedException;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.RowMeta;
import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.TrackerClosedException;
import org.jetbrains.annotations.Nullable;

class IndexBuildTask {
    private static final IgniteLogger LOG = Loggers.forClass(IndexBuildTask.class);
    private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private final IndexBuildTaskId taskId;
    private final HybridTimestamp indexCreationActivationTs;
    private final IndexStorage indexStorage;
    private final MvPartitionStorage partitionStorage;
    private final ReplicaService replicaService;
    private final FailureProcessor failureProcessor;
    private final FinalTransactionStateResolver finalTransactionStateResolver;
    private final Executor executor;
    private final IgniteSpinBusyLock busyLock;
    private final int batchSize;
    private final InternalClusterNode node;
    private final List<IndexBuildCompletionListener> buildCompletionListeners;
    private final IndexBuildTaskStatisticsLoggingListener statisticsLoggingListener;
    private final long enlistmentConsistencyToken;
    private final boolean afterDisasterRecovery;
    private final IgniteSpinBusyLock taskBusyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean taskStopGuard = new AtomicBoolean();
    private final CompletableFuture<Void> taskFuture = new CompletableFuture();
    private final HybridTimestamp initialOperationTimestamp;
    private final IndexBuilderMetricSource indexBuilderMetricSource;

    IndexBuildTask(IndexBuildTaskId taskId, HybridTimestamp indexCreationActivationTs, IndexStorage indexStorage, MvPartitionStorage partitionStorage, ReplicaService replicaService, FailureProcessor failureProcessor, FinalTransactionStateResolver finalTransactionStateResolver, Executor executor, IgniteSpinBusyLock busyLock, int batchSize, InternalClusterNode node, List<IndexBuildCompletionListener> buildCompletionListeners, long enlistmentConsistencyToken, boolean afterDisasterRecovery, HybridTimestamp initialOperationTimestamp, IndexBuilderMetricSource indexBuilderMetricSource) {
        this.taskId = taskId;
        this.indexCreationActivationTs = indexCreationActivationTs;
        this.indexStorage = indexStorage;
        this.partitionStorage = partitionStorage;
        this.replicaService = replicaService;
        this.failureProcessor = failureProcessor;
        this.finalTransactionStateResolver = finalTransactionStateResolver;
        this.executor = executor;
        this.busyLock = busyLock;
        this.batchSize = batchSize;
        this.node = node;
        this.buildCompletionListeners = buildCompletionListeners;
        this.statisticsLoggingListener = new IndexBuildTaskStatisticsLoggingListener(taskId, afterDisasterRecovery);
        this.enlistmentConsistencyToken = enlistmentConsistencyToken;
        this.afterDisasterRecovery = afterDisasterRecovery;
        this.initialOperationTimestamp = initialOperationTimestamp;
        this.indexBuilderMetricSource = indexBuilderMetricSource;
    }

    void start() {
        if (!this.enterBusy()) {
            this.taskFuture.complete(null);
            return;
        }
        String indexInfo = this.createCommonIndexInfo();
        if (this.afterDisasterRecovery) {
            LOG.warn("Start building the index due to disaster recovery of an AVAILABLE index. This shouldn't normally occur [{}]", new Object[]{indexInfo});
        } else {
            LOG.info("Start building the index [{}]", new Object[]{indexInfo});
        }
        try {
            this.statisticsLoggingListener.onIndexBuildStarted();
            ((CompletableFuture)((CompletableFuture)CompletableFuture.supplyAsync(() -> ((MvPartitionStorage)this.partitionStorage).highestRowId(), this.executor).thenApplyAsync(this::handleNextBatch, this.executor)).thenCompose(Function.identity())).whenComplete((unused, throwable) -> {
                if (throwable != null) {
                    if (IndexBuildTask.ignorable(throwable)) {
                        LOG.info("Ignorable index build error [{}, error={}]", new Object[]{indexInfo, ExceptionUtils.unwrapCause((Throwable)throwable)});
                    } else {
                        String message = String.format("Index build error [%s, error=%s]", indexInfo, ExceptionUtils.unwrapCause((Throwable)throwable));
                        this.failureProcessor.process(new FailureContext(throwable, message));
                    }
                    this.taskFuture.completeExceptionally((Throwable)throwable);
                    this.statisticsLoggingListener.onIndexBuildFailure((Throwable)throwable);
                } else {
                    this.taskFuture.complete(null);
                    this.statisticsLoggingListener.onIndexBuildSuccess();
                }
            });
        }
        catch (Throwable t) {
            this.taskFuture.completeExceptionally(t);
            throw t;
        }
        finally {
            this.leaveBusy();
        }
    }

    private static boolean ignorable(Throwable throwable) {
        return ExceptionUtils.hasCause((Throwable)throwable, (Class[])new Class[]{PrimaryReplicaMissException.class, TrackerClosedException.class, StorageClosedException.class, NodeStoppingException.class});
    }

    void stop() {
        if (!this.taskStopGuard.compareAndSet(false, true)) {
            return;
        }
        this.taskBusyLock.block();
    }

    CompletableFuture<Void> getTaskFuture() {
        return this.taskFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> handleNextBatch(@Nullable RowId highestRowId) {
        if (!this.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.indexBuilderMetricSource.onBatchProcessingStarted(this.taskId);
        try {
            CompletionStage completionStage = ((CompletableFuture)((CompletableFuture)this.createBatchToIndex(highestRowId).thenCompose(this::processBatch)).handleAsync((unused, throwable) -> {
                this.indexBuilderMetricSource.onBatchProcessingFinished(this.taskId);
                if (throwable != null) {
                    Throwable cause = ExceptionUtils.unwrapRootCause((Throwable)throwable);
                    if (!(cause instanceof ReplicationTimeoutException) && !(cause instanceof GroupOverloadedException)) {
                        LOG.error("Index build error: [indexInfo={}]", throwable, new Object[]{this.createCommonIndexInfo()});
                        return CompletableFuture.failedFuture(cause);
                    }
                } else if (this.indexStorage.getNextRowIdToBuild() == null) {
                    LOG.info("Index build completed [{}]", new Object[]{this.createCommonIndexInfo()});
                    this.notifyBuildCompletionListeners(this.taskId);
                    return CompletableFutures.nullCompletedFuture();
                }
                return this.handleNextBatch(highestRowId);
            }, this.executor)).thenCompose(Function.identity());
            return completionStage;
        }
        catch (Throwable t) {
            this.indexBuilderMetricSource.onBatchProcessingFinished(this.taskId);
            CompletableFuture<Void> completableFuture = CompletableFuture.failedFuture(t);
            return completableFuture;
        }
        finally {
            this.leaveBusy();
        }
    }

    private CompletableFuture<BatchToIndex> createBatchToIndex(@Nullable RowId highestRowId) {
        if (highestRowId == null) {
            return CompletableFuture.completedFuture(new BatchToIndex(List.of(), Set.of()));
        }
        RowId nextRowIdToBuild = this.indexStorage.getNextRowIdToBuild();
        ArrayList<RowId> rowIds = new ArrayList<RowId>(this.batchSize);
        HashMap<UUID, CommitPartitionId> transactionsToResolve = new HashMap<UUID, CommitPartitionId>();
        List rows = nextRowIdToBuild == null ? List.of() : this.partitionStorage.rowsStartingWith(nextRowIdToBuild, highestRowId, this.batchSize);
        for (RowMeta row : rows) {
            rowIds.add(row.rowId());
            if (!row.isWriteIntent()) continue;
            UUID transactionId = row.transactionId();
            assert (transactionId != null);
            if (TransactionIds.beginTimestamp((UUID)transactionId).compareTo(this.indexCreationActivationTs) >= 0) continue;
            transactionsToResolve.put(row.transactionId(), new CommitPartitionId(row.commitZoneId(), row.commitPartitionId()));
        }
        Map<UUID, CompletableFuture> txStateResolveFutures = transactionsToResolve.entrySet().stream().map(entry -> Map.entry((UUID)entry.getKey(), this.resolveFinalTxStateIfNeeded((UUID)entry.getKey(), (CommitPartitionId)entry.getValue()))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        this.indexBuilderMetricSource.onTransitionToWaitingForTransactions(this.taskId, txStateResolveFutures.size());
        return CompletableFutures.allOf(txStateResolveFutures.values()).thenApply(unused -> {
            Set<UUID> abortedTransactionIds = txStateResolveFutures.entrySet().stream().filter(entry -> ((CompletableFuture)entry.getValue()).join() == TxState.ABORTED).map(Map.Entry::getKey).collect(Collectors.toUnmodifiableSet());
            return new BatchToIndex(rowIds, abortedTransactionIds);
        });
    }

    private CompletableFuture<TxState> resolveFinalTxStateIfNeeded(UUID transactionId, CommitPartitionId commitPartitionId) {
        assert (commitPartitionId.commitZoneId != null);
        ZonePartitionId commitGroupId = new ZonePartitionId(commitPartitionId.commitZoneId.intValue(), commitPartitionId.commitPartitionId);
        return this.finalTransactionStateResolver.resolveFinalTxState(transactionId, commitGroupId).thenApply(this.statisticsLoggingListener::onWriteIntentResolved);
    }

    private CompletableFuture<Void> processBatch(BatchToIndex batch) {
        BuildIndexReplicaRequest request = this.createBuildIndexReplicaRequest(batch, this.initialOperationTimestamp);
        this.indexBuilderMetricSource.onTransitionToWaitingForReplicaResponse(this.taskId);
        return ((CompletableFuture)this.replicaService.invoke(this.node, (ReplicaRequest)request).whenComplete((unused, throwable) -> {
            if (throwable == null) {
                this.statisticsLoggingListener.onRaftCallSuccess();
            } else {
                this.statisticsLoggingListener.onRaftCallFailure();
            }
        })).thenAccept(unused -> this.statisticsLoggingListener.onBatchProcessed(batch.rowIds.size()));
    }

    private BuildIndexReplicaRequest createBuildIndexReplicaRequest(BatchToIndex batch, HybridTimestamp initialOperationTimestamp) {
        List<RowId> rowIds = batch.rowIds;
        boolean finish = rowIds.size() < this.batchSize;
        ZonePartitionIdMessage groupIdMessage = ReplicaMessageUtils.toZonePartitionIdMessage((ReplicaMessagesFactory)REPLICA_MESSAGES_FACTORY, (ZonePartitionId)new ZonePartitionId(this.taskId.getZoneId(), this.taskId.getPartitionId()));
        return PARTITION_REPLICATION_MESSAGES_FACTORY.buildIndexReplicaRequest().groupId((ReplicationGroupIdMessage)groupIdMessage).tableId(this.taskId.getTableId()).indexId(this.taskId.getIndexId()).rowIds(rowIds.stream().map(RowId::uuid).collect(Collectors.toList())).finish(finish).abortedTransactionIds(batch.abortedTransactionIds).enlistmentConsistencyToken(Long.valueOf(this.enlistmentConsistencyToken)).timestamp(initialOperationTimestamp).build();
    }

    private boolean enterBusy() {
        return IndexManagementUtils.enterBusy(this.busyLock, this.taskBusyLock);
    }

    private void leaveBusy() {
        IndexManagementUtils.leaveBusy(this.busyLock, this.taskBusyLock);
    }

    private String createCommonIndexInfo() {
        return IgniteStringFormatter.format((String)"zoneId={}, tableId={}, partitionId={}, indexId={}", (Object[])new Object[]{this.taskId.getZoneId(), this.taskId.getTableId(), this.taskId.getPartitionId(), this.taskId.getIndexId()});
    }

    private void notifyBuildCompletionListeners(IndexBuildTaskId taskId) {
        for (IndexBuildCompletionListener listener : this.buildCompletionListeners) {
            if (this.afterDisasterRecovery) {
                listener.onBuildCompletionAfterDisasterRecovery(taskId.getIndexId(), taskId.getTableId(), taskId.getPartitionId());
                continue;
            }
            listener.onBuildCompletion(taskId.getIndexId(), taskId.getTableId(), taskId.getPartitionId());
        }
    }

    private static class BatchToIndex {
        private final List<RowId> rowIds;
        private final Set<UUID> abortedTransactionIds;

        private BatchToIndex(List<RowId> rowIds, Set<UUID> abortedTransactionIds) {
            this.rowIds = rowIds;
            this.abortedTransactionIds = abortedTransactionIds;
        }
    }

    private static class CommitPartitionId {
        @Nullable
        private final Integer commitZoneId;
        private final int commitPartitionId;

        private CommitPartitionId(@Nullable Integer commitZoneId, int commitPartitionId) {
            this.commitZoneId = commitZoneId;
            this.commitPartitionId = commitPartitionId;
        }
    }
}

