package org.gridgain.grid.internal.processors.cache.database.snapshot;

import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridStringBuilder;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.T2;
import org.gridgain.grid.internal.util.GridUtils;
import org.gridgain.grid.persistentstore.SnapshotFuture;

/* loaded from: input_file:org/gridgain/grid/internal/processors/cache/database/snapshot/FutureTaskQueue.class */
public class FutureTaskQueue<T> {
    private static final long AWAIT_QUEUE_TIMEOUT = 500;
    private static final String SNAPSHOT_STACK_TRACE_PRINT_TIMEOUT_PROP = "GRIDGAIN_SNAPSHOT_STACK_TRACE_PRINT_TIMEOUT";
    private static final long TASK_STACK_TRACE_PRINT_TIMEOUT = IgniteSystemProperties.getLong(SNAPSHOT_STACK_TRACE_PRINT_TIMEOUT_PROP, 300000);
    private final ExecutorService executorService;
    private final IgniteLogger logger;
    private volatile Function<T, Runnable> taskProcessor;
    private final Queue<Future<?>> futureQueue = new ConcurrentLinkedQueue();
    private final Set<Long> activeThreadIds = new GridConcurrentHashSet();
    private final Set<T2<Thread, StackTraceElement[]>> futureSourceThreads = new GridConcurrentHashSet();

    public FutureTaskQueue(ExecutorService executorService, IgniteLogger igniteLogger) {
        this.executorService = executorService;
        this.logger = igniteLogger;
    }

    public void setTaskProcessor(Function<T, Runnable> function) {
        this.taskProcessor = function;
    }

    public void submitTask(T t) {
        Runnable apply = this.taskProcessor.apply(t);
        if (apply != null) {
            T2<Thread, StackTraceElement[]> t2 = new T2<>(Thread.currentThread(), Thread.currentThread().getStackTrace());
            this.futureSourceThreads.add(t2);
            try {
                this.futureQueue.add(this.executorService.submit(() -> {
                    long id = Thread.currentThread().getId();
                    this.activeThreadIds.add(Long.valueOf(id));
                    try {
                        apply.run();
                        this.activeThreadIds.remove(Long.valueOf(id));
                        this.futureSourceThreads.remove(t2);
                    } catch (Throwable th) {
                        this.activeThreadIds.remove(Long.valueOf(id));
                        this.futureSourceThreads.remove(t2);
                        throw th;
                    }
                }));
            } catch (RejectedExecutionException e) {
                this.futureSourceThreads.remove(t2);
                throw e;
            }
        }
    }

    public int size() {
        return this.futureQueue.size();
    }

    public void submit(Runnable runnable) {
        this.futureQueue.add(this.executorService.submit(runnable));
    }

    public void awaitCompletionWithShutdown(SnapshotOperationContext snapshotOperationContext, boolean z) throws IgniteCheckedException {
        this.executorService.shutdown();
        int size = this.futureQueue.size();
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Await completion of all pending futures, size=" + size);
        }
        while (!this.executorService.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
            try {
                int i = 0;
                if (snapshotOperationContext.isCancelled()) {
                    throw new IgniteCheckedException(SnapshotFuture.SNAPSHOT_OPERATION_CANCEL_ERROR_MSG);
                }
                if (z) {
                    Iterator<Future<?>> it = this.futureQueue.iterator();
                    while (it.hasNext()) {
                        if (it.next().isDone()) {
                            i++;
                        }
                    }
                    snapshotOperationContext.reportWork(i);
                }
            } catch (InterruptedException e) {
                throw new IgniteCheckedException(e);
            }
        }
        for (Future<?> future : this.futureQueue) {
            if (!future.isDone()) {
                try {
                    future.get(TASK_STACK_TRACE_PRINT_TIMEOUT, TimeUnit.MILLISECONDS);
                } catch (InterruptedException | ExecutionException e2) {
                } catch (TimeoutException e3) {
                    printUnfinishedFuturesWarningOnTimeout();
                }
            }
        }
        Iterator<Future<?>> it2 = this.futureQueue.iterator();
        while (it2.hasNext()) {
            try {
                it2.next().get();
            } catch (InterruptedException | ExecutionException e4) {
                throw new IgniteCheckedException(e4);
            }
        }
    }

    private void printUnfinishedFuturesWarningOnTimeout() {
        GridStringBuilder a = new GridStringBuilder("Timed out while waiting for unfinished snapshot futures.").a(IgniteKernal.NL);
        GridUtils.printStackTraces(this.activeThreadIds, a);
        a.a("Originating stack traces for incomplete futures:").a(IgniteKernal.NL);
        for (T2<Thread, StackTraceElement[]> t2 : this.futureSourceThreads) {
            a.a("Thread [name=\"").a(t2.getKey().getName()).a("\"]").a(IgniteKernal.NL);
            IgniteUtils.printStackTraceElements(a, t2.getValue(), null);
        }
        this.logger.warning(a.toString());
    }

    public void awaitCompletion(SnapshotOperationContext snapshotOperationContext) throws IgniteCheckedException {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Await completion of all pending futures, size=" + this.futureQueue.size());
        }
        while (true) {
            Future<?> poll = this.futureQueue.poll();
            if (poll == null) {
                return;
            }
            while (!poll.isDone()) {
                if (snapshotOperationContext.isCancelled()) {
                    throw new IgniteCheckedException(SnapshotFuture.SNAPSHOT_OPERATION_CANCEL_ERROR_MSG);
                }
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500L));
            }
            try {
                poll.get();
            } catch (Exception e) {
                throw new IgniteCheckedException(e);
            }
        }
    }
}
