/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table.distributed.replicator.secondary;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryEventWatermark;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.ComponentStoppingException;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.lowwatermark.LowWatermark;
import org.apache.ignite3.internal.replicator.ReplicaService;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.schema.SchemaManager;
import org.apache.ignite3.internal.storage.RowId;
import org.apache.ignite3.internal.storage.secondary.TimestampAndRowId;
import org.apache.ignite3.internal.table.TableViewInternal;
import org.apache.ignite3.internal.table.distributed.replicator.secondary.ReplicaServiceWrapper;
import org.apache.ignite3.internal.table.distributed.replicator.secondary.SecondaryStorageReplicationSubscriber;
import org.apache.ignite3.internal.table.partition.HashPartition;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.table.ContinuousQueryOptions;
import org.apache.ignite3.table.ContinuousQueryWatermark;

class SecondaryStorageReplicator {
    private static final IgniteLogger LOG = Loggers.forClass(SecondaryStorageReplicator.class);
    private final LowWatermark lowWatermark;
    private final TableViewInternal table;
    private final int partitionId;
    private final SecondaryStorageReplicationSubscriber subscriber;
    private final ZonePartitionId targetSecondaryZonePartitionId;
    private final FailureProcessor failureProcessor;
    private final Executor executor;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final UUID lowWatermarkLockId = UUID.randomUUID();

    SecondaryStorageReplicator(ReplicaService replicaService, SchemaManager schemaManager, ClockService clockService, LowWatermark lowWatermark, FailureProcessor failureProcessor, TableViewInternal table, int partitionId, Executor executor) {
        this.lowWatermark = lowWatermark;
        this.failureProcessor = failureProcessor;
        this.table = table;
        this.partitionId = partitionId;
        this.executor = executor;
        ReplicaServiceWrapper replicaServiceWrapper = new ReplicaServiceWrapper(partitionId, table.internalTable(), replicaService, this.busyLock);
        Integer secondaryZoneId = table.internalTable().secondaryZoneId();
        assert (secondaryZoneId != null) : IgniteStringFormatter.format("No secondary zone id provided for table with secondary storage: [table={}]", table.qualifiedName());
        this.targetSecondaryZonePartitionId = new ZonePartitionId(secondaryZoneId, partitionId);
        this.subscriber = new SecondaryStorageReplicationSubscriber(table.qualifiedName(), new TablePartitionId(table.tableId(), partitionId), this.targetSecondaryZonePartitionId, replicaServiceWrapper, schemaManager, clockService, failureProcessor, this.busyLock, lowWatermark, this.lowWatermarkLockId);
    }

    void start() {
        LOG.info("Starting secondary replication for [tableName={}, groupId={}]", this.table.qualifiedName(), this.targetSecondaryZonePartitionId);
        ((CompletableFuture)this.table.internalTable().latestProcessedRowOfSecondaryStorage(this.partitionId).thenAccept(this::startCqReplication)).whenComplete((i, e) -> {
            if (e != null && !ExceptionUtils.hasCause(e, NodeStoppingException.class, ComponentStoppingException.class)) {
                String message = String.format("Failed to start secondary storage replication for [tableName=%s, groupId=%s].", this.table.qualifiedName(), this.targetSecondaryZonePartitionId);
                this.failureProcessor.process(new FailureContext((Throwable)e, message));
            }
        });
    }

    void stop() {
        this.busyLock.block();
        this.subscriber.stop();
    }

    private void startCqReplication(TimestampAndRowId latestTimestampAndRowId) {
        IgniteUtils.inBusyLock(this.busyLock, () -> {
            ContinuousQueryWatermark cqWatermark = this.createCqWatermark(latestTimestampAndRowId);
            ContinuousQueryOptions options = ContinuousQueryOptions.builder().watermark(cqWatermark).enableEmptyBatches(true).executor(this.executor).partitions(new HashPartition(this.partitionId)).build();
            this.table.internalTable().queryContinuously(this.subscriber, options, (row, schema) -> row);
        });
    }

    private ContinuousQueryWatermark createCqWatermark(TimestampAndRowId latestTimestampAndRowId) {
        return latestTimestampAndRowId.timestamp().equals(HybridTimestamp.MIN_VALUE) ? this.initialCqWatermark() : this.resumeCqWatermark(latestTimestampAndRowId);
    }

    private ContinuousQueryWatermark initialCqWatermark() {
        int partitions = this.table.internalTable().partitions();
        UUID[] uuids = new UUID[partitions];
        long[] timestamps = new long[partitions];
        timestamps[this.partitionId] = this.lockAndGetCurrentLwm().longValue();
        uuids[this.partitionId] = RowId.lowestRowId(this.partitionId).uuid();
        return ContinuousQueryEventWatermark.cqEventWatermark(uuids, timestamps);
    }

    private ContinuousQueryWatermark resumeCqWatermark(TimestampAndRowId latestTimestampAndRowId) {
        int partitions = this.table.internalTable().partitions();
        UUID[] uuids = new UUID[partitions];
        long[] timestamps = new long[partitions];
        timestamps[this.partitionId] = latestTimestampAndRowId.timestamp().longValue();
        uuids[this.partitionId] = latestTimestampAndRowId.rowId().uuid();
        return ContinuousQueryEventWatermark.cqEventWatermark(uuids, timestamps);
    }

    private HybridTimestamp lockAndGetCurrentLwm() {
        HybridTimestamp currentLwm;
        HybridTimestamp result;
        while (!this.lowWatermark.tryLock(this.lowWatermarkLockId, result = (currentLwm = this.lowWatermark.getLowWatermark()) == null ? HybridTimestamp.MIN_VALUE.addPhysicalTime(1L) : currentLwm)) {
        }
        return result;
    }
}

