package org.apache.ignite.ml.inference.builder;

import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.ml.inference.Model;
import org.apache.ignite.ml.inference.parser.ModelParser;
import org.apache.ignite.ml.inference.reader.ModelReader;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;

/* loaded from: input_file:org/apache/ignite/ml/inference/builder/IgniteDistributedModelBuilder.class */
public class IgniteDistributedModelBuilder implements AsyncModelBuilder {
    private static final String INFERENCE_SERVICE_NAME_PATTERN = "inference_service_%s";
    private static final String INFERENCE_REQUEST_QUEUE_NAME_PATTERN = "inference_queue_req_%s";
    private static final String INFERENCE_RESPONSE_QUEUE_NAME_PATTERN = "inference_queue_res_%s";
    private static final int QUEUE_CAPACITY = 100;
    private static final CollectionConfiguration queueCfg = new CollectionConfiguration();
    private final Ignite ignite;
    private final int instances;
    private final int maxPerNode;

    /* loaded from: input_file:org/apache/ignite/ml/inference/builder/IgniteDistributedModelBuilder$DistributedInfModel.class */
    private static class DistributedInfModel<I extends Serializable, O extends Serializable> implements Model<I, Future<O>> {
        private final Ignite ignite;
        private final String suffix;
        private final IgniteQueue<I> reqQueue;
        private final IgniteQueue<O> resQueue;
        private final BlockingQueue<CompletableFuture<O>> futures = new ArrayBlockingQueue(IgniteDistributedModelBuilder.QUEUE_CAPACITY);
        private final ExecutorService receiverThreadPool = Executors.newSingleThreadExecutor();
        private final AtomicBoolean running = new AtomicBoolean(false);
        private volatile Future<?> receiverFut;

        DistributedInfModel(Ignite ignite, String str, ModelReader modelReader, ModelParser<I, O, ?> modelParser, int i, int i2) {
            this.ignite = ignite;
            this.suffix = str;
            this.reqQueue = ignite.queue(String.format(IgniteDistributedModelBuilder.INFERENCE_REQUEST_QUEUE_NAME_PATTERN, str), IgniteDistributedModelBuilder.QUEUE_CAPACITY, IgniteDistributedModelBuilder.queueCfg);
            this.resQueue = ignite.queue(String.format(IgniteDistributedModelBuilder.INFERENCE_RESPONSE_QUEUE_NAME_PATTERN, str), IgniteDistributedModelBuilder.QUEUE_CAPACITY, IgniteDistributedModelBuilder.queueCfg);
            startReceiver();
            startService(modelReader, modelParser, i, i2);
            this.running.set(true);
        }

        @Override // org.apache.ignite.ml.inference.Model
        public Future<O> predict(I i) {
            if (!this.running.get()) {
                throw new IllegalStateException("Inference model is not running");
            }
            CompletableFuture<O> completableFuture = new CompletableFuture<>();
            try {
                this.futures.put(completableFuture);
                this.reqQueue.put(i);
                return completableFuture;
            } catch (InterruptedException e) {
                close();
                throw new RuntimeException(e);
            }
        }

        private void startService(ModelReader modelReader, ModelParser<I, O, ?> modelParser, int i, int i2) {
            this.ignite.services().deployMultiple(String.format(IgniteDistributedModelBuilder.INFERENCE_SERVICE_NAME_PATTERN, this.suffix), new IgniteDistributedInfModelService(modelReader, modelParser, this.suffix), i, i2);
        }

        private void stopService() {
            this.ignite.services().cancel(String.format(IgniteDistributedModelBuilder.INFERENCE_SERVICE_NAME_PATTERN, this.suffix));
        }

        private void startReceiver() {
            this.receiverFut = this.receiverThreadPool.submit(() -> {
                boolean isEmpty;
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        try {
                            this.futures.remove().complete((Serializable) this.resQueue.take());
                        } catch (IllegalStateException e) {
                            if (!this.resQueue.removed()) {
                                throw e;
                            }
                        }
                    } finally {
                        close();
                        while (!this.futures.isEmpty()) {
                            this.futures.remove().cancel(true);
                        }
                    }
                }
                while (true) {
                    if (isEmpty) {
                        return;
                    }
                }
            });
        }

        private void stopReceiver() {
            if (this.receiverFut != null && !this.receiverFut.isDone()) {
                this.receiverFut.cancel(true);
            }
            this.receiverThreadPool.shutdown();
        }

        private void removeQueues() {
            this.reqQueue.close();
            this.resQueue.close();
        }

        @Override // org.apache.ignite.ml.inference.Model, java.lang.AutoCloseable
        public void close() {
            if (this.running.getAndSet(false)) {
                stopService();
                stopReceiver();
                removeQueues();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/ml/inference/builder/IgniteDistributedModelBuilder$IgniteDistributedInfModelService.class */
    public static class IgniteDistributedInfModelService<I extends Serializable, O extends Serializable> implements Service {
        private static final long serialVersionUID = -3596084917874395597L;
        private final ModelReader reader;
        private final ModelParser<I, O, ?> parser;
        private final String suffix;
        private transient IgniteQueue<I> reqQueue;
        private transient IgniteQueue<O> resQueue;
        private transient Model<I, O> mdl;

        IgniteDistributedInfModelService(ModelReader modelReader, ModelParser<I, O, ?> modelParser, String str) {
            this.reader = modelReader;
            this.parser = modelParser;
            this.suffix = str;
        }

        public void init(ServiceContext serviceContext) {
            Ignite localIgnite = Ignition.localIgnite();
            this.reqQueue = localIgnite.queue(String.format(IgniteDistributedModelBuilder.INFERENCE_REQUEST_QUEUE_NAME_PATTERN, this.suffix), IgniteDistributedModelBuilder.QUEUE_CAPACITY, IgniteDistributedModelBuilder.queueCfg);
            this.resQueue = localIgnite.queue(String.format(IgniteDistributedModelBuilder.INFERENCE_RESPONSE_QUEUE_NAME_PATTERN, this.suffix), IgniteDistributedModelBuilder.QUEUE_CAPACITY, IgniteDistributedModelBuilder.queueCfg);
            this.mdl = (Model<I, O>) this.parser.parse(this.reader.read());
        }

        public void execute(ServiceContext serviceContext) {
            while (!serviceContext.isCancelled()) {
                try {
                    try {
                        this.resQueue.put((Serializable) this.mdl.predict((Serializable) this.reqQueue.take()));
                    } catch (IllegalStateException e) {
                        if (!this.resQueue.removed()) {
                            throw e;
                        }
                    }
                } catch (IllegalStateException e2) {
                    if (!this.reqQueue.removed()) {
                        throw e2;
                    }
                }
            }
        }

        public void cancel(ServiceContext serviceContext) {
        }
    }

    public IgniteDistributedModelBuilder(Ignite ignite, int i, int i2) {
        this.ignite = ignite;
        this.instances = i;
        this.maxPerNode = i2;
    }

    @Override // org.apache.ignite.ml.inference.builder.AsyncModelBuilder
    public <I extends Serializable, O extends Serializable> Model<I, Future<O>> build(ModelReader modelReader, ModelParser<I, O, ?> modelParser) {
        return new DistributedInfModel(this.ignite, UUID.randomUUID().toString(), modelReader, modelParser, this.instances, this.maxPerNode);
    }
}
