/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.dcr.table;

import java.time.Instant;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryEventWatermark;
import org.apache.ignite3.internal.continuousquery.TableRowEventImpl;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.table.TimedTuple;
import org.apache.ignite3.table.ContinuousQueryPhysicalTimeWatermark;
import org.apache.ignite3.table.ContinuousQueryWatermark;
import org.apache.ignite3.table.DataStreamerItem;
import org.apache.ignite3.table.TableRowEvent;
import org.apache.ignite3.table.TableRowEventBatch;
import org.apache.ignite3.table.Tuple;
import org.gridgain.internal.dcr.metrics.DcrMetricSource;
import org.gridgain.internal.dcr.table.TableManager;
import org.gridgain.internal.dcr.table.schema.SchemaSyncPolicy;
import org.jetbrains.annotations.Nullable;

public class ReplicationSubscriber
implements Flow.Subscriber<TableRowEventBatch<Map.Entry<Tuple, Tuple>>> {
    private static final IgniteLogger LOG = Loggers.forClass(ReplicationSubscriber.class);
    private final AtomicReference<Flow.Subscription> subscription = new AtomicReference();
    @Nullable
    private Instant flushPoint;
    private final TableManager tableManager;
    private final SchemaSyncPolicy<Map.Entry<Tuple, Tuple>> schemaSyncPolicy;
    private final CompletableFuture<Void> finishFuture;
    private final DcrMetricSource metricSource;
    private final ClockService clockService;
    private final AtomicBoolean stopped = new AtomicBoolean();
    private SubmissionPublisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>> publisher;

    public ReplicationSubscriber(TableManager tableManager, SchemaSyncPolicy<Map.Entry<Tuple, Tuple>> schemaSyncPolicy, @Nullable Instant flushPoint, CompletableFuture<Void> finishFuture, DcrMetricSource metricSource, ClockService clockService) {
        this.tableManager = tableManager;
        this.schemaSyncPolicy = schemaSyncPolicy;
        this.flushPoint = flushPoint;
        this.finishFuture = finishFuture;
        this.metricSource = metricSource;
        this.clockService = clockService;
    }

    public void flushPoint(Instant flushPoint) {
        this.flushPoint = flushPoint;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        if (this.subscription.compareAndSet(null, subscription)) {
            subscription.request(Long.MAX_VALUE);
            this.publisher = new SubmissionPublisher();
            this.tableManager.localTable().streamData(this.publisher, null);
        }
    }

    @Override
    public void onNext(TableRowEventBatch<Map.Entry<Tuple, Tuple>> sourceEventBatch) {
        try {
            if (this.stopped.get()) {
                return;
            }
            long minWatermark = ReplicationSubscriber.minWatermark(sourceEventBatch);
            if (sourceEventBatch.rows().isEmpty() && this.isFlushPointPassed(minWatermark)) {
                this.stopReplicationOnFlush();
                return;
            }
            this.metricSource.lag(this.clockService.now().getPhysical() - HybridTimestamp.hybridTimestamp(minWatermark).getPhysical());
            boolean schemaChecked = false;
            for (TableRowEvent<Map.Entry<Tuple, Tuple>> sourceEvent : sourceEventBatch.rows()) {
                Map.Entry<Tuple, Tuple> tupleEntry;
                if (this.isFlushPointPassed(sourceEvent)) {
                    this.stopReplicationOnFlush();
                    return;
                }
                if (!schemaChecked && (tupleEntry = sourceEvent.entry()) != null) {
                    this.schemaSyncPolicy.isSchemaSync(tupleEntry);
                    schemaChecked = true;
                }
                this.metricSource.entriesObservedIncrement();
                this.doOnNext(sourceEvent);
            }
        }
        catch (Exception e) {
            LOG.error("Failed to process CQ entry.", (Throwable)e);
            throw e;
        }
    }

    private void stopReplicationOnFlush() {
        LOG.info("Replication for table " + this.tableManager.tableName() + " finished because flush point " + this.flushPoint + " is reached.", new Object[0]);
        this.stop();
    }

    private static long minWatermark(TableRowEventBatch<Map.Entry<Tuple, Tuple>> sourceEventBatch) {
        ContinuousQueryWatermark watermark = sourceEventBatch.watermark();
        if (watermark instanceof ContinuousQueryEventWatermark) {
            long[] timestamps = ((ContinuousQueryEventWatermark)watermark).timestamps();
            return Arrays.stream(timestamps).min().orElseThrow(() -> new IllegalStateException("No timestamps in : " + watermark));
        }
        if (watermark instanceof ContinuousQueryPhysicalTimeWatermark) {
            Instant timestampLong = ((ContinuousQueryPhysicalTimeWatermark)watermark).startTime();
            return timestampLong.toEpochMilli();
        }
        throw new IllegalStateException("Unknown watermark type: " + watermark);
    }

    private boolean isFlushPointPassed(long minWatermark) {
        if (this.flushPoint == null) {
            return false;
        }
        long flushMillis = HybridTimestamp.physicalToLong(this.flushPoint.toEpochMilli());
        return flushMillis < minWatermark;
    }

    private boolean isFlushPointPassed(TableRowEvent<Map.Entry<Tuple, Tuple>> sourceEvent) {
        if (this.flushPoint == null) {
            return false;
        }
        ContinuousQueryWatermark watermark = sourceEvent.watermark();
        long flushMillis = HybridTimestamp.physicalToLong(this.flushPoint.toEpochMilli());
        if (watermark instanceof ContinuousQueryEventWatermark) {
            long[] timestamps;
            for (long timestampLong : timestamps = ((ContinuousQueryEventWatermark)watermark).timestamps()) {
                if (timestampLong >= flushMillis) continue;
                return false;
            }
            return true;
        }
        if (watermark instanceof ContinuousQueryPhysicalTimeWatermark) {
            Instant timestampLong = ((ContinuousQueryPhysicalTimeWatermark)watermark).startTime();
            return timestampLong.toEpochMilli() < flushMillis;
        }
        throw new IllegalStateException("Unknown watermark type: " + watermark);
    }

    private void doOnNext(TableRowEvent<Map.Entry<Tuple, Tuple>> sourceEvent) {
        Map.Entry<Tuple, Tuple> newEntry = sourceEvent.entry();
        Map.Entry<Tuple, Tuple> oldEntry = sourceEvent.oldEntry();
        long timestamp = ((TableRowEventImpl)sourceEvent).timestamp();
        switch (sourceEvent.type()) {
            case CREATED: {
                LOG.debug("Created entry: " + newEntry, new Object[0]);
                Map.Entry<Tuple, TimedTuple> timedEntry = Map.entry(newEntry.getKey(), new TimedTuple(newEntry.getValue(), timestamp));
                this.publisher.submit(DataStreamerItem.of(timedEntry));
                this.metricSource.entriesSentIncrement();
                break;
            }
            case UPDATED: {
                LOG.debug("Updated entry: " + sourceEvent.entry(), new Object[0]);
                Map.Entry<Tuple, TimedTuple> timedEntry = Map.entry(newEntry.getKey(), new TimedTuple(newEntry.getValue(), timestamp));
                this.publisher.submit(DataStreamerItem.of(timedEntry));
                this.metricSource.entriesSentIncrement();
                break;
            }
            case REMOVED: {
                LOG.debug("Removed entry: " + oldEntry, new Object[0]);
                Map.Entry<Tuple, TimedTuple> timedEntry = Map.entry(oldEntry.getKey(), new TimedTuple(oldEntry.getValue(), timestamp));
                this.publisher.submit(DataStreamerItem.removed(timedEntry));
                this.metricSource.entriesSentIncrement();
                break;
            }
            default: {
                LOG.error("Unknown event: " + sourceEvent, new Object[0]);
            }
        }
    }

    @Override
    public void onError(Throwable throwable) {
        LOG.error("Replication error for table " + this.tableManager.tableName(), throwable);
        this.finishFuture.completeExceptionally(throwable);
        this.publisher.closeExceptionally(throwable);
    }

    @Override
    public void onComplete() {
        LOG.info("Replication for table " + this.tableManager.tableName() + " completed.", new Object[0]);
        this.finishFuture.complete(null);
        this.publisher.close();
    }

    public void stop() {
        LOG.info("Replication for table " + this.tableManager.tableName() + " stopped.", new Object[0]);
        if (!this.stopped.compareAndSet(false, true)) {
            return;
        }
        Flow.Subscription sub = this.subscription.getAndSet(null);
        if (sub != null) {
            sub.cancel();
            this.publisher.close();
        }
    }
}

