/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.kafka.source;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.EventType;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.gridgain.kafka.CacheEntry;
import org.gridgain.kafka.LogFormat;
import org.gridgain.kafka.SystemEvent;
import org.gridgain.kafka.source.CacheEntryRetriever;
import org.gridgain.kafka.source.FailoverPolicy;
import org.gridgain.kafka.source.Offsets;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class QueryRetriever
implements CacheEntryRetriever {
    private static final Logger log = LoggerFactory.getLogger(QueryRetriever.class);
    private final Function<String, IgniteCache<BinaryObject, BinaryObject>> cacheGetter;
    private final Map<String, CacheQuery> cacheQryMap = Collections.synchronizedMap(new HashMap());
    private final Consumer<CacheEntry> entryProcessor;
    private final boolean shallProcRemovals;
    private final boolean shallProcExpired;
    private final boolean shallLoadInitData;
    private final Predicate<CacheEntry> customFilter;

    QueryRetriever(Collection<String> cacheNames, Offsets offsets, Function<String, IgniteCache<BinaryObject, BinaryObject>> cacheGetter, FailoverPolicy failoverPlc, boolean shallLoadInitData, boolean shallProcRemovals, boolean shallProcExpired, Consumer<CacheEntry> entryProcessor, @Nullable Predicate<CacheEntry> filter) {
        this.cacheGetter = cacheGetter;
        this.shallProcRemovals = shallProcRemovals;
        this.shallProcExpired = shallProcExpired;
        this.entryProcessor = entryProcessor;
        this.shallLoadInitData = shallLoadInitData;
        this.customFilter = filter;
        Predicate<String> shallReallyLoadInitData = cache -> offsets.isDefined((String)cache) ? failoverPlc == FailoverPolicy.FULL_SNAPSHOT : shallLoadInitData;
        for (String cache2 : cacheNames) {
            CacheQuery q = null;
            try {
                q = new CacheQuery(cacheGetter.apply(cache2), entryProcessor, this.createRemoteFilter(filter, cache2), shallReallyLoadInitData.test(cache2), this.shallProcExpired, filter, cache2);
            }
            catch (Exception ex) {
                log.warn(LogFormat.message(SystemEvent.CACHE_QUERY_FAILURE, ex));
            }
            if (q == null) continue;
            this.cacheQryMap.put(cache2, q);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reconfigure(Collection<String> rmvCaches, Collection<String> addedCaches) {
        Map<String, CacheQuery> map = this.cacheQryMap;
        synchronized (map) {
            rmvCaches.forEach(c -> {
                CacheQuery q = this.cacheQryMap.remove(c);
                if (q != null) {
                    q.close();
                }
            });
            addedCaches.forEach(c -> {
                if (!this.cacheQryMap.containsKey(c)) {
                    this.cacheQryMap.put((String)c, new CacheQuery(this.cacheGetter.apply((String)c), this.entryProcessor, this.createRemoteFilter(this.customFilter, (String)c), this.shallLoadInitData, this.shallProcExpired, this.customFilter, (String)c));
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Map<String, CacheQuery> map = this.cacheQryMap;
        synchronized (map) {
            for (CacheQuery q : this.cacheQryMap.values()) {
                U.closeQuiet((AutoCloseable)q);
            }
        }
    }

    @NotNull
    private CacheEntryEventFilter<Object, Object> createRemoteFilter(@Nullable Predicate<CacheEntry> customFilter, String cacheName) {
        CacheEntryEventFilter evtTypeFilter = this.shallProcRemovals ? (CacheEntryEventFilter & Serializable)evt -> true : (CacheEntryEventFilter & Serializable)evt -> evt.getEventType() != EventType.REMOVED;
        return customFilter == null ? evtTypeFilter : (CacheEntryEventFilter & Serializable)evt -> evtTypeFilter.evaluate(evt) && customFilter.test(new CacheEntry(cacheName, (Cache.Entry<?, ?>)evt));
    }

    private static class CacheQuery
    implements AutoCloseable {
        private final QueryCursor<Cache.Entry<Object, Object>> cur;

        CacheQuery(IgniteCache<BinaryObject, BinaryObject> cache, Consumer<CacheEntry> entryProcessor, @NotNull CacheEntryEventFilter<Object, Object> remoteFilter, boolean shallLoadInitData, boolean shallProcExpired, @Nullable Predicate<CacheEntry> customFilter, String cacheName) {
            Query<Cache.Entry<Object, Object>> qry = CacheQuery.createCacheQuery(entryProcessor, shallLoadInitData, shallProcExpired, remoteFilter, customFilter, cacheName);
            this.cur = this.executeCacheQuery(entryProcessor, qry, cache);
        }

        @Override
        public void close() {
            this.cur.close();
        }

        private static Query<Cache.Entry<Object, Object>> createCacheQuery(Consumer<CacheEntry> entryProcessor, boolean shallLoadInitData, boolean shallProcExpired, @NotNull CacheEntryEventFilter<Object, Object> remoteFilter, @Nullable Predicate<CacheEntry> customFilter, String cacheName) {
            ContinuousQuery qry = new ContinuousQuery();
            qry.setRemoteFilterFactory((Factory & Serializable)() -> remoteFilter);
            qry.setIncludeExpired(shallProcExpired);
            qry.setLocalListener(evts -> {
                for (CacheEntryEvent evt : evts) {
                    Object key = evt.getKey();
                    Object val = evt.getEventType() == EventType.REMOVED || evt.getEventType() == EventType.EXPIRED ? null : evt.getValue();
                    entryProcessor.accept(new CacheEntry(evt.getSource().getName(), key, val));
                }
            });
            if (shallLoadInitData) {
                qry.setInitialQuery((Query)new ScanQuery((IgniteBiPredicate & Serializable)(key, val) -> customFilter == null || customFilter.test(new CacheEntry(cacheName, key, val))));
            }
            return qry;
        }

        private QueryCursor<Cache.Entry<Object, Object>> executeCacheQuery(Consumer<CacheEntry> entryProcessor, Query<Cache.Entry<Object, Object>> qry, IgniteCache<BinaryObject, BinaryObject> cache) {
            QueryCursor cur = cache.query(qry);
            for (Cache.Entry cacheEntry : cur) {
                entryProcessor.accept(new CacheEntry(cache.getName(), cacheEntry));
            }
            return cur;
        }
    }
}

