package org.apache.ignite.internal.continuousquery;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.IntStream;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.MarshallerException;
import org.apache.ignite.lang.util.IgniteNameUtils;
import org.apache.ignite.table.ContinuousQueryOptions;
import org.apache.ignite.table.ContinuousQueryPhysicalTimeWatermark;
import org.apache.ignite.table.ContinuousQueryWatermark;
import org.apache.ignite.table.TableRowEventBatch;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/continuousquery/ContinuousQuery.class */
public class ContinuousQuery<RowT, SchemaT, EntryT> {
    private final Flow.Subscriber<TableRowEventBatch<EntryT>> subscriber;
    private final ContinuousQueryOptions options;
    private final byte eventTypes;
    private final String[] columnNames;
    private final BiFunction<RowT, SchemaT, EntryT> mapper;
    private final ContinuousQueryRequestSender<RowT, SchemaT> requestSender;
    private final AtomicLong requested;
    private final AtomicBoolean cancelled;
    private final int[] partitionSet;
    private final CompletableFuture<ContinuousQueryScanResultWithSchema<RowT, SchemaT>>[] futs;
    private final UUID[] lowerBoundRowIds;
    private final long[] lowerBoundTimestamps;
    private final Executor delayedExecutor;
    private final ContinuousQueryMetricSink metrics;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ContinuousQuery(Flow.Subscriber<TableRowEventBatch<EntryT>> subscriber, @Nullable ContinuousQueryOptions continuousQueryOptions, BiFunction<RowT, SchemaT, EntryT> biFunction, ContinuousQueryRequestSender<RowT, SchemaT> continuousQueryRequestSender, ContinuousQueryMetricSink continuousQueryMetricSink, int i) {
        this(subscriber, continuousQueryOptions, biFunction, continuousQueryRequestSender, continuousQueryMetricSink, i, IntStream.range(0, i).toArray());
    }

    public ContinuousQuery(Flow.Subscriber<TableRowEventBatch<EntryT>> subscriber, @Nullable ContinuousQueryOptions continuousQueryOptions, BiFunction<RowT, SchemaT, EntryT> biFunction, ContinuousQueryRequestSender<RowT, SchemaT> continuousQueryRequestSender, ContinuousQueryMetricSink continuousQueryMetricSink, int i, int[] iArr) {
        this.requested = new AtomicLong(0L);
        this.cancelled = new AtomicBoolean(false);
        if (!$assertionsDisabled && subscriber == null) {
            throw new AssertionError("Subscriber != null");
        }
        if (!$assertionsDisabled && biFunction == null) {
            throw new AssertionError("Mapper != null");
        }
        if (!$assertionsDisabled && continuousQueryRequestSender == null) {
            throw new AssertionError("Request sender != null");
        }
        this.subscriber = subscriber;
        this.options = continuousQueryOptions == null ? ContinuousQueryOptions.DEFAULT : continuousQueryOptions;
        this.eventTypes = ContinuousQueryUtils.encodeEventTypes(this.options.eventTypes());
        this.mapper = biFunction;
        this.requestSender = continuousQueryRequestSender;
        this.metrics = continuousQueryMetricSink;
        this.partitionSet = iArr;
        this.columnNames = parsedColumnNames(this.options.columnNames());
        this.futs = new CompletableFuture[iArr.length];
        this.lowerBoundRowIds = new UUID[i];
        this.lowerBoundTimestamps = new long[i];
        ContinuousQueryWatermark watermark = this.options.watermark();
        if (watermark instanceof ContinuousQueryEventWatermark) {
            ContinuousQueryEventWatermark continuousQueryEventWatermark = (ContinuousQueryEventWatermark) watermark;
            Objects.requireNonNull(continuousQueryEventWatermark.rowIds());
            Objects.requireNonNull(continuousQueryEventWatermark.timestamps());
            if (continuousQueryEventWatermark.rowIds().length != i) {
                throw new IllegalArgumentException("Partition count mismatch in rowIds: expected=" + i + ", actual=" + continuousQueryEventWatermark.rowIds().length);
            }
            if (continuousQueryEventWatermark.timestamps().length != i) {
                throw new IllegalArgumentException("Partition count mismatch in timestamps: expected=" + i + ", actual=" + continuousQueryEventWatermark.timestamps().length);
            }
            for (int i2 : iArr) {
                this.lowerBoundRowIds[i2] = continuousQueryEventWatermark.rowIds()[i2];
                this.lowerBoundTimestamps[i2] = continuousQueryEventWatermark.timestamps()[i2];
            }
        } else {
            UUID uuid = new UUID(0L, 0L);
            long currentTimeMillis = System.currentTimeMillis();
            if (watermark != null) {
                if (!(watermark instanceof ContinuousQueryPhysicalTimeWatermark)) {
                    throw new IllegalArgumentException("Unsupported watermark type: " + watermark.getClass());
                }
                currentTimeMillis = ((ContinuousQueryPhysicalTimeWatermark) watermark).startTime().toEpochMilli();
            }
            long physicalToLong = HybridTimestamp.physicalToLong(currentTimeMillis);
            for (int i3 : iArr) {
                this.lowerBoundRowIds[i3] = uuid;
                this.lowerBoundTimestamps[i3] = physicalToLong;
            }
        }
        this.delayedExecutor = CompletableFuture.delayedExecutor(this.options.pollIntervalMs(), TimeUnit.MILLISECONDS);
    }

