package org.gridgain.grid.kernal.processors.cache.distributed.replicated.preloader;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReadWriteLock;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridInterruptedException;
import org.gridgain.grid.GridNode;
import org.gridgain.grid.GridRichNode;
import org.gridgain.grid.GridTopologyException;
import org.gridgain.grid.kernal.managers.deployment.GridDeploymentInfo;
import org.gridgain.grid.kernal.processors.cache.GridCacheContext;
import org.gridgain.grid.kernal.processors.cache.GridCacheEntryEx;
import org.gridgain.grid.kernal.processors.cache.GridCacheEntryInfo;
import org.gridgain.grid.kernal.processors.cache.GridCacheSwapEntry;
import org.gridgain.grid.kernal.processors.cache.GridCacheSwapListener;
import org.gridgain.grid.kernal.processors.cache.GridCacheVersion;
import org.gridgain.grid.lang.GridAbsPredicate;
import org.gridgain.grid.lang.GridCloseableIterator;
import org.gridgain.grid.lang.GridPredicate;
import org.gridgain.grid.lang.GridTuple2;
import org.gridgain.grid.lang.utils.GridConcurrentHashMap;
import org.gridgain.grid.logger.GridLogger;
import org.gridgain.grid.thread.GridThread;
import org.gridgain.grid.typedef.CI2;
import org.gridgain.grid.typedef.internal.U;
import org.gridgain.grid.util.worker.GridWorker;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/gridgain/grid/kernal/processors/cache/distributed/replicated/preloader/GridReplicatedPreloadSupplyPool.class */
public class GridReplicatedPreloadSupplyPool<K, V> {
    private final GridCacheContext<K, V> cctx;
    private final GridLogger log;
    private final GridAbsPredicate preloadFinished;
    private final ReadWriteLock busyLock;
    private final Collection<GridReplicatedPreloadSupplyPool<K, V>.SupplyWorker> workers = new LinkedList();
    private final BlockingQueue<DemandMessage<K, V>> queue = new LinkedBlockingQueue();
    private final boolean depEnabled;
    private GridPredicate<GridCacheEntryInfo<K, V>> preloadPred;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/grid/kernal/processors/cache/distributed/replicated/preloader/GridReplicatedPreloadSupplyPool$DemandMessage.class */
    public static class DemandMessage<K, V> extends GridTuple2<UUID, GridReplicatedPreloadDemandMessage<K, V>> {
        DemandMessage(UUID uuid, GridReplicatedPreloadDemandMessage<K, V> gridReplicatedPreloadDemandMessage) {
            super(uuid, gridReplicatedPreloadDemandMessage);
        }

        public DemandMessage() {
        }

        UUID senderId() {
            return get1();
        }

        public GridReplicatedPreloadDemandMessage<K, V> message() {
            return get2();
        }

        @Override // org.gridgain.grid.lang.GridTuple2
        public String toString() {
            return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']';
        }
    }

    /* loaded from: input_file:org/gridgain/grid/kernal/processors/cache/distributed/replicated/preloader/GridReplicatedPreloadSupplyPool$SupplyWorker.class */
    private class SupplyWorker extends GridWorker {
        private SupplyWorker() {
            super(GridReplicatedPreloadSupplyPool.this.cctx.gridName(), "preloader-supply-worker", GridReplicatedPreloadSupplyPool.this.log);
        }

