package org.apache.ignite.stream.zeromq;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.stream.StreamAdapter;
import org.jetbrains.annotations.NotNull;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;

/* loaded from: input_file:org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.class */
public class IgniteZeroMqStreamer<K, V> extends StreamAdapter<byte[], K, V> implements AutoCloseable {
    protected IgniteLogger log;
    private ExecutorService zeroMqExSrv;
    private ZMQ.Context ctx;
    private int ioThreads;
    private int socketType;
    private String addr;
    private byte[] topic;
    private volatile boolean isStarted = false;
    private long timeout = 5000;

    public IgniteZeroMqStreamer(int i, ZeroMqTypeSocket zeroMqTypeSocket, @NotNull String str, byte[] bArr) {
        A.ensure(i > 0, "ioThreads has to larger than 0.");
        A.ensure(!"".equals(str), "addr cannot be empty.");
        A.ensure(zeroMqTypeSocket != null, "socketType has to be specified.");
        this.ioThreads = i;
        this.addr = str;
        this.topic = bArr;
        this.socketType = zeroMqTypeSocket.getType();
    }

    public void start() {
        A.ensure((getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null) ? false : true, "ZeroMq extractor.");
        this.log = getIgnite().log();
        if (this.isStarted) {
            this.log.warning("Attempted to start an already started ZeroMQ streamer");
            return;
        }
        this.isStarted = true;
        this.zeroMqExSrv = Executors.newSingleThreadExecutor();
        this.ctx = ZMQ.context(this.ioThreads);
        this.zeroMqExSrv.execute(() -> {
            ZMQ.Socket socket = this.ctx.socket(this.socketType);
            socket.connect(this.addr);
            if (ZeroMqTypeSocket.SUB.getType() == this.socketType) {
                socket.subscribe(this.topic);
            }
            while (this.isStarted) {
                try {
                    byte[] recv = socket.recv(0);
                    if (ZeroMqTypeSocket.SUB.getType() == this.socketType && socket.hasReceiveMore()) {
                        addMessage(socket.recv(0));
                    } else {
                        addMessage(recv);
                    }
                } catch (ZMQException e) {
                    if (e.getErrorCode() == ZMQ.Error.ETERM.getCode()) {
                        break;
                    }
                }
            }
            socket.close();
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.isStarted = false;
        if (this.ctx != null) {
            this.ctx.close();
        }
        if (this.zeroMqExSrv != null) {
            this.zeroMqExSrv.shutdown();
            try {
                if (!this.zeroMqExSrv.awaitTermination(this.timeout, TimeUnit.MILLISECONDS)) {
                    this.log.warning("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
                }
            } catch (InterruptedException e) {
                this.zeroMqExSrv.shutdownNow();
                this.log.error("Interrupted during shutdown, exiting uncleanly.");
            }
        }
    }
}
