/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.query.h2.opt.join;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.gridgain.internal.h2.index.Cursor;
import org.gridgain.internal.h2.result.Row;
import org.gridgain.internal.h2.value.Value;

public class RangeStream {
    private final GridKernalContext ctx;
    private final H2TreeIndex idx;
    private final DistributedJoinContext joinCtx;
    private final ClusterNode node;
    private GridH2IndexRangeRequest req;
    private int remainingRanges;
    private final BlockingQueue<GridH2IndexRangeResponse> respQueue = new LinkedBlockingQueue<GridH2IndexRangeResponse>();
    private Iterator<GridH2RowRange> ranges = Collections.emptyIterator();
    private Cursor cursor = GridH2Cursor.EMPTY;
    private int cursorRangeId = -1;

    public RangeStream(GridKernalContext ctx, H2TreeIndex idx, DistributedJoinContext joinCtx, ClusterNode node) {
        this.ctx = ctx;
        this.idx = idx;
        this.node = node;
        this.joinCtx = joinCtx;
    }

    public void start() {
        this.remainingRanges = this.req.bounds().size();
        assert (this.remainingRanges > 0);
        this.idx.send(Collections.singletonList(this.node), this.req);
    }

    public void onResponse(GridH2IndexRangeResponse msg) {
        this.respQueue.add(msg);
    }

    public void request(GridH2IndexRangeRequest req) {
        this.req = req;
    }

    public GridH2IndexRangeRequest request() {
        return this.req;
    }

    private GridH2IndexRangeResponse awaitForResponse() {
        assert (this.remainingRanges > 0);
        long start = U.currentTimeMillis();
        int attempt = 0;
        while (true) {
            GridH2IndexRangeResponse res;
            if (this.joinCtx.isCancelled()) {
                throw H2Utils.retryException("Query is cancelled.");
            }
            if (this.ctx.isStopping()) {
                throw H2Utils.retryException("Local node is stopping.");
            }
            try {
                res = this.respQueue.poll(500L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ignored) {
                throw H2Utils.retryException("Interrupted while waiting for reply.");
            }
            if (res != null) {
                switch (res.status()) {
                    case 0: {
                        List<GridH2RowRange> ranges0 = res.ranges();
                        this.remainingRanges -= ranges0.size();
                        if (ranges0.get(ranges0.size() - 1).isPartial()) {
                            ++this.remainingRanges;
                        }
                        if (this.remainingRanges > 0) {
                            if (this.req.bounds() != null) {
                                this.req = DistributedLookupBatch.createRequest(this.joinCtx, this.req.batchLookupId(), this.req.segment());
                            }
                            this.idx.send(Collections.singletonList(this.node), this.req);
                        } else {
                            this.req = null;
                        }
                        return res;
                    }
                    case 2: {
                        if (this.req == null || this.req.bounds() == null) {
                            throw H2Utils.retryException("Failure on remote node.");
                        }
                        if (U.currentTimeMillis() - start > 30000L) {
                            throw H2Utils.retryException("Timeout reached.");
                        }
                        try {
                            U.sleep(20 * attempt);
                        }
                        catch (IgniteInterruptedCheckedException e) {
                            throw new IgniteInterruptedException(e.getMessage());
                        }
                        this.idx.send(Collections.singletonList(this.node), this.req);
                        break;
                    }
                    case 1: {
                        throw new CacheException(res.error());
                    }
                    default: {
                        throw new IllegalStateException();
                    }
                }
            }
            if (!this.ctx.discovery().alive(this.node)) {
                throw H2Utils.retryException("Node has left topology: " + this.node.id());
            }
            ++attempt;
        }
    }

    public boolean next(int rangeId) {
        while (true) {
            Iterator<GridH2RowMessage> it;
            if (rangeId == this.cursorRangeId) {
                if (this.cursor.next()) {
                    return true;
                }
            } else if (rangeId < this.cursorRangeId) {
                return false;
            }
            this.cursor = GridH2Cursor.EMPTY;
            while (!this.ranges.hasNext()) {
                if (this.remainingRanges == 0) {
                    this.ranges = Collections.emptyIterator();
                    return false;
                }
                this.ranges = this.awaitForResponse().ranges().iterator();
            }
            GridH2RowRange range = this.ranges.next();
            this.cursorRangeId = range.rangeId();
            if (F.isEmpty(range.rows()) || !(it = range.rows().iterator()).hasNext()) continue;
            this.cursor = new GridH2Cursor((Iterator<? extends Row>)new Iterator<Row>(){

                @Override
                public boolean hasNext() {
                    return it.hasNext();
                }

                @Override
                public Row next() {
                    return RangeStream.this.toRow((GridH2RowMessage)it.next());
                }

                @Override
                public void remove() {
                    throw new UnsupportedOperationException();
                }
            });
        }
    }

    private Row toRow(GridH2RowMessage msg) {
        if (msg == null) {
            return null;
        }
        List<GridH2ValueMessage> vals = msg.values();
        assert (!F.isEmpty(vals)) : vals;
        Value[] vals0 = new Value[vals.size()];
        for (int i = 0; i < vals0.length; ++i) {
            try {
                vals0[i] = vals.get(i).value(this.ctx);
                continue;
            }
            catch (IgniteCheckedException e) {
                throw new CacheException(e);
            }
        }
        return this.idx.getDatabase().createRow(vals0, -1);
    }

    public Row get(int rangeId) {
        assert (rangeId == this.cursorRangeId);
        return this.cursor.get();
    }
}

