/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.sql.engine.exec.memory.structures.offload;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactoryFactory;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.memory.MemoryContext;
import org.apache.ignite.internal.sql.engine.exec.memory.NoOpMemoryContext;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.KeyValueCodec;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.RowHashJoinIndex;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.RowHashTable;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.RowList;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.RowQueue;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.RowSet;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.RowStorageFactory;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.SingleNativeTypeValueCodec;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.file.DataDirectory;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.file.FileIoTracker;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.file.FileRowStorageFactoryImpl;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.file.NoOpFileIoTracker;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.inmemory.RowQueueColumnlessImpl;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.inmemory.RowStorageFactoryImpl;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.offload.OffloadAwareCollection;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.offload.OffloadAwareHashTable;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.offload.OffloadAwareListAdapter;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.offload.OffloadAwareQueueAdapter;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.offload.OffloadAwareSetAdapter;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.offload.OffloadAwareSortedQueueAdapter;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.offload.OffloadHashJoinIndexAdapter;
import org.apache.ignite.internal.sql.engine.exec.memory.structures.offload.QueryFragmentOffloadingEvent;
import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.type.StructNativeType;

public class OffloadAwareStorageFactoryImpl<RowT>
implements RowStorageFactory<RowT> {
    private final RowStorageFactory<RowT> inMemoryStorageFactory;
    private final RowStorageFactory<RowT> fileStorageFactory;
    private final MemoryContext<RowT> memoryContext;
    private final List<ManuallyCloseable> managedStructures = new ArrayList<ManuallyCloseable>();
    private final Consumer<QueryFragmentOffloadingEvent> offloadingEventsListener;

    public OffloadAwareStorageFactoryImpl(DataDirectory workDir, MemoryContext<RowT> memoryContext) {
        this(workDir, memoryContext, NoOpFileIoTracker.instance(), ignore -> {});
    }

    public OffloadAwareStorageFactoryImpl(DataDirectory workDir, MemoryContext<RowT> memoryContext, FileIoTracker fileIoTracker, Consumer<QueryFragmentOffloadingEvent> offloadingEventsListener) {
        this.memoryContext = memoryContext;
        this.offloadingEventsListener = offloadingEventsListener;
        this.fileStorageFactory = new FileRowStorageFactoryImpl(workDir, fileIoTracker);
        this.inMemoryStorageFactory = new RowStorageFactoryImpl<RowT>(NoOpMemoryContext.instance()){

            @Override
            protected synchronized <T extends ManuallyCloseable> T register(T dataStructure) {
                return dataStructure;
            }
        };
    }

    @Override
    public RowList<RowT> list(RowHandler<RowT> rowHandler, RowFactory<RowT> rowFactory) {
        RowList<RowT> inMemoryList = this.inMemoryStorageFactory.list(rowHandler, rowFactory);
        Supplier listSupplier = () -> this.fileStorageFactory.list(rowHandler, rowFactory);
        return this.register(new OffloadAwareListAdapter<RowT>(this.memoryContext, inMemoryList, this::spillToDisk, listSupplier));
    }

    @Override
    public RowList<RowT> list(RowHandler<RowT> rowHandler, RowFactory<RowT> rowFactory, int initialCapacity) {
        RowList<RowT> inMemoryList = this.inMemoryStorageFactory.list(rowHandler, rowFactory, initialCapacity);
        Supplier listSupplier = () -> this.fileStorageFactory.list(rowHandler, rowFactory);
        return this.register(new OffloadAwareListAdapter<RowT>(this.memoryContext, inMemoryList, this::spillToDisk, listSupplier));
    }

    @Override
    public RowQueue<RowT> queue(RowHandler<RowT> rowHandler, RowFactory<RowT> rowFactory, int initialCapacity) {
        boolean columnlessStore;
        boolean bl = columnlessStore = rowFactory.rowSchema().fieldsCount() == 0;
        if (columnlessStore) {
            return new RowQueueColumnlessImpl();
        }
        RowQueue<RowT> queue = this.inMemoryStorageFactory.queue(rowHandler, rowFactory, initialCapacity);
        Supplier queueSupplier = () -> this.fileStorageFactory.queue(rowHandler, rowFactory, initialCapacity);
        return this.register(new OffloadAwareQueueAdapter<RowT>(this.memoryContext, queue, this::spillToDisk, queueSupplier));
    }

    @Override
    public RowQueue<RowT> priorityQueue(RowHandler<RowT> rowHandler, RowFactory<RowT> rowFactory, Comparator<RowT> valueComparator, Comparator<ByteBuffer> tupleComp) {
        RowQueue<RowT> inMemoryQueue = this.inMemoryStorageFactory.priorityQueue(rowHandler, rowFactory, valueComparator, tupleComp);
        Supplier queueSupplier = () -> this.fileStorageFactory.priorityQueue(rowHandler, rowFactory, valueComparator, tupleComp);
        return this.register(new OffloadAwareSortedQueueAdapter<RowT>(this.memoryContext, inMemoryQueue, this::spillToDisk, queueSupplier));
    }

    @Override
    public RowQueue<RowT> boundedPriorityQueue(RowHandler<RowT> rowHandler, RowFactory<RowT> rowFactory, int capacity, Comparator<RowT> valueComparator, Comparator<ByteBuffer> tupleComp) {
        RowQueue<RowT> inMemoryQueue = this.inMemoryStorageFactory.boundedPriorityQueue(rowHandler, rowFactory, capacity, valueComparator, tupleComp);
        Supplier queueSupplier = () -> this.fileStorageFactory.boundedPriorityQueue(rowHandler, rowFactory, capacity, valueComparator, tupleComp);
        return this.register(new OffloadAwareSortedQueueAdapter<RowT>(this.memoryContext, inMemoryQueue, this::spillToDisk, queueSupplier));
    }

    @Override
    public <K, V> RowHashTable<K, V> hashTable(KeyValueCodec<K, V> keyValueCodec) {
        Supplier fileHashTable = () -> this.fileStorageFactory.hashTable(keyValueCodec);
        RowHashTable<K, V> inMemoryHashTable = this.inMemoryStorageFactory.hashTable(keyValueCodec);
        OffloadAwareHashTable<RowT, K, V> dataStructure = new OffloadAwareHashTable<RowT, K, V>(inMemoryHashTable, fileHashTable, this.memoryContext, this::spillToDisk);
        return this.register(dataStructure);
    }

    @Override
    public <E> RowSet<E> hashSet(Class<E> elementClass, NativeType elementType) {
        return this.hashSet(new SingleNativeTypeValueCodec<E>(elementClass, elementType));
    }

    @Override
    public <E> RowSet<E> hashSet(KeyValueCodec<E, Void> codec) {
        RowSet<E> queue = this.inMemoryStorageFactory.hashSet(codec);
        Supplier setSupplier = () -> this.fileStorageFactory.hashSet(codec);
        OffloadAwareSetAdapter<RowT, E> dataStructure = new OffloadAwareSetAdapter<RowT, E>(this.memoryContext, queue, setSupplier, this::spillToDisk);
        return this.register(dataStructure);
    }

    @Override
    public RowHashJoinIndex<RowT, RowT> hashJoinIndex(RowFactoryFactory<RowT> rowFactory, RowHandler<RowT> rowHandler, StructNativeType rowType, int[] keyFields) {
        RowHashJoinIndex<RowT, RowT> inMemoryStructure = this.inMemoryStorageFactory.hashJoinIndex(rowFactory, rowHandler, rowType, keyFields);
        Supplier joinTableSupplier = () -> this.fileStorageFactory.hashJoinIndex(rowFactory, rowHandler, rowType, keyFields);
        return this.register(new OffloadHashJoinIndexAdapter<RowT, RowT>(this.memoryContext, inMemoryStructure, joinTableSupplier, this::spillToDisk));
    }

    @Override
    public synchronized void spillToDisk() {
        boolean spilled = false;
        for (ManuallyCloseable structure : this.managedStructures) {
            if (!(structure instanceof OffloadAwareCollection)) continue;
            OffloadAwareCollection col = (OffloadAwareCollection)structure;
            col.onSpillToDisk();
            spilled = true;
        }
        if (spilled) {
            this.offloadingEventsListener.accept(QueryFragmentOffloadingEvent.SPILLING);
        }
    }

    @Override
    public synchronized void close() throws Exception {
        for (ManuallyCloseable structure : this.managedStructures) {
            structure.close();
        }
        this.fileStorageFactory.close();
        this.managedStructures.clear();
        this.offloadingEventsListener.accept(QueryFragmentOffloadingEvent.CLOSE);
    }

    protected synchronized <T extends ManuallyCloseable> T register(T dataStructure) {
        this.managedStructures.add(dataStructure);
        return dataStructure;
    }
}

