package org.apache.ignite.stream.mqtt;

import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.StopStrategy;
import com.github.rholder.retry.WaitStrategies;
import com.github.rholder.retry.WaitStrategy;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.stream.StreamAdapter;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/* loaded from: input_file:org/apache/ignite/stream/mqtt/MqttStreamer.class */
public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
    private IgniteLogger log;
    private MqttClient client;
    private String brokerUrl;
    private String topic;
    private Integer qualityOfService;
    private List<String> topics;
    private List<Integer> qualitiesOfService;
    private String clientId;
    private MqttClientPersistence persistence;
    private MqttConnectOptions connectOptions;
    private Integer disconnectQuiesceTimeout;
    private boolean disconnectForcibly;
    private Integer disconnectForciblyTimeout;
    private MqttStreamer<K, V>.MqttConnectionRetrier connectionRetrier;
    private boolean blockUntilConnected;
    private String cachedLogValues;
    private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait();
    private StopStrategy retryStopStrategy = StopStrategies.neverStop();
    private volatile boolean stopped = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/stream/mqtt/MqttStreamer$MqttConnectionRetrier.class */
    public class MqttConnectionRetrier {
        private final Retryer<Void> retrier;
        private final ExecutorService exec = Executors.newSingleThreadExecutor();

        public MqttConnectionRetrier(Retryer<Void> retryer) {
            this.retrier = retryer;
        }

        public void connect() {
            Future submit = this.exec.submit((Callable) this.retrier.wrap(new Callable<Void>() { // from class: org.apache.ignite.stream.mqtt.MqttStreamer.MqttConnectionRetrier.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    if (MqttStreamer.this.client.isConnected() || MqttStreamer.this.stopped) {
                        return null;
                    }
                    if (MqttStreamer.this.connectOptions == null) {
                        MqttStreamer.this.client.connect();
                    } else {
                        MqttStreamer.this.client.connect(MqttStreamer.this.connectOptions);
                    }
                    if (MqttStreamer.this.qualitiesOfService.isEmpty()) {
                        MqttStreamer.this.client.subscribe((String[]) MqttStreamer.this.topics.toArray(new String[0]));
                    } else {
                        int[] iArr = new int[MqttStreamer.this.qualitiesOfService.size()];
                        for (int i = 0; i < MqttStreamer.this.qualitiesOfService.size(); i++) {
                            iArr[i] = ((Integer) MqttStreamer.this.qualitiesOfService.get(i)).intValue();
                        }
                        MqttStreamer.this.client.subscribe((String[]) MqttStreamer.this.topics.toArray(new String[0]), iArr);
                    }
                    if (!MqttStreamer.this.log.isInfoEnabled()) {
                        return null;
                    }
                    MqttStreamer.this.log.info("MQTT Streamer (re-)connected and subscribed " + MqttStreamer.this.cachedLogValues);
                    return null;
                }
            }));
            if (MqttStreamer.this.blockUntilConnected) {
                try {
                    submit.get();
                } catch (Throwable th) {
                    throw new RuntimeException(th);
                }
            }
        }

        public void stop() {
            this.exec.shutdownNow();
        }
    }

    public void start() throws IgniteException {
        if (!this.stopped) {
            throw new IgniteException("Attempted to start an already started MQTT Streamer");
        }
        this.topics = this.topics == null ? new ArrayList<>() : this.topics;
        this.qualitiesOfService = this.qualitiesOfService == null ? new ArrayList<>() : this.qualitiesOfService;
        try {
            HashMap hashMap = new HashMap();
            A.notNull(getStreamer(), "streamer");
            A.notNull(getIgnite(), "ignite");
            A.ensure((getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null) ? false : true, "tuple extractor missing");
            A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide both single and multiple tuple extractor");
            A.notNullOrEmpty(this.brokerUrl, "broker URL");
            if (this.clientId == null || this.clientId.length() == 0) {
                this.clientId = MqttClient.generateClientId();
            }
            if (this.topic != null && this.topic.length() > 0 && !this.topics.isEmpty() && this.topics.size() != 1 && !this.topics.get(0).equals(this.topic)) {
                throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time.");
            }
            if (this.qualityOfService != null && !this.qualitiesOfService.isEmpty() && this.qualitiesOfService.size() != 1 && !this.qualitiesOfService.get(0).equals(this.qualityOfService)) {
                throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time.");
            }
            if (this.disconnectForcibly && this.disconnectQuiesceTimeout != null) {
                A.notNull(this.disconnectForciblyTimeout, "disconnect timeout cannot be null when disconnecting forcibly with quiesce");
            }
            if (this.topics.isEmpty()) {
                this.topics.add(this.topic);
                if (this.qualityOfService != null) {
                    this.qualitiesOfService.add(this.qualityOfService);
                }
                hashMap.put("topic", this.topic);
            } else {
                Iterator<String> it = this.topics.iterator();
                while (it.hasNext()) {
                    A.notNullOrEmpty(it.next(), "topic in list of topics");
                }
                A.ensure(this.qualitiesOfService.isEmpty() || this.qualitiesOfService.size() == this.topics.size(), "qualities of service must be either empty or have the same size as topics list");
                hashMap.put("topics", this.topics);
            }
            hashMap.put("brokerUrl", this.brokerUrl);
            hashMap.put("clientId", this.clientId);
            this.cachedLogValues = "[" + Joiner.on(", ").withKeyValueSeparator("=").join(hashMap) + "]";
            this.log = getIgnite().log();
            if (this.persistence == null) {
                this.client = new MqttClient(this.brokerUrl, this.clientId);
            } else {
                this.client = new MqttClient(this.brokerUrl, this.clientId, this.persistence);
            }
            this.client.setCallback(this);
            this.stopped = false;
            this.connectionRetrier = new MqttConnectionRetrier(RetryerBuilder.newBuilder().retryIfResult(new Predicate<Void>() { // from class: org.apache.ignite.stream.mqtt.MqttStreamer.1
                public boolean apply(Void r3) {
                    return (MqttStreamer.this.client.isConnected() || MqttStreamer.this.stopped) ? false : true;
                }
            }).retryIfException().retryIfRuntimeException().withWaitStrategy(this.retryWaitStrategy).withStopStrategy(this.retryStopStrategy).build());
            if (this.log.isInfoEnabled()) {
                this.log.info("Starting MQTT Streamer " + this.cachedLogValues);
            }
            this.connectionRetrier.connect();
        } catch (Exception e) {
            throw new IgniteException("Failed to initialize MQTT Streamer.", e);
        }
    }

    public void stop() throws IgniteException {
        if (this.stopped) {
            throw new IgniteException("Failed to stop MQTT Streamer (already stopped).");
        }
        this.connectionRetrier.stop();
        try {
            if (this.disconnectForcibly) {
                if (this.disconnectQuiesceTimeout == null && this.disconnectForciblyTimeout == null) {
                    this.client.disconnectForcibly();
                } else if (this.disconnectForciblyTimeout == null || this.disconnectQuiesceTimeout != null) {
                    this.client.disconnectForcibly(this.disconnectQuiesceTimeout.intValue(), this.disconnectForciblyTimeout.intValue());
                } else {
                    this.client.disconnectForcibly(this.disconnectForciblyTimeout.intValue());
                }
            } else if (this.disconnectQuiesceTimeout == null) {
                this.client.disconnect();
            } else {
                this.client.disconnect(this.disconnectQuiesceTimeout.intValue());
            }
            this.client.close();
            this.stopped = true;
        } catch (Exception e) {
            throw new IgniteException("Failed to stop Exception while stopping MQTT Streamer.", e);
        }
    }

    public void connectionLost(Throwable th) {
        if (this.stopped) {
            return;
        }
        this.log.warning(String.format("MQTT Connection to broker was lost [brokerUrl=%s, type=%s, err=%s]", this.brokerUrl, th.getClass(), th.getMessage()));
        this.connectionRetrier.connect();
    }

    public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
        if (getMultipleTupleExtractor() != null) {
            Map extract = getMultipleTupleExtractor().extract(mqttMessage);
            if (this.log.isTraceEnabled()) {
                this.log.trace("Adding cache entries: " + extract);
            }
            getStreamer().addData(extract);
            return;
        }
        Map.Entry extract2 = getSingleTupleExtractor().extract(mqttMessage);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Adding cache entry: " + extract2);
        }
        getStreamer().addData(extract2);
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }

    public void setBrokerUrl(String str) {
        this.brokerUrl = str;
    }

    public String getBrokerUrl() {
        return this.brokerUrl;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setQualityOfService(Integer num) {
        this.qualityOfService = num;
    }

    public Integer getQualityOfService() {
        return this.qualityOfService;
    }

    public void setTopics(List<String> list) {
        this.topics = list;
    }

    public List<String> getTopics() {
        return this.topics;
    }

    public void setQualitiesOfService(List<Integer> list) {
        this.qualitiesOfService = list;
    }

    public List<Integer> getQualitiesOfService() {
        return this.qualitiesOfService;
    }

    public void setClientId(String str) {
        this.clientId = str;
    }

    public String getClientId() {
        return this.clientId;
    }

    public MqttClientPersistence getPersistence() {
        return this.persistence;
    }

    public void setPersistence(MqttClientPersistence mqttClientPersistence) {
        this.persistence = mqttClientPersistence;
    }

    public MqttConnectOptions getConnectOptions() {
        return this.connectOptions;
    }

    public void setConnectOptions(MqttConnectOptions mqttConnectOptions) {
        this.connectOptions = mqttConnectOptions;
    }

    public void setDisconnectForcibly(boolean z) {
        this.disconnectForcibly = z;
    }

    public boolean isDisconnectForcibly() {
        return this.disconnectForcibly;
    }

    public void setDisconnectQuiesceTimeout(Integer num) {
        this.disconnectQuiesceTimeout = num;
    }

    public Integer getDisconnectQuiesceTimeout() {
        return this.disconnectQuiesceTimeout;
    }

    public void setDisconnectForciblyTimeout(Integer num) {
        this.disconnectForciblyTimeout = num;
    }

    public Integer getDisconnectForciblyTimeout() {
        return this.disconnectForciblyTimeout;
    }

    public void setRetryWaitStrategy(WaitStrategy waitStrategy) {
        this.retryWaitStrategy = waitStrategy;
    }

    public WaitStrategy getRetryWaitStrategy() {
        return this.retryWaitStrategy;
    }

    public void setRetryStopStrategy(StopStrategy stopStrategy) {
        this.retryStopStrategy = stopStrategy;
    }

    public StopStrategy getRetryStopStrategy() {
        return this.retryStopStrategy;
    }

    public void setBlockUntilConnected(boolean z) {
        this.blockUntilConnected = z;
    }

    public boolean isBlockUntilConnected() {
        return this.blockUntilConnected;
    }

    public boolean isStopped() {
        return this.stopped;
    }

    public boolean isConnected() {
        return this.client.isConnected();
    }
}
