/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.ml.dataset.impl.cache;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.ml.dataset.Dataset;
import org.apache.ignite.ml.dataset.PartitionDataBuilder;
import org.apache.ignite.ml.dataset.UpstreamTransformerBuilder;
import org.apache.ignite.ml.dataset.impl.cache.util.ComputeUtils;
import org.apache.ignite.ml.environment.LearningEnvironment;
import org.apache.ignite.ml.environment.LearningEnvironmentBuilder;
import org.apache.ignite.ml.math.functions.IgniteBiFunction;
import org.apache.ignite.ml.math.functions.IgniteBinaryOperator;
import org.apache.ignite.ml.math.functions.IgniteFunction;
import org.apache.ignite.ml.math.functions.IgniteTriFunction;

public class CacheBasedDataset<K, V, C extends Serializable, D extends AutoCloseable>
implements Dataset<C, D> {
    private static final int RETRY_INTERVAL = 1000;
    private final Ignite ignite;
    private final IgniteCache<K, V> upstreamCache;
    private final IgniteBiPredicate<K, V> filter;
    private final UpstreamTransformerBuilder upstreamTransformerBuilder;
    private final IgniteCache<Integer, C> datasetCache;
    private final PartitionDataBuilder<K, V, C, D> partDataBuilder;
    private final UUID datasetId;
    private final LearningEnvironmentBuilder envBuilder;
    private final boolean upstreamKeepBinary;
    private final int retries;
    private final LearningEnvironment localLearningEnv;

    public CacheBasedDataset(Ignite ignite, IgniteCache<K, V> upstreamCache, IgniteBiPredicate<K, V> filter, UpstreamTransformerBuilder upstreamTransformerBuilder, IgniteCache<Integer, C> datasetCache, LearningEnvironmentBuilder envBuilder, PartitionDataBuilder<K, V, C, D> partDataBuilder, UUID datasetId, boolean upstreamKeepBinary, LearningEnvironment localLearningEnv, int retriesCnt) {
        this.ignite = ignite;
        this.upstreamCache = upstreamCache;
        this.filter = filter;
        this.upstreamTransformerBuilder = upstreamTransformerBuilder;
        this.datasetCache = datasetCache;
        this.partDataBuilder = partDataBuilder;
        this.envBuilder = envBuilder;
        this.datasetId = datasetId;
        this.upstreamKeepBinary = upstreamKeepBinary;
        this.localLearningEnv = localLearningEnv;
        this.retries = retriesCnt;
    }

    @Override
    public <R> R computeWithCtx(IgniteTriFunction<C, D, LearningEnvironment, R> map, IgniteBinaryOperator<R> reduce, R identity) {
        String upstreamCacheName = this.upstreamCache.getName();
        String datasetCacheName = this.datasetCache.getName();
        return this.computeForAllPartitions(part -> {
            LearningEnvironment env = ComputeUtils.getLearningEnvironment(this.ignite, this.datasetId, part, this.envBuilder);
            Object ctx = ComputeUtils.getContext(Ignition.localIgnite(), datasetCacheName, part);
            D data = ComputeUtils.getData(Ignition.localIgnite(), upstreamCacheName, this.filter, this.upstreamTransformerBuilder, datasetCacheName, this.datasetId, this.partDataBuilder, env, this.upstreamKeepBinary);
            if (data != null) {
                Object res = map.apply(ctx, data, env);
                ComputeUtils.saveContext(Ignition.localIgnite(), datasetCacheName, part, ctx);
                return res;
            }
            return null;
        }, reduce, identity);
    }

    @Override
    public <R> R compute(IgniteBiFunction<D, LearningEnvironment, R> map, IgniteBinaryOperator<R> reduce, R identity) {
        String upstreamCacheName = this.upstreamCache.getName();
        String datasetCacheName = this.datasetCache.getName();
        return this.computeForAllPartitions(part -> {
            LearningEnvironment env = ComputeUtils.getLearningEnvironment(Ignition.localIgnite(), this.datasetId, part, this.envBuilder);
            D data = ComputeUtils.getData(Ignition.localIgnite(), upstreamCacheName, this.filter, this.upstreamTransformerBuilder, datasetCacheName, this.datasetId, this.partDataBuilder, env, this.upstreamKeepBinary);
            return data != null ? map.apply(data, env) : null;
        }, reduce, identity);
    }

    @Override
    public void close() {
        this.datasetCache.destroy();
        ComputeUtils.removeData(this.ignite, this.datasetId);
        ComputeUtils.removeLearningEnv(this.ignite, this.datasetId);
    }

    private <R> R computeForAllPartitions(IgniteFunction<Integer, R> fun, IgniteBinaryOperator<R> reduce, R identity) {
        List<String> cacheNames = Arrays.asList(this.datasetCache.getName(), this.upstreamCache.getName());
        Collection<R> results = ComputeUtils.affinityCallWithRetries(this.ignite, cacheNames, fun, this.retries, 1000, this.localLearningEnv.deployingContext());
        R res = identity;
        for (R partRes : results) {
            if (partRes == null) continue;
            res = reduce.apply(res, partRes);
        }
        return res;
    }

    public IgniteCache<K, V> getUpstreamCache() {
        return this.upstreamCache;
    }

    public IgniteCache<Integer, C> getDatasetCache() {
        return this.datasetCache;
    }
}