        @Override // org.gridgain.grid.util.worker.GridWorker
        protected void body() throws InterruptedException, GridInterruptedException {
            long preloadThrottle = GridReplicatedPreloadSupplyPool.this.cctx.config().getPreloadThrottle();
            while (!isCancelled()) {
                DemandMessage demandMessage = (DemandMessage) GridReplicatedPreloadSupplyPool.this.take(GridReplicatedPreloadSupplyPool.this.queue, this);
                GridRichNode richNode = GridReplicatedPreloadSupplyPool.this.cctx.discovery().richNode(demandMessage.senderId());
                if (richNode != null) {
                    GridReplicatedPreloadDemandMessage<K, V> message = demandMessage.message();
                    if (GridReplicatedPreloadSupplyPool.this.preloadFinished.apply()) {
                        try {
                            GridReplicatedPreloadSupplyPool.this.cctx.partitionReleaseFuture(Collections.singleton(Integer.valueOf(message.partition())), richNode.order()).get(message.timeout());
                            GridReplicatedPreloadSupplyMessage<K, V> gridReplicatedPreloadSupplyMessage = new GridReplicatedPreloadSupplyMessage<>(message.workerId());
                            SwapListener swapListener = null;
                            try {
                                try {
                                    boolean isSwapOrOffheapEnabled = GridReplicatedPreloadSupplyPool.this.cctx.isSwapOrOffheapEnabled();
                                    if (isSwapOrOffheapEnabled) {
                                        swapListener = new SwapListener();
                                        GridReplicatedPreloadSupplyPool.this.cctx.swap().addOffHeapListener(message.partition(), swapListener);
                                        GridReplicatedPreloadSupplyPool.this.cctx.swap().addSwapListener(message.partition(), swapListener);
                                    }
                                    boolean z = false;
                                    Iterator<GridCacheEntryEx<K, V>> it = GridReplicatedPreloadSupplyPool.this.cctx.cache().allEntries().iterator();
                                    while (true) {
                                        if (!it.hasNext()) {
                                            break;
                                        }
                                        GridCacheEntryEx<K, V> next = it.next();
                                        if (U.safeAbs(next.hashCode() % message.nodeCount()) == message.mod() && GridReplicatedPreloadSupplyPool.this.cctx.affinity().partition(next.key()) == message.partition()) {
                                            GridCacheEntryInfo<K, V> info = next.info();
                                            if (gridReplicatedPreloadSupplyMessage.size() >= GridReplicatedPreloadSupplyPool.this.cctx.config().getPreloadBatchSize()) {
                                                if (!reply(richNode, message, gridReplicatedPreloadSupplyMessage)) {
                                                    z = true;
                                                    break;
                                                } else {
                                                    if (preloadThrottle > 0) {
                                                        U.sleep(preloadThrottle);
                                                    }
                                                    gridReplicatedPreloadSupplyMessage = new GridReplicatedPreloadSupplyMessage<>(message.workerId());
                                                }
                                            }
                                            if (info != null && (info.value() != null || info.valueBytes() != null)) {
                                                long expireTime = info.expireTime();
                                                if (expireTime == 0 || expireTime > U.currentTimeMillis()) {
                                                    if (GridReplicatedPreloadSupplyPool.this.preloadPred == null || GridReplicatedPreloadSupplyPool.this.preloadPred.apply(info)) {
                                                        gridReplicatedPreloadSupplyMessage.addEntry(info, GridReplicatedPreloadSupplyPool.this.cctx);
                                                    } else if (GridReplicatedPreloadSupplyPool.this.log.isDebugEnabled()) {
                                                        GridReplicatedPreloadSupplyPool.this.log.debug("Preload predicate evaluated to false (will not send cache entry): " + info);
                                                    }
                                                }
                                            }
                                        }
                                    }
                                    if (isSwapOrOffheapEnabled) {
                                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry<V>>> it2 = GridReplicatedPreloadSupplyPool.this.cctx.swap().iterator(message.partition());
                                        if (it2 != null) {
                                            try {
                                                boolean z2 = false;
                                                Iterator it3 = it2.iterator();
                                                while (true) {
                                                    if (!it3.hasNext()) {
                                                        break;
                                                    }
                                                    Map.Entry entry = (Map.Entry) it3.next();
                                                    GridCacheSwapEntry gridCacheSwapEntry = (GridCacheSwapEntry) entry.getValue();
                                                    if (U.safeAbs(gridCacheSwapEntry.keyHash() % message.nodeCount()) == message.mod()) {
                                                        if (gridReplicatedPreloadSupplyMessage.size() >= GridReplicatedPreloadSupplyPool.this.cctx.config().getPreloadBatchSize()) {
                                                            if (!reply(richNode, message, gridReplicatedPreloadSupplyMessage)) {
                                                                z = true;
                                                                break;
                                                            } else {
                                                                if (preloadThrottle > 0) {
                                                                    U.sleep(preloadThrottle);
                                                                }
                                                                gridReplicatedPreloadSupplyMessage = new GridReplicatedPreloadSupplyMessage<>(message.workerId());
                                                            }
                                                        }
                                                        long expireTime2 = gridCacheSwapEntry.expireTime();
                                                        if (expireTime2 == 0 || expireTime2 > U.currentTimeMillis()) {
                                                            GridCacheEntryInfo<K, V> gridCacheEntryInfo = new GridCacheEntryInfo<>();
                                                            gridCacheEntryInfo.keyBytes((byte[]) entry.getKey());
                                                            gridCacheEntryInfo.valueBytes(gridCacheSwapEntry.valueBytes());
                                                            gridCacheEntryInfo.ttl(gridCacheSwapEntry.ttl());
                                                            gridCacheEntryInfo.expireTime(gridCacheSwapEntry.expireTime());
                                                            gridCacheEntryInfo.version(gridCacheSwapEntry.version());
                                                            if (GridReplicatedPreloadSupplyPool.this.preloadPred == null || GridReplicatedPreloadSupplyPool.this.preloadPred.apply(gridCacheEntryInfo)) {
                                                                gridReplicatedPreloadSupplyMessage.addEntry0(gridCacheEntryInfo, GridReplicatedPreloadSupplyPool.this.cctx);
                                                                if (GridReplicatedPreloadSupplyPool.this.depEnabled && !z2) {
                                                                    Object classLoader = gridCacheSwapEntry.keyClassLoaderId() != null ? GridReplicatedPreloadSupplyPool.this.cctx.deploy().getClassLoader(gridCacheSwapEntry.keyClassLoaderId()) : gridCacheSwapEntry.valueClassLoaderId() != null ? GridReplicatedPreloadSupplyPool.this.cctx.deploy().getClassLoader(gridCacheSwapEntry.valueClassLoaderId()) : null;
                                                                    if (classLoader != null) {
                                                                        if (classLoader instanceof GridDeploymentInfo) {
                                                                            gridReplicatedPreloadSupplyMessage.prepare((GridDeploymentInfo) classLoader);
                                                                            z2 = true;
                                                                        }
                                                                    }
                                                                }
                                                            } else if (GridReplicatedPreloadSupplyPool.this.log.isDebugEnabled()) {
                                                                GridReplicatedPreloadSupplyPool.this.log.debug("Preload predicate evaluated to false (will not send cache entry): " + gridCacheEntryInfo);
                                                            }
                                                        }
                                                    }
                                                }
                                                it2.close();
                                            } catch (Throwable th) {
                                                it2.close();
                                                throw th;
                                                break;
                                            }
                                        }
                                        GridReplicatedPreloadSupplyPool.this.cctx.swap().removeSwapListener(message.partition(), swapListener);
                                        Collection<GridCacheEntryInfo<K, V>> entries = swapListener.entries();
                                        swapListener = null;
                                        for (GridCacheEntryInfo<K, V> gridCacheEntryInfo2 : entries) {
                                            if (gridReplicatedPreloadSupplyMessage.size() >= GridReplicatedPreloadSupplyPool.this.cctx.config().getPreloadBatchSize()) {
                                                if (!reply(richNode, message, gridReplicatedPreloadSupplyMessage)) {
                                                    if (0 != 0) {
                                                        GridReplicatedPreloadSupplyPool.this.cctx.swap().removeSwapListener(message.partition(), null);
                                                        return;
                                                    }
                                                    return;
                                                }
                                                gridReplicatedPreloadSupplyMessage = new GridReplicatedPreloadSupplyMessage<>(message.workerId());
                                            }
                                            if (GridReplicatedPreloadSupplyPool.this.preloadPred == null || GridReplicatedPreloadSupplyPool.this.preloadPred.apply(gridCacheEntryInfo2)) {
                                                gridReplicatedPreloadSupplyMessage.addEntry(gridCacheEntryInfo2, GridReplicatedPreloadSupplyPool.this.cctx);
                                            } else if (GridReplicatedPreloadSupplyPool.this.log.isDebugEnabled()) {
                                                GridReplicatedPreloadSupplyPool.this.log.debug("Preload predicate evaluated to false (will not send cache entry): " + gridCacheEntryInfo2);
                                            }
                                        }
                                    }
                                    if (!z) {
                                        gridReplicatedPreloadSupplyMessage.last(true);
                                        reply(richNode, message, gridReplicatedPreloadSupplyMessage);
                                    }
                                    if (swapListener != null) {
                                        GridReplicatedPreloadSupplyPool.this.cctx.swap().removeSwapListener(message.partition(), swapListener);
                                    }
                                } catch (Throwable th2) {
                                    if (swapListener != null) {
                                        GridReplicatedPreloadSupplyPool.this.cctx.swap().removeSwapListener(message.partition(), swapListener);
                                    }
                                    throw th2;
                                }
                            } catch (GridInterruptedException e) {
                                throw e;
                            } catch (GridException e2) {
                                U.error(GridReplicatedPreloadSupplyPool.this.log, "Failed to send supply message to node: " + richNode.id(), e2);
                                if (swapListener != null) {
                                    GridReplicatedPreloadSupplyPool.this.cctx.swap().removeSwapListener(message.partition(), swapListener);
                                }
                            }
                        } catch (GridException e3) {
                            U.error(GridReplicatedPreloadSupplyPool.this.log, "Failed to wait until partition is released: " + message.partition(), e3);
                        }
                    } else {
                        try {
                            reply(richNode, message, new GridReplicatedPreloadSupplyMessage<>(message.workerId(), true));
                        } catch (GridException e4) {
                            U.error(GridReplicatedPreloadSupplyPool.this.log, "Failed to send supply message to node: " + richNode.id(), e4);
                        }
                    }
                } else if (GridReplicatedPreloadSupplyPool.this.log.isDebugEnabled()) {
                    GridReplicatedPreloadSupplyPool.this.log.debug("Received message from non-existing node (will ignore): " + demandMessage);
                }
            }
        }

