/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.checker.tasks;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.checker.objects.ExecutionResult;
import org.apache.ignite.internal.processors.cache.checker.objects.RepairRequest;
import org.apache.ignite.internal.processors.cache.checker.objects.RepairResult;
import org.apache.ignite.internal.processors.cache.checker.objects.VersionedKey;
import org.apache.ignite.internal.processors.cache.checker.objects.VersionedValue;
import org.apache.ignite.internal.processors.cache.checker.tasks.RepairEntryProcessor;
import org.apache.ignite.internal.processors.cache.checker.util.ConsistencyCheckUtils;
import org.apache.ignite.internal.processors.cache.verify.RepairAlgorithm;
import org.apache.ignite.internal.processors.cache.verify.RepairMeta;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;

@GridInternal
public class RepairRequestTask
extends ComputeTaskAdapter<RepairRequest, ExecutionResult<RepairResult>> {
    private static final long serialVersionUID = 0L;
    public static final int MAX_REPAIR_ATTEMPTS = 3;
    @LoggerResource
    private IgniteLogger log;
    @IgniteInstanceResource
    private IgniteEx ignite;
    private RepairRequest repairReq;

    @Override
    public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, RepairRequest arg) throws IgniteException {
        HashMap<RepairJob, ClusterNode> jobs = new HashMap<RepairJob, ClusterNode>();
        this.repairReq = arg;
        HashMap<UUID, Map> targetNodesToData = new HashMap<UUID, Map>();
        for (Map.Entry<KeyCacheObject, Map<UUID, VersionedValue>> dataEntry : this.repairReq.data().entrySet()) {
            KeyCacheObject keyCacheObj;
            GridCacheContext<Object, Object> ctx = this.ignite.cachex(this.repairReq.cacheName()).context();
            try {
                keyCacheObj = ConsistencyCheckUtils.unmarshalKey(dataEntry.getKey(), ctx);
            }
            catch (IgniteCheckedException e) {
                U.error(this.log, "Unable to unmarshal key=[" + dataEntry.getKey() + "], key is skipped.", e);
                continue;
            }
            int part = ctx.affinity().partition(keyCacheObj);
            UUID primaryNodeId = ctx.affinity().nodesByPartition(part, AffinityTopologyVersion.NONE).get(0).id();
            Map primaryKeyValuesMapping = targetNodesToData.computeIfAbsent(primaryNodeId, nodeId -> new HashMap());
            primaryKeyValuesMapping.put(keyCacheObj, dataEntry.getValue());
        }
        for (ClusterNode node : subgrid) {
            Map data = (Map)targetNodesToData.remove(node.id());
            if (data == null || data.isEmpty()) continue;
            jobs.put(new RepairJob(data.entrySet().stream().collect(Collectors.toMap(entry -> new VersionedKey(null, (KeyCacheObject)entry.getKey(), null), Map.Entry::getValue)), arg.cacheName(), this.repairReq.repairAlg(), this.repairReq.repairAttempt(), this.repairReq.startTopologyVersion(), this.repairReq.partitionId()), node);
        }
        if (!targetNodesToData.isEmpty()) {
            for (Map data : targetNodesToData.values()) {
                ClusterNode node = subgrid.iterator().next();
                jobs.put(new RepairJob(data.entrySet().stream().collect(Collectors.toMap(entry -> new VersionedKey(null, (KeyCacheObject)entry.getKey(), null), Map.Entry::getValue)), arg.cacheName(), this.repairReq.repairAlg(), this.repairReq.repairAttempt(), this.repairReq.startTopologyVersion(), this.repairReq.partitionId()), node);
            }
        }
        return jobs;
    }

    @Override
    public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
        ComputeJobResultPolicy superRes = super.result(res, rcvd);
        if (superRes == ComputeJobResultPolicy.FAILOVER) {
            superRes = ComputeJobResultPolicy.WAIT;
            this.log.warning("CollectPartitionEntryHashesJob failed on node [consistentId=" + res.getNode().consistentId() + "]", res.getException());
        }
        return superRes;
    }

    @Override
    public ExecutionResult<RepairResult> reduce(List<ComputeJobResult> results) throws IgniteException {
        RepairResult aggregatedRepairRes = new RepairResult();
        for (ComputeJobResult result : results) {
            if (result.getException() != null) {
                return new ExecutionResult<RepairResult>(result.getException().getMessage());
            }
            ExecutionResult excRes = (ExecutionResult)result.getData();
            if (excRes.errorMessage() != null) {
                return new ExecutionResult<RepairResult>(excRes.errorMessage());
            }
            RepairResult repairRes = (RepairResult)excRes.result();
            aggregatedRepairRes.keysToRepair().putAll(repairRes.keysToRepair());
            aggregatedRepairRes.repairedKeys().putAll(repairRes.repairedKeys());
        }
        return new ExecutionResult<RepairResult>(aggregatedRepairRes);
    }

    protected static class RepairJob
    extends ComputeJobAdapter {
        private static final long serialVersionUID = 0L;
        @IgniteInstanceResource
        private IgniteEx ignite;
        @LoggerResource
        private IgniteLogger log;
        private final Map<VersionedKey, Map<UUID, VersionedValue>> data;
        private String cacheName;
        private int repairAttempt;
        private RepairAlgorithm repairAlg;
        private AffinityTopologyVersion startTopVer;
        private int partId;

        public RepairJob(Map<VersionedKey, Map<UUID, VersionedValue>> data, String cacheName, RepairAlgorithm repairAlg, int repairAttempt, AffinityTopologyVersion startTopVer, int partId) {
            this.data = data;
            this.cacheName = cacheName;
            this.repairAlg = repairAlg;
            this.repairAttempt = repairAttempt;
            this.startTopVer = startTopVer;
            this.partId = partId;
        }

        @Override
        public ExecutionResult<RepairResult> execute() throws IgniteException {
            HashMap<VersionedKey, Map<UUID, VersionedValue>> keysToRepairWithNextAttempt = new HashMap<VersionedKey, Map<UUID, VersionedValue>>();
            HashMap<VersionedKey, RepairMeta> repairedKeys = new HashMap<VersionedKey, RepairMeta>();
            GridCacheContext ctx = this.ignite.cachex(this.cacheName).context();
            CacheObjectContext cacheObjCtx = ctx.cacheObjectContext();
            int ownersNodesSize = this.owners(ctx);
            for (Map.Entry<VersionedKey, Map<UUID, VersionedValue>> dataEntry : this.data.entrySet()) {
                try {
                    EntryProcessorResult res;
                    RepairEntryProcessor.RepairStatus keyWasSuccessfullyFixed;
                    Object key = this.keyValue(ctx, dataEntry.getKey().key());
                    Map<UUID, VersionedValue> nodeToVersionedValues = dataEntry.getValue();
                    UUID primaryUUID = this.primaryNodeId(ctx, key);
                    CacheObject valToFixWith = null;
                    RepairAlgorithm usedRepairAlg = this.repairAlg;
                    if (dataEntry.getValue().size() != ownersNodesSize && this.repairAttempt != 3) {
                        if (this.repairAlg == RepairAlgorithm.PRINT_ONLY) {
                            keyWasSuccessfullyFixed = RepairEntryProcessor.RepairStatus.SUCCESS;
                        } else {
                            valToFixWith = ConsistencyCheckUtils.calculateValueToFixWith(this.repairAlg, nodeToVersionedValues, primaryUUID, cacheObjCtx, ownersNodesSize);
                            res = this.ignite.cachex(this.cacheName).keepBinary().invoke(key, new RepairEntryProcessor(valToFixWith, nodeToVersionedValues, false, this.startTopVer), new Object[0]);
                            assert (res != null);
                            keyWasSuccessfullyFixed = (RepairEntryProcessor.RepairStatus)((Object)res.get());
                        }
                    } else if (this.repairAttempt == 3) {
                        valToFixWith = ConsistencyCheckUtils.calculateValueToFixWith(this.repairAlg, nodeToVersionedValues, primaryUUID, cacheObjCtx, ownersNodesSize);
                        res = this.ignite.cachex(this.cacheName).keepBinary().invoke(key, new RepairEntryProcessor(valToFixWith, nodeToVersionedValues, true, this.startTopVer), new Object[0]);
                        assert (res != null);
                        keyWasSuccessfullyFixed = (RepairEntryProcessor.RepairStatus)((Object)res.get());
                    } else {
                        usedRepairAlg = RepairAlgorithm.LATEST;
                        valToFixWith = ConsistencyCheckUtils.calculateValueToFixWith(RepairAlgorithm.LATEST, nodeToVersionedValues, primaryUUID, cacheObjCtx, ownersNodesSize);
                        res = this.ignite.cachex(this.cacheName).keepBinary().invoke(key, new RepairEntryProcessor(valToFixWith, nodeToVersionedValues, false, this.startTopVer), new Object[0]);
                        assert (res != null);
                        keyWasSuccessfullyFixed = (RepairEntryProcessor.RepairStatus)((Object)res.get());
                    }
                    if (keyWasSuccessfullyFixed == RepairEntryProcessor.RepairStatus.FAIL) {
                        keysToRepairWithNextAttempt.put(dataEntry.getKey(), dataEntry.getValue());
                        continue;
                    }
                    repairedKeys.put(dataEntry.getKey(), new RepairMeta(true, valToFixWith, usedRepairAlg, dataEntry.getValue()));
                }
                catch (IgniteCheckedException e) {
                    U.error(this.log, "Key [" + dataEntry.getKey().key() + "] was skipped during repair phase.", e);
                }
            }
            return new ExecutionResult<RepairResult>(new RepairResult(keysToRepairWithNextAttempt, repairedKeys));
        }

        protected UUID primaryNodeId(GridCacheContext ctx, Object key) {
            return ctx.affinity().nodesByKey(key, this.startTopVer).get(0).id();
        }

        protected int owners(GridCacheContext ctx) {
            return ctx.topology().owners(this.partId, this.startTopVer).size();
        }

        protected Object keyValue(GridCacheContext ctx, KeyCacheObject key) throws IgniteCheckedException {
            KeyCacheObject unmarshalledKey = ConsistencyCheckUtils.unmarshalKey(key, ctx);
            if (unmarshalledKey instanceof KeyCacheObjectImpl) {
                return unmarshalledKey.value(ctx.cacheObjectContext(), false);
            }
            return key;
        }
    }
}