    public void run() {
        this.subscriber.onSubscribe(new Flow.Subscription() { // from class: org.apache.ignite.internal.continuousquery.ContinuousQuery.1
            @Override // java.util.concurrent.Flow.Subscription
            public void request(long j) {
                ContinuousQuery.this.requested.addAndGet(j);
            }

            @Override // java.util.concurrent.Flow.Subscription
            public void cancel() {
                ContinuousQuery.this.cancelled.set(true);
            }
        });
        iterate(-1);
    }

    private void iterate(int i) {
        if (this.cancelled.get()) {
            this.subscriber.onComplete();
            return;
        }
        try {
            if (this.requested.get() == 0) {
                CompletableFuture.runAsync(() -> {
                    iterate(i);
                }, this.delayedExecutor);
                return;
            }
            if (i < 0) {
                for (int i2 = 0; i2 < this.partitionSet.length; i2++) {
                    int i3 = this.partitionSet[i2];
                    this.futs[i2] = this.requestSender.sendContinuousQueryRequest(new ContinuousQueryRequest(i3, this.lowerBoundTimestamps[i3], this.lowerBoundRowIds[i3], this.options.pageSize(), this.eventTypes, this.columnNames));
                    this.metrics.continuousQueryRequestsSentAdd(1L);
                }
            }
            CompletableFuture.allOf(this.futs).handle((r5, th) -> {
                if (th != null) {
                    this.subscriber.onError(IgniteExceptionMapperUtil.mapToPublicException(ExceptionUtils.unwrapCause(th)));
                    return null;
                }
                try {
                    iterateInner(i);
                    return null;
                } catch (Throwable th) {
                    this.subscriber.onError(IgniteExceptionMapperUtil.mapToPublicException(th));
                    return null;
                }
            });
        } catch (Throwable th2) {
            this.subscriber.onError(IgniteExceptionMapperUtil.mapToPublicException(th2));
        }
    }

