/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.network.file;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileAttribute;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.network.annotations.Transferable;
import org.apache.ignite.internal.network.configuration.FileTransferConfiguration;
import org.apache.ignite.internal.network.configuration.FileTransferView;
import org.apache.ignite.internal.network.file.Channel;
import org.apache.ignite.internal.network.file.FileConsumer;
import org.apache.ignite.internal.network.file.FileProvider;
import org.apache.ignite.internal.network.file.FileReceiver;
import org.apache.ignite.internal.network.file.FileSender;
import org.apache.ignite.internal.network.file.FileTransferService;
import org.apache.ignite.internal.network.file.TransferredFilesCollector;
import org.apache.ignite.internal.network.file.exception.FileTransferException;
import org.apache.ignite.internal.network.file.messages.FileChunkMessage;
import org.apache.ignite.internal.network.file.messages.FileChunkResponse;
import org.apache.ignite.internal.network.file.messages.FileDownloadRequest;
import org.apache.ignite.internal.network.file.messages.FileDownloadResponse;
import org.apache.ignite.internal.network.file.messages.FileHeader;
import org.apache.ignite.internal.network.file.messages.FileTransferError;
import org.apache.ignite.internal.network.file.messages.FileTransferErrorMessage;
import org.apache.ignite.internal.network.file.messages.FileTransferFactory;
import org.apache.ignite.internal.network.file.messages.FileTransferInitMessage;
import org.apache.ignite.internal.network.file.messages.FileTransferInitResponse;
import org.apache.ignite.internal.network.file.messages.FileTransferMessageType;
import org.apache.ignite.internal.network.file.messages.Identifier;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteUtils;

