package org.apache.ignite3.internal.pagememory.persistence.checkpoint;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.lang.IgniteBiTuple;
import org.apache.ignite3.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.pagememory.DataRegion;
import org.apache.ignite3.internal.pagememory.FullPageId;
import org.apache.ignite3.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite3.internal.pagememory.persistence.PartitionMeta;
import org.apache.ignite3.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite3.internal.thread.NamedThreadFactory;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite3/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.class */
class CheckpointWorkflow {
    static final int PARALLEL_SORT_THRESHOLD = 40000;
    private static final IgniteLogger LOG;
    private final CheckpointReadWriteLock checkpointReadWriteLock;
    private final Collection<? extends DataRegion<PersistentPageMemory>> dataRegions;
    private final ForkJoinPool parallelSortThreadPool;

    @Nullable
    private final ThreadPoolExecutor callbackListenerThreadPool;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final List<IgniteBiTuple<CheckpointListener, DataRegion<PersistentPageMemory>>> listeners = new CopyOnWriteArrayList();
    private Map<DataRegion<?>, Set<FullPageId>> dirtyPartitionsMap = new ConcurrentHashMap();

    public CheckpointWorkflow(String str, CheckpointReadWriteLock checkpointReadWriteLock, Collection<? extends DataRegion<PersistentPageMemory>> collection, int i) {
        this.checkpointReadWriteLock = checkpointReadWriteLock;
        this.dataRegions = collection;
        this.parallelSortThreadPool = new ForkJoinPool(Math.min(Runtime.getRuntime().availableProcessors(), 8) + 1, forkJoinPool -> {
            ForkJoinWorkerThread newThread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(forkJoinPool);
            newThread.setName(NamedThreadFactory.threadPrefix(str, "checkpoint-pages-sorter") + newThread.getPoolIndex());
            return newThread;
        }, null, false);
        if (i > 1) {
            this.callbackListenerThreadPool = new ThreadPoolExecutor(i, i, 30000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new NamedThreadFactory("checkpoint-runner-io", LOG));
        } else {
            this.callbackListenerThreadPool = null;
        }
    }

    public void start() {
    }

    public void stop() {
        this.listeners.clear();
        IgniteUtils.shutdownAndAwaitTermination(this.parallelSortThreadPool, 10L, TimeUnit.SECONDS);
        if (this.callbackListenerThreadPool != null) {
            IgniteUtils.shutdownAndAwaitTermination(this.callbackListenerThreadPool, 2L, TimeUnit.MINUTES);
        }
    }

    public void markPartitionAsDirty(DataRegion<?> dataRegion, int i, int i2) {
        this.dirtyPartitionsMap.computeIfAbsent(dataRegion, dataRegion2 -> {
            return ConcurrentHashMap.newKeySet();
        }).add(new FullPageId(PartitionMeta.partitionMetaPageId(i2), i));
    }

