package org.apache.ignite3.internal.sql.engine.exec.rel;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import org.apache.ignite3.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite3.internal.schema.BinaryTuple;
import org.apache.ignite3.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite3.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite3.internal.sql.engine.exec.MailboxRegistry;
import org.apache.ignite3.internal.sql.engine.exec.MemoryContext;
import org.apache.ignite3.internal.sql.engine.exec.RowHandler;
import org.apache.ignite3.internal.sql.engine.exec.SharedState;
import org.apache.ignite3.internal.sql.engine.trait.Destination;
import org.apache.ignite3.internal.sql.engine.util.Commons;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.lang.ErrorGroups;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite3/internal/sql/engine/exec/rel/Outbox.class */
public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, SingleNode<RowT>, Downstream<RowT> {
    private static final IgniteLogger LOG;
    private static final PartitionReplicationMessagesFactory TABLE_MESSAGES_FACTORY;
    private final long exchangeId;
    private final long targetFragmentId;
    private final ExchangeService exchange;
    private final MailboxRegistry registry;
    private final Destination<RowT> dest;
    private final Map<String, RemoteDownstream<RowT>> nodeBuffers;
    private final Deque<RowT> inBuf;
    private Queue<RewindRequest> rewindQueue;
    private int waiting;

    @Nullable
    private String currentNode;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite3/internal/sql/engine/exec/rel/Outbox$RemoteDownstream.class */
    public static final class RemoteDownstream<RowT> {
        private final MemoryContext<RowT> memoryContext;
        private final String nodeName;
        private final BatchSender<RowT> sender;
        private State state = State.FILLING;
        private int lastSentBatchId = -1;

        @Nullable
        private List<RowT> curr = new ArrayList(256);
        private int pendingCount;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: private */
        @FunctionalInterface
        /* loaded from: input_file:org/apache/ignite3/internal/sql/engine/exec/rel/Outbox$RemoteDownstream$BatchSender.class */
        public interface BatchSender<RowT> {
            void send(String str, int i, boolean z, List<RowT> list) throws IgniteInternalCheckedException;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/ignite3/internal/sql/engine/exec/rel/Outbox$RemoteDownstream$State.class */
        public enum State {
            FILLING,
            FULL,
            LAST_BATCH,
            END
        }

        private RemoteDownstream(String str, BatchSender<RowT> batchSender, MemoryContext<RowT> memoryContext) {
            this.nodeName = str;
            this.sender = batchSender;
            this.memoryContext = memoryContext;
        }

        void reset() {
            if (this.curr != null) {
                this.memoryContext.release((Iterable) this.curr);
            }
            this.state = State.FILLING;
            this.lastSentBatchId += this.pendingCount;
            this.pendingCount = 0;
            this.curr = new ArrayList(256);
        }

        void onBatchRequested(int i) throws Exception {
            if (!$assertionsDisabled && i <= 0) {
                throw new AssertionError(i);
            }
            this.pendingCount += i;
            if (this.state == State.FULL || this.state == State.LAST_BATCH) {
                sendBatch();
            }
        }

        boolean ready() {
            return this.state == State.FILLING;
        }

        void add(RowT rowt) throws Exception {
            if (!$assertionsDisabled && !ready()) {
                throw new AssertionError(this.state);
            }
            if (!$assertionsDisabled && this.curr == null) {
                throw new AssertionError();
            }
            this.memoryContext.acquire((MemoryContext<RowT>) rowt);
            this.curr.add(rowt);
            if (this.curr.size() == 256) {
                this.state = State.FULL;
                if (this.pendingCount > 0) {
                    sendBatch();
                }
            }
        }

        void sendBatch() throws Exception {
            if (!$assertionsDisabled && this.pendingCount <= 0) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && this.state != State.FULL && this.state != State.LAST_BATCH) {
                throw new AssertionError(this.state);
            }
            if (!$assertionsDisabled && this.curr == null) {
                throw new AssertionError();
            }
            boolean z = this.state == State.LAST_BATCH;
            BatchSender<RowT> batchSender = this.sender;
            String str = this.nodeName;
            int i = this.lastSentBatchId + 1;
            this.lastSentBatchId = i;
            batchSender.send(str, i, z, this.curr);
            this.pendingCount--;
            this.memoryContext.release((Iterable) this.curr);
            if (!z) {
                this.state = State.FILLING;
                this.curr = new ArrayList(256);
            } else {
                this.state = State.END;
                this.curr = null;
                this.lastSentBatchId += this.pendingCount;
                this.pendingCount = 0;
            }
        }

        void end() throws Exception {
            if (!$assertionsDisabled && this.state != State.FILLING && this.state != State.FULL) {
                throw new AssertionError(this.state);
            }
            this.state = State.LAST_BATCH;
            if (this.pendingCount > 0) {
                sendBatch();
            }
        }

        void close() {
            if (this.curr != null) {
                this.memoryContext.release((Iterable) this.curr);
            }
            this.curr = null;
            this.state = State.END;
        }

        static {
            $assertionsDisabled = !Outbox.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite3/internal/sql/engine/exec/rel/Outbox$RewindRequest.class */
    public static class RewindRequest {
        final String nodeName;
        final SharedState state;
        final int amountOfBatches;

        RewindRequest(String str, SharedState sharedState, int i) {
            this.nodeName = str;
            this.state = sharedState;
            this.amountOfBatches = i;
        }
    }

    public Outbox(ExecutionContext<RowT> executionContext, ExchangeService exchangeService, MailboxRegistry mailboxRegistry, long j, long j2, Destination<RowT> destination) {
        super(executionContext);
        this.inBuf = new ArrayDeque(512);
        this.exchange = exchangeService;
        this.registry = mailboxRegistry;
        this.targetFragmentId = j2;
        this.exchangeId = j;
        this.dest = destination;
        HashMap hashMap = new HashMap();
        for (String str : destination.targets()) {
            hashMap.put(str, new RemoteDownstream(str, this::sendBatch, executionContext.memoryContext()));
        }
        this.nodeBuffers = Map.copyOf(hashMap);
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.Mailbox
    public long exchangeId() {
        return this.exchangeId;
    }

    public void onRequest(String str, int i) throws Exception {
        checkState();
        this.nodeBuffers.get(str).onBatchRequested(i);
        if (this.waiting == -1 && this.inBuf.isEmpty()) {
            return;
        }
        flush();
    }

    public void prefetch() {
        if (context().description().prefetch()) {
            try {
                checkState();
                if (this.waiting == 0) {
                    Node<RowT> source = source();
                    this.waiting = 512;
                    source.request(512);
                }
            } catch (Throwable th) {
                onError(th);
            }
        }
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.Node
    public void request(int i) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.Downstream
    public void push(RowT rowt) throws Exception {
        if (!$assertionsDisabled && this.waiting <= 0) {
            throw new AssertionError(this.waiting);
        }
        checkState();
        this.waiting--;
        if (this.currentNode == null || this.dest.targets(rowt).contains(this.currentNode)) {
            acquireRow(rowt);
            this.inBuf.add(rowt);
        }
        flush();
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.Downstream
    public void end() throws Exception {
        if (!$assertionsDisabled && this.waiting <= 0) {
            throw new AssertionError(this.waiting);
        }
        checkState();
        this.waiting = -1;
        flush();
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.AbstractNode, org.apache.ignite3.internal.sql.engine.exec.rel.Downstream
    public void onError(Throwable th) {
        sendError(th);
        Commons.closeQuiet(this);
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.AbstractNode
    public void closeInternal() {
        super.closeInternal();
        this.registry.unregister((Outbox<?>) this);
        Iterator<String> it = this.dest.targets().iterator();
        while (it.hasNext()) {
            this.nodeBuffers.get(it.next()).close();
        }
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.AbstractNode, org.apache.ignite3.internal.sql.engine.exec.rel.Node
    public void onRegister(Downstream<RowT> downstream) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.AbstractNode
    protected void rewindInternal() {
        releaseRows(this.inBuf);
        this.inBuf.clear();
        this.waiting = 0;
        if (this.currentNode != null) {
            this.nodeBuffers.get(this.currentNode).reset();
            return;
        }
        Iterator<String> it = this.dest.targets().iterator();
        while (it.hasNext()) {
            RemoteDownstream<RowT> remoteDownstream = this.nodeBuffers.get(it.next());
            if (!$assertionsDisabled && remoteDownstream == null) {
                throw new AssertionError();
            }
            remoteDownstream.reset();
        }
    }

    @Override // org.apache.ignite3.internal.sql.engine.exec.rel.AbstractNode
    protected Downstream<RowT> requestDownstream(int i) {
        if (i != 0) {
            throw new IndexOutOfBoundsException();
        }
        return this;
    }

    private void sendBatch(String str, int i, boolean z, List<RowT> list) {
        RowHandler<RowT> rowHandler = context().rowHandler();
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<RowT> it = list.iterator();
        while (it.hasNext()) {
            BinaryTuple binaryTuple = rowHandler.toBinaryTuple(it.next());
            arrayList.add(TABLE_MESSAGES_FACTORY.binaryTupleMessage().elementCount(binaryTuple.elementCount()).tuple(binaryTuple.byteBuffer()).build());
        }
        this.exchange.sendBatch(str, queryId(), this.targetFragmentId, this.exchangeId, i, z, arrayList).whenComplete((r6, th) -> {
            if (th == null) {
                return;
            }
            IgniteInternalException igniteInternalException = (IgniteInternalException) ExceptionUtils.withCause((v1, v2, v3, v4) -> {
                return new IgniteInternalException(v1, v2, v3, v4);
            }, ErrorGroups.Common.INTERNAL_ERR, "Unable to send batch: " + th.getMessage(), th);
            context().execute(() -> {
                onError(igniteInternalException);
            }, this::onError);
        });
    }

    private void sendError(Throwable th) {
        String originatingNodeName = context().originatingNodeName();
        UUID queryId = queryId();
        long fragmentId = fragmentId();
        this.exchange.sendError(originatingNodeName, queryId, fragmentId, th).whenComplete((r13, th2) -> {
            if (th2 == null) {
                return;
            }
            IgniteInternalException igniteInternalException = (IgniteInternalException) ExceptionUtils.withCause((v1, v2, v3, v4) -> {
                return new IgniteInternalException(v1, v2, v3, v4);
            }, ErrorGroups.Common.INTERNAL_ERR, "Unable to send error: " + th2.getMessage(), th2);
            igniteInternalException.addSuppressed(th);
            LOG.warn("Unable to send error to a remote node [queryId={}, fragmentId={}, targetNode={}]", queryId, Long.valueOf(fragmentId), originatingNodeName, igniteInternalException);
        });
    }

    private void flush() throws Exception {
        while (!this.inBuf.isEmpty()) {
            checkState();
            List<String> targets = this.dest.targets(this.inBuf.peek());
            ArrayList arrayList = new ArrayList(targets.size());
            Iterator<String> it = targets.iterator();
            while (it.hasNext()) {
                RemoteDownstream<RowT> remoteDownstream = this.nodeBuffers.get(it.next());
                if (!remoteDownstream.ready()) {
                    return;
                } else {
                    arrayList.add(remoteDownstream);
                }
            }
            if (!$assertionsDisabled && CollectionUtils.nullOrEmpty((Collection<?>) arrayList)) {
                throw new AssertionError();
            }
            RowT remove = this.inBuf.remove();
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((RemoteDownstream) it2.next()).add(remove);
            }
            releaseRow(remove);
        }
        if (!$assertionsDisabled && !this.inBuf.isEmpty()) {
            throw new AssertionError();
        }
        if (this.waiting == 0) {
            Node<RowT> source = source();
            this.waiting = 512;
            source.request(512);
        } else if (this.waiting == -1) {
            if (this.currentNode != null) {
                this.nodeBuffers.get(this.currentNode).end();
                this.currentNode = null;
                processRewindQueue();
            } else {
                Iterator<RemoteDownstream<RowT>> it3 = this.nodeBuffers.values().iterator();
                while (it3.hasNext()) {
                    it3.next().end();
                }
            }
        }
    }

    public void onNodeLeft(String str) {
        if (str.equals(context().originatingNodeName())) {
            context().execute(this::close, this::onError);
        }
    }

    public void onRewindRequest(String str, SharedState sharedState, int i) throws Exception {
        checkState();
        if (this.rewindQueue == null) {
            this.rewindQueue = new ArrayDeque(this.nodeBuffers.size());
        }
        this.rewindQueue.offer(new RewindRequest(str, sharedState, i));
        if (this.currentNode == null || this.currentNode.equals(str)) {
            this.currentNode = null;
            processRewindQueue();
        }
    }

    private void processRewindQueue() throws Exception {
        if (!$assertionsDisabled && this.currentNode != null) {
            throw new AssertionError();
        }
        RewindRequest poll = this.rewindQueue.poll();
        if (poll == null) {
            return;
        }
        this.currentNode = poll.nodeName;
        context().sharedState(poll.state);
        rewind();
        onRequest(this.currentNode, poll.amountOfBatches);
    }

    static {
        $assertionsDisabled = !Outbox.class.desiredAssertionStatus();
        LOG = Loggers.forClass(Outbox.class);
        TABLE_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    }
}
