/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.partition.replicator.raft.snapshot.outgoing;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.catalog.CatalogService;
import org.apache.ignite3.internal.lang.IgniteBiTuple;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite3.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
import org.apache.ignite3.internal.partition.replicator.network.raft.SnapshotMetaRequest;
import org.apache.ignite3.internal.partition.replicator.network.raft.SnapshotMetaResponse;
import org.apache.ignite3.internal.partition.replicator.network.raft.SnapshotMvDataRequest;
import org.apache.ignite3.internal.partition.replicator.network.raft.SnapshotMvDataResponse;
import org.apache.ignite3.internal.partition.replicator.network.raft.SnapshotTxDataRequest;
import org.apache.ignite3.internal.partition.replicator.network.raft.SnapshotTxDataResponse;
import org.apache.ignite3.internal.partition.replicator.network.replication.BinaryRowMessage;
import org.apache.ignite3.internal.partition.replicator.network.replication.BinaryRowVersionMessage;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionKey;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.outgoing.MvPartitionDeliveryState;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.outgoing.SnapshotMetaUtils;
import org.apache.ignite3.internal.raft.RaftGroupConfiguration;
import org.apache.ignite3.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite3.internal.schema.BinaryRow;
import org.apache.ignite3.internal.storage.ReadResult;
import org.apache.ignite3.internal.storage.RowId;
import org.apache.ignite3.internal.tx.TxMeta;
import org.apache.ignite3.internal.tx.message.TxMessagesFactory;
import org.apache.ignite3.internal.tx.message.TxMetaMessage;
import org.apache.ignite3.internal.util.Cursor;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;

public class OutgoingSnapshot {
    private static final IgniteLogger LOG = Loggers.forClass(OutgoingSnapshot.class);
    private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
    private final UUID id;
    private final PartitionKey partitionKey;
    private final Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId;
    private final PartitionTxStateAccess txState;
    private final CatalogService catalogService;
    private final ReentrantLock mvOperationsLock = new ReentrantLock();
    @Nullable
    private volatile PartitionSnapshotMeta frozenMeta;
    private final Set<RowId> rowIdsToSkip = new HashSet<RowId>();
    private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData = new ArrayDeque<SnapshotMvDataResponse.ResponseEntry>();
    @Nullable
    private MvPartitionDeliveryState mvPartitionDeliveryState;
    @Nullable
    private Cursor<IgniteBiTuple<UUID, TxMeta>> txDataCursor;
    private volatile boolean finishedTxData;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean closedGuard = new AtomicBoolean();

    public OutgoingSnapshot(UUID id, PartitionKey partitionKey, Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId, PartitionTxStateAccess txState, CatalogService catalogService) {
        this.id = id;
        this.partitionKey = partitionKey;
        this.partitionsByTableId = partitionsByTableId;
        this.txState = txState;
        this.catalogService = catalogService;
    }

    public UUID id() {
        return this.id;
    }

    public PartitionKey partitionKey() {
        return this.partitionKey;
    }

    void freezeScopeUnderMvLock() {
        this.acquireMvLock();
        try {
            int catalogVersion = this.catalogService.latestCatalogVersion();
            List<PartitionMvStorageAccess> partitionStorages = this.freezePartitionStorages();
            this.frozenMeta = this.takeSnapshotMeta(catalogVersion, partitionStorages);
            this.txDataCursor = this.txState.getAllTxMeta();
            this.finishedTxData = false;
            this.mvPartitionDeliveryState = new MvPartitionDeliveryState(partitionStorages);
        }
        finally {
            this.releaseMvLock();
        }
    }

