/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.kafka.sink;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.binarytuple.inlineschema.TupleWithSchemaMarshalling;
import org.apache.ignite.internal.client.table.ClientColumn;
import org.apache.ignite.internal.client.table.ClientSchema;
import org.apache.ignite.lang.ColumnNotFoundException;
import org.apache.ignite.lang.util.IgniteNameUtils;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.table.Tuple;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.gridgain.kafka.sink.NestedStructMode;
import org.gridgain.kafka.sink.SinkRecordConvertOptions;
import org.gridgain.kafka.sink.SinkRecordTuple;
import org.gridgain.kafka.sink.UnmappedFieldPolicy;
import org.jetbrains.annotations.Nullable;

final class SinkRecordConverter {
    private static final String PRIMITIVE_COLUMN_NAME_KEY = "key";
    private static final String PRIMITIVE_COLUMN_NAME_VAL = "val";

    SinkRecordConverter() {
    }

    static Tuple convert(SinkRecord record, ClientSchema schema, SinkRecordConvertOptions options) {
        Tuple tuple = Tuple.create();
        if (!options.dropKey()) {
            SinkRecordConverter.populateTuple(tuple, record.key(), record.keySchema(), PRIMITIVE_COLUMN_NAME_KEY, schema, options, null, options.dropKeyFields());
        }
        if (record.value() != null && !options.dropVal()) {
            SinkRecordConverter.populateTuple(tuple, record.value(), record.valueSchema(), PRIMITIVE_COLUMN_NAME_VAL, schema, options, null, options.dropValFields());
        }
        return new SinkRecordTuple(tuple, record);
    }

    static Tuple convertForReceiver(SinkRecord record) {
        Tuple tuple = Tuple.create();
        if (record.key() != null) {
            tuple.set(PRIMITIVE_COLUMN_NAME_KEY, SinkRecordConverter.convertValueForReceiver(record.key(), record.keySchema()));
        }
        if (record.value() != null) {
            tuple.set(PRIMITIVE_COLUMN_NAME_VAL, SinkRecordConverter.convertValueForReceiver(record.value(), record.valueSchema()));
        }
        return tuple;
    }

    private static void populateTuple(Tuple tuple, Object obj, @Nullable Schema kafkaSchema, @Nullable String primitiveColumnName, ClientSchema igniteSchema, SinkRecordConvertOptions options, @Nullable String columnNamePrefix, Set<String> ignoredFields) {
        if (obj instanceof Map) {
            for (Map.Entry entry : ((Map)obj).entrySet()) {
                if (!(entry.getKey() instanceof String)) {
                    throw new IllegalArgumentException("Map key must be a string: " + entry.getKey());
                }
                String fieldName = (String)entry.getKey();
                Object fieldValue = entry.getValue();
                if (fieldValue instanceof Map || fieldValue instanceof Struct) {
                    if (options.nestedStructMode() == NestedStructMode.IGNORE) continue;
                    String columnNamePrefix0 = SinkRecordConverter.getColumnNamePrefix(options, columnNamePrefix, fieldName);
                    @Nullable Schema fieldSchema = SinkRecordConverter.getFieldSchema(kafkaSchema, fieldName);
                    SinkRecordConverter.populateTuple(tuple, fieldValue, fieldSchema, null, igniteSchema, options, columnNamePrefix0, ignoredFields);
                    continue;
                }
                Object columnNameRaw = columnNamePrefix != null ? columnNamePrefix + fieldName : fieldName;
                if (ignoredFields.contains(columnNameRaw)) continue;
                SinkRecordConverter.setColumnValue(tuple, (String)columnNameRaw, igniteSchema, fieldValue, options.unmappedFieldPolicy());
            }
            return;
        }
        if (obj instanceof Struct) {
            Struct struct = (Struct)obj;
            for (Field field : struct.schema().fields()) {
                String fieldName = field.name();
                Object fieldValue = struct.get(fieldName);
                if (field.schema().type() == Schema.Type.MAP || field.schema().type() == Schema.Type.STRUCT) {
                    if (options.nestedStructMode() == NestedStructMode.IGNORE) continue;
                    String columnNamePrefix0 = SinkRecordConverter.getColumnNamePrefix(options, columnNamePrefix, fieldName);
                    SinkRecordConverter.populateTuple(tuple, fieldValue, field.schema(), null, igniteSchema, options, columnNamePrefix0, ignoredFields);
                    continue;
                }
                Object columnNameRaw = columnNamePrefix != null ? columnNamePrefix + fieldName : fieldName;
                if (ignoredFields.contains(columnNameRaw)) continue;
                SinkRecordConverter.setColumnValue(tuple, (String)columnNameRaw, igniteSchema, fieldValue, options.unmappedFieldPolicy());
            }
            return;
        }
        if (kafkaSchema == null || kafkaSchema.type().isPrimitive() || kafkaSchema.type() == Schema.Type.ARRAY) {
            if (primitiveColumnName == null) {
                throw new IllegalArgumentException("Primitive column name not provided - unexpected schema.");
            }
            SinkRecordConverter.setColumnValue(tuple, primitiveColumnName, igniteSchema, obj, options.unmappedFieldPolicy());
            return;
        }
        throw new IllegalArgumentException("Unsupported schema type: " + kafkaSchema.type());
    }

