package org.apache.ignite.stream.twitter;

import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.DefaultStreamingEndpoint;
import com.twitter.hbc.core.endpoint.SitestreamEndpoint;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.endpoint.UserstreamEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.OAuth1;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.stream.StreamAdapter;

/* loaded from: input_file:org/apache/ignite/stream/twitter/TwitterStreamer.class */
public class TwitterStreamer<K, V> extends StreamAdapter<String, K, V> {
    protected IgniteLogger log;
    private Map<String, String> apiParams;
    private String endpointUrl;
    private OAuthSettings oAuthSettings;
    private Client client;
    private ExecutorService tweetStreamProcessor;
    private int threadsCount = 1;
    private final AtomicInteger running = new AtomicInteger();
    private Integer bufferCapacity = 100000;
    private final String SITE_USER_ID_KEY = "follow";

    public TwitterStreamer(OAuthSettings oAuthSettings) {
        this.oAuthSettings = oAuthSettings;
    }

    public void start() {
        if (!this.running.compareAndSet(0, 1)) {
            throw new IgniteException("Attempted to start an already started Twitter Streamer");
        }
        validateConfig();
        this.log = getIgnite().log();
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.bufferCapacity.intValue());
        this.client = getClient(linkedBlockingQueue);
        this.client.connect();
        this.tweetStreamProcessor = Executors.newFixedThreadPool(this.threadsCount);
        for (int i = 0; i < this.threadsCount; i++) {
            this.tweetStreamProcessor.submit(new Callable<Boolean>() { // from class: org.apache.ignite.stream.twitter.TwitterStreamer.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Boolean call() {
                    while (true) {
                        try {
                            TwitterStreamer.this.addMessage((String) linkedBlockingQueue.take());
                        } catch (InterruptedException e) {
                            U.warn(TwitterStreamer.this.log, "Tweets transformation was interrupted", e);
                            return true;
                        }
                    }
                }
            });
        }
    }

    public void stop() {
        if (this.running.get() == 0) {
            throw new IgniteException("Attempted to stop an already stopped Twitter Streamer");
        }
        this.tweetStreamProcessor.shutdownNow();
        this.client.stop();
        this.running.compareAndSet(1, 0);
    }

    protected void validateConfig() {
        A.notNull(getStreamer(), "Streamer");
        A.notNull(getIgnite(), "Ignite");
        A.notNull(this.endpointUrl, "Twitter Streaming API endpoint");
        A.ensure((getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null) ? false : true, "Twitter extractor");
        String str = this.apiParams.get("follow");
        A.ensure(str != null && str.matches("^(\\d+,? ?)+$"), "Site streaming endpoint must provide 'follow' param with value as comma separated numbers");
    }

    protected Client getClient(BlockingQueue<String> blockingQueue) {
        StatusesFilterEndpoint defaultStreamingEndpoint;
        HttpHosts httpHosts;
        String lowerCase = this.endpointUrl.toLowerCase();
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case -1225031712:
                if (lowerCase.equals("/site.json")) {
                    z = 4;
                    break;
                }
                break;
            case -1109618196:
                if (lowerCase.equals("/statuses/sample.json")) {
                    z = 2;
                    break;
                }
                break;
            case -352213540:
                if (lowerCase.equals("/user.json")) {
                    z = 3;
                    break;
                }
                break;
            case 916882919:
                if (lowerCase.equals("/statuses/firehose.json")) {
                    z = true;
                    break;
                }
                break;
            case 2142153950:
                if (lowerCase.equals("/statuses/filter.json")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                defaultStreamingEndpoint = new StatusesFilterEndpoint();
                httpHosts = HttpHosts.STREAM_HOST;
                break;
            case true:
                defaultStreamingEndpoint = new StatusesFirehoseEndpoint();
                httpHosts = HttpHosts.STREAM_HOST;
                break;
            case true:
                defaultStreamingEndpoint = new StatusesSampleEndpoint();
                httpHosts = HttpHosts.STREAM_HOST;
                break;
            case true:
                defaultStreamingEndpoint = new UserstreamEndpoint();
                httpHosts = HttpHosts.USERSTREAM_HOST;
                break;
            case true:
                String remove = this.apiParams.remove("follow");
                ArrayList newArrayList = Lists.newArrayList();
                Iterator it = Splitter.on(',').trimResults().omitEmptyStrings().split(remove).iterator();
                while (it.hasNext()) {
                    newArrayList.add(Long.valueOf((String) it.next()));
                }
                defaultStreamingEndpoint = new SitestreamEndpoint(newArrayList);
                httpHosts = HttpHosts.SITESTREAM_HOST;
                break;
            default:
                defaultStreamingEndpoint = new DefaultStreamingEndpoint(this.endpointUrl, "GET", false);
                httpHosts = HttpHosts.STREAM_HOST;
                break;
        }
        for (Map.Entry<String, String> entry : this.apiParams.entrySet()) {
            defaultStreamingEndpoint.addPostParameter(entry.getKey(), entry.getValue());
        }
        return buildClient(blockingQueue, httpHosts, defaultStreamingEndpoint);
    }

    protected Client buildClient(BlockingQueue<String> blockingQueue, HttpHosts httpHosts, StreamingEndpoint streamingEndpoint) {
        return new ClientBuilder().name("Ignite-Twitter-Client").hosts(httpHosts).authentication(new OAuth1(this.oAuthSettings.getConsumerKey(), this.oAuthSettings.getConsumerSecret(), this.oAuthSettings.getAccessToken(), this.oAuthSettings.getAccessTokenSecret())).endpoint(streamingEndpoint).processor(new StringDelimitedProcessor(blockingQueue)).build();
    }

    public void setApiParams(Map<String, String> map) {
        this.apiParams = map;
    }

    public void setEndpointUrl(String str) {
        this.endpointUrl = str;
    }

    public void setBufferCapacity(Integer num) {
        this.bufferCapacity = num;
    }

    public void setThreadsCount(int i) {
        this.threadsCount = i;
    }
}
