/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
import org.apache.ignite.internal.processors.cache.TombstoneCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;

public class GridDhtPartitionSupplier {
    private final CacheGroupContext grp;
    private final IgniteLogger log;
    private GridDhtPartitionTopology top;
    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
    private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext>();
    @Deprecated
    private long rebalanceThrottleOverride = IgniteSystemProperties.getLong("IGNITE_REBALANCE_THROTTLE_OVERRIDE", 0L);

    GridDhtPartitionSupplier(CacheGroupContext grp) {
        assert (grp != null);
        this.grp = grp;
        this.log = grp.shared().logger(this.getClass());
        this.top = grp.topology();
        if (this.rebalanceThrottleOverride > 0L) {
            LT.info(this.log, "Using rebalance throttle override: " + this.rebalanceThrottleOverride);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void stop() {
        Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
        synchronized (map) {
            Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = this.scMap.keySet().iterator();
            while (it.hasNext()) {
                T3<UUID, Integer, AffinityTopologyVersion> t2 = it.next();
                GridDhtPartitionSupplier.clearContext(this.scMap.get(t2), this.log);
                it.remove();
            }
        }
    }

    private static void clearContext(SupplyContext sc, IgniteLogger log) {
        IgniteRebalanceIterator it;
        if (sc != null && (it = sc.iterator) != null && !it.isClosed()) {
            try {
                it.close();
            }
            catch (IgniteCheckedException e) {
                U.error(log, "Iterator close failed.", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onTopologyChanged() {
        Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
        synchronized (map) {
            Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = this.scMap.keySet().iterator();
            Collection aliveNodes = this.grp.shared().discovery().aliveServerNodes().stream().map(ClusterNode::id).collect(Collectors.toList());
            while (it.hasNext()) {
                T3<UUID, Integer, AffinityTopologyVersion> t2 = it.next();
                if (aliveNodes.contains(t2.get1())) continue;
                GridDhtPartitionSupplier.clearContext(this.scMap.get(t2), this.log);
                it.remove();
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Supply context removed [grp=" + this.grp.cacheOrGroupName() + ", demander=" + t2.get1() + "]");
            }
        }
    }

    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
        this.preloadPred = preloadPred;
    }

    public boolean isSupply() {
        return !F.isEmpty(this.scMap);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemandMessage demandMsg) {
        block69: {
            assert (demandMsg != null);
            assert (nodeId != null);
            T3<UUID, Integer, AffinityTopologyVersion> ctxId = new T3<UUID, Integer, AffinityTopologyVersion>(nodeId, topicId, demandMsg.topologyVersion());
            if (demandMsg.rebalanceId() < 0L) {
                Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
                synchronized (map) {
                    SupplyContext sctx = this.scMap.get(ctxId);
                    if (sctx != null && sctx.rebalanceId == -demandMsg.rebalanceId()) {
                        GridDhtPartitionSupplier.clearContext(this.scMap.remove(ctxId), this.log);
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Supply context cleaned [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + ", supplyContext=" + sctx + "]");
                        }
                    } else if (this.log.isDebugEnabled()) {
                        this.log.debug("Stale supply context cleanup message [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + ", supplyContext=" + sctx + "]");
                    }
                    return;
                }
            }
            ClusterNode demanderNode = this.grp.shared().discovery().node(nodeId);
            if (demanderNode == null) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Demand message rejected (demander left cluster) [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + "]");
                }
                return;
            }
            GridCloseableIterator iter = null;
            SupplyContext sctx = null;
            Set<Integer> remainingParts = null;
            GridDhtPartitionSupplyMessage supplyMsg = new GridDhtPartitionSupplyMessage(demandMsg.rebalanceId(), this.grp.groupId(), demandMsg.topologyVersion(), this.grp.deploymentEnabled());
            try {
                HashMap<Integer, Long> initUpdateCntrs;
                Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
                synchronized (map) {
                    sctx = this.scMap.remove(ctxId);
                    if (sctx != null && demandMsg.rebalanceId() < sctx.rebalanceId) {
                        this.scMap.put(ctxId, sctx);
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Stale demand message [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + ", actualContext=" + sctx + "]");
                        }
                        return;
                    }
                }
                if (sctx == null && (demandMsg.partitions() == null || demandMsg.partitions().isEmpty())) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Empty demand message (no context and partitions) [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + "]");
                    }
                    return;
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Demand message accepted [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + "]");
                }
                assert (sctx == null || demandMsg.partitions().isEmpty()) : "sctx=" + sctx + ", topicId=" + topicId + ", demanderId=" + nodeId + ", msg=" + demandMsg;
                Integer rmtThreadPoolSize = (Integer)demanderNode.attribute("org.apache.ignite.rebalance.pool.size");
                if (rmtThreadPoolSize == null) {
                    rmtThreadPoolSize = 1;
                }
                long maxBatchesCnt = this.grp.preloader().batchesPrefetchCount() * (long)rmtThreadPoolSize.intValue();
                if (sctx == null) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Starting supplying rebalancing [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + ", fullPartitions=" + S.compact(demandMsg.partitions().fullSet()) + ", histPartitions=" + S.compact(demandMsg.partitions().historicalSet()) + "]");
                    }
                } else {
                    maxBatchesCnt = 1L;
                }
                if (sctx == null || sctx.iterator == null) {
                    GridDhtLocalPartition loc;
                    remainingParts = new HashSet<Integer>(demandMsg.partitions().fullSet());
                    HashSet<Integer> demandParts = new HashSet<Integer>(demandMsg.partitions().fullSet());
                    CachePartitionPartialCountersMap histMap = demandMsg.partitions().historicalMap();
                    for (int i = 0; i < histMap.size(); ++i) {
                        demandParts.add(histMap.partitionAt(i));
                    }
                    initUpdateCntrs = new HashMap<Integer, Long>(demandParts.size());
                    for (Integer part : demandParts) {
                        loc = this.top.localPartition(part, demandMsg.topologyVersion(), false);
                        if (loc == null || loc.state() != GridDhtPartitionState.OWNING) continue;
                        initUpdateCntrs.put(part, loc.updateCounter());
                    }
                    for (int i = 0; i < histMap.size(); ++i) {
                        int p = histMap.partitionAt(i);
                        remainingParts.add(p);
                    }
                    iter = this.grp.offheap().rebalanceIterator(demandMsg.partitions(), demandMsg.topologyVersion());
                    for (Integer part : demandMsg.partitions().fullSet()) {
                        if (iter.isPartitionMissing(part)) continue;
                        loc = this.top.localPartition(part, demandMsg.topologyVersion(), false);
                        assert (loc != null && loc.state() == GridDhtPartitionState.OWNING) : "Partition should be in OWNING state: " + loc;
                        supplyMsg.addEstimatedKeysCount(loc.dataStore().fullSize());
                    }
                    for (int i = 0; i < histMap.size(); ++i) {
                        int p = histMap.partitionAt(i);
                        if (iter.isPartitionMissing(p)) continue;
                        supplyMsg.addEstimatedKeysCount(histMap.updateCounterAt(i) - histMap.initialUpdateCounterAt(i));
                    }
                } else {
                    iter = sctx.iterator;
                    remainingParts = sctx.remainingParts;
                    initUpdateCntrs = sctx.initUpdateCntrs;
                }
                int msgMaxSize = this.grp.preloader().batchSize();
                long batchesCnt = 0L;
                CacheSearchRow prevRow = null;
                while (iter.hasNext()) {
                    GridCacheEntryInfo info;
                    boolean canFlushHistory;
                    CacheDataRow row = iter.peek();
                    boolean bl = canFlushHistory = !this.grp.mvccEnabled() || prevRow != null && (this.grp.sharedGroup() && row.cacheId() != prevRow.cacheId() || !row.key().equals(prevRow.key()));
                    if (canFlushHistory && supplyMsg.messageSize() >= msgMaxSize) {
                        if (++batchesCnt >= maxBatchesCnt) {
                            this.saveSupplyContext(ctxId, (IgniteRebalanceIterator)iter, remainingParts, demandMsg.rebalanceId(), initUpdateCntrs);
                            this.reply(topicId, demanderNode, demandMsg, supplyMsg, ctxId);
                            return;
                        }
                        if (!this.reply(topicId, demanderNode, demandMsg, supplyMsg, ctxId)) {
                            return;
                        }
                        supplyMsg = new GridDhtPartitionSupplyMessage(demandMsg.rebalanceId(), this.grp.groupId(), demandMsg.topologyVersion(), this.grp.deploymentEnabled());
                    }
                    row = (CacheDataRow)iter.next();
                    prevRow = row;
                    int part = row.partition();
                    GridDhtLocalPartition loc = this.top.localPartition(part, demandMsg.topologyVersion(), false);
                    assert (loc != null && loc.state() == GridDhtPartitionState.OWNING && loc.reservations() > 0 || iter.isPartitionMissing(part)) : "Partition should be in OWNING state and has at least 1 reservation " + loc;
                    if (iter.isPartitionMissing(part) && remainingParts.contains(part)) {
                        supplyMsg.missed(part);
                        remainingParts.remove(part);
                        if (this.grp.eventRecordable(88)) {
                            this.grp.addRebalanceMissEvent(part);
                        }
                        if (!this.log.isDebugEnabled()) continue;
                        this.log.debug("Requested partition is marked as missing [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + ", p=" + part + "]");
                        continue;
                    }
                    if (!remainingParts.contains(part) || (info = this.extractEntryInfo(row)) == null) continue;
                    if (this.preloadPred == null || this.preloadPred.apply(info)) {
                        supplyMsg.addEntry0(part, iter.historical(part), info, this.grp.shared(), this.grp.cacheObjectContext());
                    } else if (this.log.isTraceEnabled()) {
                        this.log.trace("Rebalance predicate evaluated to false (will not send cache entry): " + info);
                    }
                    if (!iter.isPartitionDone(part)) continue;
                    supplyMsg.last(part, (Long)initUpdateCntrs.get(part));
                    remainingParts.remove(part);
                    if (!this.grp.eventRecordable(87)) continue;
                    this.grp.addRebalanceSupplyEvent(part);
                }
                Iterator<Integer> remainingIter = remainingParts.iterator();
                while (remainingIter.hasNext()) {
                    int p = remainingIter.next();
                    if (iter.isPartitionDone(p)) {
                        GridDhtLocalPartition loc = this.top.localPartition(p, demandMsg.topologyVersion(), false);
                        assert (loc != null) : "Supply partition is gone: grp=" + this.grp.cacheOrGroupName() + ", p=" + p;
                        supplyMsg.last(p, loc.updateCounter());
                        remainingIter.remove();
                        if (!this.grp.eventRecordable(87)) continue;
                        this.grp.addRebalanceSupplyEvent(p);
                        continue;
                    }
                    if (!iter.isPartitionMissing(p)) continue;
                    supplyMsg.missed(p);
                    remainingIter.remove();
                    if (!this.grp.eventRecordable(88)) continue;
                    this.grp.addRebalanceMissEvent(p);
                }
                assert (remainingParts.isEmpty()) : "Partitions after rebalance should be either done or missing: " + remainingParts;
                if (sctx != null) {
                    GridDhtPartitionSupplier.clearContext(sctx, this.log);
                } else {
                    iter.close();
                }
                this.reply(topicId, demanderNode, demandMsg, supplyMsg, ctxId);
                if (this.log.isInfoEnabled()) {
                    this.log.info("Finished supplying rebalancing [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + "]");
                }
            }
            catch (Throwable t2) {
                boolean sendErrMsg;
                if (iter != null && !iter.isClosed()) {
                    try {
                        iter.close();
                    }
                    catch (IgniteCheckedException e) {
                        t2.addSuppressed(e);
                    }
                }
                if (this.grp.shared().kernalContext().isStopping()) {
                    return;
                }
                boolean bl = sendErrMsg = demanderNode.version().compareTo(GridDhtPartitionSupplyMessageV2.AVAILABLE_SINCE) >= 0;
                if (t2 instanceof IgniteSpiException) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Failed to send message to node (current node is stopping?) [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + ", msg=" + t2.getMessage() + ']');
                    }
                    sendErrMsg = false;
                } else {
                    U.error(this.log, "Failed to continue supplying [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t2);
                }
                try {
                    if (sctx != null) {
                        GridDhtPartitionSupplier.clearContext(sctx, this.log);
                    }
                }
                catch (Throwable t1) {
                    U.error(this.log, "Failed to cleanup supplying context [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1);
                }
                if (!sendErrMsg) {
                    return;
                }
                boolean fallbackToFullRebalance = X.hasCause(t2, IgniteHistoricalIteratorException.class);
                try {
                    GridDhtPartitionSupplyMessage errMsg;
                    if (fallbackToFullRebalance) {
                        this.grp.shared().database().lastCheckpointInapplicableForWalRebalance(this.grp.groupId());
                        if (iter == null && F.isEmpty(remainingParts)) {
                            remainingParts = new HashSet<Integer>(demandMsg.partitions().fullSet());
                            remainingParts.addAll(demandMsg.partitions().historicalSet());
                        }
                        Iterator iterator = Optional.ofNullable(remainingParts).orElseGet(Collections::emptySet).iterator();
                        while (iterator.hasNext()) {
                            int p = (Integer)iterator.next();
                            supplyMsg.missed(p);
                        }
                        errMsg = supplyMsg;
                    } else {
                        errMsg = new GridDhtPartitionSupplyMessageV2(demandMsg.rebalanceId(), this.grp.groupId(), demandMsg.topologyVersion(), this.grp.deploymentEnabled(), t2);
                    }
                    this.reply(topicId, demanderNode, demandMsg, errMsg, ctxId);
                }
                catch (Throwable t1) {
                    U.error(this.log, "Failed to send supply error message [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1);
                }
                if (fallbackToFullRebalance) break block69;
                this.grp.shared().kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, new IgniteCheckedException("Failed to continue supplying [" + this.supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t2)));
            }
        }
    }

    private GridCacheEntryInfo extractEntryInfo(CacheDataRow row) {
        GridCacheEntryInfo info = this.grp.mvccEnabled() ? new GridCacheMvccEntryInfo() : new GridCacheEntryInfo();
        info.key(row.key());
        info.cacheId(row.cacheId());
        if (this.grp.mvccEnabled()) {
            assert (row.mvccCoordinatorVersion() != 0L);
            if (row.mvccTxState() != 3) {
                return null;
            }
            ((MvccVersionAware)((Object)info)).mvccVersion(row);
            ((GridCacheMvccEntryInfo)info).mvccTxState((byte)3);
            if (row.newMvccCoordinatorVersion() != 0L && row.newMvccTxState() == 3) {
                ((MvccUpdateVersionAware)((Object)info)).newMvccVersion(row);
                ((GridCacheMvccEntryInfo)info).newMvccTxState((byte)3);
            }
        }
        info.value(row.value() == TombstoneCacheObject.INSTANCE ? null : row.value());
        info.version(row.version());
        info.expireTime(row.expireTime());
        return info;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean reply(int topicId, ClusterNode demander, GridDhtPartitionDemandMessage demandMsg, GridDhtPartitionSupplyMessage supplyMsg, T3<UUID, Integer, AffinityTopologyVersion> ctxId) throws IgniteCheckedException {
        try {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Send next supply message [" + this.supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]");
            }
            this.grp.shared().io().sendOrderedMessage(demander, demandMsg.topic(), supplyMsg, this.grp.ioPolicy(), demandMsg.timeout());
            if (this.rebalanceThrottleOverride > 0L) {
                U.sleep(this.rebalanceThrottleOverride);
            } else if (this.grp.preloader().throttle() > 0L) {
                U.sleep(this.grp.preloader().throttle());
            }
            return true;
        }
        catch (ClusterTopologyCheckedException ignore) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to send supply message (demander left): [" + this.supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]");
            }
            Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
            synchronized (map) {
                GridDhtPartitionSupplier.clearContext(this.scMap.remove(ctxId), this.log);
            }
            return false;
        }
    }

    private String supplyRoutineInfo(int topicId, UUID demander, GridDhtPartitionDemandMessage demandMsg) {
        return "grp=" + this.grp.cacheOrGroupName() + ", demander=" + demander + ", topVer=" + demandMsg.topologyVersion() + (topicId > 0 ? ", topic=" + topicId : "");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void saveSupplyContext(T3<UUID, Integer, AffinityTopologyVersion> ctxId, IgniteRebalanceIterator entryIt, Set<Integer> remainingParts, long rebalanceId, Map<Integer, Long> initUpdateCntrs) {
        Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
        synchronized (map) {
            assert (this.scMap.get(ctxId) == null);
            this.scMap.put(ctxId, new SupplyContext(entryIt, remainingParts, rebalanceId, initUpdateCntrs));
        }
    }

    private static class SupplyContext {
        @GridToStringExclude
        private final IgniteRebalanceIterator iterator;
        private final Set<Integer> remainingParts;
        private final long rebalanceId;
        private final Map<Integer, Long> initUpdateCntrs;

        SupplyContext(IgniteRebalanceIterator iterator, Set<Integer> remainingParts, long rebalanceId, Map<Integer, Long> initUpdateCntrs) {
            this.iterator = iterator;
            this.remainingParts = remainingParts;
            this.rebalanceId = rebalanceId;
            this.initUpdateCntrs = initUpdateCntrs;
        }

        public String toString() {
            return S.toString(SupplyContext.class, this);
        }
    }
}

