/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.migrationtools.cli.persistence.calls;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.migrationtools.cli.persistence.commands.PersistenceBaseCmd;
import org.apache.ignite.migrationtools.cli.persistence.params.IgniteClientAuthenticatorParams;
import org.apache.ignite.migrationtools.cli.persistence.params.MigrateCacheParams;
import org.apache.ignite.migrationtools.cli.persistence.params.MigrationMode;
import org.apache.ignite.migrationtools.cli.persistence.params.PersistenceParams;
import org.apache.ignite.migrationtools.persistence.Ignite2PersistentCacheTools;
import org.apache.ignite.migrationtools.persistence.MigrationKernalContext;
import org.apache.ignite.migrationtools.persistence.mappers.AbstractSchemaColumnsProcessor;
import org.apache.ignite.migrationtools.persistence.mappers.IgnoreMismatchesSchemaColumnProcessor;
import org.apache.ignite.migrationtools.persistence.mappers.SchemaColumnProcessorStats;
import org.apache.ignite.migrationtools.persistence.mappers.SimpleSchemaColumnsProcessor;
import org.apache.ignite.migrationtools.persistence.mappers.SkipRecordsSchemaColumnsProcessor;
import org.apache.ignite.migrationtools.persistence.utils.pubsub.RateLimiterProcessor;
import org.apache.ignite.migrationtools.sql.SqlDdlGenerator;
import org.apache.ignite.migrationtools.tablemanagement.PersistentTableTypeRegistryImpl;
import org.apache.ignite.migrationtools.tablemanagement.RegisterOnlyTableTypeRegistry;
import org.apache.ignite.migrationtools.tablemanagement.TableTypeRegistry;
import org.apache.ignite3.client.BasicAuthenticator;
import org.apache.ignite3.client.IgniteClient;
import org.apache.ignite3.client.IgniteClientAuthenticator;
import org.apache.ignite3.client.IgniteClientConnectionException;
import org.apache.ignite3.internal.cli.core.call.Call;
import org.apache.ignite3.internal.cli.core.call.CallInput;
import org.apache.ignite3.internal.cli.core.call.CallOutput;
import org.apache.ignite3.internal.cli.core.call.CallOutputStatus;
import org.apache.ignite3.internal.cli.core.call.DefaultCallOutput;
import org.apache.ignite3.internal.cli.core.exception.IgniteCliException;
import org.apache.ignite3.internal.cli.logger.CliLoggers;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.gridgain.ignite.migrationtools.adapter.internal.ClientAdapter;
import org.gridgain.ignite.migrationtools.adapter.internal.mapper.MapperUtils;
import org.jetbrains.annotations.Nullable;

