package org.gridgain.kafka.source;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.ignite.internal.util.GridStringBuilder;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.gridgain.kafka.DataGrid;
import org.gridgain.kafka.IgniteConnectUtils;
import org.gridgain.kafka.LogFormat;
import org.gridgain.kafka.SystemEvent;
import org.gridgain.kafka.Version;
import org.gridgain.kafka.schema.cache.ResolvedSchemasCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/gridgain/kafka/source/IgniteSourceTask.class */
public class IgniteSourceTask extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(IgniteSourceTask.class);
    private volatile boolean isStarted;
    private IgniteSourceTaskConfig cfg;
    private Map<String, SourceRecordBuilder> recordBuilders;
    private QueueingCacheEntryRetriever cacheEntryRetriever;
    private CacheToTopicMapper cacheToTopicMapper;
    private ResolvedSchemasCache resolvedSchemasCache;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        QueueingCacheEntryRetriever queueingQueryRetriever;
        if (this.isStarted) {
            return;
        }
        this.cfg = new IgniteSourceTaskConfig(map);
        DataGrid.SOURCE.init(this.cfg.igniteCfg());
        List<String> caches = this.cfg.caches();
        if (caches.isEmpty()) {
            throw new ConnectException("IgniteSourceTask must be assigned at least one cache.");
        }
        this.cacheToTopicMapper = CacheToTopicMapperFactory.factorySingleton().newMapper(this.cfg.cacheToTopicMapper(), map);
        if (this.cfg.isSchemaCacheEnabled()) {
            this.resolvedSchemasCache = SchemaCacheFactory.getCache(this.cfg);
        }
        this.recordBuilders = (Map) caches.stream().map(str -> {
            return SourceRecordBuilder.newContextBuilder().cacheName(str).topic(this.cacheToTopicMapper.map(str)).schemaless(this.cfg.isSchemaless()).dynamicSchema(this.cfg.isSchemaDynamic()).validateEntries(this.cfg.isEntriesValidationEnabled()).resolvedSchemasCache(this.resolvedSchemasCache).nullabilityPolicy(this.cfg.nullabilityPolicy()).enumPolicy(SourceEnumSchemaMapper.from(this.cfg)).toRecordBuilder();
        }).collect(Collectors.toMap((v0) -> {
            return v0.cacheName();
        }, sourceRecordBuilder -> {
            return sourceRecordBuilder;
        }));
        Offsets offsets = new Offsets(this.context.offsetStorageReader().offsets(Offsets.kafkaPartitions(caches)));
        if (this.cfg.failoverPolicy() == FailoverPolicy.BACKLOG) {
            queueingQueryRetriever = new BacklogRetriever(caches, this.cfg.backlogCacheName(), this.cfg.batchSize(), offsets);
        } else {
            DataGrid dataGrid = DataGrid.SOURCE;
            dataGrid.getClass();
            queueingQueryRetriever = new QueueingQueryRetriever(caches, dataGrid::cache, this.cfg.failoverPolicy(), this.cfg.shallLoadInitialData(), this.cfg.shallProcessRemovals(), this.cfg.shallProcessExpired(), this.cfg.batchSize(), offsets, IgniteConnectUtils.createFilter(this.cfg.cacheFilter(), log));
        }
        this.cacheEntryRetriever = queueingQueryRetriever;
        this.isStarted = true;
        log.info(LogFormat.message(SystemEvent.SOURCE_TASK_STARTED, "Source task started, caches=" + this.cfg.caches()));
    }

    public List<SourceRecord> poll() throws InterruptedException {
        while (this.isStarted) {
            List<CacheEntryOffset> list = this.cacheEntryRetriever.get();
            if (!list.isEmpty()) {
                if (log.isTraceEnabled()) {
                    GridStringBuilder a = new GridStringBuilder("Polled ").a(list.size()).a(" cache entries");
                    if (S.includeSensitive()) {
                        a.a(" ").a(list);
                    }
                    log.trace(a.toString());
                }
                ArrayList arrayList = new ArrayList(list.size());
                for (CacheEntryOffset cacheEntryOffset : list) {
                    try {
                        arrayList.add(this.recordBuilders.get(cacheEntryOffset.entry().getCache()).build(cacheEntryOffset));
                    } catch (Exception e) {
                        switch (this.cfg.errorHandlingPolicy()) {
                            case LOG_ONLY:
                                log.warn(formatEntryFailureMsg(cacheEntryOffset), e);
                                break;
                            case BACKUP:
                            case STOP_TASK:
                            default:
                                log.warn(formatEntryFailureMsg(cacheEntryOffset));
                                throw e;
                        }
                    } catch (RetriableException e2) {
                        throw e2;
                    }
                }
                if (!arrayList.isEmpty()) {
                    return arrayList;
                }
            }
            Thread.sleep(this.cfg.pollInterval());
        }
        return null;
    }

    private static String formatEntryFailureMsg(CacheEntryOffset cacheEntryOffset) {
        return LogFormat.message(SystemEvent.FAILED_TO_PROCESS_SOURCE_ENTRY, "Failed to process cache entry: " + cacheEntryOffset);
    }

    public void stop() {
        if (this.isStarted) {
            try {
                this.cacheEntryRetriever.close();
            } catch (Exception e) {
            }
            DataGrid.SOURCE.close();
            this.isStarted = false;
            log.info(LogFormat.message(SystemEvent.SOURCE_TASK_STOPPED, "Source task stopped"));
        }
    }
}
