/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.kafka.source;

import java.util.Collection;
import java.util.function.Predicate;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.gridgain.kafka.CacheEntry;
import org.gridgain.kafka.IgniteConnectUtils;
import org.gridgain.kafka.LogFormat;
import org.gridgain.kafka.SystemEvent;
import org.gridgain.kafka.source.AvailableCachesMonitor;
import org.gridgain.kafka.source.BacklogService;
import org.gridgain.kafka.source.CacheEntryRetriever;
import org.gridgain.kafka.source.FailoverPolicy;
import org.gridgain.kafka.source.IgniteSourceConnectorConfig;
import org.gridgain.kafka.source.Offsets;
import org.gridgain.kafka.source.QueryRetriever;
import org.jetbrains.annotations.Nullable;

class BacklogServiceImpl
implements BacklogService,
Service {
    private static final long serialVersionUID = 0L;
    private final long cacheListPollInterval;
    private final Collection<String> cacheWhitelist;
    private final Collection<String> cacheBlacklist;
    private final String backlogCacheName;
    private final int backlogCacheBackups;
    private final boolean shallLoadInitData;
    private final boolean shallProcRemovals;
    private final boolean shallProcExpired;
    private final Class<?> cacheFilterCls;
    private final String backlogDataRegionName;
    private final int backlogFlushFreq;
    @LoggerResource(categoryClass=BacklogService.class)
    private IgniteLogger log;
    @IgniteInstanceResource
    private Ignite ignite;
    private CacheEntryRetriever cacheEntryRetriever = null;
    private IgniteDataStreamer<Long, CacheEntry> backlogStreamer;
    private IgniteAtomicSequence offSeq;
    private AvailableCachesMonitor availableCachesMonitor;

    BacklogServiceImpl(IgniteSourceConnectorConfig cfg) {
        this.cacheListPollInterval = cfg.cacheListPollInterval();
        this.cacheWhitelist = cfg.cacheWhitelist();
        this.cacheBlacklist = cfg.cacheBlacklist();
        this.backlogCacheName = cfg.backlogCacheName();
        this.shallLoadInitData = cfg.shallLoadInitialData();
        this.shallProcRemovals = cfg.shallProcessRemovals();
        this.shallProcExpired = cfg.shallProcessExpired();
        this.cacheFilterCls = cfg.cacheFilter();
        this.backlogDataRegionName = cfg.backlogDataRegionName();
        this.backlogFlushFreq = cfg.backlogFlushFreq();
        this.backlogCacheBackups = cfg.backlogCacheBackups();
    }

    public void init(ServiceContext ctx) {
        this.createBacklog();
        this.availableCachesMonitor = new AvailableCachesMonitor(diff -> this.cacheEntryRetriever.reconfigure(diff.removed(), diff.added()), () -> ((Ignite)this.ignite).cacheNames(), this.cacheListPollInterval, this.cacheWhitelist, this.cacheBlacklist, this.backlogCacheName);
        this.cacheEntryRetriever = new QueryRetriever(this.availableCachesMonitor.cacheNames(), new Offsets(), cacheName -> this.ignite.cache(cacheName).withKeepBinary(), FailoverPolicy.BACKLOG, this.shallLoadInitData, this.shallProcRemovals, this.shallProcExpired, this::addEntry, this.createFilter(this.cacheFilterCls));
        this.availableCachesMonitor.start();
    }

    public void cancel(ServiceContext ctx) {
        if (this.availableCachesMonitor != null) {
            this.availableCachesMonitor.shutdown();
            try {
                this.availableCachesMonitor.join(60000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        U.close((AutoCloseable)this.cacheEntryRetriever, (IgniteLogger)this.log);
        U.close(this.backlogStreamer, (IgniteLogger)this.log);
        if (this.offSeq != null) {
            this.offSeq.close();
        }
        try {
            this.ignite.destroyCache(this.backlogCacheName);
        }
        catch (Exception ex) {
            this.log.warning(LogFormat.message(SystemEvent.BACKLOG_FAILURE, ex));
        }
    }

    public void execute(ServiceContext ctx) {
    }

    @Override
    public void test() {
    }

    private void addEntry(CacheEntry entry) {
        try {
            this.backlogStreamer.addData((Object)this.offSeq.getAndIncrement(), (Object)entry);
        }
        catch (Exception ex) {
            this.log.warning(LogFormat.message(SystemEvent.BACKLOG_FAILURE, ex));
        }
    }

    private void createBacklog() {
        CacheConfiguration cacheCfg = new CacheConfiguration(this.backlogCacheName).setSqlSchema("public").setIndexedTypes(new Class[]{Long.class, CacheEntry.class}).setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(this.backlogCacheBackups).setCacheMode(CacheMode.PARTITIONED).setDataRegionName(this.backlogDataRegionName);
        this.ignite.getOrCreateCache(cacheCfg).close();
        this.offSeq = this.ignite.atomicSequence("offset", 0L, true);
        this.backlogStreamer = this.ignite.dataStreamer(this.backlogCacheName);
        this.backlogStreamer.autoFlushFrequency((long)this.backlogFlushFreq);
    }

    @Nullable
    public Predicate<CacheEntry> createFilter(Class<?> cls) {
        try {
            return IgniteConnectUtils.createFilter(cls);
        }
        catch (Exception ex) {
            this.log.error(LogFormat.message(SystemEvent.CREATE_FILTER_FAILURE, ex));
            return null;
        }
    }
}

