package org.apache.ignite.stream.jms11;

import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.stream.StreamAdapter;

/* loaded from: input_file:org/apache/ignite/stream/jms11/JmsStreamer.class */
public class JmsStreamer<T extends Message, K, V> extends StreamAdapter<T, K, V> {
    private IgniteLogger log;
    private MessageTransformer<T, K, V> transformer;
    private ConnectionFactory connectionFactory;
    private boolean durableSubscription;
    private String durableSubscriptionName;
    private String clientId;
    private Destination destination;
    private String destinationName;
    private boolean transacted;
    private boolean batched;
    private Connection connection;
    private ExceptionListener exceptionListener;
    private ScheduledExecutorService scheduler;
    private int batchClosureSize = 50;
    private long batchClosureMillis = 1000;
    private Class<? extends Destination> destinationType = Queue.class;
    private int threads = 1;
    private volatile boolean stopped = true;
    private Set<Session> sessions = Collections.newSetFromMap(new ConcurrentHashMap());
    private Set<MessageConsumer> consumers = Collections.newSetFromMap(new ConcurrentHashMap());
    private Set<JmsStreamer<T, K, V>.IgniteJmsMessageListener> listeners = Collections.newSetFromMap(new ConcurrentHashMap());

    /* loaded from: input_file:org/apache/ignite/stream/jms11/JmsStreamer$IgniteJmsExceptionListener.class */
    private class IgniteJmsExceptionListener implements ExceptionListener {
        private IgniteJmsExceptionListener() {
        }

