/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.network.file;

import java.io.IOException;
import java.nio.file.Path;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.file.Channel;
import org.apache.ignite.internal.network.file.FileChunkMessagesStream;
import org.apache.ignite.internal.network.file.exception.FileTransferException;
import org.apache.ignite.internal.network.file.messages.FileChunkResponse;
import org.apache.ignite.internal.network.file.messages.FileTransferError;
import org.apache.ignite.internal.util.CompletableFutures;

class FileSender {
    private final int chunkSize;
    private final Semaphore rateLimiter;
    private final long responseTimeout;
    private final MessagingService messagingService;
    private final ExecutorService executorService;
    private final Queue<FileTransfer> queue = new LinkedList<FileTransfer>();
    private final Object lock = new Object();

    FileSender(int chunkSize, Semaphore rateLimiter, long responseTimeout, MessagingService messagingService, ExecutorService executorService) {
        this.chunkSize = chunkSize;
        this.rateLimiter = rateLimiter;
        this.responseTimeout = responseTimeout;
        this.messagingService = messagingService;
        this.executorService = executorService;
    }

    CompletableFuture<Void> send(String targetNodeConsistentId, UUID transferId, List<Path> paths) {
        AtomicBoolean shouldBeCancelled = new AtomicBoolean(false);
        List transfers = paths.stream().map(path -> new FileTransfer(targetNodeConsistentId, transferId, (Path)path, shouldBeCancelled)).collect(Collectors.toList());
        CompletableFuture[] results = (CompletableFuture[])transfers.stream().map(this::processTransferAsync).map(it -> it.whenComplete((v, e) -> {
            if (e != null) {
                shouldBeCancelled.set(true);
            }
        })).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(results);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> processTransferAsync(FileTransfer transfer) {
        Object object = this.lock;
        synchronized (object) {
            if (this.rateLimiter.tryAcquire()) {
                this.processTransferWithNextAsync(transfer);
            } else {
                this.queue.add(transfer);
            }
        }
        return transfer.result;
    }

    private CompletableFuture<Void> processTransferWithNextAsync(FileTransfer transfer) {
        return this.sendTransfer(transfer).thenComposeAsync(v -> {
            Object object = this.lock;
            synchronized (object) {
                FileTransfer nextTransfer = this.queue.poll();
                if (nextTransfer != null) {
                    return this.processTransferWithNextAsync(nextTransfer);
                }
                this.rateLimiter.release();
                return CompletableFutures.nullCompletedFuture();
            }
        }, (Executor)this.executorService);
    }

    private CompletableFuture<Void> sendTransfer(FileTransfer transfer) {
        return this.sendFile(transfer.receiverConsistentId, transfer.transferId, transfer.path, transfer.shouldBeCancelled).handle((v, e) -> {
            if (e == null) {
                transfer.result.complete(null);
            } else {
                transfer.result.completeExceptionally((Throwable)e);
            }
            return null;
        });
    }

    private CompletableFuture<Void> sendFile(String receiverConsistentId, UUID id, Path path, AtomicBoolean shouldBeCancelled) {
        if (path.toFile().length() == 0L) {
            return CompletableFutures.nullCompletedFuture();
        }
        return CompletableFuture.supplyAsync(() -> {
            try {
                FileChunkMessagesStream stream = FileChunkMessagesStream.fromPath(this.chunkSize, id, path);
                return this.sendMessagesFromStream(receiverConsistentId, stream, shouldBeCancelled).whenComplete((v, e) -> {
                    try {
                        stream.close();
                    }
                    catch (IOException ex) {
                        throw new FileTransferException("Failed to close the file transfer stream", ex);
                    }
                });
            }
            catch (IOException e2) {
                throw new FileTransferException("Failed to create a file transfer stream", e2);
            }
        }, this.executorService).thenCompose(Function.identity());
    }

    private CompletableFuture<Void> sendMessagesFromStream(String receiverConsistentId, FileChunkMessagesStream stream, AtomicBoolean shouldBeCancelled) {
        try {
            if (stream.hasNextMessage() && !shouldBeCancelled.get()) {
                return ((CompletableFuture)this.messagingService.invoke(receiverConsistentId, Channel.FILE_TRANSFER_CHANNEL, (NetworkMessage)stream.nextMessage(), this.responseTimeout).thenApply(FileChunkResponse.class::cast)).thenComposeAsync(ack -> {
                    if (ack.error() != null) {
                        return CompletableFuture.failedFuture(FileTransferError.toException(ack.error()));
                    }
                    return this.sendMessagesFromStream(receiverConsistentId, stream, shouldBeCancelled);
                }, (Executor)this.executorService);
            }
            return CompletableFutures.nullCompletedFuture();
        }
        catch (IOException e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    private static class FileTransfer {
        private final String receiverConsistentId;
        private final UUID transferId;
        private final Path path;
        private final AtomicBoolean shouldBeCancelled;
        private final CompletableFuture<Void> result = new CompletableFuture();

        private FileTransfer(String receiverConsistentId, UUID transferId, Path path, AtomicBoolean shouldBeCancelled) {
            this.receiverConsistentId = receiverConsistentId;
            this.transferId = transferId;
            this.path = path;
            this.shouldBeCancelled = shouldBeCancelled;
        }
    }
}

