package org.apache.ignite3.internal.sql.engine.exec.rel;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.ignite3.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.partition.replicator.network.replication.BinaryTupleMessage;
import org.apache.ignite3.internal.sql.engine.NodeLeftException;
import org.apache.ignite3.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite3.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite3.internal.sql.engine.exec.MailboxRegistry;
import org.apache.ignite3.internal.sql.engine.exec.MemoryContext;
import org.apache.ignite3.internal.sql.engine.exec.RowHandler;
import org.apache.ignite3.internal.sql.engine.exec.SharedState;
import org.apache.ignite3.internal.sql.engine.exec.structures.RowQueue;
import org.apache.ignite3.internal.sql.engine.exec.structures.RowStorageFactory;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.lang.ErrorGroups;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite3/internal/sql/engine/exec/rel/Inbox.class */
public class Inbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, SingleNode<RowT> {
    private final ExchangeService exchange;
    private final MailboxRegistry registry;
    private final long exchangeId;
    private final long srcFragmentId;
    private final Collection<String> srcNodeNames;

    @Nullable
    private final Comparator<RowT> comp;
    private final Map<String, RemoteSource<RowT>> perNodeBuffers;
    private final RowHandler.RowFactory<RowT> rowFactory;
    private final MemoryContext<RowT> memoryContext;

    @Nullable
    private List<RemoteSource<RowT>> remoteSources;
    private int requested;
    private boolean inLoop;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite3/internal/sql/engine/exec/rel/Inbox$Batch.class */
    public static final class Batch<RowT> implements Comparable<Batch<RowT>> {
        private final int batchId;
        private final boolean last;
        private final RowQueue<RowT> rows;

        private Batch(int i, boolean z, RowQueue<RowT> rowQueue) {
            this.batchId = i;
            this.last = z;
            this.rows = rowQueue;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.batchId == ((Batch) obj).batchId;
        }

        public int hashCode() {
            return this.batchId;
        }

