package org.gridgain.grid.kernal.processors.cache.query.continuous;

import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridInterruptedException;
import org.gridgain.grid.GridSystemProperties;
import org.gridgain.grid.cache.GridCacheEntry;
import org.gridgain.grid.cache.query.GridCacheContinuousQuery;
import org.gridgain.grid.kernal.GridTopic;
import org.gridgain.grid.kernal.processors.cache.GridCacheManager;
import org.gridgain.grid.lang.utils.GridConcurrentHashMap;
import org.gridgain.grid.lang.utils.GridConcurrentLinkedDeque;
import org.gridgain.grid.thread.GridThread;
import org.gridgain.grid.typedef.internal.U;
import org.gridgain.grid.util.worker.GridWorker;

/* loaded from: input_file:org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager.class */
public class GridCacheContinuousQueryManager<K, V> extends GridCacheManager<K, V> {
    private static final String TOPIC_PREFIX = "CONTINUOUS_QUERY";
    private static final long FORCE_UNWIND_INTERVAL = 1000;
    private static final int MAX_BUF_SIZE;
    private final ConcurrentMap<UUID, ListenerInfo<K, V>> lsnrs = new GridConcurrentHashMap();
    private final AtomicInteger lsnrCnt = new AtomicInteger();
    private final AtomicLong seq = new AtomicLong();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Collection<GridThread> threads = new HashSet();
    private volatile GridConcurrentLinkedDeque<GridCacheContinuousQueryEntry<K, V>> buf = new GridConcurrentLinkedDeque<>();
    private BlockingQueue<Queue<GridCacheContinuousQueryEntry<K, V>>> queue;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryManager$ListenerInfo.class */
    private static class ListenerInfo<K, V> {
        private final GridCacheContinuousQueryListener<K, V> lsnr;
        private Collection<GridCacheContinuousQueryEntry<K, V>> pending;

        private ListenerInfo(GridCacheContinuousQueryListener<K, V> gridCacheContinuousQueryListener) {
            this.pending = new LinkedList();
            this.lsnr = gridCacheContinuousQueryListener;
        }

        void onEntryUpdate(GridCacheContinuousQueryEntry<K, V> gridCacheContinuousQueryEntry) {
            boolean z = true;
            synchronized (this) {
                if (this.pending != null) {
                    this.pending.add(gridCacheContinuousQueryEntry);
                    z = false;
                }
            }
            if (z) {
                this.lsnr.onEntryUpdate(gridCacheContinuousQueryEntry);
            }
        }

        void onIterate(GridCacheContinuousQueryEntry<K, V> gridCacheContinuousQueryEntry) {
            this.lsnr.onEntryUpdate(gridCacheContinuousQueryEntry);
        }

        void flushPending() {
            Collection<GridCacheContinuousQueryEntry<K, V>> collection;
            synchronized (this) {
                collection = this.pending;
                this.pending = null;
            }
            Iterator<GridCacheContinuousQueryEntry<K, V>> it = collection.iterator();
            while (it.hasNext()) {
                this.lsnr.onEntryUpdate(it.next());
            }
        }
    }

    @Override // org.gridgain.grid.kernal.processors.cache.GridCacheManager
    protected void start0() throws GridException {
        this.queue = new LinkedBlockingQueue(this.cctx.config().getContinuousQueryQueueSize() / MAX_BUF_SIZE);
        this.threads.add(new GridThread(new GridWorker(this.cctx.gridName(), "continuous-query-notifier", this.log) { // from class: org.gridgain.grid.kernal.processors.cache.query.continuous.GridCacheContinuousQueryManager.1
            @Override // org.gridgain.grid.util.worker.GridWorker
            protected void body() {
                while (!isCancelled()) {
                    try {
                        Queue queue = (Queue) GridCacheContinuousQueryManager.this.queue.take();
                        while (true) {
                            GridCacheContinuousQueryEntry<K, V> gridCacheContinuousQueryEntry = (GridCacheContinuousQueryEntry) queue.poll();
                            if (gridCacheContinuousQueryEntry != null) {
                                Iterator<V> it = GridCacheContinuousQueryManager.this.lsnrs.values().iterator();
                                while (it.hasNext()) {
                                    ((ListenerInfo) it.next()).onEntryUpdate(gridCacheContinuousQueryEntry);
                                }
                            }
                        }
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            }
        }));
        this.threads.add(new GridThread(new GridWorker(this.cctx.gridName(), "continuous-query-unwinder", this.log) { // from class: org.gridgain.grid.kernal.processors.cache.query.continuous.GridCacheContinuousQueryManager.2
            @Override // org.gridgain.grid.util.worker.GridWorker
            protected void body() {
                while (!isCancelled()) {
                    try {
                        U.sleep(1000L);
                        GridCacheContinuousQueryManager.this.unwind(true);
                    } catch (GridInterruptedException e) {
                        return;
                    }
                }
            }
        }));
        U.startThreads(this.threads);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.gridgain.grid.kernal.processors.cache.GridCacheManager
    public void stop0(boolean z, boolean z2) {
        U.interrupt(this.threads);
        U.joinThreads(this.threads, this.log);
    }

    public GridCacheContinuousQuery<K, V> createQuery() {
        return new GridCacheContinuousQueryAdapter(this.cctx, GridTopic.TOPIC_CACHE.topic(TOPIC_PREFIX, this.cctx.localNodeId(), this.seq.getAndIncrement()));
    }

    public void onEntryUpdate(K k, V v) {
        if (this.lsnrCnt.get() > 0) {
            this.lock.readLock().lock();
            try {
                this.buf.add(new GridCacheContinuousQueryEntry<>(k, v));
                this.lock.readLock().unlock();
            } catch (Throwable th) {
                this.lock.readLock().unlock();
                throw th;
            }
        }
    }

    public void unwind(boolean z) {
        if (z || this.buf.sizex() >= MAX_BUF_SIZE) {
            this.lock.writeLock().lock();
            try {
                try {
                    this.queue.put(this.buf);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                this.buf = new GridConcurrentLinkedDeque<>();
                this.lock.writeLock().unlock();
            } catch (Throwable th) {
                this.lock.writeLock().unlock();
                throw th;
            }
        }
    }

    public int currentQueueSize() {
        return (MAX_BUF_SIZE * this.queue.size()) + this.buf.sizex();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean registerListener(UUID uuid, GridCacheContinuousQueryListener<K, V> gridCacheContinuousQueryListener) {
        boolean z = this.lsnrs.putIfAbsent(uuid, new ListenerInfo<>(gridCacheContinuousQueryListener)) == null;
        if (z) {
            this.lsnrCnt.incrementAndGet();
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void iterate(UUID uuid) {
        ListenerInfo<K, V> listenerInfo = this.lsnrs.get(uuid);
        if (!$assertionsDisabled && listenerInfo == null) {
            throw new AssertionError();
        }
        for (GridCacheEntry<K, V> gridCacheEntry : this.cctx.cache().primaryEntrySet()) {
            listenerInfo.onIterate(new GridCacheContinuousQueryEntry<>(gridCacheEntry.getKey(), gridCacheEntry.getValue()));
        }
        listenerInfo.flushPending();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unregisterListener(UUID uuid) {
        if (this.lsnrs.remove(uuid) != null) {
            this.lsnrCnt.decrementAndGet();
        }
    }

    static {
        $assertionsDisabled = !GridCacheContinuousQueryManager.class.desiredAssertionStatus();
        MAX_BUF_SIZE = Integer.getInteger(GridSystemProperties.GG_CONT_QUERY_MAX_BUF_SIZE, 1024).intValue();
    }
}
