/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table.distributed.replicator.secondary;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryEventWatermark;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.lowwatermark.LowWatermark;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite3.internal.partition.replicator.network.replication.BinaryRowWithTombstoneMessage;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadWriteMultipleRowsSecondaryReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.RequestType;
import org.apache.ignite3.internal.placementdriver.ReplicaMeta;
import org.apache.ignite3.internal.replicator.ReplicationGroupId;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite3.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite3.internal.replicator.message.SecondaryReplicaSafeTimeSyncRequest;
import org.apache.ignite3.internal.schema.BinaryRow;
import org.apache.ignite3.internal.schema.BinaryRowConverter;
import org.apache.ignite3.internal.schema.SchemaDescriptor;
import org.apache.ignite3.internal.schema.SchemaManager;
import org.apache.ignite3.internal.table.distributed.replicator.secondary.ReplicaServiceWrapper;
import org.apache.ignite3.internal.table.distributed.replicator.secondary.SecondaryStorageSubscriber;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.table.QualifiedName;
import org.apache.ignite3.table.TableRowEvent;
import org.apache.ignite3.table.TableRowEventBatch;
import org.apache.ignite3.table.TableRowEventType;
import org.jetbrains.annotations.Nullable;

public class SecondaryStorageReplicationSubscriber
extends SecondaryStorageSubscriber<TableRowEventBatch<BinaryRow>> {
    private static final IgniteLogger LOG = Loggers.forClass(SecondaryStorageReplicationSubscriber.class);
    private static final PartitionReplicationMessagesFactory TABLE_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private static final AtomicReferenceFieldUpdater<SecondaryStorageReplicationSubscriber, UUID> LOW_WATERMARK_LOCK_ID = AtomicReferenceFieldUpdater.newUpdater(SecondaryStorageReplicationSubscriber.class, UUID.class, "lowWatermarkLockId");
    private final QualifiedName tableName;
    private final TablePartitionId tablePartitionId;
    private final ReplicationGroupId secondaryZonePartitionId;
    private final ReplicaServiceWrapper replicaService;
    private final SchemaManager schemaManager;
    private final ClockService clockService;
    private final LowWatermark lowWatermark;
    @Nullable
    private volatile UUID lowWatermarkLockId;
    private CompletableFuture<?> replicationFuture = CompletableFutures.nullCompletedFuture();

    SecondaryStorageReplicationSubscriber(QualifiedName tableName, TablePartitionId tablePartitionId, ReplicationGroupId secondaryZonePartitionId, ReplicaServiceWrapper replicaService, SchemaManager schemaManager, ClockService clockService, FailureProcessor failureProcessor, IgniteSpinBusyLock lock, LowWatermark lowWatermark, @Nullable UUID lowWatermarkLockId) {
        super(tableName, tablePartitionId.partitionId(), failureProcessor, lock);
        this.tableName = tableName;
        this.tablePartitionId = tablePartitionId;
        this.secondaryZonePartitionId = secondaryZonePartitionId;
        this.replicaService = replicaService;
        this.schemaManager = schemaManager;
        this.clockService = clockService;
        this.lowWatermark = lowWatermark;
        this.lowWatermarkLockId = lowWatermarkLockId;
    }

    @Override
    public void onNext(TableRowEventBatch<BinaryRow> sourceEventBatch) {
        this.executeWithBusyLock(() -> {
            this.unlockLowWatermarkIfNeeded();
            if (sourceEventBatch.rows().isEmpty()) {
                ContinuousQueryEventWatermark eventWatermark = (ContinuousQueryEventWatermark)sourceEventBatch.watermark();
                long safeTime = eventWatermark.timestamps()[this.tablePartitionId.partitionId()];
                this.doOnSafeTimeUpdate(safeTime);
                return;
            }
            ArrayList<BinaryRowWithTombstoneMessage> batch = new ArrayList<BinaryRowWithTombstoneMessage>(sourceEventBatch.rows().size());
            for (TableRowEvent<BinaryRow> tableRowEvent : sourceEventBatch.rows()) {
                if (tableRowEvent.type() == TableRowEventType.ARCHIVED) continue;
                BinaryRowWithTombstoneMessage rowMessage = this.createRowMessage(tableRowEvent);
                batch.add(rowMessage);
            }
            if (!batch.isEmpty()) {
                this.replicationFuture = this.replicationFuture.thenCompose(ignore -> this.doOnNext(batch));
            }
            this.subscription().request(1L);
        });
    }

    @Override
    public void onComplete() {
        this.executeWithBusyLock(() -> {
            this.unlockLowWatermarkIfNeeded();
            LOG.info("Replication into secondary storage complete [tableName={}, partitionId={}].", this.tableName, this.tablePartitionId.partitionId());
        });
    }

    private void unlockLowWatermarkIfNeeded() {
        UUID lockId = LOW_WATERMARK_LOCK_ID.getAndSet(this, null);
        if (lockId != null) {
            this.lowWatermark.unlock(lockId);
        }
    }

    private BinaryRowWithTombstoneMessage createRowMessage(TableRowEvent<BinaryRow> event) {
        ByteBuffer rowTuple;
        BinaryRow row;
        ContinuousQueryEventWatermark eventWatermark = (ContinuousQueryEventWatermark)event.watermark();
        int partitionId = this.tablePartitionId.partitionId();
        UUID rowId = eventWatermark.rowIds()[partitionId];
        long commitTimestamp = eventWatermark.timestamps()[partitionId];
        TableRowEventType eventType = event.type();
        boolean isTombstone = eventType == TableRowEventType.REMOVED;
        BinaryRow binaryRow = row = isTombstone ? event.oldEntry() : event.entry();
        assert (row != null) : "Row must not be null";
        ByteBuffer byteBuffer = rowTuple = isTombstone ? this.extractPkColumnsBytes(event.oldEntry()) : row.tupleSlice();
        assert (rowTuple != null) : "Binary tuple is not expected to be null for operation: " + eventType;
        return TABLE_MESSAGES_FACTORY.binaryRowWithTombstoneMessage().rowId(rowId).binaryTuple(rowTuple).schemaVersion(row.schemaVersion()).tombstone(isTombstone).commitTimestamp(commitTimestamp).build();
    }

    private void doOnSafeTimeUpdate(long safeTime) {
        this.replicationFuture = this.replicationFuture.thenCompose(ignore -> this.updateSafeTime(safeTime));
    }

    private CompletableFuture<?> doOnNext(List<BinaryRowWithTombstoneMessage> batch) {
        return this.executeWithBusyLock(() -> this.replicaService.invokeWithRetry(replicaMeta -> this.writeRowsRequest((ReplicaMeta)replicaMeta, batch)).whenComplete((v, e) -> this.handleError((Throwable)e, "Failed to send new batch to the secondary storage")));
    }

    private ReadWriteMultipleRowsSecondaryReplicaRequest writeRowsRequest(ReplicaMeta replicaMeta, List<BinaryRowWithTombstoneMessage> batch) {
        return TABLE_MESSAGES_FACTORY.readWriteMultipleRowsSecondaryReplicaRequest().groupId(ReplicaMessageUtils.toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, this.secondaryZonePartitionId)).enlistmentConsistencyToken(replicaMeta.getStartTime().longValue()).tableId(this.tablePartitionId.tableId()).requestType(RequestType.RW_INSERT).timestamp(this.clockService.current()).rows(batch).build();
    }

    private CompletableFuture<?> updateSafeTime(long safeTimestamp) {
        return this.executeWithBusyLock(() -> this.replicaService.invokeWithRetry(replicaMeta -> this.safeTimeSyncRequest(safeTimestamp, replicaMeta.getStartTime().longValue())).whenComplete((v, e) -> this.handleError((Throwable)e, "Failed to send safe time update for secondary storage")));
    }

    private SecondaryReplicaSafeTimeSyncRequest safeTimeSyncRequest(long safeTimestamp, long enlistmentConsistencyToken) {
        return REPLICA_MESSAGES_FACTORY.secondaryReplicaSafeTimeSyncRequestV2().groupId(ReplicaMessageUtils.toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, this.secondaryZonePartitionId)).enlistmentConsistencyToken(enlistmentConsistencyToken).tableId(this.tablePartitionId.tableId()).proposedSafeTime(HybridTimestamp.hybridTimestamp(safeTimestamp)).build();
    }

    @Override
    void stop() {
        if (this.tryStop()) {
            LOG.info("Stopping replication into secondary storage [tableName={}, partitionId={}].", this.tableName, this.tablePartitionId.partitionId());
            this.unlockLowWatermarkIfNeeded();
            Flow.Subscription localSubscription = this.subscription();
            if (localSubscription != null) {
                localSubscription.cancel();
            }
        }
    }

    @Nullable
    private ByteBuffer extractPkColumnsBytes(@Nullable BinaryRow row) {
        if (row == null) {
            return null;
        }
        int tableId = this.tablePartitionId.tableId();
        SchemaDescriptor schemaDescriptor = this.schemaManager.schemaRegistry(tableId).schema(row.schemaVersion());
        BinaryRowConverter converter = BinaryRowConverter.keyExtractor(schemaDescriptor);
        return converter.extractColumns(row).byteBuffer();
    }
}

