/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.persistence.checkpoint;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupContextSupplier;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkpoint;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistoryResult;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.EarliestCheckpointMapSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.ReservationReason;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.util.lang.IgniteThrowableBiPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;

public class CheckpointHistory {
    public static final int DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE = 100;
    private final IgniteLogger log;
    private final NavigableMap<Long, CheckpointEntry> histMap = new ConcurrentSkipListMap<Long, CheckpointEntry>();
    private final int maxCpHistMemSize;
    private final boolean isWalTruncationEnabled;
    private final IgniteWriteAheadLogManager wal;
    private final IgniteThrowableBiPredicate<Long, Integer> checkpointInapplicable;
    private final boolean reservationDisabled;
    private final Set<Integer> earliestCpGrps = ConcurrentHashMap.newKeySet();
    private final CacheGroupContextSupplier cacheGrpCtxSupplier;
    private final AtomicReference<EarliestCheckpointMapSnapshot> earliestCpSnapshot = new AtomicReference();

    CheckpointHistory(DataStorageConfiguration dsCfg, Function<Class<?>, IgniteLogger> logFun, IgniteWriteAheadLogManager wal, IgniteThrowableBiPredicate<Long, Integer> inapplicable, CacheGroupContextSupplier cacheGrpCtxSupplier) {
        this.log = logFun.apply(this.getClass());
        this.wal = wal;
        this.checkpointInapplicable = inapplicable;
        this.cacheGrpCtxSupplier = cacheGrpCtxSupplier;
        this.isWalTruncationEnabled = dsCfg.getMaxWalArchiveSize() != -1L;
        this.maxCpHistMemSize = IgniteSystemProperties.getInteger("IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE", 100);
        this.reservationDisabled = dsCfg.getWalMode() == WALMode.NONE;
    }

    void initialize(List<CheckpointEntry> checkpoints, EarliestCheckpointMapSnapshot snapshot) {
        for (CheckpointEntry e : checkpoints) {
            this.histMap.put(e.timestamp(), e);
        }
        boolean casRes = this.earliestCpSnapshot.compareAndSet(null, snapshot);
        assert (casRes);
    }

    void start() {
        this.applyEarliestCpSnapshot();
    }

    private CheckpointEntry entry(Long cpTs) throws IgniteCheckedException {
        CheckpointEntry entry = (CheckpointEntry)this.histMap.get(cpTs);
        if (entry == null) {
            throw new IgniteCheckedException("Checkpoint entry was removed: " + cpTs);
        }
        return entry;
    }

    public CheckpointEntry firstCheckpoint() {
        Map.Entry<Long, CheckpointEntry> entry = this.histMap.firstEntry();
        return entry != null ? entry.getValue() : null;
    }

    @Nullable
    public CheckpointEntry lastCheckpoint() {
        Map.Entry<Long, CheckpointEntry> entry = this.histMap.lastEntry();
        return entry != null ? entry.getValue() : null;
    }

    public WALPointer firstCheckpointPointer() {
        CheckpointEntry entry = this.firstCheckpoint();
        return entry != null ? entry.checkpointMark() : null;
    }

    public Collection<Long> checkpoints(boolean descending) {
        if (descending) {
            return this.histMap.descendingKeySet();
        }
        return this.histMap.keySet();
    }

    public Collection<Long> checkpoints() {
        return this.checkpoints(false);
    }

    public void addCheckpoint(CheckpointEntry entry, Map<Integer, CacheState> cacheStates) {
        this.addCpCacheStatesToEarliestCpMap(entry, cacheStates);
        this.histMap.put(entry.timestamp(), entry);
    }