    public Checkpoint markCheckpointBegin(long j, CheckpointProgressImpl checkpointProgressImpl, CheckpointMetricsTracker checkpointMetricsTracker, Runnable runnable, Runnable runnable2) throws IgniteInternalCheckedException {
        List<CheckpointListener> collectCheckpointListeners = collectCheckpointListeners(this.dataRegions);
        AwaitTasksCompletionExecutor awaitTasksCompletionExecutor = this.callbackListenerThreadPool == null ? null : new AwaitTasksCompletionExecutor(this.callbackListenerThreadPool, runnable);
        this.checkpointReadWriteLock.readLock();
        try {
            runnable.run();
            Iterator<CheckpointListener> it = collectCheckpointListeners.iterator();
            while (it.hasNext()) {
                it.next().beforeCheckpointBegin(checkpointProgressImpl, awaitTasksCompletionExecutor);
                if (awaitTasksCompletionExecutor == null) {
                    runnable.run();
                }
            }
            if (awaitTasksCompletionExecutor != null) {
                awaitTasksCompletionExecutor.awaitPendingTasksFinished();
            }
            checkpointMetricsTracker.onWriteLockWaitStart();
            this.checkpointReadWriteLock.writeLock();
            try {
                runnable.run();
                checkpointProgressImpl.transitTo(CheckpointState.LOCK_TAKEN);
                checkpointMetricsTracker.onMarkCheckpointBeginStart();
                Iterator<CheckpointListener> it2 = collectCheckpointListeners.iterator();
                while (it2.hasNext()) {
                    it2.next().onMarkCheckpointBegin(checkpointProgressImpl, awaitTasksCompletionExecutor);
                    if (awaitTasksCompletionExecutor == null) {
                        runnable.run();
                    }
                }
                if (awaitTasksCompletionExecutor != null) {
                    awaitTasksCompletionExecutor.awaitPendingTasksFinished();
                }
                checkpointMetricsTracker.onMarkCheckpointBeginEnd();
                DataRegionsDirtyPages beginCheckpoint = beginCheckpoint(checkpointProgressImpl);
                checkpointProgressImpl.currentCheckpointPagesCount(beginCheckpoint.dirtyPageCount);
                checkpointProgressImpl.transitTo(CheckpointState.PAGES_SNAPSHOT_TAKEN);
                this.checkpointReadWriteLock.writeUnlock();
                checkpointMetricsTracker.onWriteLockRelease();
                runnable2.run();
                checkpointProgressImpl.transitTo(CheckpointState.LOCK_RELEASED);
                Iterator<CheckpointListener> it3 = collectCheckpointListeners.iterator();
                while (it3.hasNext()) {
                    it3.next().onCheckpointBegin(checkpointProgressImpl);
                    runnable.run();
                }
                if (beginCheckpoint.dirtyPageCount <= 0) {
                    return new Checkpoint(CheckpointDirtyPages.EMPTY, checkpointProgressImpl);
                }
                checkpointMetricsTracker.onSplitAndSortCheckpointPagesStart();
                runnable.run();
                CheckpointDirtyPages createAndSortCheckpointDirtyPages = createAndSortCheckpointDirtyPages(beginCheckpoint);
                checkpointProgressImpl.pagesToWrite(createAndSortCheckpointDirtyPages);
                checkpointProgressImpl.initCounters(createAndSortCheckpointDirtyPages.dirtyPagesCount());
                checkpointMetricsTracker.onSplitAndSortCheckpointPagesEnd();
                checkpointProgressImpl.transitTo(CheckpointState.PAGES_SORTED);
                return new Checkpoint(createAndSortCheckpointDirtyPages, checkpointProgressImpl);
            } catch (Throwable th) {
                this.checkpointReadWriteLock.writeUnlock();
                checkpointMetricsTracker.onWriteLockRelease();
                runnable2.run();
                throw th;
            }
        } finally {
            this.checkpointReadWriteLock.readUnlock();
        }
    }

    public void markCheckpointEnd(Checkpoint checkpoint) throws IgniteInternalCheckedException {
        synchronized (this) {
            Iterator<? extends DataRegion<PersistentPageMemory>> it = this.dataRegions.iterator();
            while (it.hasNext()) {
                it.next().pageMemory().finishCheckpoint();
            }
        }
        if (checkpoint.hasDelta()) {
            checkpoint.progress.pagesToWrite(null);
            checkpoint.progress.clearCounters();
        }
        Iterator<CheckpointListener> it2 = collectCheckpointListeners(this.dataRegions).iterator();
        while (it2.hasNext()) {
            it2.next().afterCheckpointEnd(checkpoint.progress);
        }
        checkpoint.progress.transitTo(CheckpointState.FINISHED);
    }

    public void addCheckpointListener(CheckpointListener checkpointListener, @Nullable DataRegion<PersistentPageMemory> dataRegion) {
        if (!$assertionsDisabled && dataRegion != null && !this.dataRegions.contains(dataRegion)) {
            throw new AssertionError(dataRegion);
        }
        this.listeners.add(new IgniteBiTuple<>(checkpointListener, dataRegion));
    }

    public void removeCheckpointListener(final CheckpointListener checkpointListener) {
        this.listeners.remove(new IgniteBiTuple<CheckpointListener, DataRegion<PersistentPageMemory>>() { // from class: org.apache.ignite3.internal.pagememory.persistence.checkpoint.CheckpointWorkflow.1
            @Override // org.apache.ignite3.internal.lang.IgniteBiTuple, java.util.Map, java.util.Map.Entry
            public boolean equals(Object obj) {
                return checkpointListener == ((IgniteBiTuple) obj).getKey();
            }

            @Override // org.apache.ignite3.internal.lang.IgniteBiTuple, java.util.Map, java.util.Map.Entry
            public int hashCode() {
                return checkpointListener.hashCode();
            }
        });
    }

