package org.apache.ignite.source.flink;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/ignite/source/flink/IgniteSource.class */
public class IgniteSource extends RichParallelSourceFunction<CacheEvent> {
    private static final long serialVersionUID = 1;
    private static final Logger log = LoggerFactory.getLogger(IgniteSource.class);
    private static final int DFLT_EVT_BATCH_SIZE = 1;
    private static final int DFLT_EVT_BUFFER_TIMEOUT = 10;
    private UUID rmtLsnrId;
    private volatile boolean isRunning;

    @IgniteInstanceResource
    private transient Ignite ignite;
    private final String cacheName;
    private BlockingQueue<CacheEvent> evtBuf = new LinkedBlockingQueue();
    private int evtBatchSize = DFLT_EVT_BATCH_SIZE;
    private int evtBufTimeout = DFLT_EVT_BUFFER_TIMEOUT;
    private final TaskLocalListener locLsnr = new TaskLocalListener();

    /* loaded from: input_file:org/apache/ignite/source/flink/IgniteSource$TaskLocalListener.class */
    private class TaskLocalListener implements IgniteBiPredicate<UUID, CacheEvent> {
        private TaskLocalListener() {
        }

        public boolean apply(UUID uuid, CacheEvent cacheEvent) {
            try {
                if (!IgniteSource.this.evtBuf.offer(cacheEvent, IgniteSource.this.evtBufTimeout, TimeUnit.MILLISECONDS)) {
                    IgniteSource.log.error("Failed to buffer event {}", cacheEvent.name());
                }
                return true;
            } catch (InterruptedException e) {
                IgniteSource.log.error("Failed to buffer event using local task listener {}", cacheEvent.name());
                Thread.currentThread().interrupt();
                return true;
            }
        }
    }

    public void setIgnite(Ignite ignite) {
        this.ignite = ignite;
    }

    public void setEvtBatchSize(int i) {
        this.evtBatchSize = i;
    }

    public void setEvtBufTimeout(int i) {
        this.evtBufTimeout = i;
    }

    TaskLocalListener getLocLsnr() {
        return this.locLsnr;
    }

    public IgniteSource(String str) {
        this.cacheName = str;
    }

    public void start(IgnitePredicate<CacheEvent> ignitePredicate, int... iArr) {
        A.notNull(this.cacheName, "Cache name");
        TaskRemoteFilter taskRemoteFilter = new TaskRemoteFilter(this.cacheName, ignitePredicate);
        try {
            synchronized (this) {
                if (this.isRunning) {
                    return;
                }
                this.isRunning = true;
                this.rmtLsnrId = this.ignite.events(this.ignite.cluster().forCacheNodes(this.cacheName)).remoteListen(this.locLsnr, taskRemoteFilter, iArr);
            }
        } catch (IgniteException e) {
            log.error("Failed to register event listener!", e);
            throw e;
        }
    }

    public void run(SourceFunction.SourceContext<CacheEvent> sourceContext) {
        ArrayList arrayList = new ArrayList(this.evtBatchSize);
        while (this.isRunning) {
            try {
                CacheEvent poll = this.evtBuf.poll(serialVersionUID, TimeUnit.SECONDS);
                if (poll != null) {
                    arrayList.add(poll);
                }
                if (this.evtBuf.drainTo(arrayList, this.evtBatchSize) > 0) {
                    synchronized (sourceContext.getCheckpointLock()) {
                        Iterator it = arrayList.iterator();
                        while (it.hasNext()) {
                            sourceContext.collect((CacheEvent) it.next());
                        }
                        arrayList.clear();
                    }
                }
            } catch (Exception e) {
                if (X.hasCause(e, new Class[]{InterruptedException.class})) {
                    return;
                }
                log.error("Error while processing cache event of " + this.cacheName, e);
                return;
            }
        }
    }

    public void cancel() {
        synchronized (this) {
            if (this.isRunning) {
                this.isRunning = false;
                if (this.rmtLsnrId != null && this.ignite != null) {
                    this.ignite.events(this.ignite.cluster().forCacheNodes(this.cacheName)).stopRemoteListen(this.rmtLsnrId);
                    this.rmtLsnrId = null;
                }
            }
        }
    }
}
