package org.gridgain.kafka.source;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.kafka.connect.errors.ConnectException;
import org.gridgain.kafka.CacheEntry;
import org.gridgain.kafka.DataGrid;
import org.gridgain.kafka.LogFormat;
import org.gridgain.kafka.SystemEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/gridgain/kafka/source/BacklogRetriever.class */
final class BacklogRetriever implements QueueingCacheEntryRetriever {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BacklogRetriever.class);
    private IgniteCache<BinaryObject, BinaryObject> backlogCache;
    private final Collection<String> cacheNames;
    private final Offsets offsets;
    private final int batchSize;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BacklogRetriever(Collection<String> collection, String str, int i, Offsets offsets) {
        this.cacheNames = collection;
        this.batchSize = i;
        this.offsets = offsets;
        for (int i2 = 1; i2 <= 600; i2++) {
            this.backlogCache = DataGrid.SOURCE.cache(str);
            if (this.backlogCache != null) {
                break;
            }
            if (i2 % 10 == 0) {
                log.warn(LogFormat.message(SystemEvent.BACKLOG_FAILURE, "Kafka Backlog is still unavailable."));
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                throw new ConnectException(e);
            }
        }
        if (this.backlogCache == null) {
            throw new ConnectException(LogFormat.message(SystemEvent.BACKLOG_FAILURE, "Kafka Backlog is unavailable"));
        }
    }

    @Override // org.gridgain.kafka.source.QueueingCacheEntryRetriever
    public List<CacheEntryOffset> get() {
        SqlFieldsQuery args;
        synchronized (this.cacheNames) {
            args = new SqlFieldsQuery("SELECT _key, cache, key, val FROM CacheEntry c JOIN TABLE(id VARCHAR = ?) i ON c.cache = i.id WHERE _key > ? LIMIT ?").setArgs(this.cacheNames.toArray(), Long.valueOf(this.offsets.last()), Integer.valueOf(this.batchSize));
        }
        List<List> all = this.backlogCache.query((Query) args).getAll();
        ArrayList arrayList = new ArrayList(all.size());
        for (List list : all) {
            long longValue = ((Long) list.get(0)).longValue();
            String obj = list.get(1).toString();
            arrayList.add(new CacheEntryOffset(new CacheEntry(obj, list.get(2), list.get(3)), this.offsets.set(obj, longValue)));
        }
        return arrayList;
    }

    @Override // org.gridgain.kafka.source.CacheEntryRetriever
    public void reconfigure(Collection<String> collection, Collection<String> collection2) {
        synchronized (this.cacheNames) {
            if (collection.size() > 0) {
                this.cacheNames.removeAll(collection);
            }
            if (collection2.size() > 0) {
                this.cacheNames.addAll(collection2);
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.backlogCache.close();
    }
}
