/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.snapshots;

import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.schema.BinaryRowUpgrader;
import org.apache.ignite3.internal.storage.BinaryRowAndRowId;
import org.apache.ignite3.internal.storage.MvPartitionStorage;
import org.apache.ignite3.internal.storage.RowId;
import org.apache.ignite3.internal.table.TableViewInternal;
import org.apache.ignite3.internal.table.distributed.PartitionSet;
import org.apache.ignite3.internal.tx.InternalTransaction;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.gridgain.internal.snapshots.PartitionResponsibilityTrigger;
import org.gridgain.internal.snapshots.RowSnapshotWriter;
import org.gridgain.internal.snapshots.SnapshotCancelledException;
import org.gridgain.internal.snapshots.SnapshotContext;
import org.gridgain.internal.snapshots.TableTombstoneResolver;
import org.gridgain.internal.snapshots.TombstoneResolverFactory;
import org.gridgain.internal.snapshots.communication.metastorage.CreateSnapshotGlobalState;
import org.gridgain.internal.snapshots.filesystem.SnapshotPath;

class TableSnapshotWriter {
    private static final IgniteLogger LOG = Loggers.forClass(TableSnapshotWriter.class);
    static final int ROWS_BATCH_SIZE = 1000;
    private final TableViewInternal table;
    private final PartitionSet partitions;
    private final InternalClusterNode thisNode;
    private final ExecutorService threadPool;
    private final SnapshotContext<CreateSnapshotGlobalState> snapshotContext;
    private final TombstoneResolverFactory tombstoneResolverFactory;
    private final int targetTableVersion;
    private final InternalTransaction transaction;

    TableSnapshotWriter(SnapshotContext<CreateSnapshotGlobalState> snapshotContext, InternalClusterNode thisNode, TableViewInternal table, PartitionSet partitions, ExecutorService threadPool, int targetTableVersion, InternalTransaction transaction) {
        this.thisNode = thisNode;
        this.table = table;
        this.partitions = partitions;
        this.threadPool = threadPool;
        this.snapshotContext = snapshotContext;
        this.targetTableVersion = targetTableVersion;
        this.transaction = transaction;
        this.tombstoneResolverFactory = new TombstoneResolverFactory((partitionId, from, to) -> {
            MvPartitionStorage partition = table.internalTable().storage().getMvPartition(partitionId);
            assert (partition != null) : "Accessing local partition. Partition with id=" + partitionId + " should be present.";
            return partition.scanSnapshotTombstones(from, to);
        });
    }

    CompletableFuture<Long> createPartitionSnapshots() {
        LOG.info("Snapshot creation {}: Started processing table {} ({}).", this.snapshotContext.operationId(), this.table.name(), this.table.tableId());
        return this.snapshotContext.inBusyLockAsync(() -> {
            CompletableFuture[] partitionSnapshotFutures = (CompletableFuture[])this.partitions.stream().mapToObj(this::createPartitionSnapshot).toArray(CompletableFuture[]::new);
            return this.aggregateSnapshotFutures(partitionSnapshotFutures);
        });
    }

    CompletableFuture<Long> createSingleCopyPartitionSnapshots(PartitionResponsibilityTrigger responsibilityTrigger) {
        int tableId = this.table.tableId();
        return this.snapshotContext.inBusyLockAsync(() -> {
            CompletableFuture[] partitionSnapshotFutures = (CompletableFuture[])this.partitions.stream().mapToObj(partition -> responsibilityTrigger.tryBecomeResponsible(new TablePartitionId(tableId, partition)).thenComposeAsync(isResponsible -> isResponsible != false ? this.createPartitionSnapshot(partition) : CompletableFuture.completedFuture(0L), (Executor)this.threadPool)).toArray(CompletableFuture[]::new);
            return this.aggregateSnapshotFutures(partitionSnapshotFutures);
        });
    }

    private CompletableFuture<Long> aggregateSnapshotFutures(CompletableFuture<Long>[] partitionSnapshotFutures) {
        return ((CompletableFuture)CompletableFutures.allOfToList(partitionSnapshotFutures).thenApply(numRowsList -> numRowsList.stream().mapToLong(Long::longValue).sum())).whenComplete((numRows, e) -> {
            if (e != null) {
                this.snapshotContext.cancel();
            } else if (LOG.isInfoEnabled()) {
                LOG.info("Snapshot creation {}: Table {} ({}) has been successfully saved, number of rows: {}.", this.snapshotContext.operationId(), this.table.name(), this.table.tableId(), numRows);
            }
        });
    }

