package org.apache.ignite.internal.table.distributed.storage;

import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.class */
public abstract class PartitionScanPublisher<T> implements Flow.Publisher<T> {
    private static final AtomicLong CURSOR_ID_GENERATOR = new AtomicLong();
    private final AtomicBoolean subscribed = new AtomicBoolean(false);
    private final InflightBatchRequestTracker inflightBatchRequestTracker;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher$InflightBatchRequestTracker.class */
    public interface InflightBatchRequestTracker {
        void onRequestBegin();

        void onRequestEnd();
    }

    /* loaded from: input_file:org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher$PartitionScanSubscription.class */
    private class PartitionScanSubscription implements Flow.Subscription {
        private final Flow.Subscriber<? super T> subscriber;
        private final AtomicBoolean canceled = new AtomicBoolean(false);
        private final Long scanId = Long.valueOf(PartitionScanPublisher.CURSOR_ID_GENERATOR.getAndIncrement());
        private final AtomicLong requestedItemsCnt = new AtomicLong(0);
        private static final int INTERNAL_BATCH_SIZE = 10000;
        static final /* synthetic */ boolean $assertionsDisabled;

        private PartitionScanSubscription(Flow.Subscriber<? super T> subscriber) {
            this.subscriber = subscriber;
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void request(long j) {
            if (j <= 0) {
                cancel(null, true);
                this.subscriber.onError(new IllegalArgumentException(IgniteStringFormatter.format("Invalid requested amount of items [requested={}, minValue=1].", new Object[]{Long.valueOf(j)})));
            }
            if (!this.canceled.get() && this.requestedItemsCnt.getAndUpdate(j2 -> {
                try {
                    return Math.addExact(j2, j);
                } catch (ArithmeticException e) {
                    return Long.MAX_VALUE;
                }
            }) == 0) {
                scanBatch((int) Math.min(j, 10000L));
            }
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void cancel() {
            cancel(null, true);
        }

        private void cancel(@Nullable Throwable th, boolean z) {
            if (this.canceled.compareAndSet(false, true)) {
                PartitionScanPublisher.this.onClose(Boolean.valueOf(z), this.scanId, th).whenComplete((r4, th2) -> {
                    if (th2 != null) {
                        this.subscriber.onError(th2);
                    } else {
                        this.subscriber.onComplete();
                    }
                });
            }
        }

        private void scanBatch(int i) {
            if (this.canceled.get()) {
                return;
            }
            PartitionScanPublisher.this.inflightBatchRequestTracker.onRequestBegin();
            PartitionScanPublisher.this.retrieveBatch(this.scanId, Integer.valueOf(i)).thenAccept(collection -> {
                if (!$assertionsDisabled && collection == null) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && collection.size() > i) {
                    throw new AssertionError("Rows more then requested " + collection.size() + " " + i);
                }
                PartitionScanPublisher.this.inflightBatchRequestTracker.onRequestEnd();
                Flow.Subscriber<? super T> subscriber = this.subscriber;
                Objects.requireNonNull(subscriber);
                collection.forEach(subscriber::onNext);
                if (collection.size() < i) {
                    cancel(null, false);
                    return;
                }
                long addAndGet = this.requestedItemsCnt.addAndGet(Math.negateExact(collection.size()));
                if (addAndGet > 0) {
                    scanBatch((int) Math.min(addAndGet, 10000L));
                }
            }).exceptionally(th -> {
                PartitionScanPublisher.this.inflightBatchRequestTracker.onRequestEnd();
                cancel(th, false);
                return null;
            });
        }

        static {
            $assertionsDisabled = !PartitionScanPublisher.class.desiredAssertionStatus();
        }
    }

    public PartitionScanPublisher(InflightBatchRequestTracker inflightBatchRequestTracker) {
        this.inflightBatchRequestTracker = inflightBatchRequestTracker;
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException("Subscriber is null");
        }
        if (!this.subscribed.compareAndSet(false, true)) {
            subscriber.onError(new IllegalStateException("Scan publisher does not support multiple subscriptions."));
        }
        subscriber.onSubscribe(new PartitionScanSubscription(subscriber));
    }

    protected abstract CompletableFuture<Collection<T>> retrieveBatch(Long l, Integer num);

    protected abstract CompletableFuture<Void> onClose(Boolean bool, Long l, @Nullable Throwable th);
}
