/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.shaded.org.apache.ignite.internal.client.table;

import java.util.ArrayList;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.BiFunction;
import org.gridgain.shaded.org.apache.ignite.internal.client.PayloadInputChannel;
import org.gridgain.shaded.org.apache.ignite.internal.client.PayloadOutputChannel;
import org.gridgain.shaded.org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.gridgain.shaded.org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.gridgain.shaded.org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
import org.gridgain.shaded.org.apache.ignite.internal.client.table.ClientSchema;
import org.gridgain.shaded.org.apache.ignite.internal.client.table.ClientTable;
import org.gridgain.shaded.org.apache.ignite.internal.continuousquery.ContinuousQuery;
import org.gridgain.shaded.org.apache.ignite.internal.continuousquery.ContinuousQueryRequest;
import org.gridgain.shaded.org.apache.ignite.internal.continuousquery.ContinuousQueryRequestSender;
import org.gridgain.shaded.org.apache.ignite.internal.continuousquery.ContinuousQueryScanResult;
import org.gridgain.shaded.org.apache.ignite.internal.continuousquery.ContinuousQueryScanResultStatus;
import org.gridgain.shaded.org.apache.ignite.internal.continuousquery.ContinuousQueryScanResultWithSchema;
import org.gridgain.shaded.org.apache.ignite.internal.continuousquery.RowUpdateInfo;
import org.gridgain.shaded.org.apache.ignite.internal.hlc.HybridTimestamp;
import org.gridgain.shaded.org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
import org.gridgain.shaded.org.apache.ignite.internal.util.ExceptionUtils;
import org.gridgain.shaded.org.apache.ignite.table.ContinuousQueryOptions;
import org.gridgain.shaded.org.apache.ignite.table.TableRowEventBatch;
import org.gridgain.shaded.org.apache.ignite.table.TableRowEventType;
import org.gridgain.shaded.org.jetbrains.annotations.Nullable;

class ClientContinuousQuery {
    private static final long REQUEST_ID_NOT_USED = -1L;

    ClientContinuousQuery() {
    }

    static <T> void queryContinuously(Flow.Subscriber<TableRowEventBatch<T>> subscriber, @Nullable ContinuousQueryOptions options, BiFunction<byte[], ClientSchema, T> mapper, ClientTable tbl) {
        Objects.requireNonNull(subscriber);
        ContinuousQueryRequestSender sender = req -> tbl.doPartitionOutInOpAsync(1001, w -> ClientContinuousQuery.writeRequest(w, tbl, req), ClientContinuousQuery::readResponse, req.partId()).thenCompose(res -> tbl.getSchema(res.schemaVersion()).thenApply(schema -> new ContinuousQueryScanResultWithSchema(res, (ClientSchema)schema)));
        ((CompletableFuture)tbl.getPartitionAssignment().thenAccept(partitions -> {
            ContinuousQuery cq = new ContinuousQuery(subscriber, options, mapper, sender, tbl.channel().metrics(), partitions.size(), tbl.qualifiedName(), tbl.channel().observableTimestamp().get());
            cq.run();
        })).exceptionally(e -> {
            subscriber.onError(IgniteExceptionMapperUtil.mapToPublicException(ExceptionUtils.unwrapCause(e)));
            return null;
        });
    }

    private static void writeRequest(PayloadOutputChannel out, ClientTable tbl, ContinuousQueryRequest req) {
        ClientMessagePacker w = out.out();
        w.packInt(tbl.tableId());
        w.packInt(req.partId());
        w.packLong(req.lowerBoundTs());
        w.packUuid(req.lowerBoundRowId());
        w.packInt(req.maxItems());
        w.packByte(req.eventTypes());
        String[] columnNames = req.columnNames();
        if (columnNames == null) {
            w.packNil();
        } else {
            w.packInt(columnNames.length);
            for (String colName : columnNames) {
                w.packString(colName);
            }
        }
        if (out.clientChannel().protocolContext().isFeatureSupported(ProtocolBitmaskFeature.CQ_SKIP_OLD_ENTRIES)) {
            w.packBoolean(req.skipOldEntries());
        }
    }

    private static ContinuousQueryScanResult<byte[]> readResponse(PayloadInputChannel in) {
        byte statusId;
        ContinuousQueryScanResultStatus status;
        boolean explicitEventType = in.clientChannel().protocolContext().isFeatureSupported(ProtocolBitmaskFeature.CQ_EVENT_TYPE);
        boolean partitionScanStatus = in.clientChannel().protocolContext().isFeatureSupported(ProtocolBitmaskFeature.CQ_PARTITION_SCAN_STATUS);
        ClientMessageUnpacker r = in.in();
        long safeTime = r.unpackLong();
        int schemaVer = r.unpackInt();
        int cnt = r.unpackInt();
        ArrayList<RowUpdateInfo<byte[]>> rows = new ArrayList<RowUpdateInfo<byte[]>>(cnt);
        for (int i = 0; i < cnt; ++i) {
            UUID rowUuid = r.unpackUuid();
            long timestamp = r.unpackLong();
            byte[] oldRow = ClientContinuousQuery.readRow(r);
            byte[] newRow = ClientContinuousQuery.readRow(r);
            Integer eventTypeId = explicitEventType ? Integer.valueOf(r.unpackInt()) : null;
            TableRowEventType eventType = ClientContinuousQuery.getEventType(eventTypeId, oldRow, newRow);
            rows.add(new RowUpdateInfo<byte[]>(rowUuid, HybridTimestamp.hybridTimestamp(timestamp), newRow, oldRow, HybridTimestamp.MIN_VALUE, null, eventType));
        }
        status = partitionScanStatus ? ((status = ContinuousQueryScanResultStatus.fromId(statusId = r.unpackByte())) == null ? ContinuousQueryScanResultStatus.ERROR : status) : ContinuousQueryScanResultStatus.OK;
        return new ContinuousQueryScanResult<byte[]>(safeTime, rows, schemaVer, -1L, status);
    }

    private static TableRowEventType getEventType(@Nullable Integer typeId, byte @Nullable [] oldRow, byte @Nullable [] newRow) {
        if (typeId != null) {
            return TableRowEventType.fromId(typeId);
        }
        if (newRow == null) {
            return TableRowEventType.REMOVED;
        }
        if (oldRow == null) {
            return TableRowEventType.CREATED;
        }
        return TableRowEventType.UPDATED;
    }

    private static byte @Nullable [] readRow(ClientMessageUnpacker r) {
        return r.tryUnpackNil() ? null : r.readBinary();
    }
}

