/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.kafka.source;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.ignite.internal.util.GridStringBuilder;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.gridgain.kafka.DataGrid;
import org.gridgain.kafka.IgniteConnectUtils;
import org.gridgain.kafka.LogFormat;
import org.gridgain.kafka.SystemEvent;
import org.gridgain.kafka.Version;
import org.gridgain.kafka.schema.cache.ResolvedSchemasCache;
import org.gridgain.kafka.source.BacklogRetriever;
import org.gridgain.kafka.source.CacheEntryOffset;
import org.gridgain.kafka.source.CacheToTopicMapper;
import org.gridgain.kafka.source.CacheToTopicMapperFactory;
import org.gridgain.kafka.source.FailoverPolicy;
import org.gridgain.kafka.source.IgniteSourceTaskConfig;
import org.gridgain.kafka.source.Offsets;
import org.gridgain.kafka.source.QueueingCacheEntryRetriever;
import org.gridgain.kafka.source.QueueingQueryRetriever;
import org.gridgain.kafka.source.SchemaCacheFactory;
import org.gridgain.kafka.source.SourceEnumSchemaMapper;
import org.gridgain.kafka.source.SourceRecordBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IgniteSourceTask
extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(IgniteSourceTask.class);
    private volatile boolean isStarted;
    private IgniteSourceTaskConfig cfg;
    private Map<String, SourceRecordBuilder> recordBuilders;
    private QueueingCacheEntryRetriever cacheEntryRetriever;
    private CacheToTopicMapper cacheToTopicMapper;
    private ResolvedSchemasCache resolvedSchemasCache;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> cfgMap) {
        if (this.isStarted) {
            return;
        }
        this.cfg = new IgniteSourceTaskConfig(cfgMap);
        DataGrid.SOURCE.init(this.cfg.igniteCfg());
        List<String> cacheNames = this.cfg.caches();
        if (cacheNames.isEmpty()) {
            throw new ConnectException("IgniteSourceTask must be assigned at least one cache.");
        }
        this.cacheToTopicMapper = CacheToTopicMapperFactory.factorySingleton().newMapper(this.cfg.cacheToTopicMapper(), cfgMap);
        if (this.cfg.isSchemaCacheEnabled()) {
            this.resolvedSchemasCache = SchemaCacheFactory.getCache(this.cfg);
        }
        this.recordBuilders = cacheNames.stream().map(cacheName -> SourceRecordBuilder.newContextBuilder().cacheName((String)cacheName).topic(this.cacheToTopicMapper.map((String)cacheName)).schemaless(this.cfg.isSchemaless()).dynamicSchema(this.cfg.isSchemaDynamic()).validateEntries(this.cfg.isEntriesValidationEnabled()).resolvedSchemasCache(this.resolvedSchemasCache).nullabilityPolicy(this.cfg.nullabilityPolicy()).enumPolicy(SourceEnumSchemaMapper.from(this.cfg)).toRecordBuilder()).collect(Collectors.toMap(SourceRecordBuilder::cacheName, rb -> rb));
        Offsets offs = new Offsets(this.context.offsetStorageReader().offsets(Offsets.kafkaPartitions(cacheNames)));
        this.cacheEntryRetriever = this.cfg.failoverPolicy() == FailoverPolicy.BACKLOG ? new BacklogRetriever(cacheNames, this.cfg.backlogCacheName(), this.cfg.batchSize(), offs) : new QueueingQueryRetriever(cacheNames, DataGrid.SOURCE::cache, this.cfg.failoverPolicy(), this.cfg.shallLoadInitialData(), this.cfg.shallProcessRemovals(), this.cfg.shallProcessExpired(), this.cfg.batchSize(), offs, IgniteConnectUtils.createFilter(this.cfg.cacheFilter(), log));
        this.isStarted = true;
        log.info(LogFormat.message(SystemEvent.SOURCE_TASK_STARTED, "Source task started, caches=" + this.cfg.caches()));
    }

    public List<SourceRecord> poll() throws InterruptedException {
        while (this.isStarted) {
            List<CacheEntryOffset> entries = this.cacheEntryRetriever.get();
            if (!entries.isEmpty()) {
                if (log.isTraceEnabled()) {
                    GridStringBuilder logMsg = new GridStringBuilder("Polled ").a(entries.size()).a(" cache entries");
                    if (S.includeSensitive()) {
                        logMsg.a(" ").a(entries);
                    }
                    log.trace(logMsg.toString());
                }
                ArrayList<SourceRecord> records = new ArrayList<SourceRecord>(entries.size());
                block7: for (CacheEntryOffset e : entries) {
                    SourceRecordBuilder recordBuilder = this.recordBuilders.get(e.entry().getCache());
                    try {
                        records.add(recordBuilder.build(e));
                    }
                    catch (RetriableException re) {
                        throw re;
                    }
                    catch (Exception exception) {
                        switch (this.cfg.errorHandlingPolicy()) {
                            case LOG_ONLY: {
                                log.warn(IgniteSourceTask.formatEntryFailureMsg(e), (Throwable)exception);
                                continue block7;
                            }
                        }
                        log.warn(IgniteSourceTask.formatEntryFailureMsg(e));
                        throw exception;
                    }
                }
                if (!records.isEmpty()) {
                    return records;
                }
            }
            Thread.sleep(this.cfg.pollInterval());
        }
        return null;
    }

    private static String formatEntryFailureMsg(CacheEntryOffset e) {
        return LogFormat.message(SystemEvent.FAILED_TO_PROCESS_SOURCE_ENTRY, "Failed to process cache entry: " + e);
    }

    public void stop() {
        if (!this.isStarted) {
            return;
        }
        try {
            this.cacheEntryRetriever.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        DataGrid.SOURCE.close();
        this.isStarted = false;
        log.info(LogFormat.message(SystemEvent.SOURCE_TASK_STOPPED, "Source task stopped"));
    }
}

