/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.kafka.source;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.ignite.internal.client.table.ClientColumn;
import org.apache.ignite.internal.client.table.ClientSchema;
import org.apache.ignite.internal.client.table.ClientTuple;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.table.TableRowEvent;
import org.apache.ignite.table.TableRowEventType;
import org.apache.ignite.table.Tuple;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.source.SourceRecord;
import org.gridgain.kafka.source.KafkaColumnSchema;
import org.gridgain.kafka.source.KafkaSchema;
import org.gridgain.kafka.source.OffsetUtils;
import org.gridgain.kafka.source.SourceEvent;
import org.gridgain.kafka.source.SourceRecordConverterConfig;
import org.gridgain.kafka.source.SourceValueConverter;
import org.gridgain.kafka.util.TopicTableNameConverter;

final class SourceRecordConverter {
    private static final Struct EMPTY_STRUCT = new Struct(SchemaBuilder.struct().build());
    private static final Map<ColumnType, KafkaColumnSchema> COLUMN_TYPE_MAP = new EnumMap<ColumnType, KafkaColumnSchema>(ColumnType.class);
    private static final Map<ColumnType, KafkaColumnSchema> COLUMN_TYPE_MAP_NULLABLE = new EnumMap<ColumnType, KafkaColumnSchema>(ColumnType.class);
    private static final KafkaColumnSchema STRING_COLUMN_SCHEMA = new KafkaColumnSchema(Schema.STRING_SCHEMA, SourceValueConverter.TO_STRING);
    private static final KafkaColumnSchema STRING_COLUMN_SCHEMA_NULLABLE = new KafkaColumnSchema(Schema.OPTIONAL_STRING_SCHEMA, SourceValueConverter.TO_STRING);
    private static final KafkaColumnSchema TIMESTAMP_INT64_COLUMN_SCHEMA = new KafkaColumnSchema(Schema.INT64_SCHEMA, o -> ((Instant)o).toEpochMilli());
    private static final KafkaColumnSchema TIMESTAMP_INT64_COLUMN_SCHEMA_NULLABLE = new KafkaColumnSchema(Schema.OPTIONAL_INT64_SCHEMA, o -> ((Instant)o).toEpochMilli());
    private static final KafkaColumnSchema TIMESTAMP_KAFKA_COLUMN_SCHEMA = new KafkaColumnSchema(Timestamp.SCHEMA, SourceRecordConverter::convertInstantToKafkaTimestamp);
    private static final KafkaColumnSchema TIMESTAMP_KAFKA_COLUMN_SCHEMA_NULLABLE = new KafkaColumnSchema(Timestamp.builder().optional().build(), SourceRecordConverter::convertInstantToKafkaTimestamp);
    private static final KafkaColumnSchema TIME_KAFKA_COLUMN_SCHEMA = new KafkaColumnSchema(Time.SCHEMA, SourceRecordConverter::convertLocalTimeToKafkaTime);
    private static final KafkaColumnSchema TIME_KAFKA_COLUMN_SCHEMA_NULLABLE = new KafkaColumnSchema(Time.builder().optional().build(), SourceRecordConverter::convertLocalTimeToKafkaTime);
    private static final KafkaColumnSchema DATE_KAFKA_COLUMN_SCHEMA = new KafkaColumnSchema(Date.SCHEMA, SourceRecordConverter::convertLocalDateToKafkaDate);
    private static final KafkaColumnSchema DATE_KAFKA_COLUMN_SCHEMA_NULLABLE = new KafkaColumnSchema(Date.builder().optional().build(), SourceRecordConverter::convertLocalDateToKafkaDate);
    private static final KafkaColumnSchema DATETIME_KAFKA_COLUMN_SCHEMA = new KafkaColumnSchema(Timestamp.SCHEMA, SourceRecordConverter::convertLocalDateTimeToKafkaTimestamp);
    private static final KafkaColumnSchema DATETIME_KAFKA_COLUMN_SCHEMA_NULLABLE = new KafkaColumnSchema(Timestamp.builder().optional().build(), SourceRecordConverter::convertLocalDateTimeToKafkaTimestamp);
    private static final Pattern SCHEMA_NAME_PATTERN = Pattern.compile("[^a-zA-Z0-9_]");

    SourceRecordConverter() {
    }