    private void updateEarliestCpMap(CheckpointEntry cp, @Nullable Map<Integer, CheckpointEntry.GroupState> statesFromSnapshot) {
        try {
            Map<Integer, CheckpointEntry.GroupState> states = statesFromSnapshot != null ? statesFromSnapshot : cp.groupState(this.wal);
            Iterator<Integer> it = this.earliestCpGrps.iterator();
            while (it.hasNext()) {
                int grpId = it.next();
                if (!this.isCheckpointApplicableForGroup(grpId, cp)) {
                    it.remove();
                    this.clearEarliestCpTsOfGrp(grpId);
                    continue;
                }
                CacheGroupContext grpCtx = this.cacheGrpCtxSupplier.get(grpId);
                if (grpCtx == null) continue;
                for (GridDhtLocalPartition localPart : grpCtx.topology().currentLocalPartitions()) {
                    if (states.get(grpId).indexByPartition(localPart.id()) >= 0) continue;
                    localPart.earliestCpTs(0L);
                }
            }
            this.addCpGroupStatesToEarliestCpMap(cp, states);
        }
        catch (IgniteCheckedException ex) {
            U.warn(this.log, "Failed to process checkpoint: " + cp, ex);
            this.earliestCpGrps.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CheckpointEntry removeFromEarliestCheckpoints(Integer grpId) {
        Set<Integer> set = this.earliestCpGrps;
        synchronized (set) {
            CheckpointEntry lastCp = this.lastCheckpoint();
            this.earliestCpGrps.remove(grpId);
            this.clearEarliestCpTsOfGrp(grpId);
            return lastCp;
        }
    }

    private void addCpCacheStatesToEarliestCpMap(CheckpointEntry entry, Map<Integer, CacheState> cacheStates) {
        for (Integer grpId : cacheStates.keySet()) {
            CacheState cacheState = cacheStates.get(grpId);
            for (int pIdx = 0; pIdx < cacheState.size(); ++pIdx) {
                int partId = cacheState.partitionByIndex(pIdx);
                this.setLocalPartitionEarliestCpTs(grpId, partId, entry.timestamp());
            }
        }
    }

    private void addCpGroupStatesToEarliestCpMap(CheckpointEntry entry, Map<Integer, CheckpointEntry.GroupState> cacheGrpStates) {
        for (Integer grpId : cacheGrpStates.keySet()) {
            CheckpointEntry.GroupState grpState = cacheGrpStates.get(grpId);
            for (int pIdx = 0; pIdx < grpState.size(); ++pIdx) {
                int partId = grpState.getPartitionByIndex(pIdx);
                this.setLocalPartitionEarliestCpTs(grpId, partId, entry.timestamp());
            }
        }
    }

    public List<CheckpointEntry> onWalTruncated(WALPointer ptr) {
        CheckpointEntry cpEntry;
        FileWALPointer cpPnt;
        ArrayList<CheckpointEntry> removed = new ArrayList<CheckpointEntry>();
        FileWALPointer highBound = (FileWALPointer)ptr;
        Iterator iterator = this.histMap.values().iterator();
        while (iterator.hasNext() && highBound.compareTo(cpPnt = (FileWALPointer)(cpEntry = (CheckpointEntry)iterator.next()).checkpointMark()) > 0 && this.removeCheckpoint(cpEntry)) {
            removed.add(cpEntry);
        }
        return removed;
    }

    public List<CheckpointEntry> removeCheckpoints(int countToRemove) {
        Map.Entry entry;
        CheckpointEntry checkpoint;
        if (countToRemove == 0) {
            return Collections.emptyList();
        }
        ArrayList<CheckpointEntry> removed = new ArrayList<CheckpointEntry>();
        Iterator iterator = this.histMap.entrySet().iterator();
        while (iterator.hasNext() && removed.size() < countToRemove && this.removeCheckpoint(checkpoint = (CheckpointEntry)(entry = iterator.next()).getValue())) {
            removed.add(checkpoint);
        }
        return removed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean removeCheckpoint(CheckpointEntry checkpoint) {
        if (this.wal.reserved(checkpoint.checkpointMark())) {
            U.warn(this.log, "Could not clear historyMap due to WAL reservation on cp: " + checkpoint + ", history map size is " + this.histMap.size());
            return false;
        }
        Set<Integer> set = this.earliestCpGrps;
        synchronized (set) {
            CheckpointEntry deletedCpEntry = (CheckpointEntry)this.histMap.remove(checkpoint.timestamp());
            CheckpointEntry oldestCpInHist = this.firstCheckpoint();
            this.earliestCpGrps.stream().map(this.cacheGrpCtxSupplier::get).filter(Objects::nonNull).flatMap(grpCtx -> StreamSupport.stream(grpCtx.topology().currentLocalPartitions().spliterator(), false)).filter(localPart -> localPart.earliestCpTs() == deletedCpEntry.timestamp()).forEach(localPart -> localPart.earliestCpTs(oldestCpInHist.timestamp()));
        }
        return true;
    }

    public List<CheckpointEntry> onCheckpointFinished(Checkpoint chp) {
        chp.walSegsCoveredRange(this.calculateWalSegmentsCovered());
        return this.removeCheckpoints(this.isWalTruncationEnabled ? 0 : this.histMap.size() - this.maxCpHistMemSize);
    }

    private IgniteBiTuple<Long, Long> calculateWalSegmentsCovered() {
        IgniteBiTuple<Long, Long> tup = new IgniteBiTuple<Long, Long>(-1L, -1L);
        Map.Entry<Long, CheckpointEntry> lastEntry = this.histMap.lastEntry();
        if (lastEntry == null) {
            return tup;
        }
        Map.Entry<Long, CheckpointEntry> previousEntry = this.histMap.lowerEntry(lastEntry.getKey());
        WALPointer lastWALPointer = lastEntry.getValue().checkpointMark();
        long lastIdx = 0L;
        long prevIdx = 0L;
        if (lastWALPointer instanceof FileWALPointer) {
            lastIdx = ((FileWALPointer)lastWALPointer).index();
            if (previousEntry != null) {
                prevIdx = ((FileWALPointer)previousEntry.getValue().checkpointMark()).index();
            }
        }
        tup.set1(prevIdx);
        tup.set2(lastIdx - 1L);
        return tup;
    }

    @Nullable
    public FileWALPointer searchEarliestWalPointer(int grpId, Map<Integer, Long> partsCounter, long margin) throws IgniteCheckedException {
        if (F.isEmpty(partsCounter)) {
            return null;
        }
        HashMap<Integer, Long> modifiedPartsCounter = new HashMap<Integer, Long>(partsCounter);
        FileWALPointer minPtr = null;
        LinkedList<WalPointerCandidate> historyPointerCandidate = new LinkedList<WalPointerCandidate>();
        for (Long cpTs : this.checkpoints(true)) {
            CheckpointEntry cpEntry = this.entry(cpTs);
            minPtr = this.getMinimalPointer(partsCounter, margin, minPtr, historyPointerCandidate, cpEntry);
            Iterator iter = modifiedPartsCounter.entrySet().iterator();
            FileWALPointer ptr = (FileWALPointer)cpEntry.checkpointMark();
            if (!this.wal.reserved(ptr)) {
                throw new IgniteCheckedException("WAL pointer appropriate to the checkpoint was not reserved [cp=(" + cpEntry.checkpointId() + ", " + U.format(cpEntry.timestamp()) + "), ptr=" + ptr + ']');
            }
            while (iter.hasNext()) {
                Map.Entry entry = iter.next();
                Long foundCntr = cpEntry.partitionCounter(this.wal, grpId, (Integer)entry.getKey());
                if (foundCntr == null || foundCntr > (Long)entry.getValue()) continue;
                iter.remove();
                if (ptr == null) {
                    throw new IgniteCheckedException("Could not find start pointer for partition [part=" + entry.getKey() + ", partCntrSince=" + entry.getValue() + "]");
                }
                if (foundCntr + margin > (Long)entry.getValue()) {
                    historyPointerCandidate.add(new WalPointerCandidate(grpId, (Integer)entry.getKey(), (Long)entry.getValue(), ptr, foundCntr));
                    continue;
                }
                partsCounter.put((Integer)entry.getKey(), (Long)entry.getValue() - margin);
                if (minPtr != null && ptr.compareTo(minPtr) >= 0) continue;
                minPtr = ptr;
            }
            if (!F.isEmpty(modifiedPartsCounter)) continue;
            break;
        }
        if (!F.isEmpty(modifiedPartsCounter)) {
            Map.Entry entry = modifiedPartsCounter.entrySet().iterator().next();
            throw new IgniteCheckedException("Could not find start pointer for partition [part=" + entry.getKey() + ", partCntrSince=" + entry.getValue() + "]");
        }
        minPtr = this.getMinimalPointer(partsCounter, margin, minPtr, historyPointerCandidate, null);
        return minPtr;
    }

    private FileWALPointer getMinimalPointer(Map<Integer, Long> partsCounter, long margin, FileWALPointer minPtr, LinkedList<WalPointerCandidate> historyPointerCandidate, CheckpointEntry cpEntry) {
        while (!F.isEmpty(historyPointerCandidate)) {
            FileWALPointer ptr = historyPointerCandidate.poll().choose(cpEntry, margin, partsCounter);
            if (minPtr != null && ptr.compareTo(minPtr) >= 0) continue;
            minPtr = ptr;
        }
        return minPtr;
    }

    public Map<GroupPartitionId, CheckpointEntry> searchCheckpointEntry(Map<T2<Integer, Integer>, Long> searchCntrMap) {
        if (F.isEmpty(searchCntrMap)) {
            return Collections.emptyMap();
        }
        HashMap<T2<Integer, Integer>, Long> modifiedSearchMap = new HashMap<T2<Integer, Integer>, Long>(searchCntrMap);
        HashMap<GroupPartitionId, CheckpointEntry> res = new HashMap<GroupPartitionId, CheckpointEntry>();
        for (Long cpTs : this.checkpoints(true)) {
            try {
                CheckpointEntry cpEntry = this.entry(cpTs);
                Iterator iter = modifiedSearchMap.entrySet().iterator();
                while (iter.hasNext()) {
                    Map.Entry entry = iter.next();
                    Long foundCntr = cpEntry.partitionCounter(this.wal, (Integer)((T2)entry.getKey()).get1(), (Integer)((T2)entry.getKey()).get2());
                    if (foundCntr == null || foundCntr > (Long)entry.getValue()) continue;
                    iter.remove();
                    res.put(new GroupPartitionId((Integer)((T2)entry.getKey()).get1(), (Integer)((T2)entry.getKey()).get2()), cpEntry);
                }
                if (!F.isEmpty(modifiedSearchMap)) continue;
                return res;
            }
            catch (IgniteCheckedException e) {
                this.log.warning("Checkpoint data is unavailable in WAL [cpTs=" + U.format(cpTs) + ']', e);
                break;
            }
        }
        if (!F.isEmpty(modifiedSearchMap)) {
            return Collections.emptyMap();
        }
        return res;
    }

    @Nullable
    public CheckpointEntry searchCheckpointEntry(int grpId, int part, long partCntrSince) {
        for (Long cpTs : this.checkpoints(true)) {
            try {
                CheckpointEntry entry = this.entry(cpTs);
                Long foundCntr = entry.partitionCounter(this.wal, grpId, part);
                if (foundCntr == null || foundCntr > partCntrSince) continue;
                return entry;
            }
            catch (IgniteCheckedException e) {
                this.log.warning("Checkpoint data is unavailable in WAL [grpId=" + grpId + ", part=" + part + ", cntr=" + partCntrSince + ", cpTs=" + U.format(cpTs) + ']', e);
                break;
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CheckpointHistoryResult searchAndReserveCheckpoints(Map<Integer, Set<Integer>> groupsAndPartitions) {
        if (F.isEmpty(groupsAndPartitions) || this.reservationDisabled) {
            return new CheckpointHistoryResult(Collections.emptyMap(), null);
        }
        HashMap<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> res = new HashMap<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>>();
        CheckpointEntry oldestCpForReservation = null;
        Set<Integer> set = this.earliestCpGrps;
        synchronized (set) {
            CheckpointEntry checkpointEntry = this.firstCheckpoint();
            for (Map.Entry<Integer, Set<Integer>> e0 : groupsAndPartitions.entrySet()) {
                CheckpointEntry oldestGrpCpEntry = null;
                Integer grpId = e0.getKey();
                CacheGroupContext grpCtx = this.cacheGrpCtxSupplier.get(grpId);
                if (grpCtx == null || !this.earliestCpGrps.contains(grpId)) continue;
                for (Integer partId : e0.getValue()) {
                    CheckpointEntry cp;
                    GridDhtLocalPartition locPart = grpCtx.topology().localPartition(partId);
                    if (locPart == null || locPart.earliestCpTs() == 0L || (cp = (CheckpointEntry)this.histMap.get(locPart.earliestCpTs())) == null) continue;
                    if (oldestCpForReservation == null || oldestCpForReservation.timestamp() > cp.timestamp()) {
                        oldestCpForReservation = cp;
                    }
                    if (oldestGrpCpEntry == null || oldestGrpCpEntry.timestamp() > cp.timestamp()) {
                        oldestGrpCpEntry = cp;
                    }
                    ((Map)res.computeIfAbsent(grpId, partCpMap -> new T2(ReservationReason.NO_MORE_HISTORY, new HashMap())).get2()).put(partId, cp);
                }
                if (oldestGrpCpEntry != null && oldestGrpCpEntry == checkpointEntry) continue;
                res.computeIfAbsent(grpId, partCpMap -> new T2<ReservationReason, Object>(ReservationReason.CHECKPOINT_NOT_APPLICABLE, null)).set1(ReservationReason.CHECKPOINT_NOT_APPLICABLE);
            }
        }
        if (oldestCpForReservation != null && !this.wal.reserve(oldestCpForReservation.checkpointMark())) {
            this.log.warning("Could not reserve cp " + oldestCpForReservation.checkpointMark());
            for (Map.Entry entry : res.entrySet()) {
                entry.setValue(new T2<ReservationReason, Object>(ReservationReason.WAL_RESERVATION_ERROR, null));
            }
            oldestCpForReservation = null;
        }
        return new CheckpointHistoryResult(res, oldestCpForReservation);
    }

    public boolean isCheckpointApplicableForGroup(int grpId, CheckpointEntry cp) throws IgniteCheckedException {
        return !this.checkpointInapplicable.test(cp.timestamp(), grpId) && cp.groupState(this.wal).containsKey(grpId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public EarliestCheckpointMapSnapshot earliestCheckpointsMapSnapshot() {
        EarliestCheckpointMapSnapshot snapshot = this.earliestCpSnapshot.get();
        if (snapshot != null) {
            return snapshot;
        }
        HashMap<UUID, Map<Integer, EarliestCheckpointMapSnapshot.GroupStateSnapshot>> earliestCp = new HashMap<UUID, Map<Integer, EarliestCheckpointMapSnapshot.GroupStateSnapshot>>();
        Set<Integer> set = this.earliestCpGrps;
        synchronized (set) {
            for (Integer grpId : this.earliestCpGrps) {
                CacheGroupContext grpCtx = this.cacheGrpCtxSupplier.get(grpId);
                if (grpCtx == null) continue;
                for (GridDhtLocalPartition localPart : grpCtx.topology().currentLocalPartitions()) {
                    CheckpointEntry cp;
                    long earliestCpTs = localPart.earliestCpTs();
                    if (earliestCpTs == 0L || (cp = (CheckpointEntry)this.histMap.get(earliestCpTs)) == null || cp.groupStates() == null || earliestCp.containsKey(cp.checkpointId())) continue;
                    earliestCp.put(cp.checkpointId(), CheckpointHistory.createSnapshot(cp.groupStates()));
                }
            }
        }
        Set<UUID> histCpIds = this.histMap.values().stream().map(CheckpointEntry::checkpointId).collect(Collectors.toSet());
        return new EarliestCheckpointMapSnapshot(histCpIds, earliestCp);
    }

    void clear() {
        this.histMap.clear();
        this.earliestCpGrps.clear();
    }

    private static Map<Integer, EarliestCheckpointMapSnapshot.GroupStateSnapshot> createSnapshot(Map<Integer, CheckpointEntry.GroupState> stateByGrpId) {
        HashMap<Integer, EarliestCheckpointMapSnapshot.GroupStateSnapshot> snapshot = new HashMap<Integer, EarliestCheckpointMapSnapshot.GroupStateSnapshot>();
        for (Map.Entry<Integer, CheckpointEntry.GroupState> e : stateByGrpId.entrySet()) {
            CheckpointEntry.GroupState grpState = e.getValue();
            EarliestCheckpointMapSnapshot.GroupStateSnapshot grpStateSnapshot = new EarliestCheckpointMapSnapshot.GroupStateSnapshot(grpState.partitionIds(), grpState.partitionCounters(), grpState.size());
            snapshot.put(e.getKey(), grpStateSnapshot);
        }
        return snapshot;
    }

    private void clearEarliestCpTsOfGrp(int grpId) {
        CacheGroupContext grpCtx = this.cacheGrpCtxSupplier.get(grpId);
        if (grpCtx != null) {
            for (GridDhtLocalPartition localPart : grpCtx.topology().currentLocalPartitions()) {
                localPart.earliestCpTs(0L);
            }
        }
    }

    private void setLocalPartitionEarliestCpTs(int grpId, int partId, long earliestCpTs) {
        CacheGroupContext grpCtx = this.cacheGrpCtxSupplier.get(grpId);
        if (grpCtx == null) {
            return;
        }
        GridDhtLocalPartition locPart = grpCtx.topology().localPartition(partId);
        if (locPart == null) {
            return;
        }
        long locEarliestCpTs = locPart.earliestCpTs();
        assert (earliestCpTs > locEarliestCpTs) : String.format("Invalid new earliestCpTs: [grpId=%s, partId=%s, localEarliestCpTs=%s, earliestCpTs=%s]", grpId, partId, locEarliestCpTs, earliestCpTs);
        if (locEarliestCpTs == 0L) {
            this.earliestCpGrps.add(grpId);
            locPart.earliestCpTs(earliestCpTs);
        }
    }

    private void applyEarliestCpSnapshot() {
        EarliestCheckpointMapSnapshot snapshot = this.earliestCpSnapshot.getAndSet(null);
        if (snapshot == null) {
            return;
        }
        for (Long timestamp : this.checkpoints(false)) {
            try {
                CheckpointEntry entry = this.entry(timestamp);
                UUID checkpointId = entry.checkpointId();
                Map<Integer, CheckpointEntry.GroupState> groupStateMap = snapshot.groupState(checkpointId);
                if (snapshot.checkpointWasPresent(checkpointId) && groupStateMap == null) continue;
                if (groupStateMap != null) {
                    entry.fillStore(groupStateMap);
                }
                this.updateEarliestCpMap(entry, groupStateMap);
            }
            catch (IgniteCheckedException e) {
                U.warn(this.log, "Failed to process checkpoint, happened at " + U.format(timestamp) + '.', e);
            }
        }
    }

    void createInMemoryEarliestCpSnapshot() {
        EarliestCheckpointMapSnapshot snapshot = this.earliestCheckpointsMapSnapshot();
        this.earliestCpSnapshot.set(snapshot);
    }

    private class WalPointerCandidate {
        private final int grpId;
        private final int part;
        private final long partContr;
        private final FileWALPointer walPntr;
        private final long walPntrCntr;

        public WalPointerCandidate(int grpId, int part, long partContr, FileWALPointer walPntr, long walPntrCntr) {
            this.grpId = grpId;
            this.part = part;
            this.partContr = partContr;
            this.walPntr = walPntr;
            this.walPntrCntr = walPntrCntr;
        }

        public FileWALPointer choose(CheckpointEntry cpEntry, long margin, Map<Integer, Long> partCntsForUpdate) {
            Long foundCntr = null;
            try {
                foundCntr = cpEntry == null ? null : cpEntry.partitionCounter(CheckpointHistory.this.wal, this.grpId, this.part);
            }
            catch (IgniteCheckedException e) {
                CheckpointHistory.this.log.warning("Checkpoint cannot be chosen because counter is unavailable [grpId=" + this.grpId + ", part=" + this.part + ", cp=(" + cpEntry.checkpointId() + ", " + U.format(cpEntry.timestamp()) + ")]", e);
            }
            if (foundCntr == null || foundCntr == this.walPntrCntr) {
                partCntsForUpdate.put(this.part, this.walPntrCntr);
                return this.walPntr;
            }
            partCntsForUpdate.put(this.part, Math.max(foundCntr, this.partContr - margin));
            return (FileWALPointer)cpEntry.checkpointMark();
        }
    }
}