    private void iterateInner(int i) throws Exception {
        if (this.cancelled.get()) {
            this.subscriber.onComplete();
            return;
        }
        int max = Math.max(i, 0);
        boolean z = false;
        ArrayList arrayList = new ArrayList();
        for (int i2 = max; i2 < this.partitionSet.length; i2++) {
            int i3 = this.partitionSet[i2];
            CompletableFuture<ContinuousQueryScanResultWithSchema<RowT, SchemaT>> completableFuture = this.futs[i2];
            if (!$assertionsDisabled && !completableFuture.isDone()) {
                throw new AssertionError("fut.isDone()");
            }
            ContinuousQueryScanResultWithSchema<RowT, SchemaT> join = completableFuture.join();
            if (!$assertionsDisabled && join == null) {
                throw new AssertionError("cqRes != null");
            }
            if (!join.result().rows().isEmpty()) {
                List<? extends RowUpdateInfo<RowT>> rows = join.result().rows();
                arrayList.clear();
                this.metrics.continuousQueryEventsReceivedAdd(rows.size());
                try {
                    TableRowEventBatchImpl tableRowEventBatchImpl = new TableRowEventBatchImpl(arrayList, rows, this.lowerBoundRowIds, this.lowerBoundTimestamps, i3);
                    for (int i4 = 0; i4 < rows.size(); i4++) {
                        try {
                            if (this.cancelled.get()) {
                                this.subscriber.onComplete();
                                tableRowEventBatchImpl.close();
                                RowUpdateInfo<RowT> rowUpdateInfo = rows.get(rows.size() - 1);
                                this.lowerBoundTimestamps[i3] = rowUpdateInfo.timestamp().longValue();
                                this.lowerBoundRowIds[i3] = rowUpdateInfo.rowUuid();
                                return;
                            }
                            RowUpdateInfo<RowT> rowUpdateInfo2 = rows.get(i4);
                            EntryT map = map(rowUpdateInfo2.row(), join.schema());
                            EntryT map2 = map(rowUpdateInfo2.oldRow(), join.schema());
                            this.lowerBoundTimestamps[i3] = rowUpdateInfo2.timestamp().longValue();
                            this.lowerBoundRowIds[i3] = rowUpdateInfo2.rowUuid();
                            arrayList.add(new TableRowEventImpl(map, map2, tableRowEventBatchImpl, i4, rowUpdateInfo2.timestamp().longValue()));
                        } finally {
                        }
                    }
                    this.subscriber.onNext(tableRowEventBatchImpl);
                    if (this.requested.decrementAndGet() <= 0 && i2 < this.partitionSet.length - 1) {
                        iterate(i2 + 1);
                        tableRowEventBatchImpl.close();
                        RowUpdateInfo<RowT> rowUpdateInfo3 = rows.get(rows.size() - 1);
                        this.lowerBoundTimestamps[i3] = rowUpdateInfo3.timestamp().longValue();
                        this.lowerBoundRowIds[i3] = rowUpdateInfo3.rowUuid();
                        return;
                    }
                    tableRowEventBatchImpl.close();
                    z = z || rows.size() == this.options.pageSize();
                } finally {
                    RowUpdateInfo<RowT> rowUpdateInfo4 = rows.get(rows.size() - 1);
                    this.lowerBoundTimestamps[i3] = rowUpdateInfo4.timestamp().longValue();
                    this.lowerBoundRowIds[i3] = rowUpdateInfo4.rowUuid();
                }
            } else if (join.result().safeTime() > this.lowerBoundTimestamps[i3]) {
                this.lowerBoundTimestamps[i3] = join.result().safeTime();
                this.lowerBoundRowIds[i3] = new UUID(0L, 0L);
            }
        }
        if (!z || this.requested.get() <= 0) {
            CompletableFuture.runAsync(() -> {
                iterate(-1);
            }, this.delayedExecutor);
        } else {
            iterate(-1);
        }
    }

    @Nullable
    private EntryT map(@Nullable RowT rowt, SchemaT schemat) {
        if (rowt == null) {
            return null;
        }
        try {
            return this.mapper.apply(rowt, schemat);
        } catch (Throwable th) {
            if (th instanceof MarshallerException) {
                throw ((MarshallerException) th);
            }
            throw new MarshallerException(th);
        }
    }

    private static String[] parsedColumnNames(@Nullable Set<String> set) {
        if (set == null) {
            return null;
        }
        String[] strArr = new String[set.size()];
        int i = 0;
        Iterator<String> it2 = set.iterator();
        while (it2.hasNext()) {
            int i2 = i;
            i++;
            strArr[i2] = IgniteNameUtils.parseIdentifier(it2.next());
        }
        return strArr;
    }

    static {
        $assertionsDisabled = !ContinuousQuery.class.desiredAssertionStatus();
    }
}