        private boolean reply(GridNode gridNode, GridReplicatedPreloadDemandMessage<K, V> gridReplicatedPreloadDemandMessage, GridReplicatedPreloadSupplyMessage<K, V> gridReplicatedPreloadSupplyMessage) throws GridException {
            try {
                GridReplicatedPreloadSupplyPool.this.cctx.io().sendOrderedMessage(gridNode, gridReplicatedPreloadDemandMessage.topic(), GridReplicatedPreloadSupplyPool.this.cctx.io().messageId(gridReplicatedPreloadDemandMessage.topic(), gridNode.id()), gridReplicatedPreloadSupplyMessage, gridReplicatedPreloadDemandMessage.timeout());
                if (!GridReplicatedPreloadSupplyPool.this.log.isDebugEnabled()) {
                    return true;
                }
                GridReplicatedPreloadSupplyPool.this.log.debug("Replied to demand message [node=" + gridNode.id() + ", demand=" + gridReplicatedPreloadDemandMessage + ", supply=" + gridReplicatedPreloadSupplyMessage + ']');
                return true;
            } catch (GridTopologyException e) {
                if (!GridReplicatedPreloadSupplyPool.this.log.isDebugEnabled()) {
                    return false;
                }
                GridReplicatedPreloadSupplyPool.this.log.debug("Failed to send supply message because node left grid: " + gridNode.id());
                return false;
            }
        }
    }

