/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.table.distributed.replicator.secondary;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
import org.apache.ignite.internal.continuousquery.ContinuousQueryEventWatermark;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ComponentStoppingException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowWithTombstoneMessage;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteMultipleRowsSecondaryReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.SecondaryReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.table.distributed.replicator.secondary.ReplicaServiceWrapper;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.TrackerClosedException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.TableRowEvent;
import org.apache.ignite.table.TableRowEventBatch;
import org.apache.ignite.table.TableRowEventType;
import org.jetbrains.annotations.Nullable;

public class SecondaryStorageReplicationSubscriber
implements Flow.Subscriber<TableRowEventBatch<BinaryRow>> {
    private static final IgniteLogger LOG = Loggers.forClass(SecondaryStorageReplicationSubscriber.class);
    private static final PartitionReplicationMessagesFactory TABLE_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private static final AtomicReferenceFieldUpdater<SecondaryStorageReplicationSubscriber, UUID> LOW_WATERMARK_LOCK_ID = AtomicReferenceFieldUpdater.newUpdater(SecondaryStorageReplicationSubscriber.class, UUID.class, "lowWatermarkLockId");
    private final QualifiedName tableName;
    private final TablePartitionId tablePartitionId;
    private final ReplicationGroupId secondaryZonePartitionId;
    private final ReplicaServiceWrapper replicaService;
    private final SchemaManager schemaManager;
    private final FailureProcessor failureProcessor;
    private final ClockService clockService;
    private final IgniteSpinBusyLock busyLock;
    private final LowWatermark lowWatermark;
    private final AtomicBoolean isStopped = new AtomicBoolean();
    private volatile Flow.Subscription subscription;
    @Nullable
    private volatile UUID lowWatermarkLockId;
    private CompletableFuture<?> replicationFuture = CompletableFutures.nullCompletedFuture();

    SecondaryStorageReplicationSubscriber(QualifiedName tableName, TablePartitionId tablePartitionId, ReplicationGroupId secondaryZonePartitionId, ReplicaServiceWrapper replicaService, SchemaManager schemaManager, ClockService clockService, FailureProcessor failureProcessor, IgniteSpinBusyLock lock, LowWatermark lowWatermark, UUID lowWatermarkLockId) {
        this.tableName = tableName;
        this.tablePartitionId = tablePartitionId;
        this.secondaryZonePartitionId = secondaryZonePartitionId;
        this.replicaService = replicaService;
        this.schemaManager = schemaManager;
        this.clockService = clockService;
        this.failureProcessor = failureProcessor;
        this.busyLock = lock;
        this.lowWatermark = lowWatermark;
        this.lowWatermarkLockId = lowWatermarkLockId;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.executeWithBusyLock(() -> {
            this.subscription = subscription;
            subscription.request(1L);
        });
    }

    @Override
    public void onNext(TableRowEventBatch<BinaryRow> sourceEventBatch) {
        this.executeWithBusyLock(() -> {
            this.unlockLowWatermarkIfNeeded();
            if (sourceEventBatch.rows().isEmpty()) {
                ContinuousQueryEventWatermark eventWatermark = (ContinuousQueryEventWatermark)sourceEventBatch.watermark();
                long safeTime = eventWatermark.timestamps()[this.tablePartitionId.partitionId()];
                this.doOnSafeTimeUpdate(safeTime);
                return;
            }
            ArrayList<BinaryRowWithTombstoneMessage> batch = new ArrayList<BinaryRowWithTombstoneMessage>(sourceEventBatch.rows().size());
            for (TableRowEvent event : sourceEventBatch.rows()) {
                if (event.type() == TableRowEventType.ARCHIVED) continue;
                BinaryRowWithTombstoneMessage rowMessage = this.createRowMessage((TableRowEvent<BinaryRow>)event);
                batch.add(rowMessage);
            }
            if (!batch.isEmpty()) {
                this.replicationFuture = this.replicationFuture.thenCompose(ignore -> this.doOnNext(batch));
            }
            this.subscription.request(1L);
        });
    }

    @Override
    public void onError(Throwable throwable) {
        this.executeWithBusyLock(() -> this.handleError(throwable, "Replication error"));
    }

    @Override
    public void onComplete() {
        this.executeWithBusyLock(() -> {
            this.unlockLowWatermarkIfNeeded();
            LOG.info("Replication into secondary storage complete [tableName={}, partitionId={}].", new Object[]{this.tableName, this.tablePartitionId.partitionId()});
        });
    }

    private void unlockLowWatermarkIfNeeded() {
        UUID lockId = LOW_WATERMARK_LOCK_ID.getAndSet(this, null);
        if (lockId != null) {
            this.lowWatermark.unlock(lockId);
        }
    }

    private BinaryRowWithTombstoneMessage createRowMessage(TableRowEvent<BinaryRow> event) {
        ByteBuffer rowTuple;
        BinaryRow row;
        ContinuousQueryEventWatermark eventWatermark = (ContinuousQueryEventWatermark)event.watermark();
        int partitionId = this.tablePartitionId.partitionId();
        UUID rowId = eventWatermark.rowIds()[partitionId];
        long commitTimestamp = eventWatermark.timestamps()[partitionId];
        TableRowEventType eventType = event.type();
        boolean isTombstone = eventType == TableRowEventType.REMOVED;
        BinaryRow binaryRow = row = isTombstone ? (BinaryRow)event.oldEntry() : (BinaryRow)event.entry();
        assert (row != null) : "Row must not be null";
        ByteBuffer byteBuffer = rowTuple = isTombstone ? this.extractPkColumnsBytes((BinaryRow)event.oldEntry()) : row.tupleSlice();
        assert (rowTuple != null) : "Binary tuple is not expected to be null for operation: " + eventType;
        return TABLE_MESSAGES_FACTORY.binaryRowWithTombstoneMessage().rowId(rowId).binaryTuple(rowTuple).schemaVersion(row.schemaVersion()).tombstone(isTombstone).commitTimestamp(commitTimestamp).build();
    }

    private void doOnSafeTimeUpdate(long safeTime) {
        this.replicationFuture = this.replicationFuture.thenCompose(ignore -> this.updateSafeTime(safeTime));
    }

    private CompletableFuture<?> doOnNext(List<BinaryRowWithTombstoneMessage> batch) {
        return this.executeWithBusyLock(() -> this.replicaService.invokeWithRetry(replicaMeta -> this.writeRowsRequest((ReplicaMeta)replicaMeta, batch)).whenComplete((v, e) -> this.handleError((Throwable)e, "Failed to send new batch to the secondary storage")));
    }

    private ReadWriteMultipleRowsSecondaryReplicaRequest writeRowsRequest(ReplicaMeta replicaMeta, List<BinaryRowWithTombstoneMessage> batch) {
        return TABLE_MESSAGES_FACTORY.readWriteMultipleRowsSecondaryReplicaRequest().groupId(ReplicaMessageUtils.toReplicationGroupIdMessage((ReplicaMessagesFactory)REPLICA_MESSAGES_FACTORY, (ReplicationGroupId)this.secondaryZonePartitionId)).enlistmentConsistencyToken(Long.valueOf(replicaMeta.getStartTime().longValue())).tableId(this.tablePartitionId.tableId()).requestType(RequestType.RW_INSERT).timestamp(this.clockService.current()).rows(batch).build();
    }

    private CompletableFuture<?> updateSafeTime(long safeTimestamp) {
        return this.executeWithBusyLock(() -> {
            SecondaryReplicaSafeTimeSyncRequest request = this.safeTimeSyncRequest(safeTimestamp);
            return this.replicaService.invokeWithRetry(replicaMeta -> request).whenComplete((v, e) -> this.handleError((Throwable)e, "Failed to send safe time update for secondary storage"));
        });
    }

    private SecondaryReplicaSafeTimeSyncRequest safeTimeSyncRequest(long safeTimestamp) {
        return REPLICA_MESSAGES_FACTORY.secondaryReplicaSafeTimeSyncRequest().groupId(ReplicaMessageUtils.toReplicationGroupIdMessage((ReplicaMessagesFactory)REPLICA_MESSAGES_FACTORY, (ReplicationGroupId)this.secondaryZonePartitionId)).tableId(this.tablePartitionId.tableId()).proposedSafeTime(HybridTimestamp.hybridTimestamp((long)safeTimestamp)).build();
    }

    void stop() {
        if (this.isStopped.compareAndSet(false, true)) {
            LOG.info("Stopping replication into secondary storage [tableName={}, partitionId={}].", new Object[]{this.tableName, this.tablePartitionId.partitionId()});
            this.unlockLowWatermarkIfNeeded();
            Flow.Subscription localSubscription = this.subscription;
            if (localSubscription != null) {
                localSubscription.cancel();
            }
        }
    }

    @Nullable
    private ByteBuffer extractPkColumnsBytes(@Nullable BinaryRow row) {
        if (row == null) {
            return null;
        }
        int tableId = this.tablePartitionId.tableId();
        SchemaDescriptor schemaDescriptor = this.schemaManager.schemaRegistry(tableId).schema(row.schemaVersion());
        BinaryRowConverter converter = BinaryRowConverter.keyExtractor((SchemaDescriptor)schemaDescriptor);
        return converter.extractColumns(row).byteBuffer();
    }

    private void handleError(@Nullable Throwable throwable, String baseMessage) {
        if (throwable == null) {
            return;
        }
        this.executeWithBusyLock(() -> {
            boolean shouldNotifyFailureHandler;
            boolean bl = shouldNotifyFailureHandler = !ExceptionUtils.hasCause((Throwable)throwable, (Class[])new Class[]{NodeStoppingException.class, TrackerClosedException.class, ComponentStoppingException.class, TableNotFoundException.class});
            if (shouldNotifyFailureHandler) {
                String message = String.format("%s [tableName=%s, partitionId=%d].", baseMessage, this.tableName, this.tablePartitionId.partitionId());
                this.failureProcessor.process(new FailureContext(throwable, message));
            }
            this.stop();
        });
    }

    private void executeWithBusyLock(Runnable action) {
        if (!this.busyLock.enterBusy()) {
            return;
        }
        try {
            if (!this.isStopped.get()) {
                action.run();
            }
        }
        catch (Throwable e) {
            this.handleError(e, "Replication error");
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> CompletableFuture<T> executeWithBusyLock(Supplier<CompletableFuture<T>> action) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletableFuture<T> completableFuture = this.isStopped.get() ? CompletableFutures.nullCompletedFuture() : action.get();
            return completableFuture;
        }
        catch (Throwable e) {
            this.handleError(e, "Replication error");
            CompletableFuture completableFuture = CompletableFutures.nullCompletedFuture();
            return completableFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }
}