public class MigrateCacheCall
implements Call<Input, Ouput> {
    private static final IgniteLogger LOGGER = CliLoggers.forClass(MigrateCacheCall.class);

    private static Path writeProgressToFile(String nodeConsistentId, String cacheName, List<Map.Entry<Integer, AbstractSchemaColumnsProcessor>> perPartitionColumnProcessors, ProgressData resumeFrom) {
        perPartitionColumnProcessors.stream().filter(e -> !((AbstractSchemaColumnsProcessor)e.getValue()).hasReceivedError()).map(e -> new PartitionProgressEntry((Integer)e.getKey(), ((AbstractSchemaColumnsProcessor)e.getValue()).getStats())).forEach(resumeFrom.getCompletedPartitions()::add);
        try {
            Path progressFileToWrite = Files.createTempFile(Path.of(".", new String[0]), "migrate-cache_" + nodeConsistentId + "_" + cacheName + "_", "_progress.json", new FileAttribute[0]);
            try (OutputStream os = Files.newOutputStream(progressFileToWrite, new OpenOption[0]);){
                ObjectMapper objMapper = new ObjectMapper();
                objMapper.writeValue(os, (Object)resumeFrom);
            }
            LOGGER.info("ProgressFile saved: {}", new Object[]{progressFileToWrite});
            return progressFileToWrite;
        }
        catch (IOException ex) {
            LOGGER.error("Could not save progress file: {}", (Throwable)ex);
            return null;
        }
    }

    private static ProgressData loadOrInitProgress(Input i) throws InvalidProgressFileException, IOException {
        String consistentNodeId = i.persistenceParams().nodeConsistentId();
        String cacheName = i.migrateCacheParams().cacheName();
        if (i.migrateCacheParams().progressFileToRead() != null) {
            ProgressData resumeFrom;
            try (InputStream is = Files.newInputStream(i.migrateCacheParams().progressFileToRead(), new OpenOption[0]);){
                ObjectMapper objectMapper = new ObjectMapper();
                resumeFrom = (ProgressData)objectMapper.readValue(is, ProgressData.class);
            }
            if (!consistentNodeId.equals(resumeFrom.nodeId)) {
                throw InvalidProgressFileException.forNodeConsistentId(resumeFrom.nodeId, i.persistenceParams().nodeConsistentId());
            }
            if (!cacheName.equals(resumeFrom.cacheName)) {
                throw InvalidProgressFileException.forNodeConsistentId(resumeFrom.cacheName, cacheName);
            }
            String migrationModeStr = i.migrateCacheParams().migrationMode().toString();
            if (!migrationModeStr.equals(resumeFrom.migrationMode)) {
                throw InvalidProgressFileException.forMode(resumeFrom.migrationMode, migrationModeStr);
            }
            return resumeFrom;
        }
        return new ProgressData(consistentNodeId, cacheName, i.migrateCacheParams().migrationMode().toString(), new ArrayList<PartitionProgressEntry>());
    }

    private static <T extends AbstractSchemaColumnsProcessor> SchemaColumnProcessorStats collectStats(MigrationMode migrationMode, List<Map.Entry<Integer, T>> schemaColumnsProcessors) {
        long processedElements = 0L;
        switch (migrationMode) {
            case SKIP_RECORD: {
                long numSkippedRecords = 0L;
                for (Map.Entry<Integer, T> e : schemaColumnsProcessors) {
                    AbstractSchemaColumnsProcessor p = (AbstractSchemaColumnsProcessor)e.getValue();
                    SkipRecordsSchemaColumnsProcessor.SkippedRecordsStats stats = (SkipRecordsSchemaColumnsProcessor.SkippedRecordsStats)p.getStats();
                    processedElements += stats.getProcessedElements();
                    numSkippedRecords += stats.getNumSkippedRecords();
                }
                return new SkipRecordsSchemaColumnsProcessor.SkippedRecordsStats(processedElements, numSkippedRecords);
            }
            case IGNORE_COLUMN: {
                HashMap droppedCols = new HashMap();
                for (Map.Entry<Integer, T> e : schemaColumnsProcessors) {
                    AbstractSchemaColumnsProcessor p = (AbstractSchemaColumnsProcessor)e.getValue();
                    IgnoreMismatchesSchemaColumnProcessor.IgnoredColumnsStats stats = (IgnoreMismatchesSchemaColumnProcessor.IgnoredColumnsStats)p.getStats();
                    processedElements += stats.getProcessedElements();
                    droppedCols.putAll(stats.getDroppedColumns());
                }
                return new IgnoreMismatchesSchemaColumnProcessor.IgnoredColumnsStats(processedElements, droppedCols);
            }
        }
        for (Map.Entry<Integer, T> e : schemaColumnsProcessors) {
            AbstractSchemaColumnsProcessor p = (AbstractSchemaColumnsProcessor)e.getValue();
            SchemaColumnProcessorStats stats = p.getStats();
            processedElements += stats.getProcessedElements();
        }
        return new SchemaColumnProcessorStats(processedElements);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CallOutput<Ouput> execute(Input i) {
        ProgressData resumeFrom;
        String cacheName = i.migrateCacheParams().cacheName();
        String[] addresses = i.migrateCacheParams().addresses();
        if (addresses[0].equals(cacheName)) {
            addresses = Arrays.copyOfRange(addresses, 1, addresses.length);
        }
        @Nullable BasicAuthenticator authenticator = i.clientAuthenticatorParams().authenticator();
        try {
            resumeFrom = MigrateCacheCall.loadOrInitProgress(i);
        }
        catch (InvalidProgressFileException e) {
            LOGGER.error(e.getMessage(), new Object[0]);
            return DefaultCallOutput.failure((Throwable)e);
        }
        catch (IOException e) {
            LOGGER.error("Could not read from progress file: {}", new Object[]{i.migrateCacheParams().progressFileToRead(), e});
            return DefaultCallOutput.failure((Throwable)new IgniteCliException("Cannot read from progress file: " + i.migrateCacheParams().progressFileToRead(), (Throwable)e));
        }
        List<Object> persistentCtx = Collections.emptyList();
        ArrayList<Map.Entry<Integer, AbstractSchemaColumnsProcessor>> perPartitionColumnProcessors = new ArrayList<Map.Entry<Integer, AbstractSchemaColumnsProcessor>>();
        try {
            DefaultCallOutput defaultCallOutput;
            block41: {
                IgniteClient client;
                block39: {
                    DefaultCallOutput defaultCallOutput2;
                    block40: {
                        IgniteConfiguration cfg22;
                        block37: {
                            DefaultCallOutput defaultCallOutput3;
                            block38: {
                                client = IgniteClient.builder().addresses(addresses).authenticator((IgniteClientAuthenticator)authenticator).build();
                                try {
                                    cfg22 = PersistenceBaseCmd.createValidIgniteCfg(i.persistenceParams());
                                    if (cfg22 != null) break block37;
                                    defaultCallOutput3 = DefaultCallOutput.failure((Throwable)new IgniteCliException("Unable to read ignite configuration"));
                                    if (client == null) break block38;
                                }
                                catch (Throwable cfg22) {
                                    try {
                                        if (client != null) {
                                            try {
                                                client.close();
                                            }
                                            catch (Throwable throwable) {
                                                cfg22.addSuppressed(throwable);
                                            }
                                        }
                                        throw cfg22;
                                    }
                                    catch (Exception e) {
                                        Path outputProgressFile;
                                        if (e instanceof IgniteClientConnectionException) {
                                            LOGGER.error("Could not connect to the cluster", (Throwable)e);
                                            cfg22 = DefaultCallOutput.failure((Throwable)e);
                                            return cfg22;
                                        }
                                        if (!i.migrateCacheParams().saveProgressFileDisabled()) {
                                            if (!perPartitionColumnProcessors.isEmpty()) {
                                                perPartitionColumnProcessors.remove(perPartitionColumnProcessors.size() - 1);
                                            }
                                            outputProgressFile = MigrateCacheCall.writeProgressToFile(i.persistenceParams().nodeConsistentId(), i.migrateCacheParams().cacheName(), perPartitionColumnProcessors, resumeFrom);
                                        } else {
                                            outputProgressFile = null;
                                        }
                                        LOGGER.error("Error while migration persistence folder", (Throwable)e);
                                        DefaultCallOutput defaultCallOutput4 = DefaultCallOutput.builder().status(CallOutputStatus.ERROR).cause(ExceptionUtils.unwrapCause((Throwable)e)).body((Object)new Ouput("Migration finished unsuccessfully", outputProgressFile)).build();
                                        return defaultCallOutput4;
                                    }
                                }
                                client.close();
                            }
                            return defaultCallOutput3;
                        }
                        persistentCtx = PersistenceBaseCmd.createAndStartMigrationContext(i.persistenceParams(), cfg22);
                        if (!persistentCtx.isEmpty()) break block39;
                        defaultCallOutput2 = DefaultCallOutput.failure((Throwable)new IgniteCliException(String.format("Could not find node (consistentId:%s) folder in '%s'", i.persistenceParams().nodeConsistentId(), i.persistenceParams().workDir().toString())));
                        if (client == null) break block40;
                        client.close();
                    }
                    return defaultCallOutput2;
                }
                MigrationMode migrationMode = i.migrateCacheParams().migrationMode();
                boolean packExtraFields = migrationMode == MigrationMode.PACK_EXTRA;
                Ignite2PersistentCacheTools.ColumnsProcessorFactory columnsProcessorFactory = (cacheTuplePublisher, partitionId, clientSchema, columnsToFieldsMappings, typeConverters) -> {
                    SkipRecordsSchemaColumnsProcessor processor;
                    if (resumeFrom.getCompletedPartitions().stream().anyMatch(e -> e.getPartitionId() == partitionId)) {
                        return null;
                    }
                    switch (migrationMode) {
                        case SKIP_RECORD: {
                            processor = new SkipRecordsSchemaColumnsProcessor(clientSchema, columnsToFieldsMappings, typeConverters);
                            break;
                        }
                        case IGNORE_COLUMN: {
                            processor = new IgnoreMismatchesSchemaColumnProcessor(clientSchema, columnsToFieldsMappings, typeConverters);
                            break;
                        }
                        default: {
                            processor = new SimpleSchemaColumnsProcessor(clientSchema, columnsToFieldsMappings, typeConverters, packExtraFields);
                        }
                    }
                    perPartitionColumnProcessors.add(Map.entry(partitionId, processor));
                    cacheTuplePublisher.subscribe(processor);
                    SkipRecordsSchemaColumnsProcessor tail = processor;
                    if (i.migrateCacheParams().rateLimiter() > 0) {
                        RateLimiterProcessor rl = new RateLimiterProcessor(1L, TimeUnit.SECONDS, i.migrateCacheParams().rateLimiter());
                        tail.subscribe(rl);
                        tail = rl;
                    }
                    return tail;
                };
                boolean allowDfltConstructors = false;
                RegisterOnlyTableTypeRegistry registry = new RegisterOnlyTableTypeRegistry((TableTypeRegistry)new PersistentTableTypeRegistryImpl(client));
                ClientAdapter adapter = new ClientAdapter(client, (TableTypeRegistry)registry, new MapperUtils(packExtraFields, allowDfltConstructors));
                SqlDdlGenerator sqlGenerator = new SqlDdlGenerator((TableTypeRegistry)registry, packExtraFields);
                LOGGER.info("Starting the migration process", new Object[0]);
                Ignite2PersistentCacheTools.migrateCache((ClientAdapter)adapter, (SqlDdlGenerator)sqlGenerator, persistentCtx, (String)cacheName, (Ignite2PersistentCacheTools.ColumnsProcessorFactory)columnsProcessorFactory);
                LOGGER.info("Waiting for results to be persisted", new Object[0]);
                Path outputProgressFile = !i.migrateCacheParams().saveProgressFileDisabled() ? MigrateCacheCall.writeProgressToFile(i.persistenceParams().nodeConsistentId(), i.migrateCacheParams().cacheName(), perPartitionColumnProcessors, resumeFrom) : null;
                SchemaColumnProcessorStats stats = MigrateCacheCall.collectStats(migrationMode, perPartitionColumnProcessors);
                LOGGER.info("Finished persisting records: {}", new Object[]{stats});
                defaultCallOutput = DefaultCallOutput.success((Object)new Ouput("Migration finished successfully", outputProgressFile));
                if (client == null) break block41;
                client.close();
            }
            return defaultCallOutput;
        }
        finally {
            for (MigrationKernalContext migrationKernalContext : persistentCtx) {
                try {
                    migrationKernalContext.stop();
                }
                catch (IgniteCheckedException e) {
                    LOGGER.warn("Error while closing persistent contexts", (Throwable)e);
                }
            }
        }
    }

    private static class ProgressData {
        private String nodeId;
        private String cacheName;
        private String migrationMode;
        private List<PartitionProgressEntry> completedPartitions;

        private ProgressData() {
        }

        public ProgressData(String nodeId, String cacheName, String migrationMode, List<PartitionProgressEntry> completedPartitions) {
            this.nodeId = nodeId;
            this.cacheName = cacheName;
            this.migrationMode = migrationMode;
            this.completedPartitions = completedPartitions;
        }

        public String getNodeId() {
            return this.nodeId;
        }

        public String getCacheName() {
            return this.cacheName;
        }

        public String getMigrationMode() {
            return this.migrationMode;
        }

        public List<PartitionProgressEntry> getCompletedPartitions() {
            return this.completedPartitions;
        }
    }

    public static class Input
    implements CallInput {
        private final PersistenceParams persistenceParams;
        private final MigrateCacheParams migrateCacheParams;
        private final IgniteClientAuthenticatorParams clientAuthenticatorParams;

        public Input(PersistenceParams persistenceParams, MigrateCacheParams migrateCacheParams, IgniteClientAuthenticatorParams clientAuthenticatorParams) {
            this.persistenceParams = persistenceParams;
            this.migrateCacheParams = migrateCacheParams;
            this.clientAuthenticatorParams = clientAuthenticatorParams;
        }

        PersistenceParams persistenceParams() {
            return this.persistenceParams;
        }

        MigrateCacheParams migrateCacheParams() {
            return this.migrateCacheParams;
        }

        IgniteClientAuthenticatorParams clientAuthenticatorParams() {
            return this.clientAuthenticatorParams;
        }
    }

    public static class InvalidProgressFileException
    extends Exception {
        public InvalidProgressFileException(String message) {
            super("Progress file does not match the provided arguments: " + message);
        }

        static InvalidProgressFileException forNodeConsistentId(String expected, String actual) {
            return new InvalidProgressFileException("Progress File is from node '" + expected + "', but '" + actual + "' was requested.");
        }

        static InvalidProgressFileException forCacheName(String expected, String actual) {
            return new InvalidProgressFileException("Progress File is from cache '" + expected + "', but '" + actual + "' was requested.");
        }

        static InvalidProgressFileException forMode(String expected, String actual) {
            return new InvalidProgressFileException("Progress File has migration mode '" + expected + "', but '" + actual + "' was requested.");
        }
    }

    public static class Ouput {
        private String msg;
        private Path progressFilePath;

        public Ouput(String msg, Path progressFilePath) {
            this.msg = msg;
            this.progressFilePath = progressFilePath;
        }

        public String getMsg() {
            return this.msg;
        }

        public Path getProgressFilePath() {
            return this.progressFilePath;
        }
    }

    static class PartitionProgressEntry {
        private int partitionId;
        private SchemaColumnProcessorStats stats;

        private PartitionProgressEntry() {
        }

        private PartitionProgressEntry(int partitionId, SchemaColumnProcessorStats stats) {
            this.partitionId = partitionId;
            this.stats = stats;
        }

        public int getPartitionId() {
            return this.partitionId;
        }

        public SchemaColumnProcessorStats getStats() {
            return this.stats;
        }
    }
}

