package org.apache.ignite.internal.sql.engine.exec.rel;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.Flow;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.class */
public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> {
    private static final int NOT_WAITING = -1;
    private Queue<RowT> inBuff;

    @Nullable
    private final Predicate<RowT> filters;

    @Nullable
    private final Function<RowT, RowT> rowTransformer;
    private int requested;
    private int waiting;
    private boolean inLoop;

    @Nullable
    private Flow.Subscription activeSubscription;
    private boolean dataRequested;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode$SubscriberImpl.class */
    public class SubscriberImpl implements Flow.Subscriber<RowT> {
        private Queue<RowT> inBuffInner;
        static final /* synthetic */ boolean $assertionsDisabled;

        private SubscriberImpl() {
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onSubscribe(Flow.Subscription subscription) {
            if (!$assertionsDisabled && StorageScanNode.this.activeSubscription != null) {
                throw new AssertionError();
            }
            this.inBuffInner = StorageScanNode.this.inBuff;
            StorageScanNode.this.activeSubscription = subscription;
            subscription.request(StorageScanNode.this.waiting);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onNext(RowT rowt) {
            this.inBuffInner.add(rowt);
            if (this.inBuffInner.size() == 512) {
                StorageScanNode.this.execute(() -> {
                    StorageScanNode.this.waiting = 0;
                    StorageScanNode.this.push();
                });
            }
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onError(Throwable th) {
            StorageScanNode.this.execute(() -> {
                throw th;
            });
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onComplete() {
            StorageScanNode.this.execute(() -> {
                StorageScanNode.this.activeSubscription = null;
                StorageScanNode.this.waiting = 0;
                StorageScanNode.this.push();
            });
        }

        static {
            $assertionsDisabled = !StorageScanNode.class.desiredAssertionStatus();
        }
    }

    public StorageScanNode(ExecutionContext<RowT> executionContext, RowHandler.RowFactory<RowT> rowFactory, @Nullable Predicate<RowT> predicate, @Nullable Function<RowT, RowT> function) {
        super(executionContext);
        this.inBuff = new LinkedBlockingQueue(512);
        if (!$assertionsDisabled && executionContext.txAttributes() == null) {
            throw new AssertionError("Transaction not initialized.");
        }
        this.filters = predicate;
        this.rowTransformer = function;
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.rel.Node
    public void request(int i) throws Exception {
        if (!$assertionsDisabled && (i <= 0 || this.requested != 0)) {
            throw new AssertionError("rowsCnt=" + i + ", requested=" + this.requested);
        }
        checkState();
        this.requested = i;
        if (this.inLoop) {
            return;
        }
        execute(this::push);
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode
    public void closeInternal() {
        super.closeInternal();
        if (this.activeSubscription != null) {
            this.activeSubscription.cancel();
            this.activeSubscription = null;
        }
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode
    protected void rewindInternal() {
        this.requested = 0;
        this.waiting = 0;
        this.dataRequested = false;
        this.inBuff = new LinkedBlockingQueue(512);
        if (this.activeSubscription != null) {
            this.activeSubscription.cancel();
            this.activeSubscription = null;
        }
    }

    protected abstract Flow.Publisher<RowT> scan();

    private void push() throws Exception {
        if (isClosed()) {
            return;
        }
        checkState();
        if (this.requested > 0 && !this.inBuff.isEmpty()) {
            this.inLoop = true;
            while (this.requested > 0 && !this.inBuff.isEmpty()) {
                try {
                    checkState();
                    RowT remove = this.inBuff.remove();
                    if (this.filters == null || this.filters.test(remove)) {
                        if (this.rowTransformer != null) {
                            remove = this.rowTransformer.apply(remove);
                        }
                        acquireRow(remove);
                        this.requested--;
                        downstream().push(remove);
                        releaseRow(remove);
                    }
                } finally {
                    this.inLoop = false;
                }
            }
        }
        if (this.requested > 0 && (this.waiting == 0 || this.activeSubscription == null)) {
            requestNextBatch();
        }
        if (this.requested <= 0 || this.waiting != -1) {
            return;
        }
        if (!this.inBuff.isEmpty()) {
            execute(this::push);
        } else {
            this.requested = 0;
            downstream().end();
        }
    }

    private void requestNextBatch() {
        if (this.waiting == -1) {
            return;
        }
        if (this.waiting == 0) {
            this.waiting = 512 - this.inBuff.size();
        }
        Flow.Subscription subscription = this.activeSubscription;
        if (subscription != null) {
            subscription.request(this.waiting);
        } else if (this.dataRequested) {
            this.waiting = -1;
        } else {
            scan().subscribe(new SubscriberImpl());
            this.dataRequested = true;
        }
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode, org.apache.ignite.internal.sql.engine.exec.rel.Node
    public void register(List<Node<RowT>> list) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode
    protected Downstream<RowT> requestDownstream(int i) {
        throw new UnsupportedOperationException();
    }

    static {
        $assertionsDisabled = !StorageScanNode.class.desiredAssertionStatus();
    }
}
