package org.apache.ignite.internal.streamer;

import java.util.BitSet;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.gridgain.shaded.org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/streamer/StreamerSubscriber.class */
public class StreamerSubscriber<T, E, V, R, P> implements Flow.Subscriber<E> {
    private final StreamerBatchSender<V, P, R> batchSender;

    @Nullable
    private final Flow.Subscriber<R> resultSubscriber;
    private final Function<E, T> keyFunc;
    private final Function<E, V> payloadFunc;
    private final Function<E, Boolean> deleteFunc;
    private final StreamerPartitionAwarenessProvider<T, P> partitionAwarenessProvider;
    private final StreamerOptions options;
    private final CompletableFuture<Void> completionFut = new CompletableFuture<>();
    private final AtomicInteger pendingItemCount = new AtomicInteger();
    private final AtomicInteger inFlightItemCount = new AtomicInteger();
    private final ConcurrentHashMap<P, StreamerBuffer<V>> buffers = new ConcurrentHashMap<>();
    private final ConcurrentMap<P, CompletableFuture<Collection<R>>> pendingRequests = new ConcurrentHashMap();
    private final IgniteLogger log;
    private final StreamerMetricSink metrics;
    private final ScheduledExecutorService flushExecutor;

    @Nullable
    private Flow.Subscription subscription;

    @Nullable
    private ResultSubscription resultSubscription;

    @Nullable
    private ScheduledFuture<?> flushTask;
    private boolean closed;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/streamer/StreamerSubscriber$ResultSubscription.class */
    public static class ResultSubscription implements Flow.Subscription {
        AtomicBoolean cancelled = new AtomicBoolean();

