/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.kafka.source;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.kafka.connect.errors.ConnectException;
import org.gridgain.kafka.CacheEntry;
import org.gridgain.kafka.DataGrid;
import org.gridgain.kafka.LogFormat;
import org.gridgain.kafka.SystemEvent;
import org.gridgain.kafka.source.CacheEntryOffset;
import org.gridgain.kafka.source.Offsets;
import org.gridgain.kafka.source.QueueingCacheEntryRetriever;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class BacklogRetriever
implements QueueingCacheEntryRetriever {
    private static final Logger log = LoggerFactory.getLogger(BacklogRetriever.class);
    private IgniteCache<BinaryObject, BinaryObject> backlogCache;
    private final Collection<String> cacheNames;
    private final Offsets offsets;
    private final int batchSize;

    BacklogRetriever(Collection<String> cacheNames, String backlogCacheName, int batchSize, Offsets offsets) {
        this.cacheNames = cacheNames;
        this.batchSize = batchSize;
        this.offsets = offsets;
        for (int i = 1; i <= 600; ++i) {
            this.backlogCache = DataGrid.SOURCE.cache(backlogCacheName);
            if (this.backlogCache != null) break;
            if (i % 10 == 0) {
                log.warn(LogFormat.message(SystemEvent.BACKLOG_FAILURE, "Kafka Backlog is still unavailable."));
            }
            try {
                Thread.sleep(1000L);
                continue;
            }
            catch (InterruptedException e) {
                throw new ConnectException((Throwable)e);
            }
        }
        if (this.backlogCache == null) {
            throw new ConnectException(LogFormat.message(SystemEvent.BACKLOG_FAILURE, "Kafka Backlog is unavailable"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<CacheEntryOffset> get() {
        SqlFieldsQuery qry;
        Collection<String> collection = this.cacheNames;
        synchronized (collection) {
            qry = new SqlFieldsQuery("SELECT _key, cache, key, val FROM CacheEntry c JOIN TABLE(id VARCHAR = ?) i ON c.cache = i.id WHERE _key > ? LIMIT ?").setArgs(this.cacheNames.toArray(), this.offsets.last(), this.batchSize);
        }
        List rows = this.backlogCache.query(qry).getAll();
        ArrayList<CacheEntryOffset> entries = new ArrayList<CacheEntryOffset>(rows.size());
        for (List row : rows) {
            long off = (Long)row.get(0);
            String cache = row.get(1).toString();
            Object key = row.get(2);
            Object val = row.get(3);
            entries.add(new CacheEntryOffset(new CacheEntry(cache, key, val), this.offsets.set(cache, off)));
        }
        return entries;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reconfigure(Collection<String> rmvCaches, Collection<String> addedCaches) {
        Collection<String> collection = this.cacheNames;
        synchronized (collection) {
            if (rmvCaches.size() > 0) {
                this.cacheNames.removeAll(rmvCaches);
            }
            if (addedCaches.size() > 0) {
                this.cacheNames.addAll(addedCaches);
            }
        }
    }

    @Override
    public void close() {
        this.backlogCache.close();
    }
}