    private CompletableFuture<Long> createPartitionSnapshot(int partitionId) {
        try {
            PartitionSnapshotWriter partitionWriter = new PartitionSnapshotWriter(partitionId);
            return partitionWriter.createPartitionSnapshot();
        }
        catch (IOException e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    private class PartitionSnapshotWriter {
        private final int partitionId;
        private final CompletableFuture<RowSnapshotWriter> rowSnapshotWriterFuture;

        PartitionSnapshotWriter(int partitionId) throws IOException {
            this.partitionId = partitionId;
            SnapshotPath partitionSnapshotPath = TableSnapshotWriter.this.snapshotContext.snapshotFileSystem().partitionFile(TableSnapshotWriter.this.table.tableId(), partitionId);
            BinaryRowUpgrader rowUpgrader = new BinaryRowUpgrader(TableSnapshotWriter.this.table.schemaView(), TableSnapshotWriter.this.targetTableVersion);
            this.rowSnapshotWriterFuture = partitionSnapshotPath.writeChannel().thenApply(writableByteChannel -> new RowSnapshotWriter((WritableByteChannel)writableByteChannel, rowUpgrader));
        }

        CompletableFuture<Long> createPartitionSnapshot() {
            try {
                LOG.info("Snapshot creation {}: Started processing partition {} of table {} ({}).", TableSnapshotWriter.this.snapshotContext.operationId(), this.partitionId, TableSnapshotWriter.this.table.name(), TableSnapshotWriter.this.table.tableId());
                return ((CompletableFuture)this.rowSnapshotWriterFuture.thenCompose(rowSnapshotWriter -> ((CompletableFuture)this.saveRows((RowSnapshotWriter)rowSnapshotWriter).whenComplete((unused, throwable) -> TableSnapshotWriter.this.snapshotContext.inBusyLock(rowSnapshotWriter::close))).thenApply(unused -> rowSnapshotWriter.rowsWritten()))).whenComplete((rowsWritten, e) -> {
                    if (e == null) {
                        LOG.info("Snapshot creation {}: Partition {} of table {} ({}) saved, number of rows: {}.", TableSnapshotWriter.this.snapshotContext.operationId(), this.partitionId, TableSnapshotWriter.this.table.name(), TableSnapshotWriter.this.table.tableId(), rowsWritten);
                    } else if (!(ExceptionUtils.unwrapCause(e) instanceof SnapshotCancelledException)) {
                        LOG.error("Snapshot creation {}: Failed to save partition {} of table {} ({}).", (Throwable)e, (Object)TableSnapshotWriter.this.snapshotContext.operationId(), (Object)this.partitionId, (Object)TableSnapshotWriter.this.table.name(), (Object)TableSnapshotWriter.this.table.tableId());
                    }
                });
            }
            catch (Throwable e2) {
                return CompletableFuture.failedFuture(e2);
            }
        }

        private CompletableFuture<Void> saveRows(final RowSnapshotWriter rowSnapshotWriter) {
            final CompletableFuture<Void> result = new CompletableFuture<Void>();
            final CreateSnapshotGlobalState snapshotState = TableSnapshotWriter.this.snapshotContext.snapshotState();
            Flow.Publisher<BinaryRowAndRowId> publisher = TableSnapshotWriter.this.table.internalTable().scanInterval(this.partitionId, TableSnapshotWriter.this.transaction.id(), snapshotState.fromTimestamp(), snapshotState.timestamp(), TableSnapshotWriter.this.thisNode, TableSnapshotWriter.this.transaction.coordinatorId());
            publisher.subscribe(new Flow.Subscriber<BinaryRowAndRowId>(){
                private Flow.Subscription subscription;
                private List<BinaryRowAndRowId> rowAccumulator = new ArrayList<BinaryRowAndRowId>(1000);
                private final TableTombstoneResolver tombstoneResolver;
                private CompletableFuture<?> lastWriteFuture;
                {
                    this.tombstoneResolver = TableSnapshotWriter.this.tombstoneResolverFactory.create(PartitionSnapshotWriter.this.partitionId, snapshotState.fromTimestamp(), snapshotState.timestamp());
                    this.lastWriteFuture = CompletableFutures.nullCompletedFuture();
                }

                @Override
                public void onSubscribe(Flow.Subscription subscription) {
                    this.subscription = subscription;
                    subscription.request(1000L);
                }

                @Override
                public void onNext(BinaryRowAndRowId item) {
                    this.rowAccumulator.add(item);
                    if (this.rowAccumulator.size() == 1000) {
                        List<BinaryRowAndRowId> oldAccumulator = this.rowAccumulator;
                        this.rowAccumulator = new ArrayList<BinaryRowAndRowId>(1000);
                        this.lastWriteFuture = CompletableFuture.runAsync(() -> TableSnapshotWriter.this.snapshotContext.inBusyLock(() -> this.saveBinaryRows(rowSnapshotWriter, oldAccumulator)), TableSnapshotWriter.this.threadPool).whenComplete((v, e) -> {
                            if (e == null) {
                                this.subscription.request(1000L);
                            } else {
                                this.subscription.cancel();
                                result.completeExceptionally((Throwable)e);
                            }
                        });
                    }
                }

                @Override
                public void onError(Throwable throwable) {
                    result.completeExceptionally(throwable);
                    this.tombstoneResolver.close();
                }

                @Override
                public void onComplete() {
                    ((CompletableFuture)this.lastWriteFuture.thenRunAsync(() -> TableSnapshotWriter.this.snapshotContext.inBusyLock(() -> {
                        this.saveBinaryRows(rowSnapshotWriter, this.rowAccumulator);
                        this.saveTombstonesUntilRowId(rowSnapshotWriter, RowId.highestRowId(PartitionSnapshotWriter.this.partitionId));
                    }), TableSnapshotWriter.this.threadPool)).whenComplete((unused, e) -> {
                        this.tombstoneResolver.close();
                        if (e == null) {
                            result.complete(null);
                        } else {
                            result.completeExceptionally((Throwable)e);
                        }
                    });
                }

                private void saveBinaryRows(RowSnapshotWriter writer, List<BinaryRowAndRowId> rows) throws IOException {
                    if (rows.isEmpty()) {
                        return;
                    }
                    for (BinaryRowAndRowId row : rows) {
                        this.saveTombstonesUntilRowId(writer, row.rowId());
                        writer.saveRowIdAndBinaryRow(row.rowId(), row.binaryRow());
                    }
                }

                private void saveTombstonesUntilRowId(RowSnapshotWriter writer, RowId rowId) throws IOException {
                    for (RowId tombstoneRowId : this.tombstoneResolver.resolveUntil(rowId)) {
                        writer.saveTombstone(tombstoneRowId);
                    }
                }
            });
            return result;
        }
    }
}