public class FileTransferServiceImpl
implements FileTransferService {
    private static final IgniteLogger LOG = Loggers.forClass(FileTransferServiceImpl.class);
    private final long responseTimeout;
    private final TopologyService topologyService;
    private final MessagingService messagingService;
    private final Path transferDirectory;
    private final FileSender fileSender;
    private final FileReceiver fileReceiver;
    private final ExecutorService executorService;
    private final Map<Short, FileProvider<Identifier>> identifierToProvider = new ConcurrentHashMap<Short, FileProvider<Identifier>>();
    private final Map<Short, FileConsumer<Identifier>> identifierToConsumer = new ConcurrentHashMap<Short, FileConsumer<Identifier>>();
    private final Map<UUID, DownloadRequestConsumer> transferIdToDownloadConsumer = new ConcurrentHashMap<UUID, DownloadRequestConsumer>();
    private final FileTransferFactory messageFactory = new FileTransferFactory();

    FileTransferServiceImpl(String nodeName, TopologyService topologyService, MessagingService messagingService, FileTransferConfiguration configuration, Path transferDirectory) {
        this(topologyService, messagingService, configuration, transferDirectory, FileTransferServiceImpl.createExecutor(nodeName, configuration));
    }

    private static ExecutorService createExecutor(String nodeName, FileTransferConfiguration configuration) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(((FileTransferView)configuration.value()).threadPoolSize(), ((FileTransferView)configuration.value()).threadPoolSize(), 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)IgniteThreadFactory.create((String)nodeName, (String)"file-transfer", (IgniteLogger)LOG, (ThreadOperation[])new ThreadOperation[0]));
        executor.allowCoreThreadTimeOut(true);
        return executor;
    }

    private FileTransferServiceImpl(TopologyService topologyService, MessagingService messagingService, FileTransferConfiguration configuration, Path transferDirectory, ExecutorService executorService) {
        this(((FileTransferView)configuration.value()).responseTimeoutMillis(), topologyService, messagingService, transferDirectory, new FileSender(((FileTransferView)configuration.value()).chunkSizeBytes(), new Semaphore(((FileTransferView)configuration.value()).maxConcurrentRequests()), ((FileTransferView)configuration.value()).responseTimeoutMillis(), messagingService, executorService), new FileReceiver(), executorService);
    }

    FileTransferServiceImpl(long responseTimeout, TopologyService topologyService, MessagingService messagingService, Path transferDirectory, FileSender fileSender, FileReceiver fileReceiver, ExecutorService executorService) {
        this.responseTimeout = responseTimeout;
        this.topologyService = topologyService;
        this.messagingService = messagingService;
        this.transferDirectory = transferDirectory;
        this.fileSender = fileSender;
        this.fileReceiver = fileReceiver;
        this.executorService = executorService;
    }

    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        this.topologyService.addEventHandler(new TopologyEventHandler(){

            public void onDisappeared(InternalClusterNode member) {
                FileTransferServiceImpl.this.fileReceiver.cancelTransfersFromSender(member.name());
            }
        });
        this.messagingService.addMessageHandler(FileTransferMessageType.class, (message, sender, correlationId) -> {
            String senderConsistentId = sender.name();
            if (message instanceof FileDownloadRequest) {
                this.processDownloadRequest((FileDownloadRequest)message, senderConsistentId, correlationId);
            } else if (message instanceof FileTransferInitMessage) {
                this.processFileTransferInitMessage((FileTransferInitMessage)message, senderConsistentId, correlationId);
            } else if (message instanceof FileChunkMessage) {
                this.processFileChunkMessage((FileChunkMessage)message, senderConsistentId, correlationId);
            } else if (message instanceof FileTransferErrorMessage) {
                this.processFileTransferErrorMessage((FileTransferErrorMessage)message);
            } else {
                LOG.error("Unexpected message received: {}", new Object[]{message});
            }
        });
        return CompletableFutures.nullCompletedFuture();
    }

    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        IgniteUtils.shutdownAndAwaitTermination((ExecutorService)this.executorService, (long)10L, (TimeUnit)TimeUnit.SECONDS);
        return CompletableFutures.nullCompletedFuture();
    }

    private void processFileTransferInitMessage(FileTransferInitMessage message, String senderConsistentId, long correlationId) {
        UUID transferId = message.transferId();
        Identifier identifier = message.identifier();
        CompletionStage directoryFuture = CompletableFuture.supplyAsync(() -> this.createTransferDirectory(transferId), this.executorService).whenComplete((directory, e) -> {
            if (e != null) {
                LOG.error("Failed to create transfer directory [transferId={}, identifier={}]", e, new Object[]{transferId, identifier});
            }
        });
        CompletionStage collectorFuture = ((CompletableFuture)directoryFuture).thenApply(directory -> this.fileReceiver.registerTransfer(senderConsistentId, transferId, message.headers(), (Path)directory));
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)collectorFuture).handle((collector, throwable) -> this.messageFactory.fileTransferInitResponse().error(throwable != null ? FileTransferError.fromThrowable(this.messageFactory, throwable) : null).build())).thenCompose(response -> this.messagingService.respond(senderConsistentId, Channel.FILE_TRANSFER_CHANNEL, (NetworkMessage)response, correlationId))).whenComplete((v, e) -> {
            if (e != null) {
                LOG.error("Failed to send file transfer response [transferId={}, identifier={}]", e, new Object[]{transferId, identifier});
                this.fileReceiver.cancelTransfer(transferId, (Throwable)e);
            }
        });
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)collectorFuture).thenCompose(TransferredFilesCollector::collectedFiles)).whenComplete((files, throwable) -> {
            if (throwable != null) {
                LOG.error("Failed to collect transferred files [transferId={}, identifier={}]", throwable, new Object[]{transferId, identifier});
                this.transferIdToDownloadConsumer.computeIfPresent(transferId, (k, v) -> {
                    v.onError((Throwable)throwable);
                    return null;
                });
            }
        })).thenCompose(files -> this.getFileConsumer(transferId, identifier).consume(identifier, (List<Path>)files).whenComplete((v, e) -> {
            if (e != null) {
                LOG.error("Failed to process file transfer [transferId={}, identifier={}]", e, new Object[]{transferId, identifier});
            }
        }))).whenComplete((arg_0, arg_1) -> this.lambda$processFileTransferInitMessage$11(transferId, (CompletableFuture)directoryFuture, arg_0, arg_1));
    }

    private void processDownloadRequest(FileDownloadRequest message, String senderConsistentId, Long correlationId) {
        ((CompletableFuture)CompletableFuture.supplyAsync(() -> this.getFileProvider(message.identifier()), this.executorService).thenCompose(provider -> provider.files(message.identifier()))).whenComplete((files, e) -> {
            if (e != null) {
                LOG.error("Failed to get files for download [transferId={}, identifier={}]", e, new Object[]{message.transferId(), message.identifier()});
                FileDownloadResponse response = this.messageFactory.fileDownloadResponse().error(FileTransferError.fromThrowable(this.messageFactory, e)).build();
                this.messagingService.respond(senderConsistentId, Channel.FILE_TRANSFER_CHANNEL, (NetworkMessage)response, correlationId.longValue());
            } else if (files.isEmpty()) {
                LOG.warn("No files to download [transferId={}, identifier={}]", new Object[]{message.transferId(), message.identifier()});
                FileDownloadResponse response = this.messageFactory.fileDownloadResponse().error(FileTransferError.fromThrowable(this.messageFactory, (Throwable)((Object)new FileTransferException("No files to download")))).build();
                this.messagingService.respond(senderConsistentId, Channel.FILE_TRANSFER_CHANNEL, (NetworkMessage)response, correlationId.longValue());
            } else {
                FileDownloadResponse response = this.messageFactory.fileDownloadResponse().build();
                this.messagingService.respond(senderConsistentId, Channel.FILE_TRANSFER_CHANNEL, (NetworkMessage)response, correlationId.longValue()).thenComposeAsync(v -> this.transferFilesToNode(senderConsistentId, message.transferId(), message.identifier(), (List<Path>)files), (Executor)this.executorService);
            }
        });
    }

    private void processFileChunkMessage(FileChunkMessage message, String senderConsistentId, long correlationId) {
        CompletableFuture.runAsync(() -> this.fileReceiver.receiveFileChunk(message), this.executorService).whenComplete((v, e) -> {
            if (e != null) {
                LOG.error("Failed to process file chunk [transferId={}]", e, new Object[]{message.transferId()});
            }
            FileChunkResponse ack = this.messageFactory.fileChunkResponse().error(e != null ? FileTransferError.fromThrowable(this.messageFactory, e) : null).build();
            this.messagingService.respond(senderConsistentId, Channel.FILE_TRANSFER_CHANNEL, (NetworkMessage)ack, correlationId);
        });
    }

    private void processFileTransferErrorMessage(FileTransferErrorMessage message) {
        LOG.error("Received file transfer error message. Transfer will be cancelled [transferId={}, error={}", new Object[]{message.transferId(), message.error()});
        CompletableFuture.runAsync(() -> this.fileReceiver.cancelTransfer(message.transferId(), FileTransferError.toException(message.error())), this.executorService);
    }

    private CompletableFuture<Void> sendFiles(String targetNodeConsistentId, UUID transferId, List<Path> paths) {
        return this.fileSender.send(targetNodeConsistentId, transferId, paths).whenComplete((v, e) -> {
            if (e != null) {
                LOG.error("Failed to send files to node [nodeConsistentId={}, transferId={}]", e, new Object[]{targetNodeConsistentId, transferId});
                FileTransferErrorMessage message = this.messageFactory.fileTransferErrorMessage().transferId(transferId).error(FileTransferError.fromThrowable(this.messageFactory, e)).build();
                this.messagingService.send(targetNodeConsistentId, Channel.FILE_TRANSFER_CHANNEL, (NetworkMessage)message);
            }
        });
    }

    @Override
    public <M extends Identifier> void addFileProvider(Class<M> identifier, FileProvider<M> provider) {
        this.identifierToProvider.compute(FileTransferServiceImpl.getMessageType(identifier), (k, v) -> {
            if (v != null) {
                throw new IllegalArgumentException("File provider for identifier " + identifier.getName() + " already exists");
            }
            return provider;
        });
    }

    @Override
    public <M extends Identifier> void addFileConsumer(Class<M> identifier, FileConsumer<M> consumer) {
        this.identifierToConsumer.compute(FileTransferServiceImpl.getMessageType(identifier), (k, v) -> {
            if (v != null) {
                throw new IllegalArgumentException("File handler for identifier " + identifier.getName() + " already exists");
            }
            return consumer;
        });
    }

    @Override
    public CompletableFuture<List<Path>> download(String sourceNodeConsistentId, Identifier identifier, Path targetDir) {
        UUID transferId = UUID.randomUUID();
        FileDownloadRequest downloadRequest = this.messageFactory.fileDownloadRequest().transferId(transferId).identifier(identifier).build();
        CompletionStage downloadedFiles = new CompletableFuture().whenComplete((v, e) -> this.transferIdToDownloadConsumer.remove(transferId));
        this.transferIdToDownloadConsumer.put(transferId, new DownloadRequestConsumer((CompletableFuture<List<Path>>)downloadedFiles, targetDir));
        ((CompletableFuture)this.messagingService.invoke(sourceNodeConsistentId, Channel.FILE_TRANSFER_CHANNEL, (NetworkMessage)downloadRequest, this.responseTimeout).thenApply(FileDownloadResponse.class::cast)).whenComplete((arg_0, arg_1) -> FileTransferServiceImpl.lambda$download$23((CompletableFuture)downloadedFiles, arg_0, arg_1));
        return downloadedFiles;
    }

    @Override
    public CompletableFuture<Void> upload(String targetNodeConsistentId, Identifier identifier) {
        return this.getFileProvider(identifier).files(identifier).thenCompose(files -> this.transferFilesToNode(targetNodeConsistentId, UUID.randomUUID(), identifier, (List<Path>)files));
    }

    private CompletableFuture<Void> transferFilesToNode(String targetNodeConsistentId, UUID transferId, Identifier identifier, List<Path> paths) {
        if (paths.isEmpty()) {
            return CompletableFuture.failedFuture((Throwable)((Object)new FileTransferException("No files to upload")));
        }
        FileTransferInitMessage message = this.messageFactory.fileTransferInitMessage().transferId(transferId).identifier(identifier).headers(FileHeader.fromPaths(this.messageFactory, paths)).build();
        return ((CompletableFuture)((CompletableFuture)this.messagingService.invoke(targetNodeConsistentId, Channel.FILE_TRANSFER_CHANNEL, (NetworkMessage)message, this.responseTimeout).thenApply(FileTransferInitResponse.class::cast)).thenComposeAsync(response -> {
            if (response.error() != null) {
                return CompletableFuture.failedFuture((Throwable)((Object)new FileTransferException("Failed to upload files: " + response.error().message())));
            }
            return this.sendFiles(targetNodeConsistentId, transferId, paths);
        }, (Executor)this.executorService)).whenComplete((v, e) -> {
            if (e != null) {
                LOG.error("Failed to transfer files to node [nodeConsistentId={}, transferId={}]", e, new Object[]{targetNodeConsistentId, transferId});
            }
        });
    }

    private static short getMessageType(Class<?> identifier) {
        Transferable annotation = identifier.getAnnotation(Transferable.class);
        if (annotation == null) {
            throw new IllegalArgumentException("Class " + identifier.getName() + " is not annotated with @Transferable");
        }
        return annotation.value();
    }

    private <M extends Identifier> FileProvider<M> getFileProvider(M identifier) {
        FileProvider<Identifier> provider = this.identifierToProvider.get(identifier.messageType());
        if (provider == null) {
            throw new IllegalArgumentException("File provider for identifier " + identifier.getClass().getName() + " not found");
        }
        return provider;
    }

    private <M extends Identifier> FileConsumer<M> getFileConsumer(UUID transferId, M identifier) {
        return this.transferIdToDownloadConsumer.containsKey(transferId) ? (FileConsumer)this.transferIdToDownloadConsumer.get(transferId) : this.getFileConsumer(identifier);
    }

    private <M extends Identifier> FileConsumer<M> getFileConsumer(M identifier) {
        FileConsumer<Identifier> consumer = this.identifierToConsumer.get(identifier.messageType());
        if (consumer == null) {
            throw new IllegalArgumentException("File consumer for identifier " + identifier.getClass().getName() + " not found");
        }
        return consumer;
    }

    private Path createTransferDirectory(UUID transferId) {
        try {
            return Files.createDirectories(this.transferDirectory.resolve(transferId.toString()), new FileAttribute[0]);
        }
        catch (IOException e) {
            throw new FileTransferException("Failed to create the transfer directory with transferId: " + transferId, e);
        }
    }

    private static /* synthetic */ void lambda$download$23(CompletableFuture downloadedFiles, FileDownloadResponse response, Throwable e) {
        if (e != null) {
            downloadedFiles.completeExceptionally(e);
        } else if (response.error() != null) {
            downloadedFiles.completeExceptionally(FileTransferError.toException(response.error()));
        }
    }

    private /* synthetic */ void lambda$processFileTransferInitMessage$11(UUID transferId, CompletableFuture directoryFuture, Void v, Throwable e) {
        this.transferIdToDownloadConsumer.remove(transferId);
        directoryFuture.thenAccept(IgniteUtils::deleteIfExists);
    }

    private static class DownloadRequestConsumer
    implements FileConsumer<Identifier> {
        private final CompletableFuture<List<Path>> downloadedFiles;
        private final Path targetDir;

        private DownloadRequestConsumer(CompletableFuture<List<Path>> downloadedFiles, Path targetDir) {
            this.downloadedFiles = downloadedFiles;
            this.targetDir = targetDir;
        }

        @Override
        public CompletableFuture<Void> consume(Identifier identifier, List<Path> uploadedFiles) {
            IgniteUtils.deleteIfExists((Path)this.targetDir);
            if (!uploadedFiles.isEmpty()) {
                Path directory = uploadedFiles.get(0).getParent();
                try {
                    Files.move(directory, this.targetDir, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
                    try (Stream<Path> stream = Files.list(this.targetDir);){
                        this.downloadedFiles.complete(stream.collect(Collectors.toList()));
                    }
                }
                catch (IOException e) {
                    this.downloadedFiles.completeExceptionally(e);
                    return CompletableFuture.failedFuture(e);
                }
            }
            return CompletableFutures.nullCompletedFuture();
        }

        private void onError(Throwable e) {
            this.downloadedFiles.completeExceptionally(e);
        }
    }
}