    public List<CheckpointListener> collectCheckpointListeners(Collection<? extends DataRegion<PersistentPageMemory>> collection) {
        return (List) this.listeners.stream().filter(igniteBiTuple -> {
            return igniteBiTuple.getValue() == null || collection.contains(igniteBiTuple.getValue());
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toUnmodifiableList());
    }

    private DataRegionsDirtyPages beginCheckpoint(CheckpointProgressImpl checkpointProgressImpl) {
        if (!$assertionsDisabled && !this.checkpointReadWriteLock.isWriteLockHeldByCurrentThread()) {
            throw new AssertionError();
        }
        Map<DataRegion<?>, Set<FullPageId>> map = this.dirtyPartitionsMap;
        this.dirtyPartitionsMap = new ConcurrentHashMap();
        ArrayList arrayList = new ArrayList(this.dataRegions.size());
        for (DataRegion<PersistentPageMemory> dataRegion : this.dataRegions) {
            Collection<FullPageId> beginCheckpoint = dataRegion.pageMemory().beginCheckpoint(checkpointProgressImpl);
            Set<FullPageId> remove = map.remove(dataRegion);
            if (remove != null) {
                beginCheckpoint = CollectionUtils.concat(remove, beginCheckpoint);
            }
            arrayList.add(new DataRegionDirtyPages(dataRegion.pageMemory(), beginCheckpoint));
        }
        for (Map.Entry<DataRegion<?>, Set<FullPageId>> entry : map.entrySet()) {
            Object pageMemory = entry.getKey().pageMemory();
            if (!$assertionsDisabled && !(pageMemory instanceof PersistentPageMemory)) {
                throw new AssertionError();
            }
            arrayList.add(new DataRegionDirtyPages((PersistentPageMemory) pageMemory, entry.getValue()));
        }
        return new DataRegionsDirtyPages(arrayList);
    }

    CheckpointDirtyPages createAndSortCheckpointDirtyPages(DataRegionsDirtyPages dataRegionsDirtyPages) throws IgniteInternalCheckedException {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        for (DataRegionDirtyPages<Collection<FullPageId>> dataRegionDirtyPages : dataRegionsDirtyPages.dirtyPages) {
            FullPageId[] fullPageIdArr = new FullPageId[dataRegionDirtyPages.dirtyPages.size()];
            HashSet hashSet = new HashSet();
            int i2 = 0;
            for (FullPageId fullPageId : dataRegionDirtyPages.dirtyPages) {
                if (!$assertionsDisabled) {
                    int i3 = i;
                    i++;
                    if (i3 == dataRegionsDirtyPages.dirtyPageCount) {
                        throw new AssertionError("Incorrect estimated dirty pages number: " + dataRegionsDirtyPages.dirtyPageCount);
                    }
                }
                int i4 = i2;
                i2++;
                fullPageIdArr[i4] = fullPageId;
                hashSet.add(GroupPartitionId.convert(fullPageId));
            }
            if (i2 != 0) {
                if (i2 != fullPageIdArr.length) {
                    fullPageIdArr = (FullPageId[]) Arrays.copyOf(fullPageIdArr, i2);
                }
                arrayList.add(new DirtyPagesAndPartitions(dataRegionDirtyPages.pageMemory, fullPageIdArr, hashSet));
            }
        }
        List list = (List) arrayList.stream().map(dirtyPagesAndPartitions -> {
            return dirtyPagesAndPartitions.dirtyPages;
        }).filter(fullPageIdArr2 -> {
            return fullPageIdArr2.length >= PARALLEL_SORT_THRESHOLD;
        }).map(fullPageIdArr3 -> {
            return this.parallelSortThreadPool.submit(() -> {
                Arrays.parallelSort(fullPageIdArr3, CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR);
            });
        }).collect(Collectors.toList());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            DirtyPagesAndPartitions dirtyPagesAndPartitions2 = (DirtyPagesAndPartitions) it.next();
            if (dirtyPagesAndPartitions2.dirtyPages.length < PARALLEL_SORT_THRESHOLD) {
                Arrays.sort(dirtyPagesAndPartitions2.dirtyPages, CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR);
            }
        }
        Iterator it2 = list.iterator();
        while (it2.hasNext()) {
            try {
                ((ForkJoinTask) it2.next()).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new IgniteInternalCheckedException("Failed to perform pages array parallel sort", e instanceof ExecutionException ? e.getCause() : e);
            }
        }
        return new CheckpointDirtyPages(arrayList);
    }

    static {
        $assertionsDisabled = !CheckpointWorkflow.class.desiredAssertionStatus();
        LOG = Loggers.forClass(CheckpointWorkflow.class);
    }
}
