/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.migrationtools.persistence;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.migrationtools.persistence.MigrationCacheProcessor;
import org.apache.ignite.migrationtools.persistence.MigrationKernalContext;
import org.apache.ignite.migrationtools.persistence.exceptions.MigrateCacheException;
import org.apache.ignite.migrationtools.persistence.mappers.CacheDataRowProcessor;
import org.apache.ignite.migrationtools.persistence.utils.pubsub.BasicProcessor;
import org.apache.ignite.migrationtools.persistence.utils.pubsub.StreamerPublisher;
import org.apache.ignite.migrationtools.sql.SqlDdlGenerator;
import org.apache.ignite.migrationtools.tablemanagement.SchemaUtils;
import org.apache.ignite.migrationtools.types.converters.StaticTypeConverterFactory;
import org.apache.ignite.migrationtools.types.converters.TypeConverterFactory;
import org.apache.ignite3.internal.client.table.ClientSchema;
import org.apache.ignite3.internal.client.table.ClientTable;
import org.apache.ignite3.table.DataStreamerItem;
import org.apache.ignite3.table.KeyValueView;
import org.apache.ignite3.table.Tuple;
import org.gridgain.ignite.migrationtools.adapter.internal.ClientAdapter;
import org.jetbrains.annotations.Nullable;

public class Ignite2PersistentCacheTools {
    public static Set<Pair<Integer, String>> persistentCaches(List<MigrationKernalContext> nodes) throws IgniteCheckedException {
        HashSet<Pair<Integer, String>> cacheIds = new HashSet<Pair<Integer, String>>();
        for (MigrationKernalContext ctx : nodes) {
            GridCacheProcessor cacheCtx = ctx.cache();
            if (cacheCtx instanceof MigrationCacheProcessor) {
                ((MigrationCacheProcessor)cacheCtx).loadAllDescriptors();
            }
            for (DynamicCacheDescriptor descr : ctx.cache().persistentCaches()) {
                if (!CacheType.USER.equals((Object)descr.cacheType())) continue;
                cacheIds.add((Pair<Integer, String>)Pair.of((Object)descr.cacheId(), (Object)descr.cacheName()));
            }
        }
        return cacheIds;
    }

    public static void publishCacheCursor(List<MigrationKernalContext> nodes, String cacheName, BiFunction<Flow.Publisher<Map.Entry<Object, Object>>, Integer, Optional<CompletableFuture<Void>>> partitionStreamFactory) throws IgniteCheckedException {
        for (MigrationKernalContext nodeCtx : nodes) {
            DynamicCacheDescriptor cacheDescriptor = nodeCtx.cache().cacheDescriptor(cacheName);
            Ignite2PersistentCacheTools.publishCacheCursorAtNode(nodeCtx, cacheDescriptor, partitionStreamFactory);
        }
    }

    public static void publishCacheCursorAtNode(MigrationKernalContext nodeCtx, DynamicCacheDescriptor cacheDescriptor, BiFunction<Flow.Publisher<Map.Entry<Object, Object>>, Integer, Optional<CompletableFuture<Void>>> partitionStreamFactory) throws IgniteCheckedException {
        GridCacheProcessor cacheProcessor = nodeCtx.cache();
        if (cacheProcessor instanceof MigrationCacheProcessor) {
            ((MigrationCacheProcessor)cacheProcessor).startCache(cacheDescriptor);
        }
        int cacheId = cacheDescriptor.cacheId();
        GridCacheSharedContext gridCacheSharedContext = cacheProcessor.context();
        GridCacheContext cacheContext = gridCacheSharedContext.cacheContext(cacheId);
        IgniteCacheOffheapManager offHeap = cacheContext.offheap();
        CacheObjectContext cacheObjectCtx = cacheContext.cacheObjectContext();
        for (IgniteCacheOffheapManager.CacheDataStore cs : offHeap.cacheDataStores()) {
            StreamerPublisher<CacheDataRow> cacheTuplesPublisher = new StreamerPublisher<CacheDataRow>();
            CacheDataRowProcessor rowProc = new CacheDataRowProcessor(cacheObjectCtx);
            cacheTuplesPublisher.subscribe(rowProc);
            Optional<CompletableFuture<Void>> streamJobOpt = partitionStreamFactory.apply(rowProc, cs.partId());
            if (!streamJobOpt.isPresent()) continue;
            CompletableFuture<Void> streamJob = streamJobOpt.get();
            try {
                GridCursor cu = cs.cursor();
                while (cu.next()) {
                    CacheDataRow row = (CacheDataRow)cu.get();
                    if (row.cacheId() != 0 && row.cacheId() != cacheId) continue;
                    try {
                        boolean success = cacheTuplesPublisher.offer(row);
                        if (success) continue;
                        throw new IgniteCheckedException("Failed to offer element to publisher");
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new IgniteCheckedException("Interrupted publishing tuple");
                    }
                }
            }
            catch (IgniteCheckedException ex) {
                cacheTuplesPublisher.closeExceptionally(ex);
                throw ex;
            }
            finally {
                cacheTuplesPublisher.close();
                streamJob.join();
            }
        }
    }