    /* loaded from: input_file:org/gridgain/grid/kernal/processors/cache/distributed/replicated/preloader/GridReplicatedPreloadSupplyPool$SwapListener.class */
    private class SwapListener<K, V> implements GridCacheSwapListener<K, V> {
        private final Map<K, GridCacheEntryInfo<K, V>> swappedEntries;

        private SwapListener() {
            this.swappedEntries = new GridConcurrentHashMap();
        }

        @Override // org.gridgain.grid.kernal.processors.cache.GridCacheSwapListener
        public void onEntryUnswapped(int i, K k, byte[] bArr, V v, byte[] bArr2, GridCacheVersion gridCacheVersion, long j, long j2) {
            if (GridReplicatedPreloadSupplyPool.this.log.isDebugEnabled()) {
                GridReplicatedPreloadSupplyPool.this.log.debug("Received unswapped event for key: " + k);
            }
            GridCacheEntryInfo<K, V> gridCacheEntryInfo = new GridCacheEntryInfo<>();
            gridCacheEntryInfo.keyBytes(bArr);
            gridCacheEntryInfo.value(v);
            gridCacheEntryInfo.valueBytes(bArr2);
            gridCacheEntryInfo.ttl(j);
            gridCacheEntryInfo.expireTime(j2);
            gridCacheEntryInfo.version(gridCacheVersion);
            if (gridCacheEntryInfo == null || gridCacheEntryInfo.value() == null) {
                return;
            }
            this.swappedEntries.put(k, gridCacheEntryInfo);
        }

