/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.dr.maintenance;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointManager;
import org.apache.ignite.internal.processors.cache.tree.SearchRow;
import org.apache.ignite.internal.processors.cache.tree.updatelog.UpdateLog;
import org.apache.ignite.internal.processors.cache.tree.updatelog.UpdateLogRow;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.maintenance.MaintenanceAction;
import org.gridgain.grid.internal.processors.cache.dr.ist.messages.DrStateRecord;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.gridgain.grid.internal.processors.dr.maintenance.RepairUpdateLogTarget;
import org.jetbrains.annotations.Nullable;

public class RebuildUpdateLogAction
implements MaintenanceAction<Boolean> {
    private final List<RepairUpdateLogTarget> targets;
    private final IgniteLogger log;
    private final GridKernalContext kernalContext;

    public RebuildUpdateLogAction(List<RepairUpdateLogTarget> targets, GridKernalContext kernalContext) {
        this.targets = targets;
        this.kernalContext = kernalContext;
        this.log = kernalContext.log(RebuildUpdateLogAction.class);
    }

    @Override
    public String name() {
        return "rebuild";
    }

    @Override
    @Nullable
    public String description() {
        return "Rebuilding partition log trees for cache group";
    }

    @Override
    public Boolean execute() {
        if (this.log.isInfoEnabled()) {
            this.log.info("Start rebuilding partition log trees.");
        }
        GridCacheProcessor cacheProc = this.kernalContext.cache();
        GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)cacheProc.context().database();
        CheckpointManager manager = database.getCheckpointManager();
        assert (manager != null);
        try {
            this.prepareForRebuild(database);
        }
        catch (IgniteCheckedException e) {
            this.log.error("Failed to prepare for the rebuild of partition log trees", e);
            return false;
        }
        try {
            for (RepairUpdateLogTarget params : this.targets) {
                CacheGroupContext grpCtx = cacheProc.cacheGroup(params.groupId());
                assert (grpCtx != null);
                List<Integer> drCachesIds = this.drCachesInGroup(grpCtx);
                if (drCachesIds.isEmpty()) {
                    this.log.warning("No caches found in the group: " + params.groupId());
                    continue;
                }
                Map<Integer, Map> partDrState = drCachesIds.stream().collect(Collectors.toMap(c -> c, c -> this.partitionDrState(grpCtx.shared().cacheContext((int)c))));
                for (GridDhtLocalPartition part : grpCtx.topology().localPartitions()) {
                    this.repairPartitionLogTree(grpCtx, part.dataStore(), drCachesIds, r -> partDrState.getOrDefault(r.cacheId(), Collections.emptyMap()).getOrDefault(r.partition(), 0L) < r.version().updateCounter());
                }
                manager.forceCheckpoint("afterPartitionLogTreeRebuild", null).futureFor(CheckpointState.FINISHED).get();
            }
        }
        catch (Exception e) {
            this.log.error("Failed to rebuild partition log tree.", e);
            return false;
        }
        this.unregisterMaintenanceTask(this.kernalContext);
        if (this.log.isInfoEnabled()) {
            this.log.info("Rebuilding partition log trees done.");
        }
        return true;
    }

    private Map<Integer, Long> partitionDrState(GridCacheContext cctx) {
        try {
            Collection<DrStateRecord> drStateRecords = DrUtils.readDrState(cctx.shared().database(), DrUtils.drStateMetastorageKey(cctx.name()));
            HashMap<Integer, Long> res = new HashMap<Integer, Long>(drStateRecords.size());
            for (DrStateRecord rec : drStateRecords) {
                res.put(rec.part(), rec.lwm());
            }
            return res;
        }
        catch (IgniteCheckedException e) {
            this.log.warning("Failed to restore DR state.", e);
            return Collections.emptyMap();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void repairPartitionLogTree(CacheGroupContext grpCtx, IgniteCacheOffheapManager.CacheDataStore dataStore, List<Integer> drCachesIds, Predicate<CacheDataRow> filter) throws IgniteCheckedException {
        IgniteCacheDatabaseSharedManager database = grpCtx.shared().database();
        database.checkpointReadLock();
        try {
            ((GridCacheOffheapManager.GridCacheDataStore)dataStore).dropPartitionLogTree();
        }
        finally {
            database.checkpointReadUnlock();
        }
        UpdateLog updateLog = dataStore.logTree();
        if (!updateLog.hasTree()) {
            return;
        }
        assert (updateLog.tree().size() == 0L) : updateLog.tree().name();
        for (Integer cacheId : drCachesIds) {
            try {
                GridCursor cur = dataStore.tree().find(grpCtx.sharedGroup() ? new SearchRow(cacheId) : null, grpCtx.sharedGroup() ? new SearchRow(cacheId) : null, (Object)CacheDataRowAdapter.RowData.FULL);
                Throwable throwable = null;
                try {
                    database.checkpointReadLock();
                    try {
                        while (cur.next()) {
                            CacheDataRow row = (CacheDataRow)cur.get();
                            if (!filter.test(row)) continue;
                            updateLog.put(new UpdateLogRow(cacheId, row.version().updateCounter(), row.link()));
                        }
                    }
                    finally {
                        database.checkpointReadUnlock();
                    }
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (cur == null) continue;
                    if (throwable != null) {
                        try {
                            cur.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    cur.close();
                }
            }
            catch (Exception e) {
                throw new IgniteException(e);
            }
        }
        this.log.info("Log tree has been built for partition: cacheGroup=" + grpCtx.cacheOrGroupName() + ", part=" + dataStore.partId());
    }

    private List<Integer> drCachesInGroup(CacheGroupContext grpCtx) {
        if (!grpCtx.hasCaches()) {
            return Collections.emptyList();
        }
        return grpCtx.caches().stream().filter(ctx -> DrUtils.isDrSenderEnabled(ctx.config())).map(GridCacheContext::cacheId).sorted(Integer::compareTo).collect(Collectors.toList());
    }

    private void prepareForRebuild(GridCacheDatabaseSharedManager database) throws IgniteCheckedException {
        database.resumeWalLogging();
        database.onStateRestored(null);
    }

    private void unregisterMaintenanceTask(GridKernalContext kernalContext) {
        kernalContext.maintenanceRegistry().unregisterMaintenanceTask("PartitionLogTreeRebuildMaintenanceTask");
    }
}

