/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.sql.engine.exec.rel;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite3.internal.lang.IgniteStringBuilder;
import org.apache.ignite3.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite3.internal.sql.engine.exec.RowHandler;
import org.apache.ignite3.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
import org.apache.ignite3.internal.sql.engine.exec.exp.agg.AccumulatorsState;
import org.apache.ignite3.internal.sql.engine.exec.exp.agg.AggregateRow;
import org.apache.ignite3.internal.sql.engine.exec.exp.agg.AggregateType;
import org.apache.ignite3.internal.sql.engine.exec.exp.agg.GroupKey;
import org.apache.ignite3.internal.sql.engine.exec.exp.agg.GroupState;
import org.apache.ignite3.internal.sql.engine.exec.memory.structures.DistinctSetElementCodec;
import org.apache.ignite3.internal.sql.engine.exec.memory.structures.RowHashTable;
import org.apache.ignite3.internal.sql.engine.exec.memory.structures.RowSet;
import org.apache.ignite3.internal.sql.engine.exec.memory.structures.file.HashAggregateNodeKeyValueCodec;
import org.apache.ignite3.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite3.internal.sql.engine.exec.rel.Downstream;
import org.apache.ignite3.internal.sql.engine.exec.rel.SingleNode;
import org.apache.ignite3.internal.sql.engine.exec.row.RowSchema;
import org.apache.ignite3.internal.util.CollectionUtils;

