/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.migrationtools.persistence;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.migrationtools.persistence.MigrationCacheProcessor;
import org.apache.ignite.migrationtools.persistence.MigrationKernalContext;
import org.apache.ignite.migrationtools.persistence.exceptions.MigrateCacheException;
import org.apache.ignite.migrationtools.persistence.utils.pubsub.BasicProcessor;
import org.apache.ignite.migrationtools.sql.SqlDdlGenerator;
import org.apache.ignite.migrationtools.tablemanagement.SchemaUtils;
import org.apache.ignite.migrationtools.types.converters.StaticTypeConverterFactory;
import org.apache.ignite.migrationtools.types.converters.TypeConverterFactory;
import org.apache.ignite3.internal.cli.logger.CliLoggers;
import org.apache.ignite3.internal.client.table.ClientSchema;
import org.apache.ignite3.internal.client.table.ClientTable;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.table.DataStreamerItem;
import org.apache.ignite3.table.KeyValueView;
import org.apache.ignite3.table.QualifiedName;
import org.apache.ignite3.table.Tuple;
import org.gridgain.ignite.migrationtools.adapter.internal.ClientAdapter;
import org.jetbrains.annotations.Nullable;

public class Ignite2PersistentCacheTools {
    private static final IgniteLogger LOGGER = CliLoggers.forClass(Ignite2PersistentCacheTools.class);

    public static Set<Pair<Integer, String>> persistentCaches(List<MigrationKernalContext> nodes) throws IgniteCheckedException {
        HashSet<Pair<Integer, String>> cacheIds = new HashSet<Pair<Integer, String>>();
        for (MigrationKernalContext ctx : nodes) {
            GridCacheProcessor cacheCtx = ctx.cache();
            if (cacheCtx instanceof MigrationCacheProcessor) {
                ((MigrationCacheProcessor)cacheCtx).loadAllDescriptors();
            }
            for (DynamicCacheDescriptor descr : ctx.cache().persistentCaches()) {
                if (!CacheType.USER.equals((Object)descr.cacheType())) continue;
                cacheIds.add((Pair<Integer, String>)Pair.of((Object)descr.cacheId(), (Object)descr.cacheName()));
            }
        }
        return cacheIds;
    }

    public static void publishCacheCursor(List<MigrationKernalContext> nodes, String cacheName, BiFunction<Flow.Publisher<Map.Entry<Object, Object>>, Integer, Optional<CompletableFuture<Void>>> partitionStreamFactory) throws IgniteCheckedException {
        for (MigrationKernalContext nodeCtx : nodes) {
            DynamicCacheDescriptor cacheDescriptor = nodeCtx.cache().cacheDescriptor(cacheName);
            Ignite2PersistentCacheTools.publishCacheCursorAtNode(nodeCtx, cacheDescriptor, partitionStreamFactory);
        }
    }

    /*
     * Exception decompiling
     */
    public static void publishCacheCursorAtNode(MigrationKernalContext nodeCtx, DynamicCacheDescriptor cacheDescriptor, BiFunction<Flow.Publisher<Map.Entry<Object, Object>>, Integer, Optional<CompletableFuture<Void>>> partitionStreamFactory) throws IgniteCheckedException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public static void migrateCache(ClientAdapter client, SqlDdlGenerator sqlGenerator, List<MigrationKernalContext> nodeContexts, final String cacheName, ColumnsProcessorFactory columnsProcessorFactory) throws IgniteCheckedException {
        Map columnToFieldMappings;
        CacheConfiguration cacheCfg = nodeContexts.stream().flatMap(nodeCtx -> Stream.ofNullable(nodeCtx.cache().cacheDescriptor(cacheName))).map(DynamicCacheDescriptor::cacheConfiguration).findFirst().orElseThrow(() -> new RuntimeException("Could not find the requested cache: " + cacheName));
        QualifiedName qualifiedName = SqlDdlGenerator.qualifiedName((CacheConfiguration)cacheCfg);
        @Nullable ClientTable table = client.getClientTable(qualifiedName);
        if (table == null) {
            Map.Entry res = (Map.Entry)client.createTableAndTypeHints(cacheCfg).join();
            table = (ClientTable)res.getKey();
            SqlDdlGenerator.GenerateTableResult generateTableResult = (SqlDdlGenerator.GenerateTableResult)res.getValue();
            columnToFieldMappings = generateTableResult.fieldToColumnMappings();
        } else {
            SqlDdlGenerator.GenerateTableResult tableDefinition = sqlGenerator.generate(cacheCfg);
            columnToFieldMappings = tableDefinition.fieldToColumnMappings();
        }
        KeyValueView view = table.keyValueView();
        ClientSchema schema = (ClientSchema)SchemaUtils.getLatestSchemaForTable((ClientTable)table).join();
        final String tableName = table.name();
        Ignite2PersistentCacheTools.publishCacheCursor(nodeContexts, cacheName, (cacheTuplesPublisher, partId) -> Optional.ofNullable(columnsProcessorFactory.createSubscribed((Flow.Publisher<Map.Entry<Object, Object>>)cacheTuplesPublisher, (int)partId, schema, columnToFieldMappings, StaticTypeConverterFactory.DEFAULT_INSTANCE)).map(itemPublisher -> {
            BasicProcessor<DataStreamerItem<Map.Entry<Tuple, Tuple>>, DataStreamerItem<Map.Entry<Tuple, Tuple>>> p = new BasicProcessor<DataStreamerItem<Map.Entry<Tuple, Tuple>>, DataStreamerItem<Map.Entry<Tuple, Tuple>>>(){

                @Override
                public void onNext(DataStreamerItem<Map.Entry<Tuple, Tuple>> item) {
                    this.subscriber.onNext(item);
                }

                @Override
                public void onError(Throwable throwable) {
                    super.onError(new MigrateCacheException(cacheName, tableName, throwable));
                }
            };
            itemPublisher.subscribe(p);
            return p;
        }).map(itemPublisher -> view.streamData((Flow.Publisher)itemPublisher, null)));
    }

    public static interface ColumnsProcessorFactory {
        @Nullable
        public Flow.Publisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>> createSubscribed(Flow.Publisher<Map.Entry<Object, Object>> var1, int var2, ClientSchema var3, Map<String, String> var4, TypeConverterFactory var5);
    }
}

