package org.gridgain.grid.internal.processors.cache.database.snapshot;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.gridgain.grid.persistentstore.SnapshotFuture;

/* loaded from: input_file:org/gridgain/grid/internal/processors/cache/database/snapshot/FutureTaskQueue.class */
public class FutureTaskQueue<T> {
    private static final long AWAIT_PARK_NANOS = TimeUnit.MILLISECONDS.toNanos(300);
    private final Queue<T> taskQueue = new ConcurrentLinkedQueue();
    private final Queue<Future<?>> futureQueue = new ConcurrentLinkedQueue();
    private final ExecutorService executorService;
    private final IgniteLogger logger;
    private volatile Function<T, Runnable> taskProcessor;

    public FutureTaskQueue(ExecutorService executorService, IgniteLogger igniteLogger) {
        this.executorService = executorService;
        this.logger = igniteLogger;
    }

    public void setTaskProcessor(Function<T, Runnable> function) {
        this.taskProcessor = function;
    }

    public void addPendingTask(T t) {
        this.taskQueue.add(t);
    }

    public void submitPendingTasks() {
        while (true) {
            T poll = this.taskQueue.poll();
            if (poll == null) {
                return;
            }
            Runnable apply = this.taskProcessor.apply(poll);
            if (apply != null) {
                this.futureQueue.add(this.executorService.submit(apply));
            }
        }
    }

    public void submit(Runnable runnable) {
        this.futureQueue.add(this.executorService.submit(runnable));
    }

    public void awaitCompletion(SnapshotOperationContext snapshotOperationContext) throws IgniteCheckedException {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Await completion of all pending futures, size=" + this.futureQueue.size());
        }
        while (true) {
            Future<?> poll = this.futureQueue.poll();
            if (poll == null) {
                return;
            }
            while (!poll.isDone()) {
                if (snapshotOperationContext.isCancelled()) {
                    throw new IgniteCheckedException(SnapshotFuture.SNAPSHOT_OPERATION_CANCEL_ERROR_MSG);
                }
                LockSupport.parkNanos(AWAIT_PARK_NANOS);
            }
            try {
                poll.get();
            } catch (Exception e) {
                throw new IgniteCheckedException(e);
            }
        }
    }
}
