package org.gridgain.kafka.source;

import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.gridgain.kafka.CacheEntry;
import org.gridgain.kafka.LogFormat;
import org.gridgain.kafka.SystemEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/gridgain/kafka/source/BacklogServiceImpl.class */
class BacklogServiceImpl implements BacklogService, Service {
    private static final long serialVersionUID = 0;
    private static final Logger log = LoggerFactory.getLogger(BacklogServiceImpl.class);
    private final IgniteSourceConnectorConfig cfg;

    @IgniteInstanceResource
    private Ignite ignite;
    private CacheEntryRetriever cacheEntryRetriever = null;
    private IgniteDataStreamer<Long, CacheEntry> backlogStreamer;
    private IgniteAtomicSequence offSeq;
    private AvailableCachesMonitor availableCachesMonitor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BacklogServiceImpl(IgniteSourceConnectorConfig igniteSourceConnectorConfig) {
        this.cfg = igniteSourceConnectorConfig;
    }

    public void init(ServiceContext serviceContext) {
        createBacklog();
        Consumer consumer = diff -> {
            this.cacheEntryRetriever.reconfigure(diff.removed(), diff.added());
        };
        Ignite ignite = this.ignite;
        ignite.getClass();
        this.availableCachesMonitor = new AvailableCachesMonitor(consumer, ignite::cacheNames, this.cfg.cacheListPollInterval(), this.cfg.cacheWhitelist(), this.cfg.cacheBlacklist(), this.cfg.backlogCacheName());
        Set<String> cacheNames = this.availableCachesMonitor.cacheNames();
        Offsets offsets = new Offsets();
        Ignite ignite2 = this.ignite;
        ignite2.getClass();
        this.cacheEntryRetriever = new QueryRetriever(cacheNames, offsets, ignite2::cache, FailoverPolicy.BACKLOG, this.cfg.shallLoadInitialData(), this.cfg.shallProcessRemovals(), this::addEntry, createFilter(this.cfg.cacheFilter()));
        this.availableCachesMonitor.start();
    }

    public void cancel(ServiceContext serviceContext) {
        if (this.availableCachesMonitor != null) {
            this.availableCachesMonitor.shutdown();
            try {
                this.availableCachesMonitor.join(60000L);
            } catch (InterruptedException e) {
            }
        }
        if (this.cacheEntryRetriever != null) {
            try {
                this.cacheEntryRetriever.close();
            } catch (Exception e2) {
            }
        }
        if (this.backlogStreamer != null) {
            this.backlogStreamer.close();
        }
        if (this.offSeq != null) {
            this.offSeq.close();
        }
        try {
            this.ignite.destroyCache(this.cfg.backlogCacheName());
        } catch (Exception e3) {
            log.warn(LogFormat.message(SystemEvent.BACKLOG_FAILURE, e3));
        }
    }

    public void execute(ServiceContext serviceContext) {
    }

    @Override // org.gridgain.kafka.source.BacklogService
    public void test() {
    }

    private void addEntry(CacheEntry cacheEntry) {
        try {
            this.backlogStreamer.addData(Long.valueOf(this.offSeq.getAndIncrement()), cacheEntry);
        } catch (Exception e) {
            log.warn(LogFormat.message(SystemEvent.BACKLOG_FAILURE, e));
        }
    }

    private void createBacklog() {
        this.ignite.getOrCreateCache(new CacheConfiguration(this.cfg.backlogCacheName()).setSqlSchema("public").setIndexedTypes(new Class[]{Long.class, CacheEntry.class}).setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0).setCacheMode(CacheMode.PARTITIONED).setDataRegionName(this.cfg.backlogDataRegionName())).close();
        this.offSeq = this.ignite.atomicSequence("offset", serialVersionUID, true);
        this.backlogStreamer = this.ignite.dataStreamer(this.cfg.backlogCacheName());
        this.backlogStreamer.autoFlushFrequency(this.cfg.backlogFlushFreq());
    }

    private static Predicate<CacheEntry> createFilter(Class cls) {
        if (cls == null) {
            return null;
        }
        try {
            return (Predicate) cls.newInstance();
        } catch (Exception e) {
            log.error(LogFormat.message(SystemEvent.CREATE_FILTER_FAILURE, e));
            return null;
        }
    }
}