        public void onException(JMSException jMSException) {
            U.error(JmsStreamer.this.log, "Caught JMS internal exception.", jMSException);
            if (JmsStreamer.this.exceptionListener != null) {
                JmsStreamer.this.exceptionListener.onException(jMSException);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/stream/jms11/JmsStreamer$IgniteJmsMessageListener.class */
    public class IgniteJmsMessageListener implements MessageListener {
        private Session session;
        private AtomicInteger counter = new AtomicInteger(0);
        private Executor executor;

        public IgniteJmsMessageListener(Session session, boolean z) {
            this.session = session;
            this.executor = z ? Executors.newFixedThreadPool(JmsStreamer.this.threads) : new Executor() { // from class: org.apache.ignite.stream.jms11.JmsStreamer.IgniteJmsMessageListener.1
                @Override // java.util.concurrent.Executor
                public void execute(Runnable runnable) {
                    runnable.run();
                }
            };
        }

        public void onMessage(final Message message) {
            if (JmsStreamer.this.stopped) {
                return;
            }
            this.executor.execute(new Runnable() { // from class: org.apache.ignite.stream.jms11.JmsStreamer.IgniteJmsMessageListener.2
                @Override // java.lang.Runnable
                public void run() {
                    JmsStreamer.this.processMessage(message);
                    if (!JmsStreamer.this.batched) {
                        if (JmsStreamer.this.transacted) {
                            try {
                                IgniteJmsMessageListener.this.session.commit();
                                return;
                            } catch (JMSException e) {
                                JmsStreamer.this.log.warning("Could not commit JMS session (non-batched).", e);
                                return;
                            }
                        }
                        return;
                    }
                    if (JmsStreamer.this.batchClosureSize > 0 && IgniteJmsMessageListener.this.counter.incrementAndGet() >= JmsStreamer.this.batchClosureSize) {
                        try {
                            IgniteJmsMessageListener.this.session.commit();
                            IgniteJmsMessageListener.this.counter.set(0);
                        } catch (Exception e2) {
                            JmsStreamer.this.log.warning("Could not commit JMS session upon completion of batch.", e2);
                        }
                    }
                }
            });
        }

        public void resetBatchCounter() {
            this.counter.set(0);
        }
    }

    public void start() throws IgniteException {
        if (!this.stopped) {
            throw new IgniteException("Attempted to start an already started JMS Streamer");
        }
        try {
            A.notNull(getStreamer(), "streamer");
            A.notNull(getIgnite(), "ignite");
            this.log = getIgnite().log();
            A.notNull(this.transformer, "message transformer");
            A.notNull(this.connectionFactory, "connection factory");
            A.ensure(this.threads > 0, "threads > 0");
            if (this.batched && !this.transacted) {
                this.log.warning("Starting a Batched JMS Streamer without transacted flag = true. Setting it automatically.");
                this.transacted = true;
            }
            if (this.batched) {
                A.ensure(this.batchClosureMillis > 0 || this.batchClosureSize > 0, "at least one of batch closure size or batch closure frequency must be specified when using batch consumption");
            }
            if (this.durableSubscription) {
                A.notNullOrEmpty(this.clientId, "client id is compulsory when using durable subscriptions");
                A.notNullOrEmpty(this.durableSubscriptionName, "durable subscription name is compulsory when using durable subscriptions");
            }
            if (this.destination == null) {
                A.notNull(this.destinationType, "destination type");
                A.ensure(this.destinationType.isAssignableFrom(Queue.class) || this.destinationType.isAssignableFrom(Topic.class), "this streamer can only handle Queues or Topics.");
                A.notNullOrEmpty(this.destinationName, "destination or destination name");
            } else if (this.destination instanceof Queue) {
                this.destinationType = Queue.class;
            } else {
                if (!(this.destination instanceof Topic)) {
                    throw new IllegalArgumentException("Invalid destination object. Can only handle Queues or Topics.");
                }
                this.destinationType = Topic.class;
            }
            this.connection = this.connectionFactory.createConnection();
            if (this.clientId != null && this.clientId.trim().length() > 0) {
                this.connection.setClientID(this.clientId.trim());
            }
            this.connection.setExceptionListener(new IgniteJmsExceptionListener());
            if (this.destinationType == Queue.class) {
                initializeJmsObjectsForQueue();
            } else {
                initializeJmsObjectsForTopic();
            }
            this.stopped = false;
            this.connection.start();
            if (this.batched && this.batchClosureMillis > 0) {
                this.scheduler = Executors.newScheduledThreadPool(1);
                this.scheduler.schedule(new Runnable() { // from class: org.apache.ignite.stream.jms11.JmsStreamer.1
                    @Override // java.lang.Runnable
                    public void run() {
                        for (Session session : JmsStreamer.this.sessions) {
                            try {
                                session.commit();
                                if (JmsStreamer.this.log.isDebugEnabled()) {
                                    JmsStreamer.this.log.debug("Committing session from time-based batch completion [session=" + session + "]");
                                }
                            } catch (JMSException e) {
                                JmsStreamer.this.log.warning("Error while committing session: from batch time-based completion [session=" + session + "]");
                            }
                        }
                        Iterator it = JmsStreamer.this.listeners.iterator();
                        while (it.hasNext()) {
                            ((IgniteJmsMessageListener) it.next()).resetBatchCounter();
                        }
                    }
                }, this.batchClosureMillis, TimeUnit.MILLISECONDS);
            }
        } catch (Throwable th) {
            throw new IgniteException("Exception while initializing JmsStreamer", th);
        }
    }

    public void stop() throws IgniteException {
        if (this.stopped) {
            throw new IgniteException("Attempted to stop an already stopped JMS Streamer");
        }
        try {
            this.stopped = true;
            if (this.scheduler != null && !this.scheduler.isShutdown()) {
                this.scheduler.shutdown();
                this.scheduler = null;
            }
            this.connection.stop();
            this.connection.close();
            Iterator<Session> it = this.sessions.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.sessions.clear();
            this.consumers.clear();
            this.listeners.clear();
        } catch (Throwable th) {
            throw new IgniteException("Exception while stopping JmsStreamer", th);
        }
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public void setTransformer(MessageTransformer<T, K, V> messageTransformer) {
        this.transformer = messageTransformer;
    }

    public void setDestination(Destination destination) {
        this.destination = destination;
    }

    public void setDestinationName(String str) {
        this.destinationName = str;
    }

    public void setDestinationType(Class<? extends Destination> cls) {
        this.destinationType = cls;
    }

    public void setThreads(int i) {
        this.threads = i;
    }

    public void setClientId(String str) {
        this.clientId = str;
    }

    public void setDurableSubscription(boolean z) {
        this.durableSubscription = z;
    }

    public void setTransacted(boolean z) {
        this.transacted = z;
    }

    public void setBatched(boolean z) {
        this.batched = z;
    }

    public void setBatchClosureSize(int i) {
        this.batchClosureSize = i;
    }

    public void setBatchClosureMillis(long j) {
        this.batchClosureMillis = j;
    }

    public void setDurableSubscriptionName(String str) {
        this.durableSubscriptionName = str;
    }

    public void setExceptionListener(ExceptionListener exceptionListener) {
        this.exceptionListener = exceptionListener;
    }

    private void initializeJmsObjectsForTopic() throws JMSException {
        Session createSession = this.connection.createSession(this.transacted, 1);
        Topic topic = this.destination;
        if (this.destination == null) {
            topic = createSession.createTopic(this.destinationName);
        }
        TopicSubscriber createDurableSubscriber = this.durableSubscription ? createSession.createDurableSubscriber(topic, this.durableSubscriptionName) : createSession.createConsumer(topic);
        JmsStreamer<T, K, V>.IgniteJmsMessageListener igniteJmsMessageListener = new IgniteJmsMessageListener(createSession, true);
        createDurableSubscriber.setMessageListener(igniteJmsMessageListener);
        this.consumers.add(createDurableSubscriber);
        this.sessions.add(createSession);
        this.listeners.add(igniteJmsMessageListener);
    }

    private void initializeJmsObjectsForQueue() throws JMSException {
        for (int i = 0; i < this.threads; i++) {
            Session createSession = this.connection.createSession(this.transacted, 1);
            if (this.destination == null) {
                this.destination = createSession.createQueue(this.destinationName);
            }
            MessageConsumer createConsumer = createSession.createConsumer(this.destination);
            JmsStreamer<T, K, V>.IgniteJmsMessageListener igniteJmsMessageListener = new IgniteJmsMessageListener(createSession, false);
            createConsumer.setMessageListener(igniteJmsMessageListener);
            this.consumers.add(createConsumer);
            this.sessions.add(createSession);
            this.listeners.add(igniteJmsMessageListener);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processMessage(T t) {
        IgniteDataStreamer streamer = getStreamer();
        Map<K, V> apply = this.transformer.apply(t);
        if (apply == null || apply.size() == 0) {
            return;
        }
        streamer.addData(apply);
    }
}
