package org.apache.ignite.internal.network.file;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileAttribute;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.network.annotations.Transferable;
import org.apache.ignite.internal.network.configuration.FileTransferConfiguration;
import org.apache.ignite.internal.network.configuration.FileTransferView;
import org.apache.ignite.internal.network.file.exception.FileTransferException;
import org.apache.ignite.internal.network.file.messages.FileChunkMessage;
import org.apache.ignite.internal.network.file.messages.FileDownloadRequest;
import org.apache.ignite.internal.network.file.messages.FileDownloadResponse;
import org.apache.ignite.internal.network.file.messages.FileHeader;
import org.apache.ignite.internal.network.file.messages.FileTransferError;
import org.apache.ignite.internal.network.file.messages.FileTransferErrorMessage;
import org.apache.ignite.internal.network.file.messages.FileTransferFactory;
import org.apache.ignite.internal.network.file.messages.FileTransferInitMessage;
import org.apache.ignite.internal.network.file.messages.FileTransferInitResponse;
import org.apache.ignite.internal.network.file.messages.FileTransferMessageType;
import org.apache.ignite.internal.network.file.messages.Identifier;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.ClusterNode;

/* loaded from: input_file:org/apache/ignite/internal/network/file/FileTransferServiceImpl.class */
public class FileTransferServiceImpl implements FileTransferService {
    private static final IgniteLogger LOG = Loggers.forClass(FileTransferServiceImpl.class);
    private final long responseTimeout;
    private final TopologyService topologyService;
    private final MessagingService messagingService;
    private final Path transferDirectory;
    private final FileSender fileSender;
    private final FileReceiver fileReceiver;
    private final ExecutorService executorService;
    private final Map<Short, FileProvider<Identifier>> identifierToProvider;
    private final Map<Short, FileConsumer<Identifier>> identifierToConsumer;
    private final Map<UUID, DownloadRequestConsumer> transferIdToDownloadConsumer;
    private final FileTransferFactory messageFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/network/file/FileTransferServiceImpl$DownloadRequestConsumer.class */
    public static class DownloadRequestConsumer implements FileConsumer<Identifier> {
        private final CompletableFuture<List<Path>> downloadedFiles;
        private final Path targetDir;

        private DownloadRequestConsumer(CompletableFuture<List<Path>> completableFuture, Path path) {
            this.downloadedFiles = completableFuture;
            this.targetDir = path;
        }

        @Override // org.apache.ignite.internal.network.file.FileConsumer
        public CompletableFuture<Void> consume(Identifier identifier, List<Path> list) {
            IgniteUtils.deleteIfExists(this.targetDir);
            if (!list.isEmpty()) {
                try {
                    Files.move(list.get(0).getParent(), this.targetDir, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
                    Stream<Path> list2 = Files.list(this.targetDir);
                    try {
                        this.downloadedFiles.complete((List) list2.collect(Collectors.toList()));
                        if (list2 != null) {
                            list2.close();
                        }
                    } finally {
                    }
                } catch (IOException e) {
                    this.downloadedFiles.completeExceptionally(e);
                    return CompletableFuture.failedFuture(e);
                }
            }
            return CompletableFutures.nullCompletedFuture();
        }

        private void onError(Throwable th) {
            this.downloadedFiles.completeExceptionally(th);
        }
    }

    FileTransferServiceImpl(String str, TopologyService topologyService, MessagingService messagingService, FileTransferConfiguration fileTransferConfiguration, Path path) {
        this(topologyService, messagingService, fileTransferConfiguration, path, createExecutor(str, fileTransferConfiguration));
    }

