/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.sql.engine.exec.rel;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Set;
import java.util.function.BiPredicate;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.ignite3.internal.lang.IgniteStringBuilder;
import org.apache.ignite3.internal.sql.engine.api.expressions.RowFactory;
import org.apache.ignite3.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite3.internal.sql.engine.exec.RowHandler;
import org.apache.ignite3.internal.sql.engine.exec.exp.SqlJoinProjection;
import org.apache.ignite3.internal.sql.engine.exec.memory.structures.RowList;
import org.apache.ignite3.internal.sql.engine.exec.memory.structures.RowQueue;
import org.apache.ignite3.internal.sql.engine.exec.memory.structures.RowStorageFactory;
import org.apache.ignite3.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite3.internal.sql.engine.exec.rel.Downstream;
import org.apache.ignite3.internal.sql.engine.exec.rel.Node;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.IgniteUtils;

public class CorrelatedNestedLoopJoinNode<RowT>
extends AbstractNode<RowT> {
    private final BiPredicate<RowT, RowT> cond;
    private final List<CorrelationId> correlationIds;
    private final JoinRelType joinType;
    private final SqlJoinProjection joinProjection;
    private final int leftInBufferSize;
    private final int rightInBufferSize;
    private final BitSet leftMatched = new BitSet();
    private final RowT rightEmptyRow;
    private final RowStorageFactory<RowT> storageFactory;
    private final RowFactory<RowT> leftRowFactory;
    private final RowFactory<RowT> rightRowFactory;
    private int requested;
    private int waitingLeft;
    private int waitingRight;
    private RowList<RowT> leftInBuf;
    private RowQueue<RowT> rightInBuf;
    private int leftIdx;
    private State state = State.INITIAL;

    public CorrelatedNestedLoopJoinNode(ExecutionContext<RowT> ctx, BiPredicate<RowT, RowT> cond, Set<CorrelationId> correlationIds, JoinRelType joinType, RowFactory<RowT> leftRowFactory, RowFactory<RowT> rightRowFactory, SqlJoinProjection joinProjection) {
        super(ctx);
        assert (!CollectionUtils.nullOrEmpty(correlationIds));
        assert (joinType == JoinRelType.LEFT || joinType == JoinRelType.INNER) : joinType;
        this.cond = cond;
        this.correlationIds = new ArrayList<CorrelationId>(correlationIds);
        this.joinType = joinType;
        this.leftRowFactory = leftRowFactory;
        this.rightRowFactory = rightRowFactory;
        this.joinProjection = joinProjection;
        this.leftInBufferSize = correlationIds.size();
        this.rightInBufferSize = this.inBufSize;
        this.rightEmptyRow = rightRowFactory.create();
        this.storageFactory = ctx.storageFactory();
    }

    @Override
    public void request(int rowsCnt) throws Exception {
        assert (!CollectionUtils.nullOrEmpty(this.sources()) && this.sources().size() == 2);
        assert (rowsCnt > 0 && this.requested == 0);
        this.requested = rowsCnt;
        this.onRequest();
    }

    @Override
    protected void rewindInternal() {
        this.releaseBuffers();
        this.leftInBuf = null;
        this.rightInBuf = null;
        this.leftIdx = 0;
        this.requested = 0;
        this.waitingLeft = 0;
        this.waitingRight = 0;
        this.state = State.INITIAL;
    }

    @Override
    protected void closeInternal() {
        super.closeInternal();
        this.releaseBuffers();
    }

    @Override
    protected Downstream<RowT> requestDownstream(int idx) {
        if (idx == 0) {
            return new Downstream<RowT>(){

                @Override
                public void push(RowT row) throws Exception {
                    CorrelatedNestedLoopJoinNode.this.pushLeft(row);
                }

                @Override
                public void end() throws Exception {
                    CorrelatedNestedLoopJoinNode.this.endLeft();
                }

                @Override
                public void onError(Throwable e) {
                    CorrelatedNestedLoopJoinNode.this.onError(e);
                }
            };
        }
        if (idx == 1) {
            return new Downstream<RowT>(){

                @Override
                public void push(RowT row) throws Exception {
                    CorrelatedNestedLoopJoinNode.this.pushRight(row);
                }

                @Override
                public void end() throws Exception {
                    CorrelatedNestedLoopJoinNode.this.endRight();
                }

                @Override
                public void onError(Throwable e) {
                    CorrelatedNestedLoopJoinNode.this.onError(e);
                }
            };
        }
        throw new IndexOutOfBoundsException();
    }

    @Override
    protected void dumpDebugInfo0(IgniteStringBuilder buf) {
        buf.app("class=").app(this.getClass().getSimpleName()).app(", requested=").app(this.requested).app(", waitingLeft=").app(this.waitingLeft).app(", waitingRight=").app(this.waitingRight).app(", state=").app((Object)this.state);
    }

    private void pushLeft(RowT row) throws Exception {
        assert (this.downstream() != null);
        assert (this.waitingLeft > 0);
        --this.waitingLeft;
        if (this.leftInBuf == null) {
            this.leftInBuf = this.storageFactory.list((RowHandler<RowT>)this.context().rowAccessor(), this.leftRowFactory, this.leftInBufferSize);
        }
        this.leftInBuf.add(row);
        this.onPushLeft();
    }

    private void pushRight(RowT row) throws Exception {
        assert (this.downstream() != null);
        assert (this.waitingRight > 0);
        --this.waitingRight;
        if (this.rightInBuf == null) {
            this.rightInBuf = this.storageFactory.queue((RowHandler<RowT>)this.context().rowAccessor(), this.rightRowFactory, this.rightInBufferSize);
        }
        this.rightInBuf.add(row);
        this.onPushRight();
    }

    private void endLeft() throws Exception {
        assert (this.downstream() != null);
        assert (this.waitingLeft > 0);
        this.waitingLeft = -1;
        if (this.leftInBuf == null) {
            this.leftInBuf = this.storageFactory.emptyList();
        }
        this.onEndLeft();
    }

    private void endRight() throws Exception {
        assert (this.downstream() != null);
        assert (this.waitingRight > 0);
        this.waitingRight = -1;
        if (this.rightInBuf == null) {
            this.rightInBuf = this.storageFactory.emptyQueue();
        }
        this.onEndRight();
    }

    private void onRequest() throws Exception {
        switch (this.state) {
            case IN_LOOP: 
            case FILLING_RIGHT: 
            case FILLING_LEFT: {
                break;
            }
            case INITIAL: {
                assert (this.waitingLeft == 0);
                assert (this.waitingRight == 0);
                assert (CollectionUtils.nullOrEmpty(this.leftInBuf));
                assert (CollectionUtils.nullOrEmpty(this.rightInBuf));
                this.execute(() -> {
                    this.state = State.FILLING_LEFT;
                    this.waitingLeft = this.leftInBufferSize;
                    this.leftSource().request(this.waitingLeft);
                });
                break;
            }
            case IDLE: {
                assert (this.rightInBuf != null);
                assert (this.leftInBuf != null);
                assert (this.waitingRight == -1 || this.waitingRight == 0);
                assert (this.waitingLeft == -1 || this.waitingLeft == 0);
                this.execute(this::join);
                break;
            }
            case END: {
                this.downstream().end();
                break;
            }
            default: {
                throw new AssertionError((Object)("Unexpected state:" + this.state));
            }
        }
    }

    private void onPushLeft() throws Exception {
        assert (this.state == State.FILLING_LEFT) : "Unexpected state:" + this.state;
        assert (this.waitingRight == 0 || this.waitingRight == -1);
        assert (CollectionUtils.nullOrEmpty(this.rightInBuf));
        if (this.leftInBuf.size() == this.leftInBufferSize) {
            assert (this.waitingLeft == 0);
            this.prepareCorrelations();
            this.rightSource().rewind();
            this.state = State.FILLING_RIGHT;
            this.waitingRight = this.rightInBufferSize;
            this.rightSource().request(this.waitingRight);
        }
    }

    private void onPushRight() throws Exception {
        assert (this.state == State.FILLING_RIGHT) : "Unexpected state:" + this.state;
        assert (!CollectionUtils.nullOrEmpty(this.leftInBuf));
        assert (this.waitingLeft == -1 || this.waitingLeft == 0 && this.leftInBuf.size() == this.leftInBufferSize);
        if (this.rightInBuf.size() == this.rightInBufferSize) {
            assert (this.waitingRight == 0);
            this.state = State.IDLE;
            this.join();
        }
    }

    private void onEndLeft() throws Exception {
        assert (this.state == State.FILLING_LEFT) : "Unexpected state:" + this.state;
        assert (this.waitingLeft == -1);
        assert (this.waitingRight == 0 || this.waitingRight == -1);
        assert (CollectionUtils.nullOrEmpty(this.rightInBuf));
        if (CollectionUtils.nullOrEmpty(this.leftInBuf)) {
            this.waitingRight = -1;
            this.state = State.END;
            if (this.requested > 0) {
                this.downstream().end();
            }
        } else {
            this.prepareCorrelations();
            if (this.waitingRight == -1) {
                this.rightSource().rewind();
            }
            this.state = State.FILLING_RIGHT;
            this.waitingRight = this.rightInBufferSize;
            this.rightSource().request(this.waitingRight);
        }
    }

    private void onEndRight() throws Exception {
        assert (this.state == State.FILLING_RIGHT) : "Unexpected state:" + this.state;
        assert (this.waitingRight == -1);
        assert (!CollectionUtils.nullOrEmpty(this.leftInBuf));
        assert (this.waitingLeft == -1 || this.waitingLeft == 0 && this.leftInBuf.size() == this.leftInBufferSize);
        this.state = State.IDLE;
        this.join();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void join() throws Exception {
        assert (this.state == State.IDLE);
        this.state = State.IN_LOOP;
        int processed = 0;
        try {
            while (this.requested > 0 && !this.rightInBuf.isEmpty()) {
                if (this.leftIdx == this.leftInBuf.size()) {
                    this.leftIdx = 0;
                }
                while (this.requested > 0 && this.leftIdx < this.leftInBuf.size()) {
                    RowT right;
                    if (processed++ > this.inBufSize) {
                        this.execute(this::join);
                        if (this.leftIdx == this.leftInBuf.size()) {
                            this.rightInBuf.poll();
                        }
                        return;
                    }
                    RowT left = this.leftInBuf.get(this.leftIdx);
                    if (this.cond.test(left, right = this.rightInBuf.peek())) {
                        this.leftMatched.set(this.leftIdx);
                        --this.requested;
                        Object row = this.joinProjection.project(this.context(), left, right);
                        this.acquireRow(row);
                        this.downstream().push(row);
                        this.releaseRow(row);
                    }
                    ++this.leftIdx;
                }
                if (this.leftIdx != this.leftInBuf.size()) continue;
                this.rightInBuf.poll();
            }
        }
        finally {
            this.state = State.IDLE;
        }
        if (this.rightInBuf.isEmpty()) {
            this.leftIdx = 0;
            if (this.waitingRight == 0) {
                this.rightInBuf.close();
                this.rightInBuf = null;
                this.state = State.FILLING_RIGHT;
                this.waitingRight = this.rightInBufferSize;
                this.rightSource().request(this.waitingRight);
                return;
            }
            if (this.joinType == JoinRelType.LEFT && !CollectionUtils.nullOrEmpty(this.leftInBuf)) {
                int notMatchedIdx = this.leftMatched.nextClearBit(0);
                this.state = State.IN_LOOP;
                try {
                    while (this.requested > 0 && notMatchedIdx < this.leftInBuf.size()) {
                        if (processed++ > this.inBufSize) {
                            this.execute(this::join);
                            return;
                        }
                        --this.requested;
                        Object row = this.joinProjection.project(this.context(), this.leftInBuf.get(notMatchedIdx), this.rightEmptyRow);
                        this.acquireRow(row);
                        this.downstream().push(row);
                        this.releaseRow(row);
                        this.leftMatched.set(notMatchedIdx);
                        notMatchedIdx = this.leftMatched.nextClearBit(notMatchedIdx + 1);
                    }
                }
                finally {
                    this.state = State.IDLE;
                }
                if (this.requested == 0 && notMatchedIdx < this.leftInBuf.size()) {
                    return;
                }
            }
            if (this.waitingLeft == 0) {
                this.releaseBuffers();
                this.rightInBuf = null;
                this.leftInBuf = null;
                this.leftMatched.clear();
                this.state = State.FILLING_LEFT;
                this.waitingLeft = this.leftInBufferSize;
                this.leftSource().request(this.waitingLeft);
                return;
            }
            assert (this.waitingLeft == -1 && this.waitingRight == -1);
            if (this.requested > 0) {
                this.releaseBuffers();
                this.leftInBuf = null;
                this.rightInBuf = null;
                this.state = State.END;
                if (this.requested > 0) {
                    this.downstream().end();
                }
                return;
            }
            this.releaseBuffers();
        }
    }

    private Node<RowT> leftSource() {
        return this.sources().get(0);
    }

    private Node<RowT> rightSource() {
        return this.sources().get(1);
    }

    private void prepareCorrelations() {
        for (int i = 0; i < this.correlationIds.size(); ++i) {
            RowT row = i < this.leftInBuf.size() ? this.leftInBuf.get(i) : CollectionUtils.first(this.leftInBuf);
            this.context().correlatedVariable(row, this.correlationIds.get(i).getId());
        }
    }

    private void releaseBuffers() {
        if (this.leftInBuf != null) {
            IgniteUtils.closeQuiet(this.leftInBuf::close);
            this.leftInBuf = null;
        }
        if (this.rightInBuf != null) {
            IgniteUtils.closeQuiet(this.rightInBuf::close);
            this.rightInBuf = null;
        }
    }

    private static enum State {
        INITIAL,
        FILLING_LEFT,
        FILLING_RIGHT,
        IDLE,
        IN_LOOP,
        END;

    }
}