    private static Object convertValue(Object val, ColumnType type) {
        switch (type) {
            case INT8: {
                if (val instanceof Long) {
                    return ((Long)val).byteValue();
                }
                if (val instanceof Integer) {
                    return ((Integer)val).byteValue();
                }
                if (val instanceof Short) {
                    return ((Short)val).byteValue();
                }
                return val;
            }
            case INT16: {
                if (val instanceof Long) {
                    return ((Long)val).shortValue();
                }
                if (val instanceof Integer) {
                    return ((Integer)val).shortValue();
                }
                if (val instanceof Byte) {
                    return ((Byte)val).shortValue();
                }
                return val;
            }
            case INT32: {
                if (val instanceof Long) {
                    return ((Long)val).intValue();
                }
                if (val instanceof Byte) {
                    return ((Byte)val).intValue();
                }
                if (val instanceof Short) {
                    return ((Short)val).intValue();
                }
                return val;
            }
            case FLOAT: {
                if (val instanceof Double) {
                    return Float.valueOf(((Double)val).floatValue());
                }
                return val;
            }
            case DOUBLE: {
                if (val instanceof Float) {
                    return ((Float)val).doubleValue();
                }
                return val;
            }
            case DATE: {
                return SinkRecordConverter.convertDate(val);
            }
            case TIME: {
                return SinkRecordConverter.convertTime(val);
            }
            case DATETIME: {
                return SinkRecordConverter.convertDateTime(val);
            }
            case TIMESTAMP: {
                return SinkRecordConverter.convertTimestamp(val);
            }
            case UUID: {
                if (val instanceof String) {
                    return UUID.fromString((String)val);
                }
                return val;
            }
            case BYTE_ARRAY: {
                return SinkRecordConverter.toByteArray(val);
            }
            case DECIMAL: {
                if (val instanceof String) {
                    return new BigDecimal((String)val);
                }
                if (val instanceof Double) {
                    return BigDecimal.valueOf((Double)val);
                }
                if (val instanceof Float) {
                    return BigDecimal.valueOf(((Float)val).floatValue());
                }
                if (val instanceof Long) {
                    return BigDecimal.valueOf((Long)val);
                }
                return val;
            }
        }
        return val;
    }

    private static int getInt(Object val) {
        if (val instanceof Byte) {
            return ((Byte)val).intValue();
        }
        if (val instanceof Short) {
            return ((Short)val).intValue();
        }
        if (val instanceof Integer) {
            return (Integer)val;
        }
        if (val instanceof Long) {
            return ((Long)val).intValue();
        }
        throw new IllegalArgumentException("Unsupported value type: " + val.getClass());
    }

    private static void setColumnValue(Tuple tuple, String rawColumnName, ClientSchema igniteSchema, Object value, UnmappedFieldPolicy unmappedFieldPolicy) {
        String columnName = IgniteNameUtils.parseIdentifier((String)rawColumnName);
        ClientColumn igniteColumn = igniteSchema.columnSafe(columnName);
        if (igniteColumn == null) {
            switch (unmappedFieldPolicy) {
                case IGNORE: {
                    return;
                }
                case FAIL: {
                    throw new ConnectException("Column not found in GridGain for Kafka field '" + rawColumnName + "'", (Throwable)new ColumnNotFoundException(columnName));
                }
            }
            throw new IllegalArgumentException("Unsupported unmapped field policy: " + unmappedFieldPolicy);
        }
        Object convertedValue = SinkRecordConverter.convertValue(value, igniteColumn.type());
        tuple.set(rawColumnName, convertedValue);
    }

    private static String getColumnNamePrefix(SinkRecordConvertOptions options, @Nullable String columnNamePrefix, String fieldName) {
        switch (options.nestedStructMode()) {
            case CONCAT: {
                return columnNamePrefix != null ? columnNamePrefix + fieldName + options.nestedStructConcatSeparator() : fieldName + options.nestedStructConcatSeparator();
            }
            case FLATTEN: {
                return "";
            }
            case IGNORE: {
                throw new IllegalArgumentException("IGNORE must be handled by the caller");
            }
            case DISALLOW: {
                throw new IllegalArgumentException("Nested structures are not allowed by configuration (nested.struct.mode=DISALLOW): " + fieldName);
            }
        }
        throw new IllegalArgumentException("Unsupported nested struct mode: " + options.nestedStructMode());
    }

