/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.sql.engine.exec.rel;

import java.util.Comparator;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite3.internal.close.ManuallyCloseable;
import org.apache.ignite3.internal.lang.IgniteStringBuilder;
import org.apache.ignite3.internal.sql.engine.api.expressions.RowFactory;
import org.apache.ignite3.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite3.internal.sql.engine.exec.RowHandler;
import org.apache.ignite3.internal.sql.engine.exec.exp.SqlJoinProjection;
import org.apache.ignite3.internal.sql.engine.exec.memory.structures.RowCollection;
import org.apache.ignite3.internal.sql.engine.exec.memory.structures.RowList;
import org.apache.ignite3.internal.sql.engine.exec.memory.structures.RowQueue;
import org.apache.ignite3.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite3.internal.sql.engine.exec.rel.Downstream;
import org.apache.ignite3.internal.sql.engine.exec.rel.Node;
import org.apache.ignite3.internal.sql.engine.util.TypeUtils;
import org.apache.ignite3.internal.type.StructNativeType;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;

public abstract class MergeJoinNode<RowT>
extends AbstractNode<RowT> {
    protected final Comparator<RowT> comp;
    protected int requested;
    protected int waitingLeft;
    protected int waitingRight;
    protected final RowQueue<RowT> rightInBuf;
    protected final RowQueue<RowT> leftInBuf;
    protected final RowFactory<RowT> leftRowFactory;
    protected final RowFactory<RowT> rightRowFactory;
    protected boolean inLoop;

    private MergeJoinNode(ExecutionContext<RowT> ctx, Comparator<RowT> comp, RowFactory<RowT> leftRowFactory, RowFactory<RowT> rightRowFactory) {
        super(ctx);
        this.comp = comp;
        this.leftRowFactory = leftRowFactory;
        this.rightRowFactory = rightRowFactory;
        this.leftInBuf = ctx.storageFactory().queue((RowHandler<RowT>)ctx.rowAccessor(), leftRowFactory, this.inBufSize);
        this.rightInBuf = ctx.storageFactory().queue((RowHandler<RowT>)ctx.rowAccessor(), rightRowFactory, this.inBufSize);
    }

    @Override
    public void request(int rowsCnt) throws Exception {
        assert (!CollectionUtils.nullOrEmpty(this.sources()) && this.sources().size() == 2);
        assert (rowsCnt > 0 && this.requested == 0);
        this.requested = rowsCnt;
        if (!this.inLoop) {
            this.execute(this::join);
        }
    }

    @Override
    protected void rewindInternal() {
        this.requested = 0;
        this.waitingLeft = 0;
        this.waitingRight = 0;
        this.rightInBuf.clear();
        this.leftInBuf.clear();
    }

    @Override
    protected void closeInternal() {
        super.closeInternal();
        IgniteUtils.closeQuiet(this.leftInBuf::close);
        IgniteUtils.closeQuiet(this.rightInBuf::close);
    }

    void close(@Nullable ManuallyCloseable closeable) {
        if (closeable != null) {
            IgniteUtils.closeQuiet(closeable::close);
        }
    }

    void clear(@Nullable RowCollection<?> collection) {
        if (collection != null) {
            collection.clear();
        }
    }

    @Override
    protected Downstream<RowT> requestDownstream(int idx) {
        if (idx == 0) {
            return new Downstream<RowT>(){

                @Override
                public void push(RowT row) throws Exception {
                    MergeJoinNode.this.pushLeft(row);
                }

                @Override
                public void end() throws Exception {
                    MergeJoinNode.this.endLeft();
                }

                @Override
                public void onError(Throwable e) {
                    MergeJoinNode.this.onError(e);
                }
            };
        }
        if (idx == 1) {
            return new Downstream<RowT>(){

                @Override
                public void push(RowT row) throws Exception {
                    MergeJoinNode.this.pushRight(row);
                }

                @Override
                public void end() throws Exception {
                    MergeJoinNode.this.endRight();
                }

                @Override
                public void onError(Throwable e) {
                    MergeJoinNode.this.onError(e);
                }
            };
        }
        throw new IndexOutOfBoundsException();
    }

    @Override
    protected void dumpDebugInfo0(IgniteStringBuilder buf) {
        buf.app("class=").app(this.getClass().getSimpleName()).app(", requested=").app(this.requested).app(", waitingLeft=").app(this.waitingLeft).app(", waitingRight=").app(this.waitingRight);
    }

    private void pushLeft(RowT row) throws Exception {
        assert (this.downstream() != null);
        assert (this.waitingLeft > 0);
        --this.waitingLeft;
        this.leftInBuf.add(row);
        if (this.waitingLeft == 0 && this.waitingRight <= 0) {
            this.join();
        }
    }

    private void pushRight(RowT row) throws Exception {
        assert (this.downstream() != null);
        assert (this.waitingRight > 0);
        --this.waitingRight;
        this.rightInBuf.add(row);
        if (this.waitingRight == 0 && this.waitingLeft <= 0) {
            this.join();
        }
    }

    private void endLeft() throws Exception {
        assert (this.downstream() != null);
        assert (this.waitingLeft > 0);
        this.waitingLeft = -1;
        if (this.waitingRight <= 0) {
            this.join();
        }
    }

    private void endRight() throws Exception {
        assert (this.downstream() != null);
        assert (this.waitingRight > 0);
        this.waitingRight = -1;
        if (this.waitingLeft <= 0) {
            this.join();
        }
    }

    protected Node<RowT> leftSource() {
        return this.sources().get(0);
    }

    protected Node<RowT> rightSource() {
        return this.sources().get(1);
    }

    protected abstract void join() throws Exception;

    public static <RowT> MergeJoinNode<RowT> create(ExecutionContext<RowT> ctx, RelDataType leftRowType, RelDataType rightRowType, JoinRelType joinType, Comparator<RowT> comp, @Nullable SqlJoinProjection outputProjection) {
        StructNativeType leftRowSchema = TypeUtils.convertStructuredType(leftRowType);
        StructNativeType rightRowSchema = TypeUtils.convertStructuredType(rightRowType);
        RowFactory<RowT> leftRowFactory = ctx.rowFactoryFactory().create(leftRowSchema);
        RowFactory<RowT> rightRowFactory = ctx.rowFactoryFactory().create(rightRowSchema);
        switch (joinType) {
            case INNER: {
                assert (outputProjection != null);
                return new InnerJoin<RowT>(ctx, comp, outputProjection, leftRowFactory, rightRowFactory);
            }
            case LEFT: {
                assert (outputProjection != null);
                return new LeftJoin<RowT>(ctx, comp, outputProjection, leftRowFactory, rightRowFactory);
            }
            case RIGHT: {
                assert (outputProjection != null);
                return new RightJoin<RowT>(ctx, comp, outputProjection, leftRowFactory, rightRowFactory);
            }
            case FULL: {
                assert (outputProjection != null);
                return new FullOuterJoin<RowT>(ctx, comp, outputProjection, leftRowFactory, rightRowFactory);
            }
            case SEMI: {
                assert (outputProjection == null);
                return new SemiJoin<RowT>(ctx, comp, leftRowFactory, rightRowFactory);
            }
            case ANTI: {
                assert (outputProjection == null);
                return new AntiJoin<RowT>(ctx, comp, leftRowFactory, rightRowFactory);
            }
        }
        throw new IllegalStateException("Join type \"" + joinType + "\" is not supported yet");
    }

    private static class InnerJoin<RowT>
    extends MergeJoinNode<RowT> {
        private final SqlJoinProjection outputProjection;
        private RowT left;
        private RowT right;
        private RowList<RowT> rightMaterialization;
        private int rightIdx;
        private boolean drainMaterialization;

        private InnerJoin(ExecutionContext<RowT> ctx, Comparator<RowT> comp, SqlJoinProjection outputProjection, RowFactory<RowT> leftRowFactory, RowFactory<RowT> rightRowFactory) {
            super(ctx, comp, leftRowFactory, rightRowFactory);
            this.outputProjection = outputProjection;
        }

        @Override
        protected void rewindInternal() {
            this.releaseNullableRow(this.left);
            this.releaseNullableRow(this.right);
            this.clear(this.rightMaterialization);
            this.left = null;
            this.right = null;
            this.rightIdx = 0;
            this.drainMaterialization = false;
            super.rewindInternal();
        }

        @Override
        protected void closeInternal() {
            super.closeInternal();
            this.releaseNullableRow(this.left);
            this.releaseNullableRow(this.right);
            this.close(this.rightMaterialization);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void join() throws Exception {
            int processed = 0;
            this.inLoop = true;
            try {
                while (!(this.requested <= 0 || this.left == null && this.leftInBuf.isEmpty() || this.right == null && this.rightInBuf.isEmpty() && CollectionUtils.nullOrEmpty(this.rightMaterialization))) {
                    Object row;
                    if (processed++ > this.inBufSize) {
                        this.execute(this::join);
                        return;
                    }
                    if (this.left == null) {
                        this.left = this.leftInBuf.remove();
                        this.acquireRow(this.left);
                    }
                    if (this.right == null) {
                        if (this.rightInBuf.isEmpty() && this.waitingRight != -1) {
                            break;
                        }
                        if (!this.rightInBuf.isEmpty()) {
                            this.right = this.rightInBuf.remove();
                            this.acquireRow(this.right);
                        }
                    }
                    if (this.right == null && !CollectionUtils.nullOrEmpty(this.rightMaterialization) && !this.drainMaterialization) {
                        this.drainMaterialization = true;
                        this.releaseRow(this.left);
                        this.left = null;
                        continue;
                    }
                    if (!this.drainMaterialization) {
                        int cmp = this.comp.compare(this.left, this.right);
                        if (cmp < 0) {
                            this.releaseRow(this.left);
                            this.left = null;
                            this.rightIdx = 0;
                            if (CollectionUtils.nullOrEmpty(this.rightMaterialization)) continue;
                            this.drainMaterialization = true;
                            continue;
                        }
                        if (cmp > 0) {
                            this.releaseRow(this.right);
                            this.right = null;
                            this.rightIdx = 0;
                            this.clear(this.rightMaterialization);
                            continue;
                        }
                        if (CollectionUtils.nullOrEmpty(this.rightMaterialization) && (!this.rightInBuf.isEmpty() || this.waitingRight != -1)) {
                            if (this.rightInBuf.isEmpty()) {
                                break;
                            }
                            if (this.rightMaterialization == null && this.comp.compare(this.left, this.rightInBuf.peek()) == 0) {
                                this.rightMaterialization = this.context().storageFactory().list(this.context().rowAccessor(), this.rightRowFactory);
                            }
                        }
                        row = this.outputProjection.project(this.context(), this.left, this.right);
                        this.acquireRow(row);
                        if (this.rightMaterialization != null) {
                            this.rightMaterialization.add(this.right);
                            this.releaseRow(this.right);
                            this.right = null;
                        } else {
                            this.releaseRow(this.left);
                            this.left = null;
                        }
                    } else {
                        RowT right;
                        int cmp;
                        if (this.rightIdx >= this.rightMaterialization.size()) {
                            this.rightIdx = 0;
                            this.releaseRow(this.left);
                            this.left = null;
                            continue;
                        }
                        if ((cmp = this.comp.compare(this.left, right = this.rightMaterialization.get(this.rightIdx++))) > 0) {
                            this.rightIdx = 0;
                            this.rightMaterialization.clear();
                            this.drainMaterialization = false;
                            continue;
                        }
                        row = this.outputProjection.project(this.context(), this.left, right);
                        this.acquireRow(row);
                    }
                    --this.requested;
                    this.downstream().push(row);
                    this.releaseRow(row);
                }
            }
            finally {
                this.inLoop = false;
            }
            if (this.requested > 0 && (this.waitingLeft == -1 && this.left == null && this.leftInBuf.isEmpty() || this.waitingRight == -1 && this.right == null && this.rightInBuf.isEmpty() && CollectionUtils.nullOrEmpty(this.rightMaterialization))) {
                this.requested = 0;
                this.rightInBuf.clear();
                this.leftInBuf.clear();
                this.downstream().end();
                return;
            }
            if (this.waitingRight == 0) {
                this.waitingRight = this.inBufSize;
                this.rightSource().request(this.waitingRight);
            }
            if (this.waitingLeft == 0) {
                this.waitingLeft = this.inBufSize;
                this.leftSource().request(this.waitingLeft);
            }
        }
    }

    private static class LeftJoin<RowT>
    extends MergeJoinNode<RowT> {
        private final SqlJoinProjection outputProjection;
        private RowT left;
        private RowT right;
        private RowList<RowT> rightMaterialization;
        private int rightIdx;
        private boolean drainMaterialization;
        private boolean matched;

        private LeftJoin(ExecutionContext<RowT> ctx, Comparator<RowT> comp, SqlJoinProjection outputProjection, RowFactory<RowT> leftRowFactory, RowFactory<RowT> rightRowFactory) {
            super(ctx, comp, leftRowFactory, rightRowFactory);
            this.outputProjection = outputProjection;
        }

        @Override
        protected void rewindInternal() {
            this.releaseNullableRow(this.left);
            this.releaseNullableRow(this.right);
            this.clear(this.rightMaterialization);
            this.left = null;
            this.right = null;
            this.rightIdx = 0;
            this.drainMaterialization = false;
            super.rewindInternal();
        }

        @Override
        protected void closeInternal() {
            super.closeInternal();
            this.releaseNullableRow(this.left);
            this.releaseNullableRow(this.right);
            this.close(this.rightMaterialization);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void join() throws Exception {
            int processed = 0;
            this.inLoop = true;
            try {
                while (!(this.requested <= 0 || this.left == null && this.leftInBuf.isEmpty() || this.right == null && this.rightInBuf.isEmpty() && CollectionUtils.nullOrEmpty(this.rightMaterialization) && this.waitingRight != -1)) {
                    Object row;
                    if (processed++ > this.inBufSize) {
                        this.execute(this::join);
                        return;
                    }
                    if (this.left == null) {
                        this.left = this.leftInBuf.remove();
                        this.acquireRow(this.left);
                        this.matched = false;
                    }
                    if (this.right == null) {
                        if (this.rightInBuf.isEmpty() && this.waitingRight != -1) {
                            break;
                        }
                        if (!this.rightInBuf.isEmpty()) {
                            this.right = this.rightInBuf.remove();
                            this.acquireRow(this.right);
                        }
                    }
                    if (this.right == null && !CollectionUtils.nullOrEmpty(this.rightMaterialization) && !this.drainMaterialization) {
                        this.drainMaterialization = true;
                        this.releaseRow(this.left);
                        this.left = null;
                        continue;
                    }
                    if (!this.drainMaterialization) {
                        if (this.right == null) {
                            row = this.outputProjection.project(this.context(), this.left, this.rightRowFactory.create());
                            this.acquireRow(row);
                            --this.requested;
                            this.downstream().push(row);
                            this.releaseRow(row);
                            this.releaseRow(this.left);
                            this.left = null;
                            continue;
                        }
                        int cmp = this.comp.compare(this.left, this.right);
                        if (cmp < 0) {
                            if (!this.matched) {
                                row = this.outputProjection.project(this.context(), this.left, this.rightRowFactory.create());
                                this.acquireRow(row);
                                --this.requested;
                                this.downstream().push(row);
                                this.releaseRow(row);
                            }
                            this.releaseRow(this.left);
                            this.left = null;
                            this.rightIdx = 0;
                            if (CollectionUtils.nullOrEmpty(this.rightMaterialization)) continue;
                            this.drainMaterialization = true;
                            continue;
                        }
                        if (cmp > 0) {
                            this.releaseRow(this.right);
                            this.right = null;
                            this.rightIdx = 0;
                            this.clear(this.rightMaterialization);
                            continue;
                        }
                        this.matched = true;
                        if (CollectionUtils.nullOrEmpty(this.rightMaterialization) && (!this.rightInBuf.isEmpty() || this.waitingRight != -1)) {
                            if (this.rightInBuf.isEmpty()) {
                                break;
                            }
                            if (this.rightMaterialization == null && this.comp.compare(this.left, this.rightInBuf.peek()) == 0) {
                                this.rightMaterialization = this.context().storageFactory().list(this.context().rowAccessor(), this.rightRowFactory);
                            }
                        }
                        row = this.outputProjection.project(this.context(), this.left, this.right);
                        this.acquireRow(row);
                        if (this.rightMaterialization != null) {
                            this.rightMaterialization.add(this.right);
                            this.releaseRow(this.right);
                            this.right = null;
                        } else {
                            this.releaseRow(this.left);
                            this.left = null;
                        }
                    } else {
                        RowT right;
                        int cmp;
                        if (this.rightIdx >= this.rightMaterialization.size()) {
                            this.rightIdx = 0;
                            this.releaseRow(this.left);
                            this.left = null;
                            continue;
                        }
                        if ((cmp = this.comp.compare(this.left, right = this.rightMaterialization.get(this.rightIdx++))) > 0) {
                            this.rightIdx = 0;
                            this.rightMaterialization.clear();
                            this.drainMaterialization = false;
                            continue;
                        }
                        row = this.outputProjection.project(this.context(), this.left, right);
                        this.acquireRow(row);
                    }
                    --this.requested;
                    this.downstream().push(row);
                    this.releaseRow(row);
                }
            }
            finally {
                this.inLoop = false;
            }
            if (this.requested > 0 && this.waitingLeft == -1 && this.left == null && this.leftInBuf.isEmpty()) {
                this.requested = 0;
                this.rightInBuf.clear();
                this.downstream().end();
            }
            if (this.waitingRight == 0) {
                this.waitingRight = this.inBufSize;
                this.rightSource().request(this.waitingRight);
            }
            if (this.waitingLeft == 0) {
                this.waitingLeft = this.inBufSize;
                this.leftSource().request(this.waitingLeft);
            }
        }
    }

    private static class RightJoin<RowT>
    extends MergeJoinNode<RowT> {
        private final SqlJoinProjection outputProjection;
        private RowT left;
        private RowT right;
        private RowList<RowT> rightMaterialization;
        private int rightIdx;
        private boolean drainMaterialization;
        private boolean matched;

        private RightJoin(ExecutionContext<RowT> ctx, Comparator<RowT> comp, SqlJoinProjection outputProjection, RowFactory<RowT> leftRowFactory, RowFactory<RowT> rightRowFactory) {
            super(ctx, comp, leftRowFactory, rightRowFactory);
            this.outputProjection = outputProjection;
        }

        @Override
        protected void rewindInternal() {
            this.releaseNullableRow(this.left);
            this.releaseNullableRow(this.right);
            this.clear(this.rightMaterialization);
            this.left = null;
            this.right = null;
            this.rightIdx = 0;
            this.drainMaterialization = false;
            super.rewindInternal();
        }

        @Override
        protected void closeInternal() {
            super.closeInternal();
            this.releaseNullableRow(this.left);
            this.releaseNullableRow(this.right);
            this.close(this.rightMaterialization);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void join() throws Exception {
            int processed = 0;
            this.inLoop = true;
            try {
                while (!(this.requested <= 0 || this.left == null && this.leftInBuf.isEmpty() && this.waitingLeft != -1 || this.right == null && this.rightInBuf.isEmpty() && CollectionUtils.nullOrEmpty(this.rightMaterialization))) {
                    Object row;
                    if (processed++ > this.inBufSize) {
                        this.execute(this::join);
                        return;
                    }
                    if (this.left == null && !this.leftInBuf.isEmpty()) {
                        this.left = this.leftInBuf.remove();
                        this.acquireRow(this.left);
                    }
                    if (this.right == null) {
                        if (this.rightInBuf.isEmpty() && this.waitingRight != -1) {
                            break;
                        }
                        if (!this.rightInBuf.isEmpty()) {
                            this.right = this.rightInBuf.remove();
                            this.acquireRow(this.right);
                            this.matched = false;
                        }
                    }
                    if (this.right == null && !CollectionUtils.nullOrEmpty(this.rightMaterialization) && !this.drainMaterialization) {
                        this.drainMaterialization = true;
                        this.releaseRow(this.left);
                        this.left = null;
                        continue;
                    }
                    if (!this.drainMaterialization) {
                        if (this.left == null) {
                            if (!this.matched) {
                                row = this.outputProjection.project(this.context(), this.leftRowFactory.create(), this.right);
                                this.acquireRow(row);
                                --this.requested;
                                this.downstream().push(row);
                                this.releaseRow(row);
                            }
                            this.releaseRow(this.right);
                            this.right = null;
                            continue;
                        }
                        int cmp = this.comp.compare(this.left, this.right);
                        if (cmp < 0) {
                            this.releaseRow(this.left);
                            this.left = null;
                            this.rightIdx = 0;
                            if (CollectionUtils.nullOrEmpty(this.rightMaterialization)) continue;
                            this.drainMaterialization = true;
                            continue;
                        }
                        if (cmp > 0) {
                            if (!this.matched) {
                                row = this.outputProjection.project(this.context(), this.leftRowFactory.create(), this.right);
                                this.acquireRow(row);
                                --this.requested;
                                this.downstream().push(row);
                                this.releaseRow(row);
                            }
                            this.releaseRow(this.right);
                            this.right = null;
                            this.rightIdx = 0;
                            this.clear(this.rightMaterialization);
                            continue;
                        }
                        if (CollectionUtils.nullOrEmpty(this.rightMaterialization) && (!this.rightInBuf.isEmpty() || this.waitingRight != -1)) {
                            if (this.rightInBuf.isEmpty()) {
                                break;
                            }
                            if (this.rightMaterialization == null && this.comp.compare(this.left, this.rightInBuf.peek()) == 0) {
                                this.rightMaterialization = this.context().storageFactory().list(this.context().rowAccessor(), this.rightRowFactory);
                            }
                        }
                        this.matched = true;
                        row = this.outputProjection.project(this.context(), this.left, this.right);
                        this.acquireRow(row);
                        if (this.rightMaterialization != null) {
                            this.rightMaterialization.add(this.right);
                            this.releaseRow(this.right);
                            this.right = null;
                        } else {
                            this.releaseRow(this.left);
                            this.left = null;
                        }
                    } else {
                        RowT right;
                        int cmp;
                        if (this.left == null) {
                            if (this.waitingLeft != -1) continue;
                            this.rightIdx = 0;
                            this.rightMaterialization.clear();
                            this.drainMaterialization = false;
                            continue;
                        }
                        if (this.rightIdx >= this.rightMaterialization.size()) {
                            this.rightIdx = 0;
                            this.releaseRow(this.left);
                            this.left = null;
                            continue;
                        }
                        if ((cmp = this.comp.compare(this.left, right = this.rightMaterialization.get(this.rightIdx++))) > 0) {
                            this.rightIdx = 0;
                            this.rightMaterialization.clear();
                            this.drainMaterialization = false;
                            continue;
                        }
                        row = this.outputProjection.project(this.context(), this.left, right);
                        this.acquireRow(row);
                    }
                    --this.requested;
                    this.downstream().push(row);
                    this.releaseRow(row);
                }
            }
            finally {
                this.inLoop = false;
            }
            if (this.requested > 0 && this.waitingRight == -1 && this.right == null && this.rightInBuf.isEmpty() && CollectionUtils.nullOrEmpty(this.rightMaterialization)) {
                this.requested = 0;
                this.leftInBuf.clear();
                this.downstream().end();
            }
            if (this.waitingRight == 0) {
                this.waitingRight = this.inBufSize;
                this.rightSource().request(this.waitingRight);
            }
            if (this.waitingLeft == 0) {
                this.waitingLeft = this.inBufSize;
                this.leftSource().request(this.waitingLeft);
            }
        }
    }

    private static class FullOuterJoin<RowT>
    extends MergeJoinNode<RowT> {
        private final SqlJoinProjection outputProjection;
        private RowT left;
        private RowT right;
        private RowList<RowT> rightMaterialization;
        private int rightIdx;
        private boolean drainMaterialization;
        private boolean leftMatched;
        private boolean rightMatched;

        private FullOuterJoin(ExecutionContext<RowT> ctx, Comparator<RowT> comp, SqlJoinProjection outputProjection, RowFactory<RowT> leftRowFactory, RowFactory<RowT> rightRowFactory) {
            super(ctx, comp, leftRowFactory, rightRowFactory);
            this.outputProjection = outputProjection;
        }

        @Override
        protected void rewindInternal() {
            this.releaseNullableRow(this.left);
            this.releaseNullableRow(this.right);
            this.clear(this.rightMaterialization);
            this.left = null;
            this.right = null;
            this.rightIdx = 0;
            this.drainMaterialization = false;
            super.rewindInternal();
        }

        @Override
        protected void closeInternal() {
            super.closeInternal();
            this.releaseNullableRow(this.left);
            this.releaseNullableRow(this.right);
            this.close(this.rightMaterialization);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void join() throws Exception {
            int processed = 0;
            this.inLoop = true;
            try {
                while (!(this.requested <= 0 || this.left == null && this.leftInBuf.isEmpty() && this.waitingLeft != -1 || this.right == null && this.rightInBuf.isEmpty() && !CollectionUtils.nullOrEmpty(this.rightMaterialization) && this.waitingRight != -1)) {
                    Object row;
                    if (processed++ > this.inBufSize) {
                        this.execute(this::join);
                        return;
                    }
                    if (this.left == null && !this.leftInBuf.isEmpty()) {
                        this.left = this.leftInBuf.remove();
                        this.acquireRow(this.left);
                        this.leftMatched = false;
                    }
                    if (this.right == null) {
                        if (this.rightInBuf.isEmpty() && this.waitingRight != -1) {
                            break;
                        }
                        if (!this.rightInBuf.isEmpty()) {
                            this.right = this.rightInBuf.remove();
                            this.acquireRow(this.right);
                            this.rightMatched = false;
                        }
                    }
                    if (this.right == null && !CollectionUtils.nullOrEmpty(this.rightMaterialization) && !this.drainMaterialization) {
                        this.drainMaterialization = true;
                        this.releaseRow(this.left);
                        this.left = null;
                        continue;
                    }
                    if (!this.drainMaterialization) {
                        if (this.left == null || this.right == null) {
                            if (this.left == null && this.right != null) {
                                if (!this.rightMatched) {
                                    row = this.outputProjection.project(this.context(), this.leftRowFactory.create(), this.right);
                                    this.acquireRow(row);
                                    --this.requested;
                                    this.downstream().push(row);
                                    this.releaseRow(row);
                                }
                                this.releaseRow(this.right);
                                this.right = null;
                                continue;
                            }
                            if (this.left != null && this.right == null) {
                                if (!this.leftMatched) {
                                    row = this.outputProjection.project(this.context(), this.left, this.rightRowFactory.create());
                                    this.acquireRow(row);
                                    --this.requested;
                                    this.downstream().push(row);
                                    this.releaseRow(row);
                                }
                                this.releaseRow(this.left);
                                this.left = null;
                                continue;
                            }
                            break;
                        }
                        int cmp = this.comp.compare(this.left, this.right);
                        if (cmp < 0) {
                            if (!this.leftMatched) {
                                row = this.outputProjection.project(this.context(), this.left, this.rightRowFactory.create());
                                this.acquireRow(row);
                                --this.requested;
                                this.downstream().push(row);
                                this.releaseRow(row);
                            }
                            this.releaseRow(this.left);
                            this.left = null;
                            this.rightIdx = 0;
                            if (CollectionUtils.nullOrEmpty(this.rightMaterialization)) continue;
                            this.drainMaterialization = true;
                            continue;
                        }
                        if (cmp > 0) {
                            if (!this.rightMatched) {
                                row = this.outputProjection.project(this.context(), this.leftRowFactory.create(), this.right);
                                this.acquireRow(row);
                                --this.requested;
                                this.downstream().push(row);
                                this.releaseRow(row);
                            }
                            this.releaseRow(this.right);
                            this.right = null;
                            this.rightIdx = 0;
                            this.clear(this.rightMaterialization);
                            continue;
                        }
                        if (CollectionUtils.nullOrEmpty(this.rightMaterialization) && (!this.rightInBuf.isEmpty() || this.waitingRight != -1)) {
                            if (this.rightInBuf.isEmpty()) {
                                break;
                            }
                            if (this.rightMaterialization == null && this.comp.compare(this.left, this.rightInBuf.peek()) == 0) {
                                this.rightMaterialization = this.context().storageFactory().list(this.context().rowAccessor(), this.rightRowFactory);
                            }
                        }
                        this.leftMatched = true;
                        this.rightMatched = true;
                        row = this.outputProjection.project(this.context(), this.left, this.right);
                        this.acquireRow(row);
                        if (this.rightMaterialization != null) {
                            this.rightMaterialization.add(this.right);
                            this.releaseRow(this.right);
                            this.right = null;
                        } else {
                            this.releaseRow(this.left);
                            this.left = null;
                        }
                    } else {
                        RowT right;
                        int cmp;
                        if (this.left == null) {
                            if (this.waitingLeft != -1) continue;
                            this.rightIdx = 0;
                            this.rightMaterialization.clear();
                            this.drainMaterialization = false;
                            continue;
                        }
                        if (this.rightIdx >= this.rightMaterialization.size()) {
                            this.rightIdx = 0;
                            this.releaseRow(this.left);
                            this.left = null;
                            continue;
                        }
                        if ((cmp = this.comp.compare(this.left, right = this.rightMaterialization.get(this.rightIdx++))) > 0) {
                            this.rightIdx = 0;
                            this.rightMaterialization.clear();
                            this.drainMaterialization = false;
                            continue;
                        }
                        this.leftMatched = true;
                        row = this.outputProjection.project(this.context(), this.left, right);
                        this.acquireRow(row);
                    }
                    --this.requested;
                    this.downstream().push(row);
                    this.releaseRow(row);
                }
            }
            finally {
                this.inLoop = false;
            }
            if (this.requested > 0 && this.waitingLeft == -1 && this.left == null && this.leftInBuf.isEmpty() && this.waitingRight == -1 && this.right == null && this.rightInBuf.isEmpty() && CollectionUtils.nullOrEmpty(this.rightMaterialization)) {
                this.requested = 0;
                this.rightInBuf.clear();
                this.leftInBuf.clear();
                this.downstream().end();
                return;
            }
            if (this.waitingRight == 0) {
                this.waitingRight = this.inBufSize;
                this.rightSource().request(this.waitingRight);
            }
            if (this.waitingLeft == 0) {
                this.waitingLeft = this.inBufSize;
                this.leftSource().request(this.waitingLeft);
            }
        }
    }

    private static class SemiJoin<RowT>
    extends MergeJoinNode<RowT> {
        private RowT left;
        private RowT right;

        private SemiJoin(ExecutionContext<RowT> ctx, Comparator<RowT> comp, RowFactory<RowT> leftRowFactory, RowFactory<RowT> rightRowFactory) {
            super(ctx, comp, leftRowFactory, rightRowFactory);
        }

        @Override
        protected void rewindInternal() {
            this.releaseNullableRow(this.left);
            this.releaseNullableRow(this.right);
            this.left = null;
            this.right = null;
            super.rewindInternal();
        }

        @Override
        protected void closeInternal() {
            super.closeInternal();
            this.releaseNullableRow(this.left);
            this.releaseNullableRow(this.right);
        }

        @Override
        protected void join() throws Exception {
            int processed = 0;
            this.inLoop = true;
            try {
                while (!(this.requested <= 0 || this.left == null && this.leftInBuf.isEmpty() || this.right == null && this.rightInBuf.isEmpty())) {
                    int cmp;
                    if (processed++ > this.inBufSize) {
                        this.execute(this::join);
                        return;
                    }
                    if (this.left == null) {
                        this.left = this.leftInBuf.remove();
                        this.acquireRow(this.left);
                    }
                    if (this.right == null) {
                        this.right = this.rightInBuf.remove();
                        this.acquireRow(this.right);
                    }
                    if ((cmp = this.comp.compare(this.left, this.right)) < 0) {
                        this.releaseRow(this.left);
                        this.left = null;
                        continue;
                    }
                    if (cmp > 0) {
                        this.releaseRow(this.right);
                        this.right = null;
                        continue;
                    }
                    --this.requested;
                    this.downstream().push(this.left);
                    this.releaseRow(this.left);
                    this.left = null;
                }
            }
            finally {
                this.inLoop = false;
            }
            if (this.requested > 0 && (this.waitingLeft == -1 && this.left == null && this.leftInBuf.isEmpty() || this.waitingRight == -1 && this.right == null && this.rightInBuf.isEmpty())) {
                this.requested = 0;
                this.rightInBuf.clear();
                this.leftInBuf.clear();
                this.downstream().end();
                return;
            }
            if (this.waitingRight == 0) {
                this.waitingRight = this.inBufSize;
                this.rightSource().request(this.waitingRight);
            }
            if (this.waitingLeft == 0) {
                this.waitingLeft = this.inBufSize;
                this.leftSource().request(this.waitingLeft);
            }
        }
    }

    private static class AntiJoin<RowT>
    extends MergeJoinNode<RowT> {
        private RowT left;
        private RowT right;

        private AntiJoin(ExecutionContext<RowT> ctx, Comparator<RowT> comp, RowFactory<RowT> leftRowFactory, RowFactory<RowT> rightRowFactory) {
            super(ctx, comp, leftRowFactory, rightRowFactory);
        }

        @Override
        protected void rewindInternal() {
            this.releaseNullableRow(this.left);
            this.releaseNullableRow(this.right);
            this.left = null;
            this.right = null;
            super.rewindInternal();
        }

        @Override
        protected void closeInternal() {
            super.closeInternal();
            this.releaseNullableRow(this.left);
            this.releaseNullableRow(this.right);
        }

        @Override
        protected void join() throws Exception {
            int processed = 0;
            this.inLoop = true;
            try {
                while (!(this.requested <= 0 || this.left == null && this.leftInBuf.isEmpty() || this.right == null && this.rightInBuf.isEmpty() && this.waitingRight != -1)) {
                    if (processed++ > this.inBufSize) {
                        this.execute(this::join);
                        return;
                    }
                    if (this.left == null) {
                        this.left = this.leftInBuf.remove();
                        this.acquireRow(this.left);
                    }
                    if (this.right == null && !this.rightInBuf.isEmpty()) {
                        this.right = this.rightInBuf.remove();
                        this.acquireRow(this.right);
                    }
                    if (this.right != null) {
                        int cmp = this.comp.compare(this.left, this.right);
                        if (cmp == 0) {
                            this.releaseRow(this.left);
                            this.left = null;
                            continue;
                        }
                        if (cmp > 0) {
                            this.releaseRow(this.right);
                            this.right = null;
                            continue;
                        }
                    }
                    --this.requested;
                    this.downstream().push(this.left);
                    this.releaseRow(this.left);
                    this.left = null;
                }
            }
            finally {
                this.inLoop = false;
            }
            if (this.requested > 0 && this.waitingLeft == -1 && this.left == null && this.leftInBuf.isEmpty()) {
                this.requested = 0;
                this.rightInBuf.clear();
                this.downstream().end();
            }
            if (this.waitingRight == 0) {
                this.waitingRight = this.inBufSize;
                this.rightSource().request(this.waitingRight);
            }
            if (this.waitingLeft == 0) {
                this.waitingLeft = this.inBufSize;
                this.leftSource().request(this.waitingLeft);
            }
        }
    }
}