    private PartitionSnapshotMeta takeSnapshotMeta(int catalogVersion, Collection<PartitionMvStorageAccess> partitionStorages) {
        Map<Integer, UUID> nextRowIdToBuildByIndexId = SnapshotMetaUtils.collectNextRowIdToBuildIndexes(this.catalogService, partitionStorages, catalogVersion);
        PartitionMvStorageAccess partitionStorageWithMaxAppliedIndex = partitionStorages.stream().max(Comparator.comparingLong(PartitionMvStorageAccess::lastAppliedIndex)).orElse(null);
        if (partitionStorageWithMaxAppliedIndex == null || this.txState.lastAppliedIndex() > partitionStorageWithMaxAppliedIndex.lastAppliedIndex()) {
            RaftGroupConfiguration config = this.txState.committedGroupConfiguration();
            assert (config != null) : "Configuration should never be null when installing a snapshot";
            return SnapshotMetaUtils.snapshotMetaAt(this.txState.lastAppliedIndex(), this.txState.lastAppliedTerm(), config, catalogVersion, nextRowIdToBuildByIndexId, this.txState.leaseInfo());
        }
        RaftGroupConfiguration config = partitionStorageWithMaxAppliedIndex.committedGroupConfiguration();
        assert (config != null) : "Configuration should never be null when installing a snapshot";
        return SnapshotMetaUtils.snapshotMetaAt(partitionStorageWithMaxAppliedIndex.lastAppliedIndex(), partitionStorageWithMaxAppliedIndex.lastAppliedTerm(), config, catalogVersion, nextRowIdToBuildByIndexId, partitionStorageWithMaxAppliedIndex.leaseInfo());
    }

    private List<PartitionMvStorageAccess> freezePartitionStorages() {
        if (this.partitionKey instanceof ZonePartitionKey) {
            return this.partitionsByTableId.values().stream().sorted(Comparator.comparingInt(PartitionMvStorageAccess::tableId)).collect(Collectors.toList());
        }
        assert (this.partitionsByTableId.size() == 1);
        return List.copyOf(this.partitionsByTableId.values());
    }