        Collection<GridCacheEntryInfo<K, V>> entries() {
            return this.swappedEntries.values();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GridReplicatedPreloadSupplyPool(GridCacheContext<K, V> gridCacheContext, GridAbsPredicate gridAbsPredicate, ReadWriteLock readWriteLock) {
        if (!$assertionsDisabled && gridCacheContext == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && gridAbsPredicate == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && readWriteLock == null) {
            throw new AssertionError();
        }
        this.cctx = gridCacheContext;
        this.preloadFinished = gridAbsPredicate;
        this.busyLock = readWriteLock;
        this.log = gridCacheContext.logger(getClass());
        int preloadThreadPoolSize = gridCacheContext.preloadEnabled() ? gridCacheContext.config().getPreloadThreadPoolSize() : 0;
        for (int i = 0; i < preloadThreadPoolSize; i++) {
            this.workers.add(new SupplyWorker());
        }
        gridCacheContext.io().addHandler(GridReplicatedPreloadDemandMessage.class, new CI2<UUID, GridReplicatedPreloadDemandMessage<K, V>>() { // from class: org.gridgain.grid.kernal.processors.cache.distributed.replicated.preloader.GridReplicatedPreloadSupplyPool.1
            @Override // org.gridgain.grid.lang.GridInClosure2
            public void apply(UUID uuid, GridReplicatedPreloadDemandMessage<K, V> gridReplicatedPreloadDemandMessage) {
                GridReplicatedPreloadSupplyPool.this.addMessage(uuid, gridReplicatedPreloadDemandMessage);
            }
        });
        this.depEnabled = gridCacheContext.gridDeploy().enabled();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        Iterator<GridReplicatedPreloadSupplyPool<K, V>.SupplyWorker> it = this.workers.iterator();
        while (it.hasNext()) {
            new GridThread(this.cctx.gridName(), "preloader-supply-worker", it.next()).start();
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Started supply pool: " + this.workers.size());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        U.cancel(this.workers);
        U.join(this.workers, this.log);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void preloadPredicate(GridPredicate<GridCacheEntryInfo<K, V>> gridPredicate) {
        this.preloadPred = gridPredicate;
    }

    int poolSize() {
        return this.cctx.config().getPreloadThreadPoolSize();
    }

    private boolean enterBusy() {
        if (this.busyLock.readLock().tryLock()) {
            return true;
        }
        if (!this.log.isDebugEnabled()) {
            return false;
        }
        this.log.debug("Failed to enter to busy state (supplier is stopping): " + this.cctx.nodeId());
        return false;
    }

    private void leaveBusy() {
        this.busyLock.readLock().unlock();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addMessage(UUID uuid, GridReplicatedPreloadDemandMessage<K, V> gridReplicatedPreloadDemandMessage) {
        if (enterBusy()) {
            try {
                if (this.cctx.preloadEnabled()) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Received demand message [node=" + uuid + ", msg=" + gridReplicatedPreloadDemandMessage + ']');
                    }
                    this.queue.offer(new DemandMessage<>(uuid, gridReplicatedPreloadDemandMessage));
                } else {
                    U.warn(this.log, "Received demand message when preloading is disabled (will ignore): " + gridReplicatedPreloadDemandMessage);
                }
            } finally {
                leaveBusy();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> T take(BlockingQueue<T> blockingQueue, GridWorker gridWorker) throws InterruptedException {
        if (!$assertionsDisabled && gridWorker == null) {
            throw new AssertionError();
        }
        if (gridWorker.isCancelled()) {
            Thread.currentThread().interrupt();
        }
        return blockingQueue.take();
    }

    static {
        $assertionsDisabled = !GridReplicatedPreloadSupplyPool.class.desiredAssertionStatus();
    }
}
