/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.sql.engine.exec.rel;

import java.util.List;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.exp.func.IterableTableFunction;
import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunction;
import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionInstance;
import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite.internal.sql.engine.exec.rel.Downstream;
import org.apache.ignite.internal.sql.engine.exec.rel.Node;
import org.apache.ignite.internal.sql.engine.exec.rel.SingleNode;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.sql.SqlException;

public class ScanNode<RowT>
extends AbstractNode<RowT>
implements SingleNode<RowT> {
    private final TableFunction<RowT> func;
    private TableFunctionInstance<RowT> inst;
    private int requested;
    private boolean inLoop;

    public ScanNode(ExecutionContext<RowT> ctx, Iterable<RowT> src) {
        this(ctx, new IterableTableFunction<RowT>(src));
    }

    public ScanNode(ExecutionContext<RowT> ctx, TableFunction<RowT> src) {
        super(ctx);
        this.func = src;
    }

    @Override
    public void request(int rowsCnt) throws Exception {
        assert (rowsCnt > 0 && this.requested == 0) : "rowsCnt=" + rowsCnt + ", requested=" + this.requested;
        this.requested = rowsCnt;
        if (!this.inLoop) {
            this.execute(this::push);
        }
    }

    @Override
    public void closeInternal() {
        super.closeInternal();
        Commons.closeQuiet(this.inst);
        this.inst = null;
        Commons.closeQuiet(this.func);
    }

    @Override
    protected void rewindInternal() {
        Commons.closeQuiet(this.inst);
        this.inst = null;
        this.requested = 0;
    }

    @Override
    public void register(List<Node<RowT>> sources) {
        throw new UnsupportedOperationException();
    }

    @Override
    protected Downstream<RowT> requestDownstream(int idx) {
        throw new UnsupportedOperationException();
    }

    private void push() throws Exception {
        this.inLoop = true;
        try {
            if (this.inst == null) {
                this.inst = this.func.createInstance(this.context());
            }
            int processed = 0;
            while (this.requested > 0 && this.inst.hasNext()) {
                --this.requested;
                Object row = this.inst.next();
                this.acquireRow(row);
                this.downstream().push(row);
                this.releaseRow(row);
                if (++processed != this.inBufSize || this.requested <= 0) continue;
                this.execute(this::push);
                return;
            }
        }
        catch (QueryCancelledException | SqlException e) {
            throw e;
        }
        finally {
            this.inLoop = false;
        }
        if (this.requested > 0 && !this.hasNext()) {
            Commons.closeQuiet(this.inst);
            this.inst = null;
            this.requested = 0;
            this.downstream().end();
        }
    }

    @Override
    protected void dumpDebugInfo0(IgniteStringBuilder buf) {
        buf.app("class=").app(this.getClass().getSimpleName()).app(", requested=").app(this.requested);
    }

    private boolean hasNext() {
        return this.inst.hasNext();
    }
}

