/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.kafka.source;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.SchemaBuilderException;
import org.apache.kafka.connect.source.SourceRecord;
import org.gridgain.kafka.LogFormat;
import org.gridgain.kafka.SystemEvent;
import org.gridgain.kafka.schema.ObjectFieldsVisitor;
import org.gridgain.kafka.schema.SchemaFieldVisitor;
import org.gridgain.kafka.schema.SchemaResolver;
import org.gridgain.kafka.schema.SchemaUtils;
import org.gridgain.kafka.schema.cache.ResolvedSchemasCache;
import org.gridgain.kafka.source.CacheEntryOffset;
import org.gridgain.kafka.source.IgniteSourceConnectorConfig;
import org.gridgain.kafka.source.Offsets;
import org.gridgain.kafka.source.SourceEnumPolicy;
import org.gridgain.kafka.source.SourceEnumSchemaMapper;
import org.gridgain.kafka.source.SourceFieldNullabilityPolicy;
import org.gridgain.kafka.source.SourceRecursiveSchemaPolicy;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SourceRecordBuilder {
    private static final Logger log = LoggerFactory.getLogger(SourceRecordBuilder.class);
    private final Context ctx;
    private Schema keySchema;
    private Schema valSchema;

    public SourceRecordBuilder(@NotNull Context ctx) {
        this.ctx = ctx;
    }

    SourceRecord build(Map<String, ?> srcOff, Object key, Object val) throws DataException {
        Object keyObj = key;
        Object valObj = val;
        if (!this.ctx.isSchemaless()) {
            if (this.keySchema == null || this.ctx.isSchemaDynamic()) {
                this.keySchema = this.getSchema(key);
            }
            if (this.valSchema == null || this.ctx.isSchemaDynamic()) {
                this.valSchema = this.getSchema(val);
            }
            keyObj = this.getObject(key, this.keySchema);
            valObj = this.getObject(val, this.valSchema);
        }
        return new SourceRecord(Offsets.kafkaPartition(this.cacheName()), srcOff, this.ctx.getTopic(), this.keySchema, keyObj, this.valSchema, valObj);
    }

    SourceRecord build(CacheEntryOffset e) {
        return this.build(e.offset(), e.entry().getKey(), e.entry().getValue());
    }

    String cacheName() {
        return this.ctx.getCacheName();
    }

    @Nullable
    public Schema getSchema(@Nullable Object obj) throws SchemaBuilderException {
        return obj != null ? new SchemaResolver(this.ctx).getSchema(obj) : null;
    }

    @Nullable
    public Object getObject(@Nullable Object obj, @Nullable Schema schema) {
        Object res = this.getObject0(obj, schema);
        if (schema != null && this.ctx.isValidationEnabled()) {
            ConnectSchema.validateValue((Schema)schema, (Object)obj);
        }
        return res;
    }

    @Nullable
    public Object getObject0(@Nullable Object obj, @Nullable Schema schema) {
        if (obj == null || schema == null || SchemaUtils.isUndefined(schema)) {
            return null;
        }
        if (obj instanceof Enum) {
            return this.ctx.getEnumMapper().map((Enum)obj);
        }
        if (obj instanceof BinaryEnumObjectImpl) {
            return this.ctx.getEnumMapper().mapBinary((BinaryEnumObjectImpl)obj);
        }
        switch (schema.type()) {
            case STRUCT: {
                return this.getAsStruct(schema, obj);
            }
            case ARRAY: {
                return this.getAsArray(schema, obj);
            }
            case MAP: {
                return this.getAsMap(schema, obj);
            }
            case STRING: {
                return this.getAsString(obj);
            }
        }
        return SourceRecordBuilder.getAsBasicTypes(schema, obj);
    }

    @NotNull
    private String getAsString(@NotNull Object obj) {
        if (obj instanceof Class) {
            return ((Class)obj).getName();
        }
        return String.valueOf(obj);
    }

    @NotNull
    private static Object getAsBasicTypes(@NotNull Schema schema, @NotNull Object obj) {
        if (obj instanceof Character) {
            return Character.getNumericValue(((Character)obj).charValue());
        }
        if (obj instanceof BigInteger) {
            return new BigDecimal((BigInteger)obj, 0);
        }
        if (obj instanceof byte[]) {
            return ByteBuffer.wrap((byte[])obj);
        }
        return obj;
    }

    @NotNull
    private Map<?, ?> getAsMap(@NotNull Schema schema, @NotNull Object obj) {
        HashMap result = new HashMap();
        ((Map)obj).forEach((key, value) -> result.put(this.getObject0(key, schema.keySchema()), this.getObject0(value, schema.valueSchema())));
        return result;
    }

    @NotNull
    private List<?> getAsArray(@NotNull Schema schema, @NotNull Object obj) {
        Schema elementSchema = schema.valueSchema();
        Class<?> cls = obj.getClass();
        if (cls.isArray()) {
            Class<?> elemType = cls.getComponentType();
            if (elemType.isPrimitive()) {
                if (elemType.equals(Boolean.TYPE)) {
                    boolean[] arr = (boolean[])obj;
                    ArrayList<Boolean> res = new ArrayList<Boolean>(arr.length);
                    for (boolean i : arr) {
                        res.add(i);
                    }
                    return res;
                }
                if (elemType.equals(Character.TYPE)) {
                    char[] arr = (char[])obj;
                    ArrayList<Integer> res = new ArrayList<Integer>(arr.length);
                    for (char i : arr) {
                        res.add(Character.getNumericValue(i));
                    }
                    return res;
                }
                if (elemType.equals(Short.TYPE)) {
                    short[] arr = (short[])obj;
                    ArrayList<Short> res = new ArrayList<Short>(arr.length);
                    for (short i : arr) {
                        res.add(i);
                    }
                    return res;
                }
                if (elemType.equals(Integer.TYPE)) {
                    return Arrays.stream((int[])obj).boxed().collect(Collectors.toList());
                }
                if (elemType.equals(Long.TYPE)) {
                    return Arrays.stream((long[])obj).boxed().collect(Collectors.toList());
                }
                if (elemType.equals(Float.TYPE)) {
                    float[] arr = (float[])obj;
                    ArrayList<Float> res = new ArrayList<Float>(arr.length);
                    for (float i : arr) {
                        res.add(Float.valueOf(i));
                    }
                    return res;
                }
                if (elemType.equals(Double.TYPE)) {
                    return Arrays.stream((double[])obj).boxed().collect(Collectors.toList());
                }
            }
            return Stream.of((Object[])obj).map(o -> this.getObject0(o, elementSchema)).collect(Collectors.toList());
        }
        return ((Collection)obj).stream().map(o -> this.getObject0(o, elementSchema)).collect(Collectors.toList());
    }

    @NotNull
    private Object getAsStruct(@NotNull Schema schema, @NotNull Object obj) {
        Struct struct = new Struct(schema);
        if (obj instanceof BinaryObject) {
            BinaryObject binObj = (BinaryObject)obj;
            String typeName = binObj.type().typeName();
            if (!typeName.equals(schema.name())) {
                throw new DataException("Binary value [" + typeName + "] doesn't match provided Kafka schema [" + schema.name() + "]");
            }
            SchemaFieldVisitor.withStructFields(schema, f -> {
                Object origVal = binObj.field(f.name());
                Object kafkaFieldVal = this.getObject0(origVal, f.schema());
                struct.put(f, kafkaFieldVal);
            });
            return struct;
        }
        Class<?> cls = obj.getClass();
        if (!cls.getName().equals(schema.name())) {
            throw new DataException("Value [" + cls.getName() + "] doesn't match provided Kafka schema [" + schema.name() + "]");
        }
        Map<String, Field> schemaFields = schema.fields().stream().collect(Collectors.toMap(Field::name, f -> f));
        ObjectFieldsVisitor.withClassFields(obj, (clsField, instance) -> {
            Field kafkaField = (Field)schemaFields.get(clsField.getName());
            if (kafkaField != null) {
                Object fieldVal = clsField.get(obj);
                struct.put(kafkaField, this.getObject0(fieldVal, kafkaField.schema()));
                schemaFields.remove(clsField.getName());
            }
        });
        if (!schemaFields.isEmpty()) {
            log.warn(LogFormat.message(SystemEvent.KAFKA_SCHEMA_BUILD_FAILED, "Failed to map " + cls.getName() + " to kafka struct [" + schema.name() + "]. Failed to get value of following schema fields=" + schemaFields.keySet()));
        }
        return struct;
    }

    public static Context.Builder newContextBuilder() {
        return new Context.Builder();
    }

    public static class Context {
        private String cacheName;
        private String topic;
        private boolean isSchemaless = IgniteSourceConnectorConfig.IS_SCHEMALESS.dflt();
        private boolean isSchemaDynamic = IgniteSourceConnectorConfig.IS_SCHEMA_DYNAMIC.dflt();
        private boolean isValidationEnabled = IgniteSourceConnectorConfig.ENABLE_ENTRIES_VALIDATION.dflt();
        private ResolvedSchemasCache resolvedSchemasCache;
        private SourceEnumSchemaMapper enumMapper = SourceEnumSchemaMapper.of((SourceEnumPolicy)((Object)IgniteSourceConnectorConfig.ENUM_POLICY.dflt()));
        private SourceFieldNullabilityPolicy fieldNullabilityPolicy = (SourceFieldNullabilityPolicy)((Object)IgniteSourceConnectorConfig.NULLABILITY_POLICY.dflt());
        private SourceRecursiveSchemaPolicy recursiveSchemaPolicy = SourceRecursiveSchemaPolicy.SKIP;
        private int maxRecursionLevel = 5;

        public String getCacheName() {
            return this.cacheName;
        }

        public String getTopic() {
            return this.topic;
        }

        public boolean isSchemaless() {
            return this.isSchemaless;
        }

        public boolean isSchemaDynamic() {
            return this.isSchemaDynamic;
        }

        public boolean isValidationEnabled() {
            return this.isValidationEnabled;
        }

        public ResolvedSchemasCache getResolvedSchemasCache() {
            return this.resolvedSchemasCache;
        }

        public SourceEnumSchemaMapper getEnumMapper() {
            return this.enumMapper;
        }

        public SourceFieldNullabilityPolicy getFieldNullabilityPolicy() {
            return this.fieldNullabilityPolicy;
        }

        public SourceRecursiveSchemaPolicy getRecursiveSchemaPolicy() {
            return this.recursiveSchemaPolicy;
        }

        public int getMaxRecursionLevel() {
            return this.maxRecursionLevel;
        }

        public static class Builder {
            protected final Context delegate;

            public Builder() {
                this(new Context());
            }

            protected Builder(Context delegate) {
                this.delegate = delegate;
            }

            public Builder cacheName(String cacheName) {
                this.delegate.cacheName = cacheName;
                return this;
            }

            public Builder topic(String topic) {
                this.delegate.topic = topic;
                return this;
            }

            public Builder schemaless(boolean isSchemaless) {
                this.delegate.isSchemaless = isSchemaless;
                return this;
            }

            public Builder dynamicSchema(boolean isSchemaDynamic) {
                this.delegate.isSchemaDynamic = isSchemaDynamic;
                return this;
            }

            public Builder validateEntries(boolean isValidationEnabled) {
                this.delegate.isValidationEnabled = isValidationEnabled;
                return this;
            }

            public Builder resolvedSchemasCache(@Nullable ResolvedSchemasCache resolvedSchemasCache) {
                this.delegate.resolvedSchemasCache = resolvedSchemasCache;
                return this;
            }

            public Builder enumPolicy(SourceEnumSchemaMapper enumPolicy) {
                this.delegate.enumMapper = enumPolicy;
                return this;
            }

            public Builder nullabilityPolicy(SourceFieldNullabilityPolicy nullabilityPolicy) {
                this.delegate.fieldNullabilityPolicy = nullabilityPolicy;
                return this;
            }

            public Builder recursiveSchemaPolicy(SourceRecursiveSchemaPolicy recursiveSchemaPolicy) {
                this.delegate.recursiveSchemaPolicy = recursiveSchemaPolicy;
                return this;
            }

            public Builder maxRecursionLevel(int maxRecursionLevel) {
                this.delegate.maxRecursionLevel = maxRecursionLevel;
                return this;
            }

            public Context build() {
                return this.delegate;
            }

            public SourceRecordBuilder toRecordBuilder() {
                return new SourceRecordBuilder(this.delegate);
            }
        }
    }
}

