/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.checker.tasks;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.checker.objects.ExecutionResult;
import org.apache.ignite.internal.processors.cache.checker.objects.RecheckRequest;
import org.apache.ignite.internal.processors.cache.checker.objects.VersionedEntry;
import org.apache.ignite.internal.processors.cache.checker.objects.VersionedValue;
import org.apache.ignite.internal.processors.cache.checker.tasks.ReconciliationResourceLimitedJob;
import org.apache.ignite.internal.processors.cache.checker.util.ConsistencyCheckUtils;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;

@GridInternal
public class CollectPartitionKeysByRecheckRequestTask
extends ComputeTaskAdapter<RecheckRequest, ExecutionResult<Map<KeyCacheObject, Map<UUID, VersionedValue>>>> {
    private static final long serialVersionUID = 0L;
    @LoggerResource
    private IgniteLogger log;
    @IgniteInstanceResource
    private IgniteEx ignite;
    private RecheckRequest recheckReq;

    @Override
    public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, RecheckRequest arg) throws IgniteException {
        HashMap<CollectRecheckJob, ClusterNode> jobs = new HashMap<CollectRecheckJob, ClusterNode>();
        this.recheckReq = arg;
        for (ClusterNode node : subgrid) {
            jobs.put(new CollectRecheckJob(arg), node);
        }
        return jobs;
    }

    @Override
    public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
        ComputeJobResultPolicy superRes = super.result(res, rcvd);
        if (superRes == ComputeJobResultPolicy.FAILOVER) {
            superRes = ComputeJobResultPolicy.WAIT;
            this.log.warning("CollectPartitionEntryHashesJob failed on node [consistentId=" + res.getNode().consistentId() + "]", res.getException());
        }
        return superRes;
    }

    @Override
    public ExecutionResult<Map<KeyCacheObject, Map<UUID, VersionedValue>>> reduce(List<ComputeJobResult> results) throws IgniteException {
        IgniteInternalCache cache = this.ignite.context().cache().cache(this.recheckReq.cacheName());
        if (cache == null) {
            return new ExecutionResult<Map<KeyCacheObject, Map<UUID, VersionedValue>>>("Cache not found (was stopped) [name=" + this.recheckReq.cacheName() + ']');
        }
        HashMap<KeyCacheObject, Map> res = new HashMap<KeyCacheObject, Map>();
        GridCacheContext<Object, Object> ctx = cache.context();
        for (ComputeJobResult result : results) {
            if (result.getException() != null) {
                return new ExecutionResult<Map<KeyCacheObject, Map<UUID, VersionedValue>>>(result.getException().getMessage());
            }
            ExecutionResult excRes = (ExecutionResult)result.getData();
            if (excRes.errorMessage() != null) {
                return new ExecutionResult<Map<KeyCacheObject, Map<UUID, VersionedValue>>>(excRes.errorMessage());
            }
            List partKeys = (List)excRes.result();
            for (VersionedEntry key : partKeys) {
                try {
                    KeyCacheObject keyObj = ConsistencyCheckUtils.unmarshalKey(key.key(), ctx);
                    res.computeIfAbsent(keyObj, k -> new HashMap()).put(key.nodeId(), new VersionedValue(key.val(), key.ver(), key.updateCntr(), key.recheckStartTime()));
                }
                catch (Exception e) {
                    U.error(this.log, e.getMessage(), e);
                    return new ExecutionResult<Map<KeyCacheObject, Map<UUID, VersionedValue>>>(e.getMessage());
                }
            }
        }
        return new ExecutionResult<Map<KeyCacheObject, Map<UUID, VersionedValue>>>(res);
    }

    public static class CollectRecheckJob
    extends ReconciliationResourceLimitedJob {
        private static final long serialVersionUID = 0L;
        private final RecheckRequest recheckReq;

        public CollectRecheckJob(RecheckRequest recheckReq) {
            this.recheckReq = recheckReq;
        }

        @Override
        protected long sessionId() {
            return this.recheckReq.sessionId();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected ExecutionResult<List<VersionedEntry>> execute0() {
            IgniteInternalCache cache = this.ignite.context().cache().cache(this.recheckReq.cacheName());
            if (cache == null) {
                return new ExecutionResult<List<VersionedEntry>>("Cache not found (was stopped) [name=" + this.recheckReq.cacheName() + ']');
            }
            GridCacheContext<Object, Object> cctx = cache.context();
            CacheGroupContext grpCtx = cctx.group();
            GridDhtLocalPartition part = grpCtx.topology().localPartition(this.recheckReq.partitionId());
            assert (part != null);
            part.reserve();
            ArrayList<VersionedEntry> recheckedKeys = new ArrayList<VersionedEntry>();
            long updateCntr = part.updateCounter();
            long recheckStartTime = System.currentTimeMillis();
            try {
                for (KeyCacheObject recheckKey : this.recheckReq.recheckKeys()) {
                    try {
                        KeyCacheObject key = ConsistencyCheckUtils.unmarshalKey(recheckKey, cctx);
                        CacheDataRow row = grpCtx.offheap().dataStore(part).find(cctx, key);
                        if (row == null) continue;
                        recheckedKeys.add(new VersionedEntry(this.ignite.localNode().id(), row.key(), row.version(), row.value(), updateCntr, recheckStartTime));
                    }
                    catch (IgniteCheckedException e) {
                        String errMsg = "Recheck key [key=" + recheckKey + "] was skipped.";
                        U.error(this.log, errMsg, e);
                        ExecutionResult<List<VersionedEntry>> executionResult = new ExecutionResult<List<VersionedEntry>>(errMsg);
                        part.release();
                        return executionResult;
                    }
                }
                ExecutionResult executionResult = new ExecutionResult(recheckedKeys);
                return executionResult;
            }
            finally {
                part.release();
            }
        }
    }
}

