package org.apache.ignite.stream.flume;

import java.util.ArrayList;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/ignite/stream/flume/IgniteSink.class */
public class IgniteSink extends AbstractSink implements Configurable {
    private static final Logger log = LoggerFactory.getLogger(IgniteSink.class);
    private static final int DFLT_BATCH_SIZE = 100;
    private String springCfgPath;
    private String cacheName;
    private String eventTransformerCls;
    private int batchSize;
    private SinkCounter sinkCounter;
    private EventTransformer<Event, Object, Object> eventTransformer;
    private Ignite ignite;

    public void configure(Context context) {
        this.springCfgPath = context.getString(IgniteSinkConstants.CFG_PATH);
        this.cacheName = context.getString(IgniteSinkConstants.CFG_CACHE_NAME);
        this.eventTransformerCls = context.getString(IgniteSinkConstants.CFG_EVENT_TRANSFORMER);
        this.batchSize = context.getInteger(IgniteSinkConstants.CFG_BATCH_SIZE, Integer.valueOf(DFLT_BATCH_SIZE)).intValue();
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(getName());
        }
    }

    public synchronized void start() {
        A.notNull(this.springCfgPath, "Ignite config file");
        A.notNull(this.cacheName, "Cache name");
        A.notNull(this.eventTransformerCls, "Event transformer class");
        this.sinkCounter.start();
        try {
            if (this.ignite == null) {
                this.ignite = Ignition.start(this.springCfgPath);
            }
            if (this.eventTransformerCls != null && !this.eventTransformerCls.isEmpty()) {
                this.eventTransformer = (EventTransformer) Class.forName(this.eventTransformerCls).newInstance();
            }
            super.start();
        } catch (Exception e) {
            log.error("Failed to start grid", e);
            throw new FlumeException("Failed to start grid", e);
        }
    }

    public synchronized void stop() {
        if (this.ignite != null) {
            this.ignite.close();
        }
        this.sinkCounter.stop();
        super.stop();
    }

    public Sink.Status process() throws EventDeliveryException {
        Event take;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        int i = 0;
        try {
            try {
                transaction.begin();
                ArrayList arrayList = new ArrayList(this.batchSize);
                while (i < this.batchSize && (take = channel.take()) != null) {
                    arrayList.add(take);
                    i++;
                }
                if (arrayList.isEmpty()) {
                    this.sinkCounter.incrementBatchEmptyCount();
                } else {
                    this.ignite.cache(this.cacheName).putAll(this.eventTransformer.transform(arrayList));
                    if (arrayList.size() < this.batchSize) {
                        this.sinkCounter.incrementBatchUnderflowCount();
                    } else {
                        this.sinkCounter.incrementBatchCompleteCount();
                    }
                }
                this.sinkCounter.addToEventDrainAttemptCount(arrayList.size());
                transaction.commit();
                this.sinkCounter.addToEventDrainSuccessCount(arrayList.size());
                transaction.close();
                return i == 0 ? Sink.Status.BACKOFF : Sink.Status.READY;
            } catch (Exception e) {
                log.error("Failed to process events", e);
                transaction.rollback();
                throw new EventDeliveryException(e);
            }
        } catch (Throwable th) {
            transaction.close();
            throw th;
        }
    }
}