    @Nullable
    private static Schema getFieldSchema(@Nullable Schema kafkaSchema, String fieldName) {
        if (kafkaSchema == null || kafkaSchema.type() != Schema.Type.STRUCT) {
            return null;
        }
        Field field = kafkaSchema.field(fieldName);
        return field != null ? field.schema() : null;
    }

    @Nullable
    private static Object convertValueForReceiver(Object val, @Nullable Schema kafkaSchema) {
        if (val == null) {
            return null;
        }
        if (val instanceof ByteBuffer) {
            return SinkRecordConverter.byteBufferToByteArray((ByteBuffer)val);
        }
        if (kafkaSchema == null) {
            if (val instanceof HashMap) {
                Tuple res = Tuple.create();
                HashMap map = (HashMap)val;
                for (Map.Entry entry : map.entrySet()) {
                    res.set(SinkRecordConverter.toValidTupleFieldName(entry.getKey().toString()), SinkRecordConverter.convertValueForReceiver(entry.getValue(), null));
                }
                return res;
            }
            if (val instanceof Date) {
                return ((Date)val).toInstant();
            }
            if (val instanceof List) {
                return SinkRecordConverter.listToTuple((List)val, null);
            }
            return SinkRecordConverter.toStringIfUnknownType(val);
        }
        if ("org.apache.kafka.connect.data.Timestamp".equals(kafkaSchema.name())) {
            return SinkRecordConverter.toStringIfUnknownType(SinkRecordConverter.convertTimestamp(val));
        }
        if ("org.apache.kafka.connect.data.Time".equals(kafkaSchema.name())) {
            return SinkRecordConverter.toStringIfUnknownType(SinkRecordConverter.convertTime(val));
        }
        if ("org.apache.kafka.connect.data.Date".equals(kafkaSchema.name())) {
            return SinkRecordConverter.toStringIfUnknownType(SinkRecordConverter.convertDate(val));
        }
        switch (kafkaSchema.type()) {
            case INT8: 
            case INT16: 
            case INT32: 
            case INT64: 
            case FLOAT32: 
            case FLOAT64: 
            case BOOLEAN: 
            case STRING: {
                return val;
            }
            case BYTES: {
                return SinkRecordConverter.toByteArray(val);
            }
            case ARRAY: {
                return SinkRecordConverter.listToTuple((List)val, kafkaSchema.valueSchema());
            }
            case MAP: {
                Tuple res = Tuple.create();
                Map map = (Map)val;
                for (Map.Entry entry : map.entrySet()) {
                    res.set(SinkRecordConverter.toValidTupleFieldName(entry.getKey().toString()), SinkRecordConverter.convertValueForReceiver(entry.getValue(), null));
                }
                return res;
            }
            case STRUCT: {
                Struct struct = (Struct)val;
                Tuple res = Tuple.create();
                for (Field field : struct.schema().fields()) {
                    res.set(SinkRecordConverter.toValidTupleFieldName(field.name()), SinkRecordConverter.convertValueForReceiver(struct.get(field), field.schema()));
                }
                return res;
            }
        }
        return SinkRecordConverter.toStringIfUnknownType(val);
    }

    private static Object toByteArray(Object val) {
        if (val instanceof ArrayList) {
            ArrayList bytes = (ArrayList)val;
            byte[] arr = new byte[bytes.size()];
            for (int i = 0; i < bytes.size(); ++i) {
                arr[i] = (Byte)bytes.get(i);
            }
            return arr;
        }
        if (val instanceof String) {
            return Base64.getDecoder().decode((String)val);
        }
        if (val instanceof ByteBuffer) {
            return SinkRecordConverter.byteBufferToByteArray((ByteBuffer)val);
        }
        return val;
    }

    private static byte[] byteBufferToByteArray(ByteBuffer buf) {
        int pos = buf.position();
        byte[] bytes = new byte[buf.remaining()];
        buf.get(bytes);
        buf.position(pos);
        return bytes;
    }

    private static Tuple listToTuple(List<?> val, @Nullable Schema valueSchema) {
        Tuple res = Tuple.create();
        for (int i = 0; i < val.size(); ++i) {
            Object item = val.get(i);
            res.set(SinkRecordConverter.toValidTupleFieldName(Integer.toString(i)), SinkRecordConverter.convertValueForReceiver(item, valueSchema));
        }
        return res;
    }