    private static ExecutorService createExecutor(String str, FileTransferConfiguration fileTransferConfiguration) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(((FileTransferView) fileTransferConfiguration.value()).threadPoolSize(), ((FileTransferView) fileTransferConfiguration.value()).threadPoolSize(), 10L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) NamedThreadFactory.create(str, "file-transfer", LOG));
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        return threadPoolExecutor;
    }

    private FileTransferServiceImpl(TopologyService topologyService, MessagingService messagingService, FileTransferConfiguration fileTransferConfiguration, Path path, ExecutorService executorService) {
        this(((FileTransferView) fileTransferConfiguration.value()).responseTimeoutMillis(), topologyService, messagingService, path, new FileSender(((FileTransferView) fileTransferConfiguration.value()).chunkSizeBytes(), new Semaphore(((FileTransferView) fileTransferConfiguration.value()).maxConcurrentRequests()), ((FileTransferView) fileTransferConfiguration.value()).responseTimeoutMillis(), messagingService, executorService), new FileReceiver(), executorService);
    }

    FileTransferServiceImpl(long j, TopologyService topologyService, MessagingService messagingService, Path path, FileSender fileSender, FileReceiver fileReceiver, ExecutorService executorService) {
        this.identifierToProvider = new ConcurrentHashMap();
        this.identifierToConsumer = new ConcurrentHashMap();
        this.transferIdToDownloadConsumer = new ConcurrentHashMap();
        this.messageFactory = new FileTransferFactory();
        this.responseTimeout = j;
        this.topologyService = topologyService;
        this.messagingService = messagingService;
        this.transferDirectory = path;
        this.fileSender = fileSender;
        this.fileReceiver = fileReceiver;
        this.executorService = executorService;
    }

    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        this.topologyService.addEventHandler(new TopologyEventHandler() { // from class: org.apache.ignite.internal.network.file.FileTransferServiceImpl.1
            public void onDisappeared(ClusterNode clusterNode) {
                FileTransferServiceImpl.this.fileReceiver.cancelTransfersFromSender(clusterNode.name());
            }
        });
        this.messagingService.addMessageHandler(FileTransferMessageType.class, (networkMessage, clusterNode, l) -> {
            String name = clusterNode.name();
            if (networkMessage instanceof FileDownloadRequest) {
                processDownloadRequest((FileDownloadRequest) networkMessage, name, l);
                return;
            }
            if (networkMessage instanceof FileTransferInitMessage) {
                processFileTransferInitMessage((FileTransferInitMessage) networkMessage, name, l.longValue());
                return;
            }
            if (networkMessage instanceof FileChunkMessage) {
                processFileChunkMessage((FileChunkMessage) networkMessage, name, l.longValue());
            } else if (networkMessage instanceof FileTransferErrorMessage) {
                processFileTransferErrorMessage((FileTransferErrorMessage) networkMessage);
            } else {
                LOG.error("Unexpected message received: {}", new Object[]{networkMessage});
            }
        });
        return CompletableFutures.nullCompletedFuture();
    }

    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        IgniteUtils.shutdownAndAwaitTermination(this.executorService, 10L, TimeUnit.SECONDS);
        return CompletableFutures.nullCompletedFuture();
    }

    private void processFileTransferInitMessage(FileTransferInitMessage fileTransferInitMessage, String str, long j) {
        UUID transferId = fileTransferInitMessage.transferId();
        Identifier identifier = fileTransferInitMessage.identifier();
        CompletableFuture whenComplete = CompletableFuture.supplyAsync(() -> {
            return createTransferDirectory(transferId);
        }, this.executorService).whenComplete((path, th) -> {
            if (th != null) {
                LOG.error("Failed to create transfer directory [transferId={}, identifier={}]", th, new Object[]{transferId, identifier});
            }
        });
        CompletableFuture thenApply = whenComplete.thenApply(path2 -> {
            return this.fileReceiver.registerTransfer(str, transferId, fileTransferInitMessage.headers(), path2);
        });
        thenApply.handle((transferredFilesCollector, th2) -> {
            return this.messageFactory.fileTransferInitResponse().error(th2 != null ? FileTransferError.fromThrowable(this.messageFactory, th2) : null).build();
        }).thenCompose(fileTransferInitResponse -> {
            return this.messagingService.respond(str, Channel.FILE_TRANSFER_CHANNEL, fileTransferInitResponse, j);
        }).whenComplete((r11, th3) -> {
            if (th3 != null) {
                LOG.error("Failed to send file transfer response [transferId={}, identifier={}]", th3, new Object[]{transferId, identifier});
                this.fileReceiver.cancelTransfer(transferId, th3);
            }
        });
        thenApply.thenCompose((v0) -> {
            return v0.collectedFiles();
        }).whenComplete((list, th4) -> {
            if (th4 != null) {
                LOG.error("Failed to collect transferred files [transferId={}, identifier={}]", th4, new Object[]{transferId, identifier});
                this.transferIdToDownloadConsumer.computeIfPresent(transferId, (uuid, downloadRequestConsumer) -> {
                    downloadRequestConsumer.onError(th4);
                    return null;
                });
            }
        }).thenCompose(list2 -> {
            return getFileConsumer(transferId, identifier).consume(identifier, list2).whenComplete((r10, th5) -> {
                if (th5 != null) {
                    LOG.error("Failed to process file transfer [transferId={}, identifier={}]", th5, new Object[]{transferId, identifier});
                }
            });
        }).whenComplete((r6, th5) -> {
            this.transferIdToDownloadConsumer.remove(transferId);
            whenComplete.thenAccept(IgniteUtils::deleteIfExists);
        });
    }

    private void processDownloadRequest(FileDownloadRequest fileDownloadRequest, String str, Long l) {
        CompletableFuture.supplyAsync(() -> {
            return getFileProvider(fileDownloadRequest.identifier());
        }, this.executorService).thenCompose(fileProvider -> {
            return fileProvider.files(fileDownloadRequest.identifier());
        }).whenComplete((list, th) -> {
            if (th != null) {
                LOG.error("Failed to get files for download [transferId={}, identifier={}]", th, new Object[]{fileDownloadRequest.transferId(), fileDownloadRequest.identifier()});
                this.messagingService.respond(str, Channel.FILE_TRANSFER_CHANNEL, this.messageFactory.fileDownloadResponse().error(FileTransferError.fromThrowable(this.messageFactory, th)).build(), l.longValue());
            } else if (!list.isEmpty()) {
                this.messagingService.respond(str, Channel.FILE_TRANSFER_CHANNEL, this.messageFactory.fileDownloadResponse().build(), l.longValue()).thenComposeAsync(r10 -> {
                    return transferFilesToNode(str, fileDownloadRequest.transferId(), fileDownloadRequest.identifier(), list);
                }, (Executor) this.executorService);
            } else {
                LOG.warn("No files to download [transferId={}, identifier={}]", new Object[]{fileDownloadRequest.transferId(), fileDownloadRequest.identifier()});
                this.messagingService.respond(str, Channel.FILE_TRANSFER_CHANNEL, this.messageFactory.fileDownloadResponse().error(FileTransferError.fromThrowable(this.messageFactory, new FileTransferException("No files to download"))).build(), l.longValue());
            }
        });
    }

    private void processFileChunkMessage(FileChunkMessage fileChunkMessage, String str, long j) {
        CompletableFuture.runAsync(() -> {
            this.fileReceiver.receiveFileChunk(fileChunkMessage);
        }, this.executorService).whenComplete((r13, th) -> {
            if (th != null) {
                LOG.error("Failed to process file chunk [transferId={}]", th, new Object[]{fileChunkMessage.transferId()});
            }
            this.messagingService.respond(str, Channel.FILE_TRANSFER_CHANNEL, this.messageFactory.fileChunkResponse().error(th != null ? FileTransferError.fromThrowable(this.messageFactory, th) : null).build(), j);
        });
    }

    private void processFileTransferErrorMessage(FileTransferErrorMessage fileTransferErrorMessage) {
        LOG.error("Received file transfer error message. Transfer will be cancelled [transferId={}, error={}", new Object[]{fileTransferErrorMessage.transferId(), fileTransferErrorMessage.error()});
        CompletableFuture.runAsync(() -> {
            this.fileReceiver.cancelTransfer(fileTransferErrorMessage.transferId(), FileTransferError.toException(fileTransferErrorMessage.error()));
        }, this.executorService);
    }

    private CompletableFuture<Void> sendFiles(String str, UUID uuid, List<Path> list) {
        return this.fileSender.send(str, uuid, list).whenComplete((r11, th) -> {
            if (th != null) {
                LOG.error("Failed to send files to node [nodeConsistentId={}, transferId={}]", th, new Object[]{str, uuid});
                this.messagingService.send(str, Channel.FILE_TRANSFER_CHANNEL, this.messageFactory.fileTransferErrorMessage().transferId(uuid).error(FileTransferError.fromThrowable(this.messageFactory, th)).build());
            }
        });
    }

    @Override // org.apache.ignite.internal.network.file.FileTransferService
    public <M extends Identifier> void addFileProvider(Class<M> cls, FileProvider<M> fileProvider) {
        this.identifierToProvider.compute(Short.valueOf(getMessageType(cls)), (sh, fileProvider2) -> {
            if (fileProvider2 != null) {
                throw new IllegalArgumentException("File provider for identifier " + cls.getName() + " already exists");
            }
            return fileProvider;
        });
    }

    @Override // org.apache.ignite.internal.network.file.FileTransferService
    public <M extends Identifier> void addFileConsumer(Class<M> cls, FileConsumer<M> fileConsumer) {
        this.identifierToConsumer.compute(Short.valueOf(getMessageType(cls)), (sh, fileConsumer2) -> {
            if (fileConsumer2 != null) {
                throw new IllegalArgumentException("File handler for identifier " + cls.getName() + " already exists");
            }
            return fileConsumer;
        });
    }

    @Override // org.apache.ignite.internal.network.file.FileTransferService
    public CompletableFuture<List<Path>> download(String str, Identifier identifier, Path path) {
        UUID randomUUID = UUID.randomUUID();
        FileDownloadRequest build = this.messageFactory.fileDownloadRequest().transferId(randomUUID).identifier(identifier).build();
        CompletableFuture<List<Path>> whenComplete = new CompletableFuture().whenComplete((list, th) -> {
            this.transferIdToDownloadConsumer.remove(randomUUID);
        });
        this.transferIdToDownloadConsumer.put(randomUUID, new DownloadRequestConsumer(whenComplete, path));
        CompletableFuture invoke = this.messagingService.invoke(str, Channel.FILE_TRANSFER_CHANNEL, build, this.responseTimeout);
        Class<FileDownloadResponse> cls = FileDownloadResponse.class;
        Objects.requireNonNull(FileDownloadResponse.class);
        invoke.thenApply((v1) -> {
            return r1.cast(v1);
        }).whenComplete((fileDownloadResponse, th2) -> {
            if (th2 != null) {
                whenComplete.completeExceptionally(th2);
            } else if (fileDownloadResponse.error() != null) {
                whenComplete.completeExceptionally(FileTransferError.toException(fileDownloadResponse.error()));
            }
        });
        return whenComplete;
    }

    @Override // org.apache.ignite.internal.network.file.FileTransferService
    public CompletableFuture<Void> upload(String str, Identifier identifier) {
        return getFileProvider(identifier).files(identifier).thenCompose(list -> {
            return transferFilesToNode(str, UUID.randomUUID(), identifier, list);
        });
    }

    private CompletableFuture<Void> transferFilesToNode(String str, UUID uuid, Identifier identifier, List<Path> list) {
        if (list.isEmpty()) {
            return CompletableFuture.failedFuture(new FileTransferException("No files to upload"));
        }
        CompletableFuture invoke = this.messagingService.invoke(str, Channel.FILE_TRANSFER_CHANNEL, this.messageFactory.fileTransferInitMessage().transferId(uuid).identifier(identifier).headers(FileHeader.fromPaths(this.messageFactory, list)).build(), this.responseTimeout);
        Class<FileTransferInitResponse> cls = FileTransferInitResponse.class;
        Objects.requireNonNull(FileTransferInitResponse.class);
        return invoke.thenApply((v1) -> {
            return r1.cast(v1);
        }).thenComposeAsync(fileTransferInitResponse -> {
            return fileTransferInitResponse.error() != null ? CompletableFuture.failedFuture(new FileTransferException("Failed to upload files: " + fileTransferInitResponse.error().message())) : sendFiles(str, uuid, list);
        }, (Executor) this.executorService).whenComplete((r10, th) -> {
            if (th != null) {
                LOG.error("Failed to transfer files to node [nodeConsistentId={}, transferId={}]", th, new Object[]{str, uuid});
            }
        });
    }

    private static short getMessageType(Class<?> cls) {
        Transferable annotation = cls.getAnnotation(Transferable.class);
        if (annotation == null) {
            throw new IllegalArgumentException("Class " + cls.getName() + " is not annotated with @Transferable");
        }
        return annotation.value();
    }

    private <M extends Identifier> FileProvider<M> getFileProvider(M m) {
        FileProvider<M> fileProvider = (FileProvider) this.identifierToProvider.get(Short.valueOf(m.messageType()));
        if (fileProvider == null) {
            throw new IllegalArgumentException("File provider for identifier " + m.getClass().getName() + " not found");
        }
        return fileProvider;
    }

    private <M extends Identifier> FileConsumer<M> getFileConsumer(UUID uuid, M m) {
        return this.transferIdToDownloadConsumer.containsKey(uuid) ? this.transferIdToDownloadConsumer.get(uuid) : getFileConsumer(m);
    }

    private <M extends Identifier> FileConsumer<M> getFileConsumer(M m) {
        FileConsumer<M> fileConsumer = (FileConsumer) this.identifierToConsumer.get(Short.valueOf(m.messageType()));
        if (fileConsumer == null) {
            throw new IllegalArgumentException("File consumer for identifier " + m.getClass().getName() + " not found");
        }
        return fileConsumer;
    }

    private Path createTransferDirectory(UUID uuid) {
        try {
            return Files.createDirectories(this.transferDirectory.resolve(uuid.toString()), new FileAttribute[0]);
        } catch (IOException e) {
            throw new FileTransferException("Failed to create the transfer directory with transferId: " + uuid, e);
        }
    }
}
