package org.gridgain.ignite.migrationtools.persistence;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite3.internal.client.table.ClientSchema;
import org.apache.ignite3.internal.client.table.ClientTable;
import org.apache.ignite3.table.DataStreamerItem;
import org.apache.ignite3.table.DataStreamerOptions;
import org.apache.ignite3.table.KeyValueView;
import org.apache.ignite3.table.Tuple;
import org.gridgain.ignite.migrationtools.adapter.internal.ClientAdapter;
import org.gridgain.ignite.migrationtools.adapter.internal.mapper.converters.StaticTypeConverterFactory;
import org.gridgain.ignite.migrationtools.adapter.internal.mapper.converters.TypeConverterFactory;
import org.gridgain.ignite.migrationtools.persistence.mappers.CacheDataRowProcessor;
import org.gridgain.ignite.migrationtools.persistence.utils.pubsub.StreamerPublisher;
import org.gridgain.ignite.migrationtools.sql.SQLDDLGenerator;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/gridgain/ignite/migrationtools/persistence/Ignite2PersistentCacheTools.class */
public class Ignite2PersistentCacheTools {

    /* loaded from: input_file:org/gridgain/ignite/migrationtools/persistence/Ignite2PersistentCacheTools$ColumnsProcessorFactory.class */
    public interface ColumnsProcessorFactory {
        @Nullable
        Flow.Publisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>> createSubscribed(Flow.Publisher<Map.Entry<Tuple, Tuple>> publisher, int i, ClientSchema clientSchema, Map<String, String> map, TypeConverterFactory typeConverterFactory);
    }

    public static Set<Pair<Integer, String>> persistentCaches(List<MigrationKernalContext> list) throws IgniteCheckedException {
        HashSet hashSet = new HashSet();
        for (MigrationKernalContext migrationKernalContext : list) {
            GridCacheProcessor cache = migrationKernalContext.cache();
            if (cache instanceof MigrationCacheProcessor) {
                ((MigrationCacheProcessor) cache).loadAllDescriptors();
            }
            for (DynamicCacheDescriptor dynamicCacheDescriptor : migrationKernalContext.cache().persistentCaches()) {
                if (CacheType.USER.equals(dynamicCacheDescriptor.cacheType())) {
                    hashSet.add(Pair.of(Integer.valueOf(dynamicCacheDescriptor.cacheId()), dynamicCacheDescriptor.cacheName()));
                }
            }
        }
        return hashSet;
    }

    public static void publishCacheCursor(List<MigrationKernalContext> list, String str, BiFunction<Flow.Publisher<Map.Entry<Tuple, Tuple>>, Integer, Optional<CompletableFuture<Void>>> biFunction) throws IgniteCheckedException {
        for (MigrationKernalContext migrationKernalContext : list) {
            publishCacheCursorAtNode(migrationKernalContext, migrationKernalContext.cache().cacheDescriptor(str), biFunction);
        }
    }

    public static void publishCacheCursorAtNode(MigrationKernalContext migrationKernalContext, DynamicCacheDescriptor dynamicCacheDescriptor, BiFunction<Flow.Publisher<Map.Entry<Tuple, Tuple>>, Integer, Optional<CompletableFuture<Void>>> biFunction) throws IgniteCheckedException {
        GridCacheProcessor cache = migrationKernalContext.cache();
        if (cache instanceof MigrationCacheProcessor) {
            ((MigrationCacheProcessor) cache).startCache(dynamicCacheDescriptor);
        }
        int cacheId = dynamicCacheDescriptor.cacheId();
        GridCacheContext cacheContext = cache.context().cacheContext(cacheId);
        IgniteCacheOffheapManager offheap = cacheContext.offheap();
        CacheObjectContext cacheObjectContext = cacheContext.cacheObjectContext();
        for (IgniteCacheOffheapManager.CacheDataStore cacheDataStore : offheap.cacheDataStores()) {
            StreamerPublisher streamerPublisher = new StreamerPublisher();
            CacheDataRowProcessor cacheDataRowProcessor = new CacheDataRowProcessor(cacheObjectContext);
            streamerPublisher.subscribe(cacheDataRowProcessor);
            Optional<CompletableFuture<Void>> apply = biFunction.apply(cacheDataRowProcessor, Integer.valueOf(cacheDataStore.partId()));
            if (apply.isPresent()) {
                CompletableFuture<Void> completableFuture = apply.get();
                try {
                    try {
                        GridCursor cursor = cacheDataStore.cursor();
                        while (cursor.next()) {
                            CacheDataRow cacheDataRow = (CacheDataRow) cursor.get();
                            if (cacheDataRow.cacheId() == 0 || cacheDataRow.cacheId() == cacheId) {
                                try {
                                    if (!streamerPublisher.offer(cacheDataRow)) {
                                        throw new IgniteCheckedException("Failed to offer element to publisher");
                                    }
                                } catch (InterruptedException e) {
                                    Thread.currentThread().interrupt();
                                    throw new IgniteCheckedException("Interrupted publishing tuple");
                                }
                            }
                        }
                    } finally {
                        streamerPublisher.close();
                        completableFuture.join();
                    }
                } catch (IgniteCheckedException e2) {
                    streamerPublisher.closeExceptionally(e2);
                    throw e2;
                }
            }
        }
    }

    public static void migrateCache(ClientAdapter clientAdapter, SQLDDLGenerator sQLDDLGenerator, List<MigrationKernalContext> list, String str, ColumnsProcessorFactory columnsProcessorFactory) throws IgniteCheckedException {
        Map fieldNameForColumnMappings;
        CacheConfiguration cacheConfiguration = (CacheConfiguration) list.stream().flatMap(migrationKernalContext -> {
            return Stream.ofNullable(migrationKernalContext.cache().cacheDescriptor(str));
        }).map((v0) -> {
            return v0.cacheConfiguration();
        }).findFirst().orElseThrow(() -> {
            return new RuntimeException("Could not find the requested cache: " + str);
        });
        ClientTable clientTable = clientAdapter.getClientTable(str);
        if (clientTable == null) {
            Map.Entry entry = (Map.Entry) clientAdapter.createTableAndTypeHints(cacheConfiguration).join();
            clientTable = (ClientTable) entry.getKey();
            fieldNameForColumnMappings = ((SQLDDLGenerator.GenerateTableResult) entry.getValue()).fieldNameForColumnMappings();
        } else {
            fieldNameForColumnMappings = sQLDDLGenerator.generate(cacheConfiguration).fieldNameForColumnMappings();
        }
        KeyValueView keyValueView = clientTable.keyValueView();
        ClientSchema clientSchema = (ClientSchema) ClientAdapter.getLatestSchemaForTable(clientTable).join();
        Map map = fieldNameForColumnMappings;
        publishCacheCursor(list, str, (publisher, num) -> {
            return Optional.ofNullable(columnsProcessorFactory.createSubscribed(publisher, num.intValue(), clientSchema, map, StaticTypeConverterFactory.DEFAULT_INSTANCE)).map(publisher -> {
                return keyValueView.streamData(publisher, (DataStreamerOptions) null);
            });
        });
    }
}