    public static void migrateCache(ClientAdapter client, SqlDdlGenerator sqlGenerator, List<MigrationKernalContext> nodeContexts, final String cacheName, ColumnsProcessorFactory columnsProcessorFactory) throws IgniteCheckedException {
        Map columnToFieldMappings;
        CacheConfiguration cacheCfg = nodeContexts.stream().flatMap(nodeCtx -> Stream.ofNullable(nodeCtx.cache().cacheDescriptor(cacheName))).map(DynamicCacheDescriptor::cacheConfiguration).findFirst().orElseThrow(() -> new RuntimeException("Could not find the requested cache: " + cacheName));
        @Nullable ClientTable table = client.getClientTable(cacheName);
        if (table == null) {
            Map.Entry res = (Map.Entry)client.createTableAndTypeHints(cacheCfg).join();
            table = (ClientTable)res.getKey();
            SqlDdlGenerator.GenerateTableResult generateTableResult = (SqlDdlGenerator.GenerateTableResult)res.getValue();
            columnToFieldMappings = generateTableResult.fieldToColumnMappings();
        } else {
            SqlDdlGenerator.GenerateTableResult tableDefinition = sqlGenerator.generate(cacheCfg);
            columnToFieldMappings = tableDefinition.fieldToColumnMappings();
        }
        KeyValueView view = table.keyValueView();
        ClientSchema schema = (ClientSchema)SchemaUtils.getLatestSchemaForTable((ClientTable)table).join();
        final String tableName = table.name();
        Ignite2PersistentCacheTools.publishCacheCursor(nodeContexts, cacheName, (cacheTuplesPublisher, partId) -> Optional.ofNullable(columnsProcessorFactory.createSubscribed((Flow.Publisher<Map.Entry<Object, Object>>)cacheTuplesPublisher, (int)partId, schema, columnToFieldMappings, StaticTypeConverterFactory.DEFAULT_INSTANCE)).map(itemPublisher -> {
            BasicProcessor<DataStreamerItem<Map.Entry<Tuple, Tuple>>, DataStreamerItem<Map.Entry<Tuple, Tuple>>> p = new BasicProcessor<DataStreamerItem<Map.Entry<Tuple, Tuple>>, DataStreamerItem<Map.Entry<Tuple, Tuple>>>(){

                @Override
                public void onNext(DataStreamerItem<Map.Entry<Tuple, Tuple>> item) {
                    this.subscriber.onNext(item);
                }

                @Override
                public void onError(Throwable throwable) {
                    super.onError(new MigrateCacheException(cacheName, tableName, throwable));
                }
            };
            itemPublisher.subscribe(p);
            return p;
        }).map(itemPublisher -> view.streamData((Flow.Publisher)itemPublisher, null)));
    }

    public static interface ColumnsProcessorFactory {
        @Nullable
        public Flow.Publisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>> createSubscribed(Flow.Publisher<Map.Entry<Object, Object>> var1, int var2, ClientSchema var3, Map<String, String> var4, TypeConverterFactory var5);
    }
}

