/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.sql.engine.exec.rel;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.ignite3.internal.lang.IgniteStringBuilder;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite3.internal.sql.engine.exec.RowHandler;
import org.apache.ignite3.internal.sql.engine.exec.exp.agg.AggregateType;
import org.apache.ignite3.internal.sql.engine.exec.exp.agg.GroupKey;
import org.apache.ignite3.internal.sql.engine.exec.memory.MemoryContext;
import org.apache.ignite3.internal.sql.engine.exec.memory.structures.RowHashTable;
import org.apache.ignite3.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite3.internal.sql.engine.exec.rel.Downstream;
import org.apache.ignite3.internal.sql.engine.exec.rel.SetNodeKeyValueCodec;
import org.apache.ignite3.internal.sql.engine.exec.row.RowSchema;
import org.apache.ignite3.internal.util.CollectionUtils;

public abstract class AbstractSetOpNode<RowT>
extends AbstractNode<RowT> {
    private final AggregateType type;
    private final Grouping<RowT> grouping;
    private int requested;
    private int waiting;
    private int curSrcIdx;
    private boolean inLoop;

    protected AbstractSetOpNode(ExecutionContext<RowT> ctx, AggregateType type, boolean all, RowHandler.RowFactory<RowT> rowFactory, Grouping<RowT> grouping) {
        super(ctx);
        this.type = type;
        this.grouping = grouping;
    }

    @Override
    public void request(int rowsCnt) throws Exception {
        assert (!CollectionUtils.nullOrEmpty(this.sources()));
        assert (rowsCnt > 0 && this.requested == 0);
        assert (this.waiting <= 0);
        this.requested = rowsCnt;
        if (this.waiting == 0) {
            this.waiting = this.inBufSize;
            this.sources().get(this.curSrcIdx).request(this.waiting);
        } else if (!this.inLoop) {
            this.execute(this::flush);
        }
    }

    public void push(RowT row, int idx) throws Exception {
        assert (this.downstream() != null);
        assert (this.waiting > 0);
        --this.waiting;
        this.grouping.add(row, idx);
        if (this.waiting == 0) {
            this.waiting = this.inBufSize;
            this.sources().get(this.curSrcIdx).request(this.waiting);
        }
    }

    public void end(int idx) throws Exception {
        assert (this.downstream() != null);
        assert (this.waiting > 0);
        assert (this.curSrcIdx == idx);
        this.grouping.endOfSet(idx);
        this.curSrcIdx = this.type == AggregateType.SINGLE && this.grouping.isEmpty() ? this.sources().size() : ++this.curSrcIdx;
        if (this.curSrcIdx >= this.sources().size()) {
            this.waiting = -1;
            this.flush();
        } else {
            this.sources().get(this.curSrcIdx).request(this.waiting);
        }
    }

    @Override
    protected void rewindInternal() {
        this.requested = 0;
        this.waiting = 0;
        this.curSrcIdx = 0;
        this.grouping.groups.clear();
    }

    @Override
    protected void closeInternal() {
        super.closeInternal();
        this.grouping.groups.clear();
    }

    @Override
    protected Downstream<RowT> requestDownstream(final int idx) {
        return new Downstream<RowT>(){

            @Override
            public void push(RowT row) throws Exception {
                AbstractSetOpNode.this.push(row, idx);
            }

            @Override
            public void end() throws Exception {
                AbstractSetOpNode.this.end(idx);
            }

            @Override
            public void onError(Throwable e) {
                AbstractSetOpNode.this.onError(e);
            }
        };
    }

    @Override
    protected void dumpDebugInfo0(IgniteStringBuilder buf) {
        buf.app("class=").app(this.getClass().getSimpleName()).app(", requested=").app(this.requested).app(", waiting=").app(this.waiting);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush() throws Exception {
        assert (this.waiting == -1);
        int processed = 0;
        this.inLoop = true;
        try {
            if (this.requested > 0 && !this.grouping.isEmpty()) {
                int toSnd = Math.min(this.requested, this.inBufSize - processed);
                for (RowT row : this.grouping.getRows(toSnd)) {
                    --this.requested;
                    this.acquireRow(row);
                    this.downstream().push(row);
                    this.releaseRow(row);
                    ++processed;
                }
                if (processed >= this.inBufSize && this.requested > 0) {
                    this.execute(this::flush);
                    return;
                }
            }
        }
        finally {
            this.inLoop = false;
        }
        if (this.requested > 0) {
            this.requested = 0;
            this.downstream().end();
        }
    }

    protected static abstract class Grouping<RowT> {
        protected final RowHashTable<GroupKey, int[]> groups;
        protected final AggregateType type;
        protected final boolean all;
        protected final RowHandler.RowFactory<RowT> rowFactory;
        private final RowHandler<RowT> hnd;
        protected final MemoryContext<RowT> memoryContext;
        private final int columnCnt;

        protected Grouping(ExecutionContext<RowT> ctx, RowHandler.RowFactory<RowT> rowFactory, int columnCnt, AggregateType type, boolean all, int inputsCnt) {
            this.hnd = ctx.rowHandler();
            this.memoryContext = ctx.memoryContext();
            this.columnCnt = columnCnt;
            this.type = type;
            this.all = all;
            this.rowFactory = rowFactory;
            RowSchema rowSchema = rowFactory.rowSchema();
            SetNodeKeyValueCodec keyValueCodec = type == AggregateType.SINGLE ? new SetNodeKeyValueCodec(rowSchema, rowFactory.columnsCount(), inputsCnt) : new SetNodeKeyValueCodec(rowSchema, columnCnt, inputsCnt);
            this.groups = ctx.storageFactory().hashTable(keyValueCodec);
        }

        protected void add(RowT row, int setIdx) {
            switch (this.type) {
                case MAP: {
                    this.addOnMapper(row, setIdx);
                    break;
                }
                case REDUCE: {
                    assert (setIdx == 0) : "Unexpected set index: " + setIdx;
                    this.addOnReducer(row);
                    break;
                }
                case SINGLE: {
                    this.addOnSingle(row, setIdx);
                    break;
                }
                default: {
                    throw new IllegalStateException("Unexpected type " + this.type);
                }
            }
        }

        private List<RowT> getRows(int cnt) {
            if (this.groups.isEmpty()) {
                return Collections.emptyList();
            }
            switch (this.type) {
                case MAP: {
                    return this.getOnMapper(cnt);
                }
                case REDUCE: 
                case SINGLE: {
                    return this.getResultRows(cnt);
                }
            }
            throw new IllegalStateException("Unexpected type " + this.type);
        }

        protected GroupKey createKey(RowT row) {
            int size = this.hnd.columnCount(row);
            Object[] fields = new Object[size];
            for (int i = 0; i < size; ++i) {
                fields[i] = this.hnd.get(i, row);
            }
            return new GroupKey(fields);
        }

        protected abstract void endOfSet(int var1);

        protected abstract void addOnSingle(RowT var1, int var2);

        protected abstract void addOnMapper(RowT var1, int var2);

        protected abstract int getCounterFieldsCount();

        private void addOnReducer(RowT row) {
            GroupKey.Builder grpKeyBuilder = GroupKey.builder(this.columnCnt);
            for (int i = 0; i < this.columnCnt; ++i) {
                Object field = this.hnd.get(i, row);
                grpKeyBuilder.add(field);
            }
            GroupKey grpKey = grpKeyBuilder.build();
            int inputsCnt = this.getCounterFieldsCount();
            int[] cntrs = this.addGroup(grpKey, inputsCnt);
            for (int i = 0; i < inputsCnt; ++i) {
                int n = i;
                cntrs[n] = cntrs[n] + (Integer)this.hnd.get(i + this.columnCnt, row);
            }
            this.groups.put(grpKey, cntrs);
        }

        private List<RowT> getOnMapper(int cnt) {
            Iterator<Map.Entry<GroupKey, int[]>> it = this.groups.entrySetIterator();
            int amount = Math.min(cnt, this.groups.size());
            ArrayList<RowT> res = new ArrayList<RowT>(amount);
            while (amount > 0 && it.hasNext()) {
                Map.Entry<GroupKey, int[]> entry = it.next();
                if (this.affectResult(entry.getValue())) {
                    RowT row = this.createOutputRow(entry);
                    res.add(row);
                    --amount;
                }
                it.remove();
            }
            return res;
        }

        private List<RowT> getResultRows(int cnt) {
            Iterator<Map.Entry<GroupKey, int[]>> it = this.groups.entrySetIterator();
            ArrayList<RowT> res = new ArrayList<RowT>(cnt);
            while (it.hasNext() && cnt > 0) {
                Map.Entry<GroupKey, int[]> entry = it.next();
                RowT row = this.createOutputRow(entry);
                int availableRows = this.availableRows(entry.getValue());
                this.updateAvailableRows(entry.getValue(), availableRows);
                if (availableRows <= cnt) {
                    it.remove();
                    cnt -= availableRows;
                } else {
                    availableRows = cnt;
                    assert (availableRows > 0) : IgniteStringFormatter.format("Number of available rows is negative: {}", entry);
                    assert (this.all) : IgniteStringFormatter.format("This branch should only be accessible for non distinct variant of a set operator: {}", entry);
                    this.decrementAvailableRows(entry.getValue(), availableRows);
                    this.groups.put(entry.getKey(), entry.getValue());
                    cnt = 0;
                }
                for (int i = 0; i < availableRows; ++i) {
                    res.add(row);
                }
            }
            return res;
        }

        protected abstract boolean affectResult(int[] var1);

        protected abstract int availableRows(int[] var1);

        protected abstract void updateAvailableRows(int[] var1, int var2);

        protected abstract void decrementAvailableRows(int[] var1, int var2);

        private boolean isEmpty() {
            return this.groups.isEmpty();
        }

        private RowT createOutputRow(Map.Entry<GroupKey, int[]> entry) {
            Object[] fields;
            int counts;
            boolean appendCounts;
            GroupKey groupKey = entry.getKey();
            int[] cnts = entry.getValue();
            assert (groupKey.fieldsCount() == this.columnCnt) : IgniteStringFormatter.format("Invalid key {} columnNum: {}", groupKey, this.columnCnt);
            boolean bl = appendCounts = this.type == AggregateType.MAP;
            if (appendCounts) {
                counts = cnts.length;
                fields = new Object[this.columnCnt + counts];
            } else {
                counts = 0;
                fields = new Object[this.columnCnt];
            }
            for (int i = 0; i < groupKey.fieldsCount(); ++i) {
                fields[i] = groupKey.field(i);
            }
            for (int j = 0; j < counts; ++j) {
                fields[this.columnCnt + j] = cnts[j];
            }
            return this.rowFactory.create(fields);
        }

        protected final int[] addGroup(GroupKey key, int numCntrs) {
            int[] cntrs;
            for (int c : cntrs = this.groups.computeIfAbsent(key, k -> new int[numCntrs])) {
                if (c == 0) continue;
                return cntrs;
            }
            return cntrs;
        }

        protected final int[] getGroup(GroupKey key) {
            return this.groups.get(key);
        }
    }
}

