package org.apache.ignite.stream.kafka.connect;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/ignite/stream/kafka/connect/IgniteSourceTask.class */
public class IgniteSourceTask extends SourceTask {
    private static String igniteCfgFile;
    private static String cacheName;
    private static UUID rmtLsnrId;
    private static IgnitePredicate<CacheEvent> filter;
    private static String[] topics;
    private static final Logger log = LoggerFactory.getLogger(IgniteSourceTask.class);
    private static final Object lock = new Object();
    private static int evtBufSize = 100000;
    private static BlockingQueue<CacheEvent> evtBuf = new LinkedBlockingQueue(evtBufSize);
    private static int evtBatchSize = 100;
    private static volatile boolean stopped = true;
    private static TaskLocalListener locLsnr = new TaskLocalListener();
    private static final Map<String, Long> offset = Collections.singletonMap("offset", 0L);
    private static final Map<String, String> srcPartition = Collections.singletonMap("cache", null);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/stream/kafka/connect/IgniteSourceTask$CacheEvt.class */
    public enum CacheEvt {
        CREATED(60),
        DESTROYED(61),
        PUT(63),
        READ(64),
        REMOVED(65),
        LOCKED(66),
        UNLOCKED(67),
        EXPIRED(70);

        private final int id;

        CacheEvt(int i) {
            this.id = i;
        }

        int getId() {
            return this.id;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/stream/kafka/connect/IgniteSourceTask$IgniteGrid.class */
    public static class IgniteGrid {

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/ignite/stream/kafka/connect/IgniteSourceTask$IgniteGrid$Holder.class */
        public static class Holder {
            private static final Ignite IGNITE = Ignition.start(IgniteSourceTask.igniteCfgFile);

            private Holder() {
            }
        }

        private IgniteGrid() {
        }

        private static Ignite getIgnite() {
            return Holder.IGNITE;
        }

        static /* synthetic */ Ignite access$100() {
            return getIgnite();
        }
    }

    /* loaded from: input_file:org/apache/ignite/stream/kafka/connect/IgniteSourceTask$TaskLocalListener.class */
    private static class TaskLocalListener implements IgniteBiPredicate<UUID, CacheEvent> {
        private TaskLocalListener() {
        }

        public boolean apply(UUID uuid, CacheEvent cacheEvent) {
            try {
                if (!IgniteSourceTask.evtBuf.offer(cacheEvent, 10L, TimeUnit.MILLISECONDS)) {
                    IgniteSourceTask.log.error("Failed to buffer event {}", cacheEvent.name());
                }
                return true;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return true;
            }
        }
    }

    /* loaded from: input_file:org/apache/ignite/stream/kafka/connect/IgniteSourceTask$TaskRemoteFilter.class */
    private static class TaskRemoteFilter implements IgnitePredicate<CacheEvent> {

        @IgniteInstanceResource
        Ignite ignite;
        private final String cacheName;

        TaskRemoteFilter(String str) {
            this.cacheName = str;
        }

        public boolean apply(CacheEvent cacheEvent) {
            if (this.ignite.affinity(this.cacheName).isPrimary(this.ignite.cluster().localNode(), cacheEvent.key())) {
                return IgniteSourceTask.filter == null || !IgniteSourceTask.filter.apply(cacheEvent);
            }
            return false;
        }
    }

    public String version() {
        return new IgniteSinkConnector().version();
    }

    public void start(Map<String, String> map) {
        String str;
        synchronized (lock) {
            if (stopped) {
                cacheName = map.get("cacheName");
                igniteCfgFile = map.get("igniteCfg");
                topics = map.get(IgniteSourceConstants.TOPIC_NAMES).split("\\s*,\\s*");
                if (map.containsKey(IgniteSourceConstants.INTL_BUF_SIZE)) {
                    evtBufSize = Integer.parseInt(map.get(IgniteSourceConstants.INTL_BUF_SIZE));
                }
                if (map.containsKey(IgniteSourceConstants.INTL_BATCH_SIZE)) {
                    evtBatchSize = Integer.parseInt(map.get(IgniteSourceConstants.INTL_BATCH_SIZE));
                }
                if (map.containsKey(IgniteSourceConstants.CACHE_FILTER_CLASS) && (str = map.get(IgniteSourceConstants.CACHE_FILTER_CLASS)) != null && !str.isEmpty()) {
                    try {
                        filter = (IgnitePredicate) Class.forName(str).newInstance();
                    } catch (Exception e) {
                        log.error("Failed to instantiate the provided filter! User-enabled filtering is ignored!", e);
                    }
                }
                try {
                    try {
                        rmtLsnrId = IgniteGrid.access$100().events(IgniteGrid.access$100().cluster().forCacheNodes(cacheName)).remoteListen(locLsnr, new TaskRemoteFilter(cacheName), cacheEvents(map.get(IgniteSourceConstants.CACHE_EVENTS)));
                        stopped = false;
                    } catch (Throwable th) {
                        stopped = false;
                        throw th;
                    }
                } catch (Exception e2) {
                    log.error("Failed to register event listener!", e2);
                    throw new ConnectException(e2);
                }
            }
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        ArrayList arrayList = new ArrayList(evtBatchSize);
        ArrayList arrayList2 = new ArrayList(evtBatchSize);
        if (stopped) {
            return arrayList;
        }
        try {
            if (evtBuf.drainTo(arrayList2, evtBatchSize) <= 0) {
                return null;
            }
            Iterator it = arrayList2.iterator();
            while (it.hasNext()) {
                CacheEvent cacheEvent = (CacheEvent) it.next();
                for (String str : topics) {
                    arrayList.add(new SourceRecord(srcPartition, offset, str, (Schema) null, cacheEvent));
                }
            }
            return arrayList;
        } catch (IgniteException e) {
            log.error("Error when polling event queue!", e);
            return null;
        }
    }

    private int[] cacheEvents(String str) throws Exception {
        String[] split = str.split("\\s*,\\s*");
        if (split.length == 0) {
            return EventType.EVTS_CACHE;
        }
        int[] iArr = new int[split.length];
        for (int i = 0; i < split.length; i++) {
            try {
                iArr[i] = CacheEvt.valueOf(split[i].toUpperCase()).getId();
            } catch (Exception e) {
                log.error("Failed to recognize the provided cache event!", e);
                throw new Exception(e);
            }
        }
        return iArr;
    }

    public synchronized void stop() {
        if (stopped) {
            return;
        }
        stopped = true;
        stopRemoteListen();
        IgniteGrid.access$100().close();
    }

    protected void stopRemoteListen() {
        if (rmtLsnrId != null) {
            IgniteGrid.access$100().events(IgniteGrid.access$100().cluster().forCacheNodes(cacheName)).stopRemoteListen(rmtLsnrId);
        }
        rmtLsnrId = null;
    }

    protected static void setStopped(boolean z) {
        stopped = z;
    }
}