    static SourceRecord toSourceRecord(SourceEvent event, TopicTableNameConverter tableTopicNameConverter, Map<ClientSchema, KafkaSchema> schemaCache, int schemaCacheMaxSize, SourceRecordConverterConfig config) {
        Map<String, Object> sourcePartition = OffsetUtils.kafkaSourcePartition(event.tableName());
        Map<String, Object> sourceOffset = OffsetUtils.watermarkToOffset(event.watermark());
        String topic = tableTopicNameConverter.convert(event.tableName());
        TableRowEvent<Tuple> row = event.row();
        boolean remove = row.type() == TableRowEventType.REMOVED;
        ClientTuple igniteValue = (ClientTuple)(remove ? (Tuple)row.oldEntry() : (Tuple)row.entry());
        assert (igniteValue != null) : "Unexpected null value for event type: " + row.type();
        KafkaSchema kafkaSchema = SourceRecordConverter.igniteSchemaToKafkaSchemaCached(igniteValue.schema(), event.tableName(), schemaCache, schemaCacheMaxSize, config);
        Map.Entry<Struct, Struct> kafkaKeyValue = SourceRecordConverter.igniteTupleToKafkaStruct(igniteValue, kafkaSchema, remove);
        return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaSchema.keySchema(), (Object)kafkaKeyValue.getKey(), remove ? null : kafkaSchema.valueSchema(), (Object)(remove ? null : kafkaKeyValue.getValue()));
    }

    static String getSchemaName(String tableName, boolean isKey) {
        String cleanTableName = SCHEMA_NAME_PATTERN.matcher(tableName).replaceAll("_");
        return cleanTableName + (isKey ? "_KEY" : "_VAL");
    }

    private static KafkaSchema igniteSchemaToKafkaSchemaCached(ClientSchema schema, String tableName, Map<ClientSchema, KafkaSchema> schemaCache, int schemaCacheMaxSize, SourceRecordConverterConfig config) {
        if (schemaCache.size() > schemaCacheMaxSize) {
            schemaCache.clear();
        }
        return schemaCache.computeIfAbsent(schema, s -> SourceRecordConverter.igniteSchemaToKafkaSchema(s, tableName, config));
    }

    private static KafkaSchema igniteSchemaToKafkaSchema(ClientSchema schema, String tableName, SourceRecordConverterConfig config) {
        SchemaBuilder keySchema = new SchemaBuilder(Schema.Type.STRUCT).name(SourceRecordConverter.getSchemaName(tableName, true)).version(Integer.valueOf(schema.version()));
        SchemaBuilder valSchema = new SchemaBuilder(Schema.Type.STRUCT).name(SourceRecordConverter.getSchemaName(tableName, false)).version(Integer.valueOf(schema.version()));
        ClientColumn[] columns = schema.columns();
        ArrayList<SourceValueConverter> converters = new ArrayList<SourceValueConverter>();
        for (ClientColumn column : columns) {
            KafkaColumnSchema columnSchema = SourceRecordConverter.igniteColumnTypeToKafkaType(column, config);
            SchemaBuilder targetSchema = column.key() ? keySchema : valSchema;
            targetSchema.field(column.name(), columnSchema.schema());
            converters.add(columnSchema.converter());
        }
        return new KafkaSchema(keySchema.build(), valSchema.build(), converters);
    }

    private static KafkaColumnSchema igniteColumnTypeToKafkaType(ClientColumn column, SourceRecordConverterConfig config) {
        if (column.type() == ColumnType.TIMESTAMP) {
            switch (config.timestampMode()) {
                case INT64: {
                    return column.nullable() ? TIMESTAMP_INT64_COLUMN_SCHEMA_NULLABLE : TIMESTAMP_INT64_COLUMN_SCHEMA;
                }
                case STRING: {
                    return column.nullable() ? STRING_COLUMN_SCHEMA_NULLABLE : STRING_COLUMN_SCHEMA;
                }
                case KAFKA_TIMESTAMP: {
                    return column.nullable() ? TIMESTAMP_KAFKA_COLUMN_SCHEMA_NULLABLE : TIMESTAMP_KAFKA_COLUMN_SCHEMA;
                }
            }
        } else if (column.type() == ColumnType.TIME) {
            switch (config.timeMode()) {
                case KAFKA_TIME: {
                    return column.nullable() ? TIME_KAFKA_COLUMN_SCHEMA_NULLABLE : TIME_KAFKA_COLUMN_SCHEMA;
                }
                case STRING: {
                    return column.nullable() ? STRING_COLUMN_SCHEMA_NULLABLE : STRING_COLUMN_SCHEMA;
                }
            }
        } else if (column.type() == ColumnType.DATE) {
            switch (config.dateMode()) {
                case STRING: {
                    return column.nullable() ? STRING_COLUMN_SCHEMA_NULLABLE : STRING_COLUMN_SCHEMA;
                }
                case KAFKA_DATE: {
                    return column.nullable() ? DATE_KAFKA_COLUMN_SCHEMA_NULLABLE : DATE_KAFKA_COLUMN_SCHEMA;
                }
            }
        } else if (column.type() == ColumnType.DATETIME) {
            switch (config.dateTimeMode()) {
                case STRING: {
                    return column.nullable() ? STRING_COLUMN_SCHEMA_NULLABLE : STRING_COLUMN_SCHEMA;
                }
                case KAFKA_TIMESTAMP: {
                    return column.nullable() ? DATETIME_KAFKA_COLUMN_SCHEMA_NULLABLE : DATETIME_KAFKA_COLUMN_SCHEMA;
                }
            }
        }
        Map<ColumnType, KafkaColumnSchema> columnTypeMap = column.nullable() ? COLUMN_TYPE_MAP_NULLABLE : COLUMN_TYPE_MAP;
        KafkaColumnSchema columnSchema = columnTypeMap.get(column.type());
        if (columnSchema != null) {
            return columnSchema;
        }
        return column.nullable() ? STRING_COLUMN_SCHEMA_NULLABLE : STRING_COLUMN_SCHEMA;
    }

    private static Map.Entry<Struct, Struct> igniteTupleToKafkaStruct(ClientTuple igniteTuple, KafkaSchema kafkaSchema, boolean keyOnly) {
        Struct kafkaKey = new Struct(kafkaSchema.keySchema());
        Struct kafkaValue = keyOnly ? EMPTY_STRUCT : new Struct(kafkaSchema.valueSchema());
        ClientColumn[] columns = igniteTuple.schema().columns();
        for (int i = 0; i < igniteTuple.columnCount(); ++i) {
            ClientColumn column = columns[i];
            Object columnValue = igniteTuple.value(i);
            if (keyOnly && !column.key()) continue;
            if (columnValue != null) {
                columnValue = kafkaSchema.convert(columnValue, i);
            }
            Struct targetStruct = column.key() ? kafkaKey : kafkaValue;
            targetStruct.put(column.name(), columnValue);
        }
        return Map.entry(kafkaKey, kafkaValue);
    }

    private static double convertInstantToDouble(Object columnValue) {
        Instant instant = (Instant)columnValue;
        return (double)instant.getEpochSecond() + (double)instant.getNano() / 1.0E9;
    }

    private static Object convertInstantToKafkaTimestamp(Object columnValue) {
        Instant instant = (Instant)columnValue;
        long epochMillis = instant.getEpochSecond() * 1000L;
        return Timestamp.toLogical((Schema)Timestamp.SCHEMA, (long)(epochMillis += (long)(instant.getNano() / 1000000)));
    }

    private static List<Integer> convertLocalTimeToArray(Object columnValue) {
        LocalTime time = (LocalTime)columnValue;
        return List.of(Integer.valueOf(time.getHour()), Integer.valueOf(time.getMinute()), Integer.valueOf(time.getSecond()), Integer.valueOf(time.getNano()));
    }

    private static Object convertLocalTimeToKafkaTime(Object columnValue) {
        LocalTime time = (LocalTime)columnValue;
        return Time.toLogical((Schema)Time.SCHEMA, (int)((int)(time.toNanoOfDay() / 1000000L)));
    }

    private static List<Integer> convertLocalDateToArray(Object columnValue) {
        LocalDate localDate = (LocalDate)columnValue;
        return List.of(Integer.valueOf(localDate.getYear()), Integer.valueOf(localDate.getMonthValue()), Integer.valueOf(localDate.getDayOfMonth()));
    }

    private static Object convertLocalDateToKafkaDate(Object columnValue) {
        LocalDate localDate = (LocalDate)columnValue;
        return Date.toLogical((Schema)Date.SCHEMA, (int)((int)localDate.toEpochDay()));
    }

    private static List<Integer> convertLocalDateTimeToArray(Object columnValue) {
        LocalDateTime dateTime = (LocalDateTime)columnValue;
        return List.of(Integer.valueOf(dateTime.getYear()), Integer.valueOf(dateTime.getMonthValue()), Integer.valueOf(dateTime.getDayOfMonth()), Integer.valueOf(dateTime.getHour()), Integer.valueOf(dateTime.getMinute()), Integer.valueOf(dateTime.getSecond()), Integer.valueOf(dateTime.getNano()));
    }

    private static Object convertLocalDateTimeToKafkaTimestamp(Object columnValue) {
        LocalDateTime dateTime = (LocalDateTime)columnValue;
        long epochMillis = dateTime.toEpochSecond(ZoneOffset.UTC) * 1000L;
        return Timestamp.toLogical((Schema)Timestamp.SCHEMA, (long)(epochMillis += (long)(dateTime.getNano() / 1000000)));
    }

    private static Schema arraySchema(boolean optional, Schema schema) {
        SchemaBuilder arr = SchemaBuilder.array((Schema)schema);
        return optional ? arr.optional().build() : arr.build();
    }

    static {
        COLUMN_TYPE_MAP.put(ColumnType.BOOLEAN, new KafkaColumnSchema(Schema.BOOLEAN_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP_NULLABLE.put(ColumnType.BOOLEAN, new KafkaColumnSchema(Schema.OPTIONAL_BOOLEAN_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP.put(ColumnType.INT8, new KafkaColumnSchema(Schema.INT8_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP_NULLABLE.put(ColumnType.INT8, new KafkaColumnSchema(Schema.OPTIONAL_INT8_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP.put(ColumnType.INT16, new KafkaColumnSchema(Schema.INT16_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP_NULLABLE.put(ColumnType.INT16, new KafkaColumnSchema(Schema.OPTIONAL_INT16_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP.put(ColumnType.INT32, new KafkaColumnSchema(Schema.INT32_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP_NULLABLE.put(ColumnType.INT32, new KafkaColumnSchema(Schema.OPTIONAL_INT32_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP.put(ColumnType.INT64, new KafkaColumnSchema(Schema.INT64_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP_NULLABLE.put(ColumnType.INT64, new KafkaColumnSchema(Schema.OPTIONAL_INT64_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP.put(ColumnType.FLOAT, new KafkaColumnSchema(Schema.FLOAT32_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP_NULLABLE.put(ColumnType.FLOAT, new KafkaColumnSchema(Schema.OPTIONAL_FLOAT32_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP.put(ColumnType.DOUBLE, new KafkaColumnSchema(Schema.FLOAT64_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP_NULLABLE.put(ColumnType.DOUBLE, new KafkaColumnSchema(Schema.OPTIONAL_FLOAT64_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP.put(ColumnType.BYTE_ARRAY, new KafkaColumnSchema(Schema.BYTES_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP_NULLABLE.put(ColumnType.BYTE_ARRAY, new KafkaColumnSchema(Schema.OPTIONAL_BYTES_SCHEMA, SourceValueConverter.IDENTITY));
        COLUMN_TYPE_MAP.put(ColumnType.DECIMAL, new KafkaColumnSchema(Schema.STRING_SCHEMA, SourceValueConverter.TO_STRING));
        COLUMN_TYPE_MAP_NULLABLE.put(ColumnType.DECIMAL, new KafkaColumnSchema(Schema.OPTIONAL_STRING_SCHEMA, SourceValueConverter.TO_STRING));
        COLUMN_TYPE_MAP.put(ColumnType.DATE, new KafkaColumnSchema(SourceRecordConverter.arraySchema(false, Schema.INT32_SCHEMA), SourceRecordConverter::convertLocalDateToArray));
        COLUMN_TYPE_MAP_NULLABLE.put(ColumnType.DATE, new KafkaColumnSchema(SourceRecordConverter.arraySchema(true, Schema.INT32_SCHEMA), SourceRecordConverter::convertLocalDateToArray));
        COLUMN_TYPE_MAP.put(ColumnType.TIME, new KafkaColumnSchema(SourceRecordConverter.arraySchema(false, Schema.INT32_SCHEMA), SourceRecordConverter::convertLocalTimeToArray));
        COLUMN_TYPE_MAP_NULLABLE.put(ColumnType.TIME, new KafkaColumnSchema(SourceRecordConverter.arraySchema(true, Schema.INT32_SCHEMA), SourceRecordConverter::convertLocalTimeToArray));
        COLUMN_TYPE_MAP.put(ColumnType.TIMESTAMP, new KafkaColumnSchema(Schema.FLOAT64_SCHEMA, SourceRecordConverter::convertInstantToDouble));
        COLUMN_TYPE_MAP_NULLABLE.put(ColumnType.TIMESTAMP, new KafkaColumnSchema(Schema.OPTIONAL_FLOAT64_SCHEMA, SourceRecordConverter::convertInstantToDouble));
        COLUMN_TYPE_MAP.put(ColumnType.DATETIME, new KafkaColumnSchema(SourceRecordConverter.arraySchema(false, Schema.INT32_SCHEMA), SourceRecordConverter::convertLocalDateTimeToArray));
        COLUMN_TYPE_MAP_NULLABLE.put(ColumnType.DATETIME, new KafkaColumnSchema(SourceRecordConverter.arraySchema(true, Schema.INT32_SCHEMA), SourceRecordConverter::convertLocalDateTimeToArray));
    }
}

