/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.cdc.api.sink;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.schema.Column;
import org.apache.ignite3.table.QualifiedName;
import org.apache.ignite3.table.TableRowEvent;
import org.apache.ignite3.table.Tuple;
import org.apache.parquet.schema.MessageType;
import org.gridgain.internal.cdc.api.exception.CdcInternalException;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkAlreadyInitializedInternalException;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkCloseInternalException;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkFlushInternalException;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkInitException;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkNotInitializedInternalException;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkWriteBatchInternalException;
import org.gridgain.internal.cdc.api.sink.IcebergUtils;
import org.gridgain.internal.cdc.api.sink.SinkDefinition;
import org.gridgain.internal.cdc.api.sink.TableSink;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class IcebergTableSink
implements TableSink {
    static final String DEFAULT_CATALOG_IMPL = "org.apache.iceberg.hadoop.HadoopCatalog";
    private volatile boolean initialized = false;
    private final Lock initGuard = new ReentrantLock();
    private final Lock dataWriterGuard = new ReentrantLock();
    @Nullable
    private DataWriter<Record> dataWriter;
    @Nullable
    private Transaction runningTx;
    private CloseableGroup closeableGroup;
    private GenericRecord rec;
    private FileIO tableIo;
    private String sinkName;
    private Table table;
    private Schema schema;
    private List<String> keyColumnNames;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void init(QualifiedName tableName, List<Column> columns, SinkDefinition sinkDefinition) {
        if (this.initialized) {
            throw new CdcSinkAlreadyInitializedInternalException(IgniteStringFormatter.format("Sink {} is already initialized", this.sinkName));
        }
        this.initGuard.lock();
        try {
            if (this.initialized) {
                throw new CdcSinkAlreadyInitializedInternalException(IgniteStringFormatter.format("Sink {} is already initialized", this.sinkName));
            }
            this.initGuarded(tableName, columns, sinkDefinition);
            this.initialized = true;
        }
        finally {
            this.initGuard.unlock();
        }
    }

    static void closeCatalog(Catalog catalog) throws IOException {
        if (catalog instanceof Closeable) {
            ((Closeable)catalog).close();
        }
    }

    static Catalog createCatalog(SinkDefinition definition) {
        Map<String, String> props = definition.parameters().parameters();
        String catalogImpl = props.get("catalog-impl");
        catalogImpl = catalogImpl == null ? DEFAULT_CATALOG_IMPL : catalogImpl;
        props.put("catalog-impl", catalogImpl);
        Configuration conf = IcebergUtils.combineConfiguration(props);
        return CatalogUtil.buildIcebergCatalog((String)catalogImpl, props, (Object)conf);
    }

    private void initGuarded(QualifiedName tableName, List<Column> columns, SinkDefinition sinkDefinition) {
        this.sinkName = sinkDefinition.name();
        HashMap<String, String> props = new HashMap<String, String>(sinkDefinition.parameters().parameters());
        props.computeIfAbsent("client.region", k -> (String)props.get("s3.client-region"));
        Catalog catalog = IcebergTableSink.createCatalog(SinkDefinition.builderFrom(sinkDefinition).parameters(props).build());
        this.closeableGroup = new CloseableGroup();
        this.closeableGroup.setSuppressCloseFailure(true);
        if (catalog instanceof Closeable) {
            this.closeableGroup.addCloseable((Closeable)catalog);
        }
        TableIdentifier name = TableIdentifier.of((String[])new String[]{tableName.schemaName(), tableName.objectName()});
        this.schema = IcebergUtils.asIcebergSchema(columns);
        this.keyColumnNames = this.extractKeyColumns(columns);
        this.rec = GenericRecord.create((Schema)this.schema);
        this.table = this.loadTableOrThrow(catalog, name, sinkDefinition.createTableIfNotExists(), props);
        this.tableIo = this.table.io();
        this.closeableGroup.addCloseable((Closeable)this.tableIo);
    }

    private List<String> extractKeyColumns(List<Column> columns) {
        ArrayList<String> result = new ArrayList<String>();
        for (Column col : columns) {
            if (-1 == col.positionInKey()) continue;
            result.add(col.name());
        }
        return result;
    }

    private Table loadTableOrThrow(Catalog catalog, TableIdentifier name, boolean createIfNotExists, Map<String, String> props) {
        try {
            catalog.invalidateTable(name);
            return catalog.loadTable(name);
        }
        catch (Throwable e) {
            if (e instanceof NoSuchTableException) {
                if (createIfNotExists) {
                    return catalog.createTable(name, this.schema, PartitionSpec.unpartitioned(), props);
                }
                throw new CdcSinkInitException(IgniteStringFormatter.format("Table {} does not exist and `createIfNotExists` is false. If you want to create the table, set `createIfNotExists` to true.", name));
            }
            if (catalog instanceof Closeable) {
                try {
                    ((Closeable)catalog).close();
                }
                catch (IOException ex) {
                    throw new CdcSinkInitException(ex);
                }
            }
            throw new CdcSinkInitException(e);
        }
    }

    @Override
    public void writeBatch(List<TableRowEvent<Map.Entry<Tuple, Tuple>>> events) {
        this.ensureInitialized();
        if (events.isEmpty()) {
            return;
        }
        this.dataWriterGuard.lock();
        try {
            this.ensureNotClosedGuarded();
            this.ensureWriterAndTxInitialized();
            if (this.keyColumnNames.isEmpty()) {
                this.processEventsWithoutKeyCollapsing(events);
                return;
            }
            this.processEvents(events);
        }
        catch (Exception e) {
            throw new CdcSinkWriteBatchInternalException(e);
        }
        finally {
            this.dataWriterGuard.unlock();
        }
    }

    private void processEvents(List<TableRowEvent<Map.Entry<Tuple, Tuple>>> events) {
        LinkedHashMap<Tuple, KeyState> keyStates = new LinkedHashMap<Tuple, KeyState>();
        block9: for (TableRowEvent<Map.Entry<Tuple, Tuple>> event : events) {
            Object key;
            switch (event.type()) {
                case CREATED: 
                case UPDATED: {
                    key = event.entry().getKey();
                    break;
                }
                case REMOVED: 
                case ARCHIVED: {
                    key = event.oldEntry().getKey();
                    break;
                }
                default: {
                    throw new IllegalStateException("Unexpected event type: " + event.type());
                }
            }
            KeyState state = keyStates.computeIfAbsent((Tuple)key, KeyState::new);
            switch (event.type()) {
                case CREATED: {
                    state.exists = true;
                    state.value = event.entry();
                    continue block9;
                }
                case UPDATED: {
                    state.needsDelete = true;
                    state.exists = true;
                    state.value = event.entry();
                    continue block9;
                }
                case REMOVED: 
                case ARCHIVED: {
                    state.exists = false;
                    state.value = null;
                    state.needsDelete = true;
                    continue block9;
                }
            }
            throw new IllegalStateException("Unexpected event type: " + event.type());
        }
        ArrayList<Tuple> keysToDelete = new ArrayList<Tuple>();
        ArrayList<Map.Entry<Tuple, Tuple>> entriesToWrite = new ArrayList<Map.Entry<Tuple, Tuple>>();
        for (KeyState state : keyStates.values()) {
            if (state.needsDelete) {
                keysToDelete.add(state.key);
            }
            if (!state.exists || state.value == null) continue;
            entriesToWrite.add(state.value);
        }
        if (!keysToDelete.isEmpty()) {
            Expression expr = this.buildExpression(keysToDelete);
            this.runningTx.newDelete().caseSensitive(false).deleteFromRowFilter(expr).commit();
        }
        if (!entriesToWrite.isEmpty()) {
            this.dataWriter.write(this.toRecordsKv(entriesToWrite));
        }
    }

    private void processEventsWithoutKeyCollapsing(List<TableRowEvent<Map.Entry<Tuple, Tuple>>> events) {
        try {
            ArrayList<GenericRecord> records = new ArrayList<GenericRecord>();
            block6: for (TableRowEvent<Map.Entry<Tuple, Tuple>> event : events) {
                switch (event.type()) {
                    case CREATED: 
                    case UPDATED: {
                        Tuple allColumnsTuple = event.entry().getKey();
                        records.add(IcebergUtils.asRecord(allColumnsTuple, this.rec, this.schema));
                        continue block6;
                    }
                    case REMOVED: 
                    case ARCHIVED: {
                        continue block6;
                    }
                }
                throw new IllegalStateException("Unexpected event type: " + event.type());
            }
            if (!records.isEmpty()) {
                this.dataWriter.write(records);
            }
        }
        catch (Exception e) {
            throw new CdcSinkWriteBatchInternalException(e);
        }
    }

    private void ensureWriterAndTxInitialized() {
        if (this.dataWriter == null) {
            try {
                OutputFile outputFile = this.tableIo.newOutputFile(this.outputFileName());
                this.dataWriter = Parquet.writeData((OutputFile)outputFile).schema(this.schema).createWriterFunc(messageType -> GenericParquetWriter.create((Schema)this.schema, (MessageType)messageType)).overwrite().withSpec(PartitionSpec.unpartitioned()).build();
            }
            catch (IOException ex) {
                throw new CdcInternalException(ex);
            }
        }
        if (this.runningTx == null) {
            this.runningTx = this.table.newTransaction();
        }
    }

    @NotNull
    private List<Record> toRecordsKv(List<Map.Entry<Tuple, Tuple>> kvBatch) {
        return kvBatch.stream().map(e -> IcebergUtils.asRecord((Tuple)e.getKey(), (Tuple)e.getValue(), this.rec, this.schema)).collect(Collectors.toList());
    }

    private Expression buildExpression(List<Tuple> entriesToDelete) {
        HashMap<String, List> keyColumnToValues = new HashMap<String, List>();
        for (Tuple tup : entriesToDelete) {
            for (String colName : this.keyColumnNames) {
                if (tup.value(colName) == null) {
                    throw new CdcSinkWriteBatchInternalException(IgniteStringFormatter.format("Key column {} contains null value. Full key columns list: {}", colName, this.keyColumnNames));
                }
                keyColumnToValues.computeIfAbsent(colName, k -> new ArrayList()).add(tup.value(colName));
            }
        }
        ArrayList<UnboundPredicate> inExpressions = new ArrayList<UnboundPredicate>();
        for (Map.Entry e : keyColumnToValues.entrySet()) {
            inExpressions.add(Expressions.in((String)((String)e.getKey()), (Iterable)((Iterable)e.getValue())));
        }
        return (Expression)inExpressions.stream().reduce(Expressions::and).orElse(Expressions.alwaysFalse());
    }

    private void ensureInitialized() {
        if (!this.initialized) {
            throw new CdcSinkNotInitializedInternalException(IgniteStringFormatter.format("Sink {} is not initialized", this.sinkName));
        }
    }

    private void ensureNotClosedGuarded() {
        if (this.closeableGroup == null) {
            throw new CdcSinkCloseInternalException(IgniteStringFormatter.format("Sink {} is closed", this.sinkName));
        }
    }

    @Override
    public void flush() {
        this.ensureInitialized();
        this.dataWriterGuard.lock();
        try {
            this.ensureNotClosedGuarded();
            this.flushGuarded();
        }
        catch (IOException e) {
            throw new CdcSinkFlushInternalException(e);
        }
        finally {
            this.dataWriterGuard.unlock();
        }
    }

    private void flushGuarded() throws IOException {
        if (this.dataWriter == null || this.runningTx == null) {
            return;
        }
        try {
            this.dataWriter.close();
            this.runningTx.newAppend().appendFile(this.dataWriter.toDataFile()).commit();
            this.dataWriter = null;
            this.runningTx.commitTransaction();
        }
        finally {
            this.runningTx = null;
        }
    }

    @Override
    public void close() {
        this.ensureInitialized();
        this.dataWriterGuard.lock();
        try {
            this.ensureNotClosedGuarded();
            this.flushGuarded();
            this.closeableGroup.close();
            this.closeableGroup = null;
        }
        catch (IOException e) {
            throw new CdcSinkFlushInternalException(e);
        }
        finally {
            this.dataWriterGuard.unlock();
        }
    }

    private String outputFileName() {
        String fileName = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss.SSS").withZone(ZoneOffset.UTC).format(Instant.now()) + "-UTC.parquet";
        return this.table.location() + "/data/" + fileName;
    }

    private static class KeyState {
        final Tuple key;
        boolean exists;
        Map.Entry<Tuple, Tuple> value;
        boolean needsDelete;

        KeyState(Tuple key) {
            this.key = key;
            this.exists = false;
            this.value = null;
            this.needsDelete = false;
        }
    }
}

