/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.sql.engine.exec.rel;

import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.RowQueue;
import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite.internal.sql.engine.exec.rel.Downstream;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.CursorClosedException;
import org.jetbrains.annotations.Nullable;

public class AsyncRootNode<InRowT, OutRowT>
implements Downstream<InRowT>,
AsyncCursor<OutRowT> {
    private final CompletableFuture<Void> cancelFut = new CompletableFuture();
    private final Object lock = new Object();
    private final AtomicReference<Throwable> ex = new AtomicReference();
    private final AbstractNode<InRowT> source;
    private final Function<InRowT, OutRowT> converter;
    private final RowQueue<InRowT> buff;
    private final AtomicBoolean taskScheduled = new AtomicBoolean();
    private final Queue<PendingRequest<InRowT, OutRowT>> pendingRequests = new ConcurrentLinkedQueue<PendingRequest<InRowT, OutRowT>>();
    private final CompletableFuture<Void> prefetchFut = new CompletableFuture();
    private volatile boolean closed = false;
    private int waiting;

    public AsyncRootNode(ExecutionContext<InRowT> ctx, RowFactory<InRowT> rowFactory, AbstractNode<InRowT> source, Function<InRowT, OutRowT> converter) {
        this.source = source;
        this.converter = converter;
        this.buff = ctx.storageFactory().queue((RowHandler<InRowT>)ctx.rowAccessor(), rowFactory, 512);
    }

    @Override
    public void push(InRowT row) throws Exception {
        assert (this.waiting > 0) : this.waiting;
        this.buff.add(row);
        if (--this.waiting == 0) {
            this.completePrefetchFuture(null);
            this.flush();
        }
    }

    @Override
    public void end() throws Exception {
        assert (this.waiting > 0) : this.waiting;
        this.waiting = -1;
        this.completePrefetchFuture(null);
        this.flush();
    }

    @Override
    public void onError(Throwable e) {
        if (this.closed) {
            return;
        }
        if (!this.ex.compareAndSet(null, e)) {
            this.ex.get().addSuppressed(e);
        }
        this.closeAsync();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<AsyncCursor.BatchedResult<OutRowT>> requestNextAsync(int rows) {
        CompletableFuture<AsyncCursor.BatchedResult<OutRowT>> next = new CompletableFuture<AsyncCursor.BatchedResult<OutRowT>>();
        Object object = this.lock;
        synchronized (object) {
            Throwable t = this.ex.get();
            if (t != null) {
                next.completeExceptionally(t);
                return next;
            }
            if (this.closed) {
                next.completeExceptionally((Throwable)new CursorClosedException());
                return next;
            }
            this.pendingRequests.add(new PendingRequest(rows, next));
            this.scheduleTask();
        }
        return next;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> closeAsync() {
        if (!this.closed) {
            Object object = this.lock;
            synchronized (object) {
                if (!this.closed) {
                    Object th = this.ex.get();
                    if (!this.pendingRequests.isEmpty()) {
                        if (th == null) {
                            th = new QueryCancelledException();
                        }
                        Throwable th0 = th;
                        this.pendingRequests.forEach(req -> req.fut.completeExceptionally(th0));
                        this.pendingRequests.clear();
                    }
                    this.source.context().execute(() -> {
                        IgniteUtils.closeQuiet(() -> this.buff.close());
                        try {
                            this.source.close();
                            this.cancelFut.complete(null);
                        }
                        catch (Throwable t) {
                            this.cancelFut.completeExceptionally(t);
                            throw t;
                        }
                    }, this.source::onError);
                    this.completePrefetchFuture((Throwable)th);
                    this.closed = true;
                }
            }
        }
        return this.cancelFut;
    }

    public CompletableFuture<Void> startPrefetch() {
        assert (this.source.context().description().prefetch());
        if (this.waiting == 0) {
            try {
                this.source.checkState();
                this.waiting = 512;
                this.source.request(512);
            }
            catch (Exception ex) {
                this.onError(ex);
            }
        }
        return this.prefetchFut;
    }

    public boolean isClosed() {
        return this.cancelFut.isDone();
    }

    private void flush() throws Exception {
        boolean hasMoreRows;
        PendingRequest<InRowT, OutRowT> currentReq = this.pendingRequests.peek();
        if (currentReq == null) {
            return;
        }
        boolean bl = hasMoreRows = this.waiting != -1 || this.buff.size() > currentReq.requested;
        if (this.buff.size() > currentReq.requested || !hasMoreRows) {
            this.pendingRequests.poll();
            int size = Math.min(this.buff.size(), currentReq.requested);
            ArrayList<OutRowT> converted = new ArrayList<OutRowT>(size);
            for (int i = 0; i < size; ++i) {
                InRowT row = this.buff.remove();
                converted.add(this.converter.apply(row));
            }
            currentReq.fut.complete(new AsyncCursor.BatchedResult(converted, hasMoreRows));
        }
        if (this.waiting == 0) {
            this.waiting = 512;
            this.source.request(512);
        } else if (!hasMoreRows) {
            this.closeAsync();
        } else if (!this.pendingRequests.isEmpty()) {
            this.scheduleTask();
        }
    }

    private void scheduleTask() {
        if (!this.pendingRequests.isEmpty() && this.taskScheduled.compareAndSet(false, true)) {
            this.source.execute(() -> {
                this.taskScheduled.set(false);
                this.flush();
            });
        }
    }

    private void completePrefetchFuture(@Nullable Throwable ex) {
        if (!this.prefetchFut.isDone()) {
            if (ex != null) {
                this.prefetchFut.completeExceptionally(ex);
            } else {
                this.prefetchFut.complete(null);
            }
        }
    }

    private static class PendingRequest<InRowT, OutRowT> {
        private final CompletableFuture<AsyncCursor.BatchedResult<OutRowT>> fut;
        private final int requested;

        private PendingRequest(int requested, CompletableFuture<AsyncCursor.BatchedResult<OutRowT>> fut) {
            this.requested = requested;
            this.fut = fut;
        }
    }
}