    private static String toValidTupleFieldName(String fieldName) {
        return IgniteNameUtils.quoteIfNeeded((String)fieldName);
    }

    private static Object convertDateTime(Object val) {
        if (val instanceof ArrayList) {
            ArrayList dateTimeParts = (ArrayList)val;
            if (dateTimeParts.size() == 7) {
                return LocalDateTime.of(SinkRecordConverter.getInt(dateTimeParts.get(0)), SinkRecordConverter.getInt(dateTimeParts.get(1)), SinkRecordConverter.getInt(dateTimeParts.get(2)), SinkRecordConverter.getInt(dateTimeParts.get(3)), SinkRecordConverter.getInt(dateTimeParts.get(4)), SinkRecordConverter.getInt(dateTimeParts.get(5)), SinkRecordConverter.getInt(dateTimeParts.get(6)));
            }
            if (dateTimeParts.size() == 6) {
                return LocalDateTime.of(SinkRecordConverter.getInt(dateTimeParts.get(0)), SinkRecordConverter.getInt(dateTimeParts.get(1)), SinkRecordConverter.getInt(dateTimeParts.get(2)), SinkRecordConverter.getInt(dateTimeParts.get(3)), SinkRecordConverter.getInt(dateTimeParts.get(4)), SinkRecordConverter.getInt(dateTimeParts.get(5)));
            }
            if (dateTimeParts.size() == 5) {
                return LocalDateTime.of(SinkRecordConverter.getInt(dateTimeParts.get(0)), SinkRecordConverter.getInt(dateTimeParts.get(1)), SinkRecordConverter.getInt(dateTimeParts.get(2)), SinkRecordConverter.getInt(dateTimeParts.get(3)), SinkRecordConverter.getInt(dateTimeParts.get(4)));
            }
            throw new IllegalArgumentException("Unexpected DateTime format: " + dateTimeParts);
        }
        if (val instanceof String) {
            return LocalDateTime.parse((String)val);
        }
        if (val instanceof Long) {
            Instant instant = Instant.ofEpochMilli((Long)val);
            return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
        }
        return val;
    }

    private static Object convertTimestamp(Object val) {
        if (val instanceof Double) {
            Double v = (Double)val;
            long epochSecond = v.longValue();
            long nanoAdjustment = (long)((v - (double)epochSecond) * 1.0E9);
            return Instant.ofEpochSecond(epochSecond, nanoAdjustment);
        }
        if (val instanceof String) {
            return Instant.parse((String)val);
        }
        if (val instanceof Date) {
            return ((Date)val).toInstant();
        }
        return val;
    }

    private static Object convertTime(Object val) {
        if (val instanceof ArrayList) {
            ArrayList timeParts = (ArrayList)val;
            if (timeParts.size() == 4) {
                return LocalTime.of(SinkRecordConverter.getInt(timeParts.get(0)), SinkRecordConverter.getInt(timeParts.get(1)), SinkRecordConverter.getInt(timeParts.get(2)), SinkRecordConverter.getInt(timeParts.get(3)));
            }
            if (timeParts.size() == 3) {
                return LocalTime.of(SinkRecordConverter.getInt(timeParts.get(0)), SinkRecordConverter.getInt(timeParts.get(1)), SinkRecordConverter.getInt(timeParts.get(2)));
            }
            if (timeParts.size() == 2) {
                return LocalTime.of(SinkRecordConverter.getInt(timeParts.get(0)), SinkRecordConverter.getInt(timeParts.get(1)));
            }
            throw new IllegalArgumentException("Unexpected Time format: " + timeParts);
        }
        if (val instanceof String) {
            return LocalTime.parse((String)val);
        }
        if (val instanceof Date) {
            return LocalTime.ofNanoOfDay(((Date)val).getTime() * 1000000L);
        }
        return val;
    }

    private static Object convertDate(Object val) {
        if (val instanceof ArrayList) {
            ArrayList dateParts = (ArrayList)val;
            if (dateParts.size() != 3) {
                throw new IllegalArgumentException("Date must have 3 parts: " + dateParts);
            }
            return LocalDate.of(SinkRecordConverter.getInt(dateParts.get(0)), SinkRecordConverter.getInt(dateParts.get(1)), SinkRecordConverter.getInt(dateParts.get(2)));
        }
        if (val instanceof String) {
            return LocalDate.parse((String)val);
        }
        if (val instanceof Date) {
            return LocalDate.ofInstant(((Date)val).toInstant(), ZoneOffset.UTC);
        }
        return val;
    }

    private static Object toStringIfUnknownType(Object val) {
        boolean unknownType = TupleWithSchemaMarshalling.inferType((Object)val) == null;
        return unknownType ? val.toString() : val;
    }
}