        @Override // java.lang.Comparable
        public int compareTo(Batch<RowT> batch) {
            return Integer.compare(this.batchId, batch.batchId);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ignite3/internal/sql/engine/exec/rel/Inbox$RemoteSource.class */
    public static final class RemoteSource<RowT> {
        private final BatchRequester batchRequester;
        private final RowStorageFactory<RowT> storageFactory;
        private final RowHandler.RowFactory<RowT> rowFactory;
        static final /* synthetic */ boolean $assertionsDisabled;
        private final PriorityQueue<Batch<RowT>> batches = new PriorityQueue<>(4);
        private State state = State.WAITING;
        private int lastEnqueued = -1;
        private int lastRequested = -1;

        @Nullable
        private Batch<RowT> curr = null;

        @Nullable
        private SharedState sharedStateHolder = null;

        /* JADX INFO: Access modifiers changed from: private */
        @FunctionalInterface
        /* loaded from: input_file:org/apache/ignite3/internal/sql/engine/exec/rel/Inbox$RemoteSource$BatchRequester.class */
        public interface BatchRequester {
            void request(int i, @Nullable SharedState sharedState) throws IgniteInternalCheckedException;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/ignite3/internal/sql/engine/exec/rel/Inbox$RemoteSource$State.class */
        public enum State {
            END,
            READY,
            WAITING
        }

        private RemoteSource(BatchRequester batchRequester, RowStorageFactory<RowT> rowStorageFactory, RowHandler.RowFactory<RowT> rowFactory) {
            this.batchRequester = batchRequester;
            this.storageFactory = rowStorageFactory;
            this.rowFactory = rowFactory;
        }

        void reset(SharedState sharedState) {
            releaseRows();
            this.sharedStateHolder = sharedState;
            this.batches.clear();
            this.lastEnqueued = this.lastRequested;
            this.state = State.WAITING;
            this.curr = null;
        }

        void onBatchReceived(int i, boolean z, List<RowT> list) {
            RowQueue<RowT> queue;
            if (i <= this.lastEnqueued) {
                return;
            }
            if (list.isEmpty()) {
                queue = this.storageFactory.emptyQueue();
            } else {
                queue = this.storageFactory.queue(this.rowFactory, list.size());
                Objects.requireNonNull(queue);
                list.forEach(queue::add);
            }
            this.batches.add(new Batch<>(i, z, queue));
            if (this.state == State.WAITING && i == this.lastEnqueued + 1) {
                advanceBatch();
            }
        }

        void requestNextBatchIfNeeded() throws IgniteInternalCheckedException {
            int max = Math.max(4, 1);
            int i = this.lastRequested - this.lastEnqueued;
            if (max / 2 >= i) {
                int i2 = max - i;
                this.lastRequested += i2;
                this.batchRequester.request(i2, this.sharedStateHolder);
                this.sharedStateHolder = null;
            }
        }

        State check() {
            return this.state;
        }

        RowT peek() {
            if (!$assertionsDisabled && this.state != State.READY) {
                throw new AssertionError();
            }
            if ($assertionsDisabled || this.curr != null) {
                return ((Batch) this.curr).rows.peek();
            }
            throw new AssertionError();
        }

        RowT remove() {
            if (!$assertionsDisabled && this.state != State.READY) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && this.curr == null) {
                throw new AssertionError();
            }
            RowT remove = ((Batch) this.curr).rows.remove();
            if (((Batch) this.curr).rows.isEmpty()) {
                if (((Batch) this.curr).last) {
                    this.state = State.END;
                } else {
                    advanceBatch();
                }
            }
            return remove;
        }

        private boolean hasNextBatch() {
            return !this.batches.isEmpty() && ((Batch) this.batches.peek()).batchId == this.lastEnqueued + 1;
        }

        private void advanceBatch() {
            if (!hasNextBatch()) {
                this.state = State.WAITING;
                return;
            }
            this.curr = this.batches.poll();
            if (!$assertionsDisabled && this.curr == null) {
                throw new AssertionError();
            }
            this.state = ((Batch) this.curr).rows.isEmpty() ? State.END : State.READY;
            this.lastEnqueued = ((Batch) this.curr).batchId;
        }

        void close() {
            releaseRows();
            this.batches.clear();
            this.curr = null;
        }

        private void releaseRows() {
            Iterator<Batch<RowT>> it = this.batches.iterator();
            while (it.hasNext()) {
                releaseBatchRows(it.next());
            }
            if (this.curr != null) {
                releaseBatchRows(this.curr);
            }
        }

        private void releaseBatchRows(Batch<RowT> batch) {
            ((Batch) batch).rows.clear();
            RowQueue<RowT> rowQueue = ((Batch) batch).rows;
            Objects.requireNonNull(rowQueue);
            IgniteUtils.closeQuiet(rowQueue::close);
        }

        static {
            $assertionsDisabled = !Inbox.class.desiredAssertionStatus();
        }
    }

    public Inbox(ExecutionContext<RowT> executionContext, ExchangeService exchangeService, MailboxRegistry mailboxRegistry, Collection<String> collection, @Nullable Comparator<RowT> comparator, RowHandler.RowFactory<RowT> rowFactory, long j, long j2) {
        super(executionContext);
        if (!$assertionsDisabled && CollectionUtils.nullOrEmpty((Collection<?>) collection)) {
            throw new AssertionError();
        }
        this.exchange = exchangeService;
        this.registry = mailboxRegistry;
        this.srcNodeNames = collection;
        this.comp = comparator;
        this.rowFactory = rowFactory;
        this.memoryContext = executionContext.memoryContext();
        this.srcFragmentId = j2;
        this.exchangeId = j;
        HashMap hashMap = new HashMap();
        for (String str : collection) {
            hashMap.put(str, new RemoteSource((i, sharedState) -> {
                requestBatches(str, i, sharedState);
            }, executionContext.storageFactory(), rowFactory));
        }
        this.perNodeBuffers = Map.copyOf(hashMap);
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.Mailbox
    public long exchangeId() {
        return this.exchangeId;
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.Node
    public void request(int i) throws Exception {
        if (!$assertionsDisabled && (i <= 0 || this.requested != 0)) {
            throw new AssertionError();
        }
        checkState();
        this.requested = i;
        if (this.inLoop) {
            return;
        }
        context().execute(this::doPush, this::onError);
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.AbstractNode
    public void closeInternal() {
        super.closeInternal();
        this.registry.unregister((Inbox<?>) this);
        Iterator<RemoteSource<RowT>> it = this.perNodeBuffers.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.AbstractNode
    protected Downstream<RowT> requestDownstream(int i) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.AbstractNode, org.apache.ignite3.internal.sql.engine.exec.rel.Node
    public void register(List<Node<RowT>> list) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.AbstractNode
    protected void rewindInternal() {
        this.remoteSources = null;
        this.requested = 0;
        Iterator<String> it = this.srcNodeNames.iterator();
        while (it.hasNext()) {
            RemoteSource<RowT> remoteSource = this.perNodeBuffers.get(it.next());
            if (!$assertionsDisabled && remoteSource == null) {
                throw new AssertionError();
            }
            remoteSource.reset(context().sharedState());
        }
    }

    public void onBatchReceived(String str, int i, boolean z, List<BinaryTupleMessage> list) throws Exception {
        RemoteSource<RowT> remoteSource = this.perNodeBuffers.get(str);
        boolean z2 = remoteSource.check() == RemoteSource.State.WAITING;
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<BinaryTupleMessage> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(this.rowFactory.create(it.next().asBinaryTuple()));
        }
        remoteSource.onBatchReceived(i, z, arrayList);
        if (this.requested <= 0 || !z2 || remoteSource.check() == RemoteSource.State.WAITING) {
            return;
        }
        push();
    }

    private void doPush() throws Exception {
        checkState();
        push();
    }

    private void push() throws Exception {
        if (this.remoteSources == null) {
            this.remoteSources = new ArrayList(this.srcNodeNames.size());
            Iterator<String> it = this.srcNodeNames.iterator();
            while (it.hasNext()) {
                this.remoteSources.add(this.perNodeBuffers.get(it.next()));
            }
        }
        if (this.comp != null) {
            pushOrdered();
        } else {
            pushUnordered();
        }
    }

    private boolean checkAllBuffsReady(Iterator<RemoteSource<RowT>> it) {
        while (it.hasNext()) {
            RemoteSource.State check = it.next().check();
            switch (check) {
                case READY:
                    break;
                case END:
                    it.remove();
                    break;
                case WAITING:
                    return false;
                default:
                    throw Util.unexpected(check);
            }
        }
        return true;
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:33:0x0114. Please report as an issue. */
    /* JADX WARN: Multi-variable type inference failed */
    private void pushOrdered() throws Exception {
        if (!checkAllBuffsReady(this.remoteSources.iterator())) {
            Iterator<RemoteSource<RowT>> it = this.remoteSources.iterator();
            while (it.hasNext()) {
                it.next().requestNextBatchIfNeeded();
            }
            return;
        }
        if (!$assertionsDisabled && this.comp == null) {
            throw new AssertionError();
        }
        PriorityQueue priorityQueue = new PriorityQueue(Math.max(this.remoteSources.size(), 1), Map.Entry.comparingByKey(this.comp));
        for (RemoteSource<RowT> remoteSource : this.remoteSources) {
            RemoteSource.State check = remoteSource.check();
            if (check != RemoteSource.State.READY) {
                throw new AssertionError("Unexpected buffer state: " + check);
            }
            priorityQueue.offer(Pair.of(remoteSource.peek(), remoteSource));
        }
        this.inLoop = true;
        while (this.requested > 0 && !priorityQueue.isEmpty()) {
            try {
                checkState();
                RemoteSource remoteSource2 = (RemoteSource) ((Pair) priorityQueue.poll()).right;
                this.requested--;
                Object remove = remoteSource2.remove();
                this.memoryContext.acquire((MemoryContext<RowT>) remove);
                downstream().push(remove);
                this.memoryContext.release((MemoryContext<RowT>) remove);
                RemoteSource.State check2 = remoteSource2.check();
                switch (check2) {
                    case READY:
                        priorityQueue.offer(Pair.of(remoteSource2.peek(), remoteSource2));
                    case END:
                        remoteSource2.close();
                        this.remoteSources.remove(remoteSource2);
                    case WAITING:
                        break;
                    default:
                        throw Util.unexpected(check2);
                }
            } finally {
                this.inLoop = false;
            }
        }
        Iterator<RemoteSource<RowT>> it2 = this.remoteSources.iterator();
        while (it2.hasNext()) {
            it2.next().requestNextBatchIfNeeded();
        }
        if (this.requested > 0 && this.remoteSources.isEmpty() && priorityQueue.isEmpty()) {
            this.requested = 0;
            downstream().end();
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:8:0x0039. Please report as an issue. */
    private void pushUnordered() throws Exception {
        int i = 0;
        int i2 = 0;
        this.inLoop = true;
        while (this.requested > 0 && !this.remoteSources.isEmpty()) {
            try {
                checkState();
                RemoteSource<RowT> remoteSource = this.remoteSources.get(i);
                switch (remoteSource.check()) {
                    case READY:
                        i2 = 0;
                        this.requested--;
                        RowT remove = remoteSource.remove();
                        this.memoryContext.acquire((MemoryContext<RowT>) remove);
                        downstream().push(remove);
                        this.memoryContext.release((MemoryContext<RowT>) remove);
                        break;
                    case END:
                        remoteSource.close();
                        this.remoteSources.remove(i);
                        break;
                    case WAITING:
                        i2++;
                        i++;
                        break;
                }
                if (i2 < this.remoteSources.size()) {
                    if (i == this.remoteSources.size()) {
                        i = 0;
                    }
                }
            } finally {
                this.inLoop = false;
            }
        }
        Iterator<RemoteSource<RowT>> it = this.remoteSources.iterator();
        while (it.hasNext()) {
            it.next().requestNextBatchIfNeeded();
        }
        if (this.requested <= 0 || !this.remoteSources.isEmpty()) {
            return;
        }
        this.requested = 0;
        downstream().end();
    }

    private void requestBatches(String str, int i, @Nullable SharedState sharedState) {
        this.exchange.request(str, executionId(), this.srcFragmentId, this.exchangeId, i, sharedState).whenComplete((r6, th) -> {
            if (th != null) {
                IgniteInternalException igniteInternalException = (IgniteInternalException) ExceptionUtils.withCause((v1, v2, v3, v4) -> {
                    return new IgniteInternalException(v1, v2, v3, v4);
                }, ErrorGroups.Common.INTERNAL_ERR, "Unable to request next batch: " + th.getMessage(), th);
                context().execute(() -> {
                    onError(igniteInternalException);
                }, this::onError);
            }
        });
    }

    public void onNodeLeft(String str) {
        if (context().originatingNodeName().equals(str) && this.srcNodeNames == null) {
            context().execute(this::close, this::onError);
        } else {
            if (this.srcNodeNames == null || !this.srcNodeNames.contains(str)) {
                return;
            }
            context().execute(() -> {
                onNodeLeft0(str);
            }, this::onError);
        }
    }

    private void onNodeLeft0(String str) throws Exception {
        checkState();
        if (this.perNodeBuffers.get(str).check() != RemoteSource.State.END) {
            throw new NodeLeftException(str);
        }
    }

    static {
        $assertionsDisabled = !Inbox.class.desiredAssertionStatus();
    }
}
