package org.apache.ignite.stream.rocketmq;

import java.util.List;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.stream.StreamAdapter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

/* loaded from: input_file:org/apache/ignite/stream/rocketmq/RocketMQStreamer.class */
public class RocketMQStreamer<K, V> extends StreamAdapter<List<MessageExt>, K, V> implements MessageListenerConcurrently {
    private IgniteLogger log;
    private DefaultMQPushConsumer consumer;
    private volatile boolean stopped = true;
    private String topic;
    private String consumerGrp;
    private String nameSrvAddr;

    public void start() {
        if (!this.stopped) {
            throw new IgniteException("Attempted to start an already started RocketMQ streamer");
        }
        A.notNull(getStreamer(), "streamer");
        A.notNull(getIgnite(), "ignite");
        A.notNull(this.topic, "topic");
        A.notNull(this.consumerGrp, "consumer group");
        A.notNullOrEmpty(this.nameSrvAddr, "nameserver address");
        A.ensure(null != getMultipleTupleExtractor(), "Multiple tuple extractor must be configured");
        this.log = getIgnite().log();
        this.consumer = new DefaultMQPushConsumer(this.consumerGrp);
        this.consumer.setNamesrvAddr(this.nameSrvAddr);
        this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        try {
            this.consumer.subscribe(this.topic, "*");
            this.consumer.registerMessageListener(this);
            try {
                this.consumer.start();
                this.stopped = false;
            } catch (MQClientException e) {
                throw new IgniteException("Failed to start the streamer", e);
            }
        } catch (MQClientException e2) {
            throw new IgniteException("Failed to subscribe to " + this.topic, e2);
        }
    }

    public void stop() {
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        this.stopped = true;
    }

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Received " + list.size() + " messages");
        }
        addMessage(list);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public void setConsumerGrp(String str) {
        this.consumerGrp = str;
    }

    public void setNameSrvAddr(String str) {
        this.nameSrvAddr = str;
    }
}
