package org.apache.ignite.internal.table.distributed.storage;

import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher.class */
public abstract class PartitionScanPublisher<T> implements Flow.Publisher<T> {
    private static final AtomicLong CURSOR_ID_GENERATOR = new AtomicLong();
    private final AtomicBoolean subscribed = new AtomicBoolean(false);
    private final InflightBatchRequestTracker inflightBatchRequestTracker;

    /* loaded from: input_file:org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher$InflightBatchRequestTracker.class */
    public interface InflightBatchRequestTracker {
        void onRequestBegin();

        void onRequestEnd();
    }

    /* loaded from: input_file:org/apache/ignite/internal/table/distributed/storage/PartitionScanPublisher$PartitionScanSubscription.class */
    private class PartitionScanSubscription implements Flow.Subscription {
        private static final int INTERNAL_BATCH_SIZE = 10000;
        private final Flow.Subscriber<? super T> subscriber;
        private boolean canceled;
        private long requestedItemsCnt;
        static final /* synthetic */ boolean $assertionsDisabled;
        private final Object lock = new Object();
        private CompletableFuture<Void> serializationFuture = CompletableFutures.nullCompletedFuture();
        private final long scanId = PartitionScanPublisher.CURSOR_ID_GENERATOR.getAndIncrement();

        private PartitionScanSubscription(Flow.Subscriber<? super T> subscriber) {
            this.subscriber = subscriber;
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void request(long j) {
            synchronized (this.lock) {
                if (this.canceled) {
                    return;
                }
                if (j <= 0) {
                    this.serializationFuture = this.serializationFuture.thenRun(() -> {
                        completeSubscription(new IllegalArgumentException(IgniteStringFormatter.format("Invalid amount of items requested [requested={}, minValue=1].", new Object[]{Long.valueOf(j)})));
                    });
                    return;
                }
                boolean z = this.requestedItemsCnt == 0;
                this.requestedItemsCnt += j;
                if (this.requestedItemsCnt < 0) {
                    this.requestedItemsCnt = Long.MAX_VALUE;
                }
                if (z) {
                    this.serializationFuture = this.serializationFuture.thenCompose(r3 -> {
                        return retrieveAndProcessBatch();
                    });
                }
            }
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void cancel() {
            synchronized (this.lock) {
                if (this.canceled) {
                    return;
                }
                this.canceled = true;
                this.serializationFuture = this.serializationFuture.thenCompose(r7 -> {
                    return PartitionScanPublisher.this.onClose(true, this.scanId, null);
                });
            }
        }

        private void completeSubscription(@Nullable Throwable th) {
            synchronized (this.lock) {
                if (this.canceled) {
                    return;
                }
                this.canceled = true;
                PartitionScanPublisher.this.onClose(false, this.scanId, th).whenComplete((r5, th2) -> {
                    if (th != null) {
                        this.subscriber.onError(th);
                    } else if (th2 == null) {
                        this.subscriber.onComplete();
                    } else {
                        this.subscriber.onError(th2);
                    }
                });
            }
        }

        private CompletableFuture<Void> retrieveAndProcessBatch() {
            synchronized (this.lock) {
                if (this.canceled) {
                    return CompletableFutures.nullCompletedFuture();
                }
                int min = (int) Math.min(this.requestedItemsCnt, 10000L);
                if (!$assertionsDisabled && min <= 0) {
                    throw new AssertionError(min);
                }
                try {
                    PartitionScanPublisher.this.inflightBatchRequestTracker.onRequestBegin();
                    return PartitionScanPublisher.this.retrieveBatch(this.scanId, min).whenComplete((collection, th) -> {
                        PartitionScanPublisher.this.inflightBatchRequestTracker.onRequestEnd();
                    }).thenAccept(collection2 -> {
                        processBatch(collection2, min);
                    }).whenComplete((r4, th2) -> {
                        if (th2 != null) {
                            completeSubscription(th2);
                        }
                    });
                } catch (TransactionException e) {
                    completeSubscription(e);
                    return CompletableFutures.nullCompletedFuture();
                }
            }
        }

        private void processBatch(Collection<T> collection, int i) {
            if (!$assertionsDisabled && collection == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && collection.size() > i) {
                throw new AssertionError("Rows more than requested " + collection.size() + " " + i);
            }
            Flow.Subscriber<? super T> subscriber = this.subscriber;
            Objects.requireNonNull(subscriber);
            collection.forEach(subscriber::onNext);
            synchronized (this.lock) {
                if (this.canceled) {
                    return;
                }
                if (collection.size() < i) {
                    completeSubscription(null);
                } else {
                    this.requestedItemsCnt -= collection.size();
                    if (this.requestedItemsCnt > 0) {
                        this.serializationFuture = this.serializationFuture.thenCompose(r3 -> {
                            return retrieveAndProcessBatch();
                        });
                    }
                }
            }
        }

        static {
            $assertionsDisabled = !PartitionScanPublisher.class.desiredAssertionStatus();
        }
    }

    public PartitionScanPublisher(InflightBatchRequestTracker inflightBatchRequestTracker) {
        this.inflightBatchRequestTracker = inflightBatchRequestTracker;
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException("Subscriber is null");
        }
        if (!this.subscribed.compareAndSet(false, true)) {
            subscriber.onError(new IllegalStateException("Scan publisher does not support multiple subscriptions."));
        }
        subscriber.onSubscribe(new PartitionScanSubscription(subscriber));
    }

    protected abstract CompletableFuture<Collection<T>> retrieveBatch(long j, int i);

    protected abstract CompletableFuture<Void> onClose(boolean z, long j, @Nullable Throwable th);
}
