package org.gridgain.internal.dcr.table;

import java.time.Instant;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.continuousquery.ContinuousQueryEventWatermark;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.table.ContinuousQueryPhysicalTimeWatermark;
import org.apache.ignite.table.DataStreamerItem;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.TableRowEvent;
import org.apache.ignite.table.TableRowEventBatch;
import org.apache.ignite.table.TableRowEventType;
import org.apache.ignite.table.Tuple;
import org.gridgain.internal.dcr.table.schema.SchemaSyncPolicy;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/gridgain/internal/dcr/table/ReplicationSubscriber.class */
public class ReplicationSubscriber implements Flow.Subscriber<TableRowEventBatch<Map.Entry<Tuple, Tuple>>> {
    private static final IgniteLogger LOG = Loggers.forClass(ReplicationSubscriber.class);
    private Flow.Subscription subscription;

    @Nullable
    private Instant flushPoint;
    private final TableManager tableManager;
    private final SchemaSyncPolicy<Map.Entry<Tuple, Tuple>> schemaSyncPolicy;
    private final CompletableFuture<Void> finishFuture;
    private final AtomicBoolean stopped = new AtomicBoolean();
    private SubmissionPublisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>> publisher;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.gridgain.internal.dcr.table.ReplicationSubscriber$1, reason: invalid class name */
    /* loaded from: input_file:org/gridgain/internal/dcr/table/ReplicationSubscriber$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$ignite$table$TableRowEventType = new int[TableRowEventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$ignite$table$TableRowEventType[TableRowEventType.CREATED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$ignite$table$TableRowEventType[TableRowEventType.UPDATED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$ignite$table$TableRowEventType[TableRowEventType.REMOVED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public ReplicationSubscriber(TableManager tableManager, SchemaSyncPolicy<Map.Entry<Tuple, Tuple>> schemaSyncPolicy, @Nullable Instant instant, CompletableFuture<Void> completableFuture) {
        this.tableManager = tableManager;
        this.schemaSyncPolicy = schemaSyncPolicy;
        this.flushPoint = instant;
        this.finishFuture = completableFuture;
    }

    public void flushPoint(Instant instant) {
        this.flushPoint = instant;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(Long.MAX_VALUE);
        this.publisher = new SubmissionPublisher<>();
        this.tableManager.localTable().streamData(this.publisher, (DataStreamerOptions) null);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(TableRowEventBatch<Map.Entry<Tuple, Tuple>> tableRowEventBatch) {
        Map.Entry<Tuple, Tuple> entry;
        try {
            boolean z = false;
            for (TableRowEvent<Map.Entry<Tuple, Tuple>> tableRowEvent : tableRowEventBatch.rows()) {
                if (this.stopped.get()) {
                    return;
                }
                if (!z && (entry = (Map.Entry) tableRowEvent.entry()) != null) {
                    this.schemaSyncPolicy.isSchemaSync(entry);
                    z = true;
                }
                if (isFlushPointPassed(tableRowEvent)) {
                    LOG.info("Replication for table " + this.tableManager.tableName() + " finished because flush point " + this.flushPoint + " is reached.", new Object[0]);
                    stop();
                    return;
                }
                doOnNext(tableRowEvent);
            }
        } catch (Exception e) {
            LOG.error("Failed to process CQ entry.", e);
            throw e;
        }
    }

    private boolean isFlushPointPassed(TableRowEvent<Map.Entry<Tuple, Tuple>> tableRowEvent) {
        if (this.flushPoint == null) {
            return false;
        }
        ContinuousQueryEventWatermark watermark = tableRowEvent.watermark();
        long physicalToLong = HybridTimestamp.physicalToLong(this.flushPoint.toEpochMilli());
        if (!(watermark instanceof ContinuousQueryEventWatermark)) {
            if (watermark instanceof ContinuousQueryPhysicalTimeWatermark) {
                return ((ContinuousQueryPhysicalTimeWatermark) watermark).startTime().toEpochMilli() < physicalToLong;
            }
            throw new IllegalStateException("Unknown watermark type: " + watermark);
        }
        for (long j : watermark.timestamps()) {
            if (j < physicalToLong) {
                return false;
            }
        }
        return true;
    }

    private void doOnNext(TableRowEvent<Map.Entry<Tuple, Tuple>> tableRowEvent) {
        Map.Entry entry = (Map.Entry) tableRowEvent.entry();
        Map.Entry entry2 = (Map.Entry) tableRowEvent.oldEntry();
        switch (AnonymousClass1.$SwitchMap$org$apache$ignite$table$TableRowEventType[tableRowEvent.type().ordinal()]) {
            case 1:
                LOG.debug("Created entry: " + entry, new Object[0]);
                this.publisher.submit(DataStreamerItem.of(entry));
                return;
            case 2:
                LOG.debug("Updated entry: " + tableRowEvent.entry(), new Object[0]);
                this.publisher.submit(DataStreamerItem.of(entry));
                return;
            case 3:
                LOG.debug("Removed entry: " + entry2, new Object[0]);
                this.publisher.submit(DataStreamerItem.removed(entry2));
                return;
            default:
                LOG.error("Unknown event: " + tableRowEvent, new Object[0]);
                return;
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        LOG.error("Replication error for table " + this.tableManager.tableName(), th);
        this.finishFuture.completeExceptionally(th);
        this.publisher.closeExceptionally(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        LOG.info("Replication for table " + this.tableManager.tableName() + " completed.", new Object[0]);
        this.finishFuture.complete(null);
        this.publisher.close();
    }

    public void stop() {
        this.stopped.set(true);
        this.subscription.cancel();
        this.publisher.close();
    }
}