    public PartitionSnapshotMeta meta() {
        PartitionSnapshotMeta meta = this.frozenMeta;
        assert (meta != null) : "No snapshot meta yet, probably the snapshot scope was not yet frozen";
        return meta;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    SnapshotMetaResponse handleSnapshotMetaRequest(SnapshotMetaRequest request) {
        assert (Objects.equals(request.id(), this.id)) : "Expected id " + this.id + " but got " + request.id();
        if (!this.busyLock.enterBusy()) {
            return (SnapshotMetaResponse)this.logThatAlreadyClosedAndReturnNull();
        }
        try {
            PartitionSnapshotMeta meta = this.frozenMeta;
            assert (meta != null) : "No snapshot meta yet, probably the snapshot scope was not yet frozen";
            SnapshotMetaResponse snapshotMetaResponse = PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotMetaResponse().meta(meta).build();
            return snapshotMetaResponse;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    @Nullable
    private <T> T logThatAlreadyClosedAndReturnNull() {
        LOG.debug("Snapshot with ID '{}' is already closed", this.id);
        return null;
    }

    @Nullable
    SnapshotMvDataResponse handleSnapshotMvDataRequest(SnapshotMvDataRequest request) {
        if (!this.busyLock.enterBusy()) {
            return (SnapshotMvDataResponse)this.logThatAlreadyClosedAndReturnNull();
        }
        try {
            SnapshotMvDataResponse snapshotMvDataResponse = this.handleSnapshotMvDataRequestInternal(request);
            return snapshotMvDataResponse;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private SnapshotMvDataResponse handleSnapshotMvDataRequestInternal(SnapshotMvDataRequest request) {
        long totalBatchSize = 0L;
        ArrayList<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<SnapshotMvDataResponse.ResponseEntry>();
        while (true) {
            this.acquireMvLock();
            try {
                totalBatchSize = this.fillWithOutOfOrderRows(batch, totalBatchSize, request);
                totalBatchSize = this.tryProcessRowFromPartition(batch, totalBatchSize, request);
                if (!this.finishedMvData() && !OutgoingSnapshot.batchIsFull(request, totalBatchSize)) continue;
                SnapshotMvDataResponse snapshotMvDataResponse = PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotMvDataResponse().rows(batch).finish(this.finishedMvData()).build();
                return snapshotMvDataResponse;
            }
            finally {
                this.releaseMvLock();
                continue;
            }
            break;
        }
    }

    private long fillWithOutOfOrderRows(List<SnapshotMvDataResponse.ResponseEntry> rowEntries, long totalBytesBefore, SnapshotMvDataRequest request) {
        long totalBytesAfter;
        SnapshotMvDataResponse.ResponseEntry rowEntry;
        assert (this.mvOperationsLock.isLocked()) : "MV operations lock must be acquired!";
        for (totalBytesAfter = totalBytesBefore; totalBytesAfter < request.batchSizeHint() && (rowEntry = this.outOfOrderMvData.poll()) != null; totalBytesAfter += OutgoingSnapshot.rowSizeInBytes(rowEntry.rowVersions())) {
            rowEntries.add(rowEntry);
        }
        return totalBytesAfter;
    }

    private static long rowSizeInBytes(List<BinaryRowMessage> rowVersions) {
        long sum = 0L;
        for (BinaryRowMessage rowMessage : rowVersions) {
            if (rowMessage == null || rowMessage.binaryTuple() == null) continue;
            sum += (long)(rowMessage.binaryTuple().remaining() + 2);
        }
        return sum;
    }

    private long tryProcessRowFromPartition(List<SnapshotMvDataResponse.ResponseEntry> batch, long totalBatchSize, SnapshotMvDataRequest request) {
        if (OutgoingSnapshot.batchIsFull(request, totalBatchSize) || this.finishedMvData()) {
            return totalBatchSize;
        }
        assert (this.mvPartitionDeliveryState != null) : "Snapshot scope has not been frozen.";
        this.mvPartitionDeliveryState.advance();
        if (!this.finishedMvData()) {
            RowId rowId = this.mvPartitionDeliveryState.currentRowId();
            PartitionMvStorageAccess partition = this.mvPartitionDeliveryState.currentPartitionStorage();
            if (!this.rowIdsToSkip.remove(rowId)) {
                SnapshotMvDataResponse.ResponseEntry rowEntry = OutgoingSnapshot.rowEntry(partition, rowId);
                assert (rowEntry != null);
                batch.add(rowEntry);
                totalBatchSize += OutgoingSnapshot.rowSizeInBytes(rowEntry.rowVersions());
            }
        }
        return totalBatchSize;
    }

    private static boolean batchIsFull(SnapshotMvDataRequest request, long totalBatchSize) {
        return totalBatchSize >= request.batchSizeHint();
    }

    @Nullable
    private static SnapshotMvDataResponse.ResponseEntry rowEntry(PartitionMvStorageAccess partition, RowId rowId) {
        List<ReadResult> rowVersionsN2O = partition.getAllRowVersions(rowId);
        if (rowVersionsN2O.isEmpty()) {
            return null;
        }
        int count = rowVersionsN2O.size();
        ArrayList<BinaryRowMessage> rowVersions = new ArrayList<BinaryRowMessage>(count);
        int commitTimestampsCount = rowVersionsN2O.get(0).isWriteIntent() ? count - 1 : count;
        long[] commitTimestamps = new long[commitTimestampsCount];
        UUID transactionId = null;
        Integer commitTableOrZoneId = null;
        int commitPartitionId = -1;
        int j = 0;
        for (int i = count - 1; i >= 0; --i) {
            ReadResult version = rowVersionsN2O.get(i);
            BinaryRow row = version.binaryRow();
            BinaryRowVersionMessage rowMessage = PARTITION_REPLICATION_MESSAGES_FACTORY.binaryRowVersionMessage().binaryTuple(row == null ? null : row.tupleSlice()).schemaVersion(row == null ? 0 : row.schemaVersion()).tombstone(version.isEmpty()).isArchived(version.isArchived()).build();
            rowVersions.add(rowMessage);
            if (version.isWriteIntent()) {
                assert (i == 0) : rowVersionsN2O;
                transactionId = version.transactionId();
                commitTableOrZoneId = version.commitTableOrZoneId();
                commitPartitionId = version.commitPartitionId();
                continue;
            }
            commitTimestamps[j++] = version.commitTimestamp().longValue();
        }
        return PARTITION_REPLICATION_MESSAGES_FACTORY.responseEntry().tableId(partition.tableId()).rowId(rowId.uuid()).rowVersions(rowVersions).timestamps(commitTimestamps).txId(transactionId).commitTableOrZoneId(commitTableOrZoneId).commitPartitionId(commitPartitionId).build();
    }

    @Nullable
    SnapshotTxDataResponse handleSnapshotTxDataRequest(SnapshotTxDataRequest request) {
        if (!this.busyLock.enterBusy()) {
            return (SnapshotTxDataResponse)this.logThatAlreadyClosedAndReturnNull();
        }
        try {
            SnapshotTxDataResponse snapshotTxDataResponse = this.handleSnapshotTxDataRequestInternal(request);
            return snapshotTxDataResponse;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private SnapshotTxDataResponse handleSnapshotTxDataRequestInternal(SnapshotTxDataRequest request) {
        ArrayList<IgniteBiTuple<UUID, TxMeta>> rows = new ArrayList<IgniteBiTuple<UUID, TxMeta>>();
        boolean finishedTxData = this.finishedTxData;
        Cursor<IgniteBiTuple<UUID, TxMeta>> txDataCursor = this.txDataCursor;
        assert (txDataCursor != null) : "Snapshot scope has not been frozen.";
        while (!finishedTxData && rows.size() < request.maxTransactionsInBatch()) {
            if (txDataCursor.hasNext()) {
                rows.add((IgniteBiTuple)txDataCursor.next());
                continue;
            }
            finishedTxData = true;
            txDataCursor.close();
        }
        this.finishedTxData = finishedTxData;
        return OutgoingSnapshot.buildTxDataResponse(rows, finishedTxData);
    }

    private static SnapshotTxDataResponse buildTxDataResponse(List<IgniteBiTuple<UUID, TxMeta>> rows, boolean finished) {
        ArrayList<UUID> txIds = new ArrayList<UUID>(rows.size());
        ArrayList<TxMetaMessage> txMetas = new ArrayList<TxMetaMessage>(rows.size());
        for (IgniteBiTuple<UUID, TxMeta> row : rows) {
            txIds.add(row.getKey());
            txMetas.add(row.getValue().toTransactionMetaMessage(REPLICA_MESSAGES_FACTORY, TX_MESSAGES_FACTORY));
        }
        return PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotTxDataResponse().txIds(txIds).txMeta(txMetas).finish(finished).build();
    }

    public void acquireMvLock() {
        this.mvOperationsLock.lock();
    }

    public void releaseMvLock() {
        this.mvOperationsLock.unlock();
    }

    private boolean finishedMvData() {
        assert (this.mvOperationsLock.isLocked()) : "MV operations lock must be acquired!";
        return this.mvPartitionDeliveryState != null && this.mvPartitionDeliveryState.isExhausted();
    }

    public boolean addRowIdToSkip(RowId rowId) {
        assert (this.mvOperationsLock.isLocked()) : "MV operations lock must be acquired!";
        return this.rowIdsToSkip.add(rowId);
    }

    public boolean alreadyPassedOrIrrelevant(int tableId, RowId rowId) {
        assert (this.mvOperationsLock.isLocked()) : "MV operations lock must be acquired!";
        if (this.mvPartitionDeliveryState == null) {
            return false;
        }
        return !this.mvPartitionDeliveryState.isGoingToBeDelivered(tableId) || this.alreadyPassed(tableId, rowId);
    }

    private boolean alreadyPassed(int tableId, RowId rowId) {
        assert (this.mvPartitionDeliveryState != null);
        if (this.mvPartitionDeliveryState.isExhausted()) {
            return true;
        }
        if (!this.mvPartitionDeliveryState.hasIterationStarted()) {
            return false;
        }
        if (tableId == this.mvPartitionDeliveryState.currentTableId()) {
            return rowId.compareTo(this.mvPartitionDeliveryState.currentRowId()) <= 0;
        }
        return tableId < this.mvPartitionDeliveryState.currentTableId();
    }

    public void enqueueForSending(int tableId, RowId rowId) {
        assert (this.mvOperationsLock.isLocked()) : "MV operations lock must be acquired!";
        SnapshotMvDataResponse.ResponseEntry entry = OutgoingSnapshot.rowEntry((PartitionMvStorageAccess)this.partitionsByTableId.get(tableId), rowId);
        if (entry != null) {
            this.outOfOrderMvData.add(entry);
        }
    }

    public void close() {
        Cursor<IgniteBiTuple<UUID, TxMeta>> txCursor;
        if (!this.closedGuard.compareAndSet(false, true)) {
            return;
        }
        this.busyLock.block();
        if (!this.finishedTxData && (txCursor = this.txDataCursor) != null) {
            txCursor.close();
            this.finishedTxData = true;
        }
    }
}

