/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.kafka.source;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.continuousquery.TableRowEventImpl;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.TableRowEvent;
import org.apache.ignite.table.TableRowEventBatch;
import org.apache.ignite.table.Tuple;
import org.gridgain.kafka.source.SourceEvent;
import org.jetbrains.annotations.Nullable;

class ContinuousQuerySubscriber
implements Flow.Subscriber<TableRowEventBatch<Tuple>> {
    private final String originalTableName;
    private final Table table;
    private final BlockingQueue<SourceEvent> events;
    private final AtomicLong requested = new AtomicLong();
    private final boolean saveWatermark;
    @Nullable
    private volatile Flow.Subscription subscription;
    @Nullable
    private volatile Throwable error;

    ContinuousQuerySubscriber(String originalTableName, Table table, BlockingQueue<SourceEvent> events, boolean saveWatermark) {
        this.originalTableName = originalTableName;
        this.table = table;
        this.events = events;
        this.saveWatermark = saveWatermark;
    }

    Table table() {
        return this.table;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        assert (this.subscription == null) : "onSubscribe called multiple times";
        this.subscription = subscription;
        this.ensureRequested(1);
    }

    @Override
    public void onNext(TableRowEventBatch<Tuple> batch) {
        this.requested.decrementAndGet();
        for (TableRowEvent event : batch.rows()) {
            this.events.add(new SourceEvent(this.originalTableName, (TableRowEvent<Tuple>)event, this.getWatermarkBytes((TableRowEvent<Tuple>)event)));
        }
    }

    @Override
    public void onError(Throwable throwable) {
        this.error = throwable;
    }

    @Override
    public void onComplete() {
    }

    @Nullable
    Throwable error() {
        return this.error;
    }

    void cancel() {
        Flow.Subscription sub = this.subscription;
        if (sub == null) {
            return;
        }
        sub.cancel();
    }

    void ensureRequested(int batches) {
        long current;
        Flow.Subscription sub = this.subscription;
        if (sub == null) {
            return;
        }
        do {
            if ((current = this.requested.get()) < (long)batches) continue;
            return;
        } while (!this.requested.compareAndSet(current, batches));
        sub.request((long)batches - current);
    }

    void reset() {
        this.error = null;
        this.subscription = null;
        this.requested.set(0L);
    }

    private byte @Nullable [] getWatermarkBytes(TableRowEvent<Tuple> event) {
        if (!this.saveWatermark) {
            return null;
        }
        TableRowEventImpl eventInternal = (TableRowEventImpl)event;
        return eventInternal.watermarkBytes();
    }
}