public class HashAggregateNode<RowT>
extends AbstractNode<RowT>
implements SingleNode<RowT>,
Downstream<RowT> {
    private final AggregateType type;
    private final RowHandler.RowFactory<RowT> rowFactory;
    private final RowSchema inputRowSchema;
    private final ImmutableBitSet allFields;
    private final List<Grouping> groupings;
    private final List<AccumulatorWrapper<RowT>> accs;
    private int requested;
    private int waiting;
    private boolean inLoop;

    public HashAggregateNode(ExecutionContext<RowT> ctx, AggregateType type, List<ImmutableBitSet> grpSets, List<AccumulatorWrapper<RowT>> accumulators, RowHandler.RowFactory<RowT> rowFactory, RowSchema inputRowSchema) {
        super(ctx);
        ImmutableBitSet grpFields;
        byte i;
        this.type = type;
        this.rowFactory = rowFactory;
        this.inputRowSchema = inputRowSchema;
        assert (grpSets.size() <= 127) : "Too many grouping sets";
        ImmutableBitSet.Builder b = ImmutableBitSet.builder();
        this.groupings = new ArrayList<Grouping>(grpSets.size());
        this.accs = accumulators;
        for (i = 0; i < grpSets.size(); i = (byte)((byte)(i + 1))) {
            grpFields = grpSets.get(i);
            b.addAll(grpFields);
        }
        this.allFields = b.build();
        for (i = 0; i < grpSets.size(); i = (byte)(i + 1)) {
            grpFields = grpSets.get(i);
            Grouping grouping = new Grouping(i, grpFields);
            grouping.init();
            this.groupings.add(grouping);
        }
    }

    @Override
    public void request(int rowsCnt) throws Exception {
        assert (!CollectionUtils.nullOrEmpty(this.sources()) && this.sources().size() == 1);
        assert (rowsCnt > 0 && this.requested == 0);
        assert (this.waiting <= 0);
        this.requested = rowsCnt;
        if (this.waiting == 0) {
            this.waiting = this.inBufSize;
            this.source().request(this.waiting);
        } else if (!this.inLoop) {
            this.execute(this::doFlush);
        }
    }

    @Override
    public void push(RowT row) throws Exception {
        assert (this.downstream() != null);
        assert (this.waiting > 0);
        --this.waiting;
        this.acquireRow(row);
        for (Grouping grouping : this.groupings) {
            grouping.add(row);
        }
        this.releaseRow(row);
        if (this.waiting == 0) {
            this.waiting = this.inBufSize;
            this.source().request(this.waiting);
        }
    }

    @Override
    public void end() throws Exception {
        assert (this.downstream() != null);
        assert (this.waiting > 0);
        this.waiting = -1;
        this.flush();
    }

    @Override
    protected void rewindInternal() {
        this.requested = 0;
        this.waiting = 0;
        this.groupings.forEach(rec$ -> ((Grouping)rec$).reset());
    }

    @Override
    protected Downstream<RowT> requestDownstream(int idx) {
        if (idx != 0) {
            throw new IndexOutOfBoundsException();
        }
        return this;
    }

    @Override
    protected void dumpDebugInfo0(IgniteStringBuilder buf) {
        buf.app("class=").app(this.getClass().getSimpleName()).app(", requested=").app(this.requested).app(", waiting=").app(this.waiting);
    }

    private void doFlush() throws Exception {
        this.flush();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush() throws Exception {
        assert (this.waiting == -1);
        int processed = 0;
        ArrayDeque<Grouping> groupingsQueue = this.groupingsQueue();
        this.inLoop = true;
        try {
            while (this.requested > 0 && !groupingsQueue.isEmpty()) {
                Grouping grouping = groupingsQueue.peek();
                int toSnd = Math.min(this.requested, this.inBufSize - processed);
                for (Object row : grouping.getRows(toSnd)) {
                    --this.requested;
                    this.acquireRow(row);
                    this.downstream().push(row);
                    this.releaseRow(row);
                    ++processed;
                }
                if (processed >= this.inBufSize && this.requested > 0) {
                    this.execute(this::doFlush);
                    return;
                }
                if (!grouping.isEmpty()) continue;
                Grouping group = groupingsQueue.remove();
                group.releaseGroups();
            }
        }
        finally {
            this.inLoop = false;
        }
        if (this.requested > 0) {
            this.requested = 0;
            this.downstream().end();
        }
    }

    private ArrayDeque<Grouping> groupingsQueue() {
        return this.groupings.stream().filter(g -> !g.isEmpty()).collect(Collectors.toCollection(ArrayDeque::new));
    }

    private class Grouping {
        private final byte grpId;
        private final ImmutableBitSet grpFields;
        private final RowHashTable<GroupKey, GroupState> groups;
        private final RowSet<GroupKey> distinctSet;
        private final AggregateRow<RowT> aggRow;

        private Grouping(byte grpId, ImmutableBitSet grpFields) {
            this.grpId = grpId;
            this.grpFields = grpFields;
            this.aggRow = new AggregateRow(grpId, HashAggregateNode.this.allFields, HashAggregateNode.this.type, HashAggregateNode.this.accs);
            RowSchema.Builder keySchemaBuilder = RowSchema.builder();
            Iterator iterator = grpFields.asList().iterator();
            while (iterator.hasNext()) {
                int k = (Integer)iterator.next();
                keySchemaBuilder.addField(HashAggregateNode.this.inputRowSchema.fields().get(k));
            }
            RowSchema keySchema = keySchemaBuilder.build();
            HashAggregateNodeKeyValueCodec keyValueCodec = new HashAggregateNodeKeyValueCodec(keySchema, grpFields.cardinality(), HashAggregateNode.this.accs);
            this.groups = HashAggregateNode.this.context().storageFactory().hashTable(keyValueCodec);
            this.distinctSet = HashAggregateNode.this.context().storageFactory().hashSet(new DistinctSetElementCodec(HashAggregateNode.this.inputRowSchema, grpFields, HashAggregateNode.this.accs));
        }

        private void init() {
            if (AggregateRow.addEmptyGroup(this.grpFields, HashAggregateNode.this.type)) {
                this.groups.put(GroupKey.EMPTY_GRP_KEY, this.create(GroupKey.EMPTY_GRP_KEY));
            }
        }

        private void reset() {
            this.releaseGroups();
            this.groups.clear();
            this.distinctSet.clear();
            this.init();
        }

        private void add(RowT row) {
            RowHandler handler = HashAggregateNode.this.context().rowHandler();
            if (!AggregateRow.groupMatches(handler, row, HashAggregateNode.this.type, this.grpId)) {
                return;
            }
            GroupKey.Builder b = GroupKey.builder(this.grpFields.cardinality());
            Iterator iterator = this.grpFields.iterator();
            while (iterator.hasNext()) {
                int field = (Integer)iterator.next();
                b.add(handler.get(field, row));
            }
            GroupKey grpKey = b.build();
            GroupState groupState = this.addGroup(grpKey);
            AccumulatorsState state = groupState.state();
            this.aggRow.updateSingleDistinctSet(this.grpId, grpKey, state, this.grpFields, handler, row, this.distinctSet, HashAggregateNode.this.context().memoryContext());
            this.groups.put(grpKey, groupState);
        }

        private List<RowT> getRows(int cnt) {
            Iterator<Map.Entry<GroupKey, GroupState>> it = this.groups.entryIterator();
            int rowNum = Math.min(cnt, this.groups.size());
            ArrayList res = new ArrayList(rowNum);
            for (int i = 0; i < rowNum; ++i) {
                Map.Entry<GroupKey, GroupState> entry = it.next();
                GroupKey grpKey = entry.getKey();
                GroupState groupRow = entry.getValue();
                Object[] fields = this.aggRow.createOutput();
                int j = 0;
                int k = 0;
                Iterator iterator = HashAggregateNode.this.allFields.iterator();
                while (iterator.hasNext()) {
                    int field = (Integer)iterator.next();
                    fields[j++] = this.grpFields.get(field) ? grpKey.field(k++) : null;
                }
                AccumulatorsState state = groupRow.state();
                this.aggRow.writeTo(state, this.grpFields, fields);
                Object row = HashAggregateNode.this.rowFactory.create(fields);
                res.add(row);
                it.remove();
            }
            return res;
        }

        private GroupState addGroup(GroupKey grpKey) {
            return this.groups.computeIfAbsent(grpKey, this::create);
        }

        private void releaseGroups() {
            this.distinctSet.clear();
        }

        private GroupState create(GroupKey key) {
            AccumulatorsState state = new AccumulatorsState(HashAggregateNode.this.accs.size());
            return new GroupState(state);
        }

        private boolean isEmpty() {
            return this.groups.isEmpty();
        }
    }
}

