/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table.distributed.replicator.secondary;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite3.internal.partition.replicator.network.replication.BinaryRowWithTombstoneMessage;
import org.apache.ignite3.internal.partition.replicator.network.replication.TransferRowVersionsToSecondaryReplicaRequest;
import org.apache.ignite3.internal.placementdriver.ReplicaMeta;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite3.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite3.internal.storage.TimedBinaryRowAndRowId;
import org.apache.ignite3.internal.table.TableViewInternal;
import org.apache.ignite3.internal.table.distributed.replicator.secondary.ReplicaServiceWrapper;
import org.apache.ignite3.internal.table.distributed.replicator.secondary.SecondaryStorageSubscriber;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;

class SecondaryStorageVersionScanSubscriber
extends SecondaryStorageSubscriber<TimedBinaryRowAndRowId> {
    private static final IgniteLogger LOG = Loggers.forClass(TableViewInternal.class);
    private static final int BATCH_SIZE = 1000;
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private static final PartitionReplicationMessagesFactory TABLE_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    private final TableViewInternal table;
    private final int partitionId;
    private final ZonePartitionId secondaryZonePartitionId;
    private final ClockService clockService;
    private final ReplicaServiceWrapper replicaService;
    private final IgniteSpinBusyLock busyLock;
    private final CompletableFuture<Void> result;
    private final List<BinaryRowWithTombstoneMessage> batch = new ArrayList<BinaryRowWithTombstoneMessage>();
    private CompletableFuture<?> replicationFuture = CompletableFutures.nullCompletedFuture();

    SecondaryStorageVersionScanSubscriber(TableViewInternal table, int partitionId, ClockService clockService, ReplicaServiceWrapper replicaService, FailureProcessor failureProcessor, IgniteSpinBusyLock lock, CompletableFuture<Void> result) {
        super(table.qualifiedName(), partitionId, failureProcessor, lock);
        this.table = table;
        this.partitionId = partitionId;
        this.clockService = clockService;
        this.replicaService = replicaService;
        this.result = result;
        this.busyLock = lock;
        Integer secondaryZoneId = table.internalTable().secondaryZoneId();
        assert (secondaryZoneId != null);
        this.secondaryZonePartitionId = new ZonePartitionId(secondaryZoneId, partitionId);
    }

    @Override
    void stop() {
        this.busyLock.block();
        if (this.tryStop()) {
            LOG.info("Stopping full state transfer into secondary storage [tableName={}, partitionId={}].", this.table.qualifiedName(), this.partitionId);
            Flow.Subscription localSubscription = this.subscription();
            if (localSubscription != null) {
                localSubscription.cancel();
            }
            this.result.complete(null);
        }
    }

    @Override
    public void onNext(TimedBinaryRowAndRowId item) {
        this.executeWithBusyLock(() -> {
            this.batch.add(SecondaryStorageVersionScanSubscriber.createRowMessage(item));
            if (this.batch.size() >= 1000) {
                this.replicationFuture = this.replicationFuture.thenCompose(v -> this.doOnNext());
            }
            this.subscription().request(1L);
        });
    }

    @Override
    public void onComplete() {
        this.executeWithBusyLock(() -> {
            if (!this.batch.isEmpty()) {
                this.replicationFuture = this.replicationFuture.thenCompose(v -> this.doOnNext());
            }
            this.replicationFuture = this.replicationFuture.thenRun(() -> this.result.complete(null));
            LOG.info("Full state transfer into secondary storage complete [tableName={}, partitionId={}].", this.table.qualifiedName(), this.partitionId);
        });
    }

    private CompletableFuture<?> doOnNext() {
        return this.executeWithBusyLock(() -> this.replicaService.invokeWithRetry(replicaMeta -> this.writeRowsRequest((ReplicaMeta)replicaMeta, this.batch)).whenComplete((v, e) -> this.handleError((Throwable)e, "Failed to send new batch to the secondary storage")));
    }

    private TransferRowVersionsToSecondaryReplicaRequest writeRowsRequest(ReplicaMeta replicaMeta, List<BinaryRowWithTombstoneMessage> batch) {
        return TABLE_MESSAGES_FACTORY.transferRowVersionsToSecondaryReplicaRequest().groupId(ReplicaMessageUtils.toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, this.secondaryZonePartitionId)).enlistmentConsistencyToken(replicaMeta.getStartTime().longValue()).tableId(this.table.tableId()).timestamp(this.clockService.current()).rows(batch).build();
    }

    private static BinaryRowWithTombstoneMessage createRowMessage(TimedBinaryRowAndRowId event) {
        return TABLE_MESSAGES_FACTORY.binaryRowWithTombstoneMessage().rowId(event.rowId().uuid()).binaryTuple(event.binaryRow().tupleSlice()).schemaVersion(event.binaryRow().schemaVersion()).tombstone(event.tombstone()).commitTimestamp(event.commitTimestamp().longValue()).build();
    }
}

