/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table.distributed;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.partition.replicator.network.TimedBinaryRow;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
import org.apache.ignite3.internal.replicator.PartitionGroupId;
import org.apache.ignite3.internal.replicator.ReplicationGroupId;
import org.apache.ignite3.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite3.internal.schema.BinaryRow;
import org.apache.ignite3.internal.storage.AbortResult;
import org.apache.ignite3.internal.storage.AbortResultStatus;
import org.apache.ignite3.internal.storage.AddWriteCommittedResult;
import org.apache.ignite3.internal.storage.AddWriteCommittedResultStatus;
import org.apache.ignite3.internal.storage.AddWriteResult;
import org.apache.ignite3.internal.storage.AddWriteResultStatus;
import org.apache.ignite3.internal.storage.MvPartitionStorage;
import org.apache.ignite3.internal.storage.ReadResult;
import org.apache.ignite3.internal.storage.RowId;
import org.apache.ignite3.internal.storage.StorageException;
import org.apache.ignite3.internal.storage.TxIdMismatchException;
import org.apache.ignite3.internal.table.distributed.PartitionModificationCounter;
import org.apache.ignite3.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite3.internal.table.distributed.replicator.PendingRows;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class StorageUpdateHandler {
    private final int partitionId;
    private final PartitionDataStorage storage;
    private final IndexUpdateHandler indexUpdateHandler;
    private final PendingRows pendingRows = new PendingRows();
    private final ReplicationConfiguration replicationConfiguration;
    private final PartitionModificationCounter modificationCounter;

    public StorageUpdateHandler(int partitionId, PartitionDataStorage storage, IndexUpdateHandler indexUpdateHandler, ReplicationConfiguration replicationConfiguration, PartitionModificationCounter modificationCounter) {
        this.partitionId = partitionId;
        this.storage = storage;
        this.indexUpdateHandler = indexUpdateHandler;
        this.replicationConfiguration = replicationConfiguration;
        this.modificationCounter = modificationCounter;
    }

    public int partitionId() {
        return this.partitionId;
    }

    public void handleUpdate(UUID txId, UUID rowUuid, ReplicationGroupId commitPartitionId, @Nullable BinaryRow row, boolean trackWriteIntent, @Nullable Runnable onApplication, @Nullable HybridTimestamp commitTs, @Nullable HybridTimestamp lastCommitTs, @Nullable List<Integer> indexIds) {
        assert (trackWriteIntent || commitTs != null) : "either trackWriteIntent must be true or commitTs must be non-null";
        this.storage.runConsistently(locker -> {
            RowId rowId = new RowId(this.partitionId, rowUuid);
            this.tryProcessRow(locker, (PartitionGroupId)commitPartitionId, rowId, txId, row, lastCommitTs, commitTs, false, indexIds, false);
            if (trackWriteIntent) {
                this.pendingRows.addPendingRowId(txId, rowId);
            } else {
                this.modificationCounter.updateValue(1, commitTs);
            }
            if (onApplication != null) {
                onApplication.run();
            }
            return null;
        });
    }

    private boolean tryProcessRow(MvPartitionStorage.Locker locker, PartitionGroupId commitPartitionId, RowId rowId, UUID txId, @Nullable BinaryRow row, @Nullable HybridTimestamp lastCommitTs, @Nullable HybridTimestamp commitTs, boolean useTryLock, @Nullable List<Integer> indexIds, boolean isArchivation) {
        if (useTryLock) {
            if (!locker.tryLock(rowId)) {
                return false;
            }
        } else {
            locker.lock(rowId);
        }
        if (commitTs != null) {
            this.performAddWriteCommittedWithCleanup(rowId, row, commitTs, txId, lastCommitTs, indexIds);
        } else {
            this.performAddWriteWithCleanup(rowId, row, txId, commitPartitionId, lastCommitTs, indexIds, isArchivation);
        }
        this.indexUpdateHandler.addToIndexes(row, rowId, indexIds);
        return true;
    }

    public void handleUpdateAll(UUID txId, Map<UUID, TimedBinaryRow> rowsToUpdate, ReplicationGroupId commitPartitionId, boolean trackWriteIntent, @Nullable Runnable onApplication, @Nullable HybridTimestamp commitTs, @Nullable List<Integer> indexIds, boolean isArchivation) {
        assert (trackWriteIntent || commitTs != null) : "either trackWriteIntent must be true or commitTs must be non-null";
        if (CollectionUtils.nullOrEmpty(rowsToUpdate)) {
            return;
        }
        Iterator<Map.Entry<UUID, TimedBinaryRow>> it = rowsToUpdate.entrySet().iterator();
        Map.Entry<UUID, TimedBinaryRow> lastUnprocessedEntry = it.next();
        while (lastUnprocessedEntry != null) {
            lastUnprocessedEntry = this.processEntriesUntilBatchLimit(lastUnprocessedEntry, txId, trackWriteIntent, commitTs, (PartitionGroupId)commitPartitionId, it, onApplication, (Integer)this.replicationConfiguration.batchSizeBytes().value(), indexIds, isArchivation);
        }
    }

    private Map.Entry<UUID, TimedBinaryRow> processEntriesUntilBatchLimit(Map.Entry<UUID, TimedBinaryRow> lastUnprocessedEntry, UUID txId, boolean trackWriteIntent, @Nullable HybridTimestamp commitTs, PartitionGroupId commitPartitionId, Iterator<Map.Entry<UUID, TimedBinaryRow>> it, @Nullable Runnable onApplication, int maxBatchLength, @Nullable List<Integer> indexIds, boolean isArchivation) {
        return this.storage.runConsistently(locker -> {
            ArrayList<RowId> processedRowIds = new ArrayList<RowId>();
            int batchLength = 0;
            Map.Entry entryToProcess = lastUnprocessedEntry;
            while (entryToProcess != null) {
                boolean rowProcessed;
                BinaryRow row;
                RowId rowId = new RowId(this.partitionId, (UUID)entryToProcess.getKey());
                BinaryRow binaryRow = row = entryToProcess.getValue() == null ? null : ((TimedBinaryRow)entryToProcess.getValue()).binaryRow();
                if (row != null) {
                    batchLength += row.tupleSliceLength();
                }
                if (!processedRowIds.isEmpty() && batchLength > maxBatchLength || !(rowProcessed = this.tryProcessRow(locker, commitPartitionId, rowId, txId, row, entryToProcess.getValue() == null ? null : ((TimedBinaryRow)entryToProcess.getValue()).commitTimestamp(), commitTs, !processedRowIds.isEmpty(), indexIds, isArchivation))) break;
                entryToProcess = it.hasNext() ? (Map.Entry)it.next() : null;
                processedRowIds.add(rowId);
            }
            if (trackWriteIntent) {
                this.pendingRows.addPendingRowIds(txId, processedRowIds);
            } else {
                this.modificationCounter.updateValue(processedRowIds.size(), commitTs);
            }
            if (entryToProcess == null && onApplication != null) {
                onApplication.run();
            }
            return entryToProcess;
        });
    }

    private void tryRemovePreviousWritesIndex(RowId rowId, BinaryRow previousRow, @Nullable List<Integer> indexIds) {
        try (Cursor<ReadResult> cursor = this.storage.scanVersions(rowId);){
            if (!cursor.hasNext()) {
                return;
            }
            this.indexUpdateHandler.tryRemoveFromIndexes(previousRow, rowId, cursor, indexIds);
        }
    }

    public void handleWriteIntentRead(UUID txId, RowId rowId) {
        this.pendingRows.addPendingRowId(txId, rowId);
    }

    public void switchWriteIntents(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp, @Nullable List<Integer> indexIds) {
        this.switchWriteIntents(txId, commit, commitTimestamp, null, indexIds);
    }

    public void switchWriteIntents(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp, @Nullable Runnable onApplication, @Nullable List<Integer> indexIds) {
        Set<RowId> pendingRowIds = this.pendingRows.removePendingRowIds(txId);
        if (!pendingRowIds.isEmpty() || onApplication != null) {
            this.storage.runConsistently(locker -> {
                pendingRowIds.forEach(locker::lock);
                if (commit) {
                    this.performCommitWrite(txId, pendingRowIds, commitTimestamp);
                } else {
                    this.performAbortWrite(txId, pendingRowIds, indexIds);
                }
                if (onApplication != null) {
                    onApplication.run();
                }
                return null;
            });
        }
    }

    private void performCommitWrite(UUID txId, Set<RowId> pendingRowIds, HybridTimestamp commitTimestamp) {
        assert (commitTimestamp != null) : "Commit timestamp is null: " + txId;
        pendingRowIds.forEach(rowId -> this.storage.commitWrite((RowId)rowId, commitTimestamp, txId));
        if (!pendingRowIds.isEmpty()) {
            this.modificationCounter.updateValue(pendingRowIds.size(), commitTimestamp);
        }
    }

    private void performAbortWrite(UUID txId, Set<RowId> pendingRowIds, @Nullable List<Integer> indexIds) {
        for (RowId rowId : pendingRowIds) {
            AbortResult abortResult = this.storage.abortWrite(rowId, txId);
            if (abortResult.status() == AbortResultStatus.TX_MISMATCH || abortResult.status() != AbortResultStatus.SUCCESS || abortResult.previousWriteIntent() == null) continue;
            Cursor<ReadResult> cursor = this.storage.scanVersions(rowId);
            try {
                this.indexUpdateHandler.tryRemoveFromIndexes(abortResult.previousWriteIntent(), rowId, cursor, indexIds);
            }
            finally {
                if (cursor == null) continue;
                cursor.close();
            }
        }
    }

    public IndexUpdateHandler getIndexUpdateHandler() {
        return this.indexUpdateHandler;
    }

    private void performAddWriteCommittedWithCleanup(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTs, UUID txId, @Nullable HybridTimestamp lastCommitTs, @Nullable List<Integer> indexIds) {
        AddWriteCommittedResult result = this.storage.addWriteCommitted(rowId, row, commitTs);
        if (result.status() == AddWriteCommittedResultStatus.WRITE_INTENT_EXISTS) {
            if (lastCommitTs == null) {
                throw new StorageException("Write intent exists: [rowId={}]", rowId);
            }
            UUID wiTxId = result.currentWriteIntentTxId();
            this.performWriteIntentCleanup(rowId, txId, wiTxId, lastCommitTs, result.latestCommitTimestamp(), indexIds);
            result = this.storage.addWriteCommitted(rowId, row, commitTs);
            assert (result.status() == AddWriteCommittedResultStatus.SUCCESS) : "rowId=" + rowId + ", result=" + result;
        }
    }

    private void performAddWriteWithCleanup(RowId rowId, @Nullable BinaryRow row, UUID txId, PartitionGroupId commitPartitionId, @Nullable HybridTimestamp lastCommitTs, @Nullable List<Integer> indexIds, boolean isArchivation) {
        AddWriteResult result = this.performAddWrite(rowId, row, txId, commitPartitionId, indexIds, isArchivation);
        if (result.status() == AddWriteResultStatus.TX_MISMATCH) {
            UUID wiTxId = result.currentWriteIntentTxId();
            if (lastCommitTs == null) {
                throw new TxIdMismatchException(wiTxId, txId);
            }
            this.performWriteIntentCleanup(rowId, txId, wiTxId, lastCommitTs, result.latestCommitTimestamp(), indexIds);
            result = this.performAddWrite(rowId, row, txId, commitPartitionId, indexIds, isArchivation);
            assert (result.status() == AddWriteResultStatus.SUCCESS) : "rowId=" + rowId + ", result=" + result;
        }
    }

    private AddWriteResult performAddWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, PartitionGroupId commitPartitionId, @Nullable List<Integer> indexIds, boolean isArchivation) {
        AddWriteResult result = this.storage.addWrite(rowId, row, txId, commitPartitionId.objectId(), commitPartitionId.partitionId(), isArchivation);
        if (result.status() == AddWriteResultStatus.SUCCESS && result.previousWriteIntent() != null) {
            this.tryRemovePreviousWritesIndex(rowId, result.previousWriteIntent(), indexIds);
        }
        return result;
    }

    private void performWriteIntentCleanup(RowId rowId, UUID txId, UUID writeIntentTxId, HybridTimestamp lastCommitTs, @Nullable HybridTimestamp latestCommittedTs, @Nullable List<Integer> indexIds) {
        assert (!txId.equals(writeIntentTxId)) : String.format("Transactions must not match: [rowId=%s, txId=%s]", rowId, txId);
        if (latestCommittedTs == null) {
            this.performCommitWrite(writeIntentTxId, Set.of(rowId), lastCommitTs);
            return;
        }
        assert (lastCommitTs.compareTo(latestCommittedTs) >= 0) : "Primary commit timestamp " + lastCommitTs + " is earlier than local commit timestamp " + latestCommittedTs;
        if (lastCommitTs.compareTo(latestCommittedTs) > 0) {
            this.performCommitWrite(writeIntentTxId, Set.of(rowId), lastCommitTs);
        } else {
            this.performAbortWrite(writeIntentTxId, Set.of(rowId), indexIds);
        }
    }

    @TestOnly
    public void eraseVolatileState(UUID txId) {
        this.pendingRows.removePendingRowIds(txId);
    }

    public PartitionDataStorage storage() {
        return this.storage;
    }

    public void discardTransaction(UUID txId) {
        Set<RowId> pendingRowIds = this.pendingRows.removePendingRowIds(txId);
        if (!pendingRowIds.isEmpty()) {
            this.storage.runConsistently(locker -> {
                for (RowId rowId : pendingRowIds) {
                    locker.lock(rowId);
                    this.storage.discard(rowId);
                }
                return null;
            });
        }
    }
}