        private ResultSubscription() {
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void request(long j) {
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void cancel() {
            this.cancelled.set(true);
        }
    }

    public StreamerSubscriber(StreamerBatchSender<V, P, R> streamerBatchSender, @Nullable Flow.Subscriber<R> subscriber, Function<E, T> function, Function<E, V> function2, Function<E, Boolean> function3, StreamerPartitionAwarenessProvider<T, P> streamerPartitionAwarenessProvider, StreamerOptions streamerOptions, ScheduledExecutorService scheduledExecutorService, IgniteLogger igniteLogger, @Nullable StreamerMetricSink streamerMetricSink) {
        if (!$assertionsDisabled && streamerBatchSender == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && function == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && function2 == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && streamerPartitionAwarenessProvider == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && streamerOptions == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && scheduledExecutorService == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && igniteLogger == null) {
            throw new AssertionError();
        }
        this.batchSender = streamerBatchSender;
        this.resultSubscriber = subscriber;
        this.keyFunc = function;
        this.payloadFunc = function2;
        this.deleteFunc = function3;
        this.partitionAwarenessProvider = streamerPartitionAwarenessProvider;
        this.options = streamerOptions;
        this.flushExecutor = scheduledExecutorService;
        this.log = igniteLogger;
        this.metrics = getMetrics(streamerMetricSink);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public synchronized void onSubscribe(Flow.Subscription subscription) {
        if (this.subscription != null) {
            throw new IllegalStateException("Subscription is already set.");
        }
        this.subscription = subscription;
        if (this.resultSubscriber != null) {
            this.resultSubscription = new ResultSubscription();
            this.resultSubscriber.onSubscribe(this.resultSubscription);
        }
        this.partitionAwarenessProvider.refreshAsync().whenComplete((r5, th) -> {
            if (th != null) {
                this.log.error("Failed to refresh schemas and partition assignment: " + th.getMessage(), th);
                close(th);
            } else {
                initFlushTimer();
                requestMore();
            }
        });
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(E e) {
        this.pendingItemCount.decrementAndGet();
        this.buffers.computeIfAbsent(this.partitionAwarenessProvider.partition(this.keyFunc.apply(e)), obj -> {
            return new StreamerBuffer(this.options.pageSize(), (list, bitSet) -> {
                enlistBatch(obj, list, bitSet);
            });
        }).add(this.payloadFunc.apply(e), this.deleteFunc.apply(e).booleanValue());
        this.metrics.streamerItemsQueuedAdd(1L);
        requestMore();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        close(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        close(null);
    }

    public CompletableFuture<Void> completionFuture() {
        return this.completionFut;
    }

    private void enlistBatch(P p, Collection<V> collection, BitSet bitSet) {
        int size = collection.size();
        if (!$assertionsDisabled && size <= 0) {
            throw new AssertionError("Batch size must be positive.");
        }
        if (!$assertionsDisabled && p == null) {
            throw new AssertionError("Partition must not be null.");
        }
        this.inFlightItemCount.addAndGet(size);
        this.metrics.streamerBatchesActiveAdd(1L);
        this.pendingRequests.compute(p, (obj, completableFuture) -> {
            return completableFuture == null ? sendBatch(obj, collection, bitSet) : completableFuture.thenCompose(collection2 -> {
                return sendBatch(obj, collection, bitSet);
            });
        });
    }

    private CompletableFuture<Collection<R>> sendBatch(P p, Collection<V> collection, BitSet bitSet) {
        try {
            return this.batchSender.sendAsync(p, collection, bitSet).whenComplete((collection2, th) -> {
                if (th != null) {
                    this.log.error("Failed to send batch to partition " + p + ": " + th.getMessage(), th);
                    close(th);
                    return;
                }
                int size = collection.size();
                this.metrics.streamerBatchesSentAdd(1L);
                this.metrics.streamerBatchesActiveAdd(-1L);
                this.metrics.streamerItemsSentAdd(size);
                this.metrics.streamerItemsQueuedAdd(-size);
                this.inFlightItemCount.addAndGet(-size);
                requestMore();
                this.partitionAwarenessProvider.refreshAsync().exceptionally(th -> {
                    this.log.error("Failed to refresh schemas and partition assignment: " + th.getMessage(), th);
                    close(th);
                    return null;
                });
                invokeResultSubscriber(collection2);
            });
        } catch (Exception e) {
            this.log.error("Failed to send batch to partition " + p + ": " + e.getMessage(), e);
            close(e);
            return CompletableFuture.failedFuture(e);
        }
    }

    private void invokeResultSubscriber(Collection<R> collection) {
        ResultSubscription resultSubscription;
        if (collection == null || this.resultSubscriber == null || (resultSubscription = resultSubscription()) == null) {
            return;
        }
        for (R r : collection) {
            if (resultSubscription.cancelled.get()) {
                return;
            } else {
                this.resultSubscriber.onNext(r);
            }
        }
    }

    @Nullable
    private synchronized ResultSubscription resultSubscription() {
        return this.resultSubscription;
    }

    private synchronized void close(@Nullable Throwable th) {
        if (this.flushTask != null) {
            this.flushTask.cancel(false);
        }
        Flow.Subscription subscription = this.subscription;
        if (subscription != null) {
            subscription.cancel();
        }
        if (th == null) {
            this.buffers.values().forEach((v0) -> {
                v0.flushAndClose();
            });
            CompletableFuture.allOf((CompletableFuture[]) this.pendingRequests.values().toArray(new CompletableFuture[0])).whenComplete((r4, th2) -> {
                if (th2 != null) {
                    if (this.resultSubscriber != null) {
                        this.resultSubscriber.onError(th2);
                    }
                    this.completionFut.completeExceptionally(th2);
                } else {
                    if (this.resultSubscriber != null) {
                        this.resultSubscriber.onComplete();
                    }
                    this.completionFut.complete(null);
                }
            });
        } else {
            this.completionFut.completeExceptionally(th);
            if (this.resultSubscriber != null) {
                this.resultSubscriber.onError(th);
            }
        }
        this.closed = true;
    }

    private synchronized void requestMore() {
        if (this.closed || this.subscription == null) {
            return;
        }
        int max = (((Math.max(1, this.buffers.size()) * this.options.pageSize()) * this.options.perPartitionParallelOperations()) - this.inFlightItemCount.get()) - this.pendingItemCount.get();
        if (max <= 0) {
            return;
        }
        this.subscription.request(max);
        this.pendingItemCount.addAndGet(max);
    }

    private synchronized void initFlushTimer() {
        int autoFlushInterval;
        if (!this.closed && (autoFlushInterval = this.options.autoFlushInterval()) > 0) {
            this.flushTask = this.flushExecutor.scheduleAtFixedRate(this::flushBuffers, autoFlushInterval, autoFlushInterval, TimeUnit.MILLISECONDS);
        }
    }

    private void flushBuffers() {
        this.buffers.values().forEach((v0) -> {
            v0.flush();
        });
    }

    private static StreamerMetricSink getMetrics(@Nullable StreamerMetricSink streamerMetricSink) {
        return streamerMetricSink != null ? streamerMetricSink : new StreamerMetricSink() { // from class: org.apache.ignite.internal.streamer.StreamerSubscriber.1
            @Override // org.apache.ignite.internal.streamer.StreamerMetricSink
            public void streamerBatchesSentAdd(long j) {
            }

            @Override // org.apache.ignite.internal.streamer.StreamerMetricSink
            public void streamerItemsSentAdd(long j) {
            }

            @Override // org.apache.ignite.internal.streamer.StreamerMetricSink
            public void streamerBatchesActiveAdd(long j) {
            }

            @Override // org.apache.ignite.internal.streamer.StreamerMetricSink
            public void streamerItemsQueuedAdd(long j) {
            }
        };
    }

    static {
        $assertionsDisabled = !StreamerSubscriber.class.desiredAssertionStatus();
    }
}
