/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.cdc.api.sink;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.Tuple;
import org.apache.parquet.schema.MessageType;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkAlreadyInitializedInternalException;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkCloseInternalException;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkFlushInternalException;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkInitException;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkNotInitializedInternalException;
import org.gridgain.internal.cdc.api.exception.sink.CdcSinkWriteBatchInternalException;
import org.gridgain.internal.cdc.api.sink.IcebergUtils;
import org.gridgain.internal.cdc.api.sink.SinkDefinition;
import org.gridgain.internal.cdc.api.sink.TableSink;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class IcebergTableSink
implements TableSink {
    static final String DEFAULT_CATALOG_IMPL = "org.apache.iceberg.hadoop.HadoopCatalog";
    private volatile boolean initialized = false;
    private final Lock initGuard = new ReentrantLock();
    private final Lock dataWriterGuard = new ReentrantLock();
    @Nullable
    private DataWriter<Record> dataWriter;
    @Nullable
    private Transaction runningTx;
    private CloseableGroup closeableGroup;
    private GenericRecord rec;
    private FileIO tableIo;
    private String sinkName;
    private Table table;
    private Schema schema;
    private List<String> keyColumnNames;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void init(QualifiedName tableName, List<Column> columns, SinkDefinition sinkDefinition) {
        if (this.initialized) {
            throw new CdcSinkAlreadyInitializedInternalException(IgniteStringFormatter.format((String)"Sink {} is already initialized", (Object[])new Object[]{this.sinkName}));
        }
        this.initGuard.lock();
        try {
            if (this.initialized) {
                throw new CdcSinkAlreadyInitializedInternalException(IgniteStringFormatter.format((String)"Sink {} is already initialized", (Object[])new Object[]{this.sinkName}));
            }
            this.initGuarded(tableName, columns, sinkDefinition);
            this.initialized = true;
        }
        finally {
            this.initGuard.unlock();
        }
    }

    static void closeCatalog(Catalog catalog) throws IOException {
        if (catalog instanceof Closeable) {
            ((Closeable)catalog).close();
        }
    }

    static Catalog createCatalog(SinkDefinition definition) {
        Map props = definition.parameters().parameters();
        String catalogImpl = (String)props.get("catalog-impl");
        catalogImpl = catalogImpl == null ? DEFAULT_CATALOG_IMPL : catalogImpl;
        props.put("catalog-impl", catalogImpl);
        Configuration conf = IcebergUtils.combineConfiguration(props);
        return CatalogUtil.buildIcebergCatalog((String)catalogImpl, (Map)props, (Object)conf);
    }

    private void initGuarded(QualifiedName tableName, List<Column> columns, SinkDefinition sinkDefinition) {
        this.sinkName = sinkDefinition.name();
        HashMap<String, String> props = new HashMap<String, String>(sinkDefinition.parameters().parameters());
        props.computeIfAbsent("client.region", k -> (String)props.get("s3.client-region"));
        String catalogImpl = (String)props.get("catalog-impl");
        catalogImpl = catalogImpl == null ? DEFAULT_CATALOG_IMPL : catalogImpl;
        props.put("catalog-impl", catalogImpl);
        Configuration conf = IcebergUtils.combineConfiguration(props);
        Catalog catalog = CatalogUtil.buildIcebergCatalog((String)catalogImpl, props, (Object)conf);
        this.closeableGroup = new CloseableGroup();
        this.closeableGroup.setSuppressCloseFailure(true);
        if (catalog instanceof Closeable) {
            this.closeableGroup.addCloseable((Closeable)catalog);
        }
        TableIdentifier name = TableIdentifier.of((String[])new String[]{tableName.schemaName(), tableName.objectName()});
        this.schema = IcebergUtils.asIcebergSchema(columns);
        this.keyColumnNames = this.extractKeyColumns(columns);
        this.rec = GenericRecord.create((Schema)this.schema);
        this.table = this.loadTableOrThrow(catalog, name, sinkDefinition.createTableIfNotExists(), props);
        this.tableIo = this.table.io();
        this.closeableGroup.addCloseable((Closeable)this.tableIo);
    }

    private List<String> extractKeyColumns(List<Column> columns) {
        ArrayList<String> result = new ArrayList<String>();
        for (Column col : columns) {
            if (-1 == col.positionInKey()) continue;
            result.add(col.name());
        }
        return result;
    }

    private Table loadTableOrThrow(Catalog catalog, TableIdentifier name, boolean createIfNotExists, Map<String, String> props) {
        try {
            catalog.invalidateTable(name);
            return catalog.loadTable(name);
        }
        catch (Throwable e) {
            if (e instanceof NoSuchTableException) {
                if (createIfNotExists) {
                    return catalog.createTable(name, this.schema, PartitionSpec.unpartitioned(), props);
                }
                throw new CdcSinkInitException(IgniteStringFormatter.format((String)"Table {} does not exist and `createIfNotExists` is false. If you want to create the table, set `createIfNotExists` to true.", (Object[])new Object[]{name}));
            }
            if (catalog instanceof Closeable) {
                try {
                    ((Closeable)catalog).close();
                }
                catch (IOException ex) {
                    throw new CdcSinkInitException((Throwable)ex);
                }
            }
            throw new CdcSinkInitException(e);
        }
    }

    public void writeBatch(List<Tuple> batch) {
        this.ensureInitialized();
        this.dataWriterGuard.lock();
        try {
            this.ensureNotClosedGuarded();
            this.initWriterAndTxIfNot();
            this.dataWriter.write(this.toRecords(batch));
        }
        catch (Exception e) {
            throw new CdcSinkWriteBatchInternalException((Throwable)e);
        }
        finally {
            this.dataWriterGuard.unlock();
        }
    }

    public void writeKvBatch(List<Map.Entry<Tuple, Tuple>> kvBatch) {
        this.ensureInitialized();
        if (kvBatch.isEmpty()) {
            return;
        }
        this.dataWriterGuard.lock();
        try {
            this.ensureNotClosedGuarded();
            this.initWriterAndTxIfNot();
            this.dataWriter.write(this.toRecordsKv(kvBatch));
        }
        catch (Exception e) {
            throw new CdcSinkWriteBatchInternalException((Throwable)e);
        }
        finally {
            this.dataWriterGuard.unlock();
        }
    }

    public void updateKvBatch(List<Map.Entry<Tuple, Tuple>> kvBatch) {
        this.ensureInitialized();
        if (kvBatch.isEmpty()) {
            return;
        }
        this.dataWriterGuard.lock();
        try {
            this.ensureNotClosedGuarded();
            this.initWriterAndTxIfNot();
            List<Tuple> keyTuples = kvBatch.stream().map(Map.Entry::getKey).collect(Collectors.toList());
            Expression removeExpr = this.buildExpression(keyTuples);
            this.runningTx.newDelete().caseSensitive(false).deleteFromRowFilter(removeExpr).commit();
            this.dataWriter.write(this.toRecordsKv(kvBatch));
        }
        catch (Exception e) {
            throw new CdcSinkWriteBatchInternalException((Throwable)e);
        }
        finally {
            this.dataWriterGuard.unlock();
        }
    }

    private void initWriterAndTxIfNot() {
        if (this.dataWriter == null) {
            try {
                OutputFile outputFile = this.tableIo.newOutputFile(this.outputFileName());
                this.dataWriter = Parquet.writeData((OutputFile)outputFile).schema(this.schema).createWriterFunc(messageType -> GenericParquetWriter.create((Schema)this.schema, (MessageType)messageType)).overwrite().withSpec(PartitionSpec.unpartitioned()).build();
            }
            catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        }
        if (this.runningTx == null) {
            this.runningTx = this.table.newTransaction();
        }
    }

    @NotNull
    private List<Record> toRecordsKv(List<Map.Entry<Tuple, Tuple>> kvBatch) {
        return kvBatch.stream().map(e -> IcebergUtils.asRecord((Tuple)e.getKey(), (Tuple)e.getValue(), this.rec, this.schema)).collect(Collectors.toList());
    }

    @NotNull
    private List<Record> toRecords(List<Tuple> kvBatch) {
        return kvBatch.stream().map(e -> IcebergUtils.asRecord(e, this.rec, this.schema)).collect(Collectors.toList());
    }

    public void removeBatch(List<Tuple> entriesToDelete) {
        this.ensureInitialized();
        if (entriesToDelete.isEmpty()) {
            return;
        }
        this.dataWriterGuard.lock();
        try {
            this.ensureNotClosedGuarded();
            Expression expr = this.buildExpression(entriesToDelete);
            this.table.newDelete().caseSensitive(false).deleteFromRowFilter(expr).commit();
        }
        catch (Exception e) {
            throw new CdcSinkWriteBatchInternalException((Throwable)e);
        }
        finally {
            this.dataWriterGuard.unlock();
        }
    }

    private Expression buildExpression(List<Tuple> entriesToDelete) {
        HashMap<String, List> keyColumnToValues = new HashMap<String, List>();
        for (Tuple tup : entriesToDelete) {
            for (String colName : this.keyColumnNames) {
                if (tup.value(colName) == null) {
                    throw new CdcSinkWriteBatchInternalException(IgniteStringFormatter.format((String)"Key column {} contains null value. Full key columns list: {}", (Object[])new Object[]{colName, this.keyColumnNames}));
                }
                keyColumnToValues.computeIfAbsent(colName, k -> new ArrayList()).add(tup.value(colName));
            }
        }
        ArrayList<UnboundPredicate> inExpressions = new ArrayList<UnboundPredicate>();
        for (Map.Entry e : keyColumnToValues.entrySet()) {
            inExpressions.add(Expressions.in((String)((String)e.getKey()), (Iterable)((Iterable)e.getValue())));
        }
        return (Expression)inExpressions.stream().reduce(Expressions::and).orElse(Expressions.alwaysFalse());
    }

    private void ensureInitialized() {
        if (!this.initialized) {
            throw new CdcSinkNotInitializedInternalException(IgniteStringFormatter.format((String)"Sink {} is not initialized", (Object[])new Object[]{this.sinkName}));
        }
    }

    private void ensureNotClosedGuarded() {
        if (this.closeableGroup == null) {
            throw new CdcSinkCloseInternalException(IgniteStringFormatter.format((String)"Sink {} is closed", (Object[])new Object[]{this.sinkName}));
        }
    }

    public void flush() {
        this.ensureInitialized();
        this.dataWriterGuard.lock();
        try {
            this.ensureNotClosedGuarded();
            this.flushGuarded();
        }
        catch (IOException e) {
            throw new CdcSinkFlushInternalException((Throwable)e);
        }
        finally {
            this.dataWriterGuard.unlock();
        }
    }

    private void flushGuarded() throws IOException {
        if (this.dataWriter == null || this.runningTx == null) {
            return;
        }
        try {
            this.dataWriter.close();
            this.runningTx.newAppend().appendFile(this.dataWriter.toDataFile()).commit();
            this.dataWriter = null;
            this.runningTx.commitTransaction();
        }
        finally {
            this.runningTx = null;
        }
    }

    public void close() {
        this.ensureInitialized();
        this.dataWriterGuard.lock();
        try {
            this.ensureNotClosedGuarded();
            this.flushGuarded();
            this.closeableGroup.close();
            this.closeableGroup = null;
        }
        catch (IOException e) {
            throw new CdcSinkFlushInternalException((Throwable)e);
        }
        finally {
            this.dataWriterGuard.unlock();
        }
    }

    private String outputFileName() {
        String fileName = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss.SSS").withZone(ZoneOffset.UTC).format(Instant.now()) + "-UTC.parquet";
        return this.table.location() + "/data/" + fileName;
    }
}

