/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.Function;
import org.apache.ignite3.internal.lang.IgniteExceptionMapperUtil;
import org.apache.ignite3.internal.marshaller.MarshallersProvider;
import org.apache.ignite3.internal.nearcache.EmbeddedNearCacheKeyImpl;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.schema.BinaryRow;
import org.apache.ignite3.internal.schema.BinaryRowEx;
import org.apache.ignite3.internal.schema.Column;
import org.apache.ignite3.internal.schema.SchemaDescriptor;
import org.apache.ignite3.internal.schema.SchemaRegistry;
import org.apache.ignite3.internal.schema.marshaller.TupleMarshaller;
import org.apache.ignite3.internal.schema.row.Row;
import org.apache.ignite3.internal.streamer.StreamerBatchSender;
import org.apache.ignite3.internal.table.AbstractTableView;
import org.apache.ignite3.internal.table.DataStreamer;
import org.apache.ignite3.internal.table.InternalTable;
import org.apache.ignite3.internal.table.RecordViewInternal;
import org.apache.ignite3.internal.table.TableRow;
import org.apache.ignite3.internal.table.TupleMarshallerCache;
import org.apache.ignite3.internal.table.TupleStreamerPartitionAwarenessProvider;
import org.apache.ignite3.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite3.internal.thread.PublicApiThreading;
import org.apache.ignite3.internal.tx.InternalTransaction;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.Pair;
import org.apache.ignite3.internal.util.ViewUtils;
import org.apache.ignite3.sql.IgniteSql;
import org.apache.ignite3.table.ContinuousQueryOptions;
import org.apache.ignite3.table.DataStreamerItem;
import org.apache.ignite3.table.DataStreamerOptions;
import org.apache.ignite3.table.DataStreamerReceiverDescriptor;
import org.apache.ignite3.table.RecordView;
import org.apache.ignite3.table.TableRowEventBatch;
import org.apache.ignite3.table.TableRowEventType;
import org.apache.ignite3.table.Tuple;
import org.apache.ignite3.table.TupleHelper;
import org.apache.ignite3.tx.Transaction;
import org.gridgain.internal.table.nearcache.NearCacheEntriesProvider;
import org.gridgain.internal.table.nearcache.NearCacheKey;
import org.gridgain.internal.table.nearcache.NearCacheRecordRowEventSubscriber;
import org.gridgain.internal.table.nearcache.NearCacheSchemaVersionAwareSubscriber;
import org.gridgain.internal.table.nearcache.NearCacheValue;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
import org.jetbrains.annotations.VisibleForTesting;

public class RecordBinaryViewImpl
extends AbstractTableView<Tuple>
implements RecordView<Tuple>,
RecordViewInternal<Tuple>,
NearCacheEntriesProvider<Tuple, Tuple, BinaryRowEx> {
    private final TupleMarshallerCache marshallerCache;

    public RecordBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaRegistry, SchemaVersions schemaVersions, IgniteSql sql, MarshallersProvider marshallers) {
        super(tbl, schemaVersions, schemaRegistry, sql, marshallers);
        this.marshallerCache = new TupleMarshallerCache(schemaRegistry);
    }

    @Override
    public Tuple get(@Nullable Transaction tx, Tuple keyRec) {
        return ViewUtils.sync(this.getAsync(tx, keyRec));
    }

    @Override
    public CompletableFuture<Tuple> getAsync(@Nullable Transaction tx, Tuple keyRec) {
        Objects.requireNonNull(keyRec);
        return this.doOperation(tx, schemaVersion -> {
            Row keyRow = this.marshal(keyRec, schemaVersion, true);
            return this.tbl.get(keyRow, (InternalTransaction)tx).thenApply(row -> this.wrap((BinaryRow)row, schemaVersion));
        });
    }

    public TupleMarshaller marshaller(int schemaVersion) {
        return this.marshallerCache.marshaller(schemaVersion);
    }

    @Override
    public List<Tuple> getAll(@Nullable Transaction tx, Collection<Tuple> keyRecs) {
        return ViewUtils.sync(this.getAllAsync(tx, keyRecs));
    }

    @Override
    public CompletableFuture<List<Tuple>> getAllAsync(@Nullable Transaction tx, Collection<Tuple> keyRecs) {
        ViewUtils.checkCollectionForNulls(keyRecs, "keyRecs", "key");
        return this.doOperation(tx, schemaVersion -> this.tbl.getAll(this.mapToBinary(keyRecs, schemaVersion, true), (InternalTransaction)tx).thenApply(binaryRows -> this.wrap((Collection<BinaryRow>)binaryRows, schemaVersion, true)));
    }

    @Override
    public boolean contains(@Nullable Transaction tx, Tuple keyRec) {
        return ViewUtils.sync(this.containsAsync(tx, keyRec));
    }

    @Override
    public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx, Tuple keyRec) {
        Objects.requireNonNull(keyRec);
        return this.doOperation(tx, schemaVersion -> {
            Row keyRow = this.marshal(keyRec, schemaVersion, true);
            return this.tbl.get(keyRow, (InternalTransaction)tx).thenApply(Objects::nonNull);
        });
    }

    @Override
    public boolean containsAll(@Nullable Transaction tx, Collection<Tuple> keys) {
        return ViewUtils.sync(this.containsAllAsync(tx, keys));
    }

    @Override
    public CompletableFuture<Boolean> containsAllAsync(@Nullable Transaction tx, Collection<Tuple> keys) {
        ViewUtils.checkKeysForNulls(keys);
        if (keys.isEmpty()) {
            return CompletableFutures.trueCompletedFuture();
        }
        return this.doOperation(tx, schemaVersion -> {
            Collection<BinaryRowEx> keysRows = this.mapToBinary(keys, schemaVersion, true);
            return this.tbl.getAll(keysRows, (InternalTransaction)tx).thenApply(rows -> {
                for (BinaryRow row : rows) {
                    if (row != null) continue;
                    return false;
                }
                return true;
            });
        });
    }

    @Override
    public void upsert(@Nullable Transaction tx, Tuple rec) {
        ViewUtils.sync(this.upsertAsync(tx, rec));
    }

    @Override
    public CompletableFuture<Void> upsertAsync(@Nullable Transaction tx, Tuple rec) {
        Objects.requireNonNull(rec);
        return this.doOperation(tx, schemaVersion -> {
            Row row = this.marshal(rec, schemaVersion, false);
            return this.tbl.upsert(row, (InternalTransaction)tx);
        });
    }

    @Override
    public void upsertAll(@Nullable Transaction tx, Collection<Tuple> recs) {
        ViewUtils.sync(this.upsertAllAsync(tx, recs));
    }

    @Override
    public CompletableFuture<Void> upsertAllAsync(@Nullable Transaction tx, Collection<Tuple> recs) {
        ViewUtils.checkCollectionForNulls(recs, "recs", "rec");
        return this.doOperation(tx, schemaVersion -> this.tbl.upsertAll(this.mapToBinary(recs, schemaVersion, false), (InternalTransaction)tx));
    }

    @Override
    public Tuple getAndUpsert(@Nullable Transaction tx, Tuple rec) {
        return ViewUtils.sync(this.getAndUpsertAsync(tx, rec));
    }

    @Override
    public CompletableFuture<Tuple> getAndUpsertAsync(@Nullable Transaction tx, Tuple rec) {
        Objects.requireNonNull(rec);
        return this.doOperation(tx, schemaVersion -> {
            Row row = this.marshal(rec, schemaVersion, false);
            return this.tbl.getAndUpsert(row, (InternalTransaction)tx).thenApply(resultRow -> this.wrap((BinaryRow)resultRow, schemaVersion));
        });
    }

    @Override
    public boolean insert(@Nullable Transaction tx, Tuple rec) {
        return ViewUtils.sync(this.insertAsync(tx, rec));
    }

    @Override
    public CompletableFuture<Boolean> insertAsync(@Nullable Transaction tx, Tuple rec) {
        Objects.requireNonNull(rec);
        return this.doOperation(tx, schemaVersion -> {
            Row row = this.marshal(rec, schemaVersion, false);
            return this.tbl.insert(row, (InternalTransaction)tx);
        });
    }

    @Override
    public List<Tuple> insertAll(@Nullable Transaction tx, Collection<Tuple> recs) {
        return ViewUtils.sync(this.insertAllAsync(tx, recs));
    }

    @Override
    public CompletableFuture<List<Tuple>> insertAllAsync(@Nullable Transaction tx, Collection<Tuple> recs) {
        ViewUtils.checkCollectionForNulls(recs, "recs", "rec");
        return this.doOperation(tx, schemaVersion -> this.tbl.insertAll(this.mapToBinary(recs, schemaVersion, false), (InternalTransaction)tx).thenApply(rows -> this.wrap((Collection<BinaryRow>)rows, schemaVersion, false)));
    }

    @Override
    public boolean replace(@Nullable Transaction tx, Tuple rec) {
        return ViewUtils.sync(this.replaceAsync(tx, rec));
    }

    @Override
    public boolean replace(@Nullable Transaction tx, Tuple oldRec, Tuple newRec) {
        return ViewUtils.sync(this.replaceAsync(tx, oldRec, newRec));
    }

    @Override
    public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, Tuple rec) {
        Objects.requireNonNull(rec);
        return this.doOperation(tx, schemaVersion -> {
            Row row = this.marshal(rec, schemaVersion, false);
            return this.tbl.replace(row, (InternalTransaction)tx);
        });
    }

    @Override
    public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, Tuple oldRec, Tuple newRec) {
        Objects.requireNonNull(oldRec);
        Objects.requireNonNull(newRec);
        return this.doOperation(tx, schemaVersion -> {
            Row oldRow = this.marshal(oldRec, schemaVersion, false);
            Row newRow = this.marshal(newRec, schemaVersion, false);
            return this.tbl.replace(oldRow, newRow, (InternalTransaction)tx);
        });
    }

    @Override
    public Tuple getAndReplace(@Nullable Transaction tx, Tuple rec) {
        return ViewUtils.sync(this.getAndReplaceAsync(tx, rec));
    }

    @Override
    public CompletableFuture<Tuple> getAndReplaceAsync(@Nullable Transaction tx, Tuple rec) {
        Objects.requireNonNull(rec);
        return this.doOperation(tx, schemaVersion -> {
            Row row = this.marshal(rec, schemaVersion, false);
            return this.tbl.getAndReplace(row, (InternalTransaction)tx).thenApply(resultRow -> this.wrap((BinaryRow)resultRow, schemaVersion));
        });
    }

    @Override
    public boolean delete(@Nullable Transaction tx, Tuple keyRec) {
        return ViewUtils.sync(this.deleteAsync(tx, keyRec));
    }

    @Override
    public CompletableFuture<Boolean> deleteAsync(@Nullable Transaction tx, Tuple keyRec) {
        Objects.requireNonNull(keyRec);
        return this.doOperation(tx, schemaVersion -> {
            Row keyRow = this.marshal(keyRec, schemaVersion, true);
            return this.tbl.delete(keyRow, (InternalTransaction)tx);
        });
    }

    @Override
    public boolean deleteExact(@Nullable Transaction tx, Tuple rec) {
        return ViewUtils.sync(this.deleteExactAsync(tx, rec));
    }

    @Override
    public CompletableFuture<Boolean> deleteExactAsync(@Nullable Transaction tx, Tuple rec) {
        Objects.requireNonNull(rec);
        return this.doOperation(tx, schemaVersion -> {
            Row row = this.marshal(rec, schemaVersion, false);
            return this.tbl.deleteExact(row, (InternalTransaction)tx);
        });
    }

    @Override
    public Tuple getAndDelete(@Nullable Transaction tx, Tuple keyRec) {
        return ViewUtils.sync(this.getAndDeleteAsync(tx, keyRec));
    }

    @Override
    public CompletableFuture<Tuple> getAndDeleteAsync(@Nullable Transaction tx, Tuple keyRec) {
        Objects.requireNonNull(keyRec);
        return this.doOperation(tx, schemaVersion -> {
            Row keyRow = this.marshal(keyRec, schemaVersion, true);
            return this.tbl.getAndDelete(keyRow, (InternalTransaction)tx).thenApply(row -> this.wrap((BinaryRow)row, schemaVersion));
        });
    }

    @Override
    public List<Tuple> deleteAll(@Nullable Transaction tx, Collection<Tuple> keyRecs) {
        return ViewUtils.sync(this.deleteAllAsync(tx, keyRecs));
    }

    @Override
    public void deleteAll(@Nullable Transaction tx) {
        ViewUtils.sync(this.deleteAllAsync(tx));
    }

    @Override
    public CompletableFuture<List<Tuple>> deleteAllAsync(@Nullable Transaction tx, Collection<Tuple> keyRecs) {
        Objects.requireNonNull(keyRecs);
        return this.doOperation(tx, schemaVersion -> this.tbl.deleteAll(this.mapToBinary(keyRecs, schemaVersion, true), (InternalTransaction)tx).thenApply(rows -> this.wrapKeys((Collection<BinaryRow>)rows, schemaVersion)));
    }

    @Override
    public CompletableFuture<Void> deleteAllAsync(@Nullable Transaction tx) {
        return this.sql.executeAsync(tx, "DELETE FROM " + this.tbl.name().toCanonicalForm(), new Object[0]).thenApply(r -> null);
    }

    @Override
    public List<Tuple> deleteAllExact(@Nullable Transaction tx, Collection<Tuple> recs) {
        return ViewUtils.sync(this.deleteAllExactAsync(tx, recs));
    }

    @Override
    public CompletableFuture<List<Tuple>> deleteAllExactAsync(@Nullable Transaction tx, Collection<Tuple> recs) {
        Objects.requireNonNull(recs);
        return this.doOperation(tx, schemaVersion -> this.tbl.deleteAllExact(this.mapToBinary(recs, schemaVersion, false), (InternalTransaction)tx).thenApply(rows -> this.wrap((Collection<BinaryRow>)rows, schemaVersion, false)));
    }

    private Row marshal(Tuple tuple, int schemaVersion, boolean keyOnly) {
        TupleMarshaller marshaller = this.marshaller(schemaVersion);
        return RecordBinaryViewImpl.marshal(tuple, marshaller, keyOnly);
    }

    private static Row marshal(Tuple tuple, TupleMarshaller marshaller, boolean keyOnly) {
        if (keyOnly) {
            return marshaller.marshalKey(tuple);
        }
        return marshaller.marshal(tuple);
    }

    @TestOnly
    @VisibleForTesting
    public CompletableFuture<BinaryRowEx> tupleToBinaryRow(@Nullable Transaction tx, Tuple rec) {
        Objects.requireNonNull(rec);
        return this.doOperation(tx, schemaVersion -> {
            Row row = this.marshal(rec, schemaVersion, false);
            return CompletableFuture.completedFuture(row);
        });
    }

    @TestOnly
    @VisibleForTesting
    public CompletableFuture<Tuple> binaryRowToTuple(@Nullable Transaction tx, BinaryRow row) {
        return this.doOperation(tx, schemaVersion -> CompletableFuture.completedFuture(this.wrap(row, schemaVersion)));
    }

    @Nullable
    private Tuple wrap(@Nullable BinaryRow row, int targetSchemaVersion) {
        return row == null ? null : TableRow.tuple(this.rowConverter.resolveRow(row, targetSchemaVersion));
    }

    private List<Tuple> wrap(Collection<BinaryRow> rows, int targetSchemaVersion, boolean addNull) {
        if (rows.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<Tuple> wrapped = new ArrayList<Tuple>(rows.size());
        for (Row row : this.rowConverter.resolveRows(rows, targetSchemaVersion)) {
            if (row != null) {
                wrapped.add(TableRow.tuple(row));
                continue;
            }
            if (!addNull) continue;
            wrapped.add(null);
        }
        return wrapped;
    }

    private List<Tuple> wrapKeys(Collection<BinaryRow> rows, int targetSchemaVersion) {
        if (rows.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<Tuple> wrapped = new ArrayList<Tuple>(rows.size());
        for (Row row : this.rowConverter.resolveKeys(rows, targetSchemaVersion)) {
            if (row == null) continue;
            wrapped.add(TableRow.tuple(row));
        }
        return wrapped;
    }

    private Collection<BinaryRowEx> mapToBinary(Collection<Tuple> rows, int schemaVersion, boolean key) {
        ArrayList<BinaryRowEx> mapped = new ArrayList<BinaryRowEx>(rows.size());
        for (Tuple row : rows) {
            mapped.add(this.marshal(row, schemaVersion, key));
        }
        return mapped;
    }

    private Collection<BinaryRowEx> mapToBinary(Collection<Tuple> rows, int schemaVersion, @Nullable BitSet deleted) {
        ArrayList<BinaryRowEx> mapped = new ArrayList<BinaryRowEx>(rows.size());
        TupleMarshaller marshaller = this.marshaller(schemaVersion);
        for (Tuple row : rows) {
            boolean key = deleted != null && deleted.get(mapped.size());
            mapped.add(RecordBinaryViewImpl.marshal(row, marshaller, key));
        }
        return mapped;
    }

    @Override
    public CompletableFuture<Void> streamData(Flow.Publisher<DataStreamerItem<Tuple>> publisher, @Nullable DataStreamerOptions options) {
        Objects.requireNonNull(publisher);
        TupleStreamerPartitionAwarenessProvider partitioner = new TupleStreamerPartitionAwarenessProvider(this.rowConverter.registry(), this.tbl.partitions());
        StreamerBatchSender batchSender = (partitionId, rows, deleted) -> PublicApiThreading.execUserAsyncOperation(() -> this.withSchemaSync(null, schemaVersion -> this.tbl.updateAll(this.mapToBinary((Collection<Tuple>)rows, schemaVersion, deleted), deleted, (int)partitionId)));
        CompletableFuture<Void> future = DataStreamer.streamData(publisher, options, batchSender, partitioner, this.tbl.streamerFlushExecutor());
        return IgniteExceptionMapperUtil.convertToPublicFuture(future);
    }

    @Override
    public <E, V, A, R> CompletableFuture<Void> streamData(Flow.Publisher<E> publisher, DataStreamerReceiverDescriptor<V, A, R> receiver, Function<E, Tuple> keyFunc, Function<E, V> payloadFunc, @Nullable A receiverArg, @Nullable Flow.Subscriber<R> resultSubscriber, @Nullable DataStreamerOptions options) {
        Objects.requireNonNull(publisher);
        Objects.requireNonNull(keyFunc);
        Objects.requireNonNull(payloadFunc);
        Objects.requireNonNull(receiver);
        TupleStreamerPartitionAwarenessProvider partitioner = new TupleStreamerPartitionAwarenessProvider(this.rowConverter.registry(), this.tbl.partitions());
        StreamerBatchSender batchSender = (partitionIndex, rows, deleted) -> PublicApiThreading.execUserAsyncOperation(() -> this.tbl.partitionLocation((int)partitionIndex).thenCompose(node -> this.tbl.streamerReceiverRunner().runReceiverAsync(receiver, receiverArg, rows, (InternalClusterNode)node, receiver.units())));
        CompletableFuture<Void> future = DataStreamer.streamData(publisher, keyFunc, payloadFunc, x -> false, options, batchSender, resultSubscriber, partitioner, this.tbl.streamerFlushExecutor());
        return IgniteExceptionMapperUtil.convertToPublicFuture(future);
    }

    @Override
    public CompletableFuture<Void> updateAll(int partitionId, Collection<Tuple> rows, @Nullable BitSet deleted) {
        return this.doOperation(null, schemaVersion -> this.tbl.updateAll(this.mapToBinary(rows, schemaVersion, deleted), deleted, partitionId));
    }

    @Override
    public void queryContinuously(Flow.Subscriber<TableRowEventBatch<Tuple>> subscriber, @Nullable ContinuousQueryOptions options) {
        Objects.requireNonNull(subscriber);
        this.tbl.queryContinuously(subscriber, options, (row, schema) -> this.wrap((BinaryRow)row, row.schemaVersion()));
    }

    @Override
    public Class<Tuple> valueType() {
        return Tuple.class;
    }

    @Override
    public CompletableFuture<NearCacheKey<Tuple, BinaryRowEx>> getNearCacheKeyAsync(Tuple key, boolean enforceKeyOnly) {
        return this.doOperation(null, schemaVersion -> {
            Row keyRow;
            SchemaDescriptor schemaDscr = this.rowConverter.registry().lastKnownSchema();
            List<Column> keyColumns = schemaDscr.keyColumns();
            if (!enforceKeyOnly) {
                Tuple keyRec;
                if (key.columnCount() == keyColumns.size()) {
                    keyRec = key;
                } else {
                    ArrayList<String> keyColumnNames = new ArrayList<String>(keyColumns.size());
                    for (Column c : keyColumns) {
                        keyColumnNames.add(c.name());
                    }
                    keyRec = TupleHelper.copyColumns(key, keyColumnNames);
                }
                keyRow = this.marshal(keyRec, schemaVersion, true);
            } else {
                keyRow = this.marshal(key, schemaVersion, true);
            }
            return CompletableFuture.completedFuture(new EmbeddedNearCacheKeyImpl<Tuple>(key, keyRow));
        });
    }

    @Override
    public CompletableFuture<Collection<NearCacheKey<Tuple, BinaryRowEx>>> getNearCacheKeysAsync(Iterable<? extends Tuple> keys, boolean enforceKeyOnly) {
        return this.doOperation(null, schemaVersion -> {
            ArrayList<EmbeddedNearCacheKeyImpl<Tuple>> cacheKeys = new ArrayList<EmbeddedNearCacheKeyImpl<Tuple>>();
            SchemaDescriptor schemaDscr = this.rowConverter.registry().lastKnownSchema();
            List<Column> keyColumns = schemaDscr.keyColumns();
            for (Tuple rec : keys) {
                Tuple keyRec;
                if (!enforceKeyOnly && rec.columnCount() != keyColumns.size()) {
                    ArrayList<String> keyColumnNames = new ArrayList<String>(keyColumns.size());
                    for (Column c : keyColumns) {
                        keyColumnNames.add(c.name());
                    }
                    keyRec = TupleHelper.copyColumns(rec, keyColumnNames);
                } else {
                    keyRec = rec;
                }
                Row keyRow = this.marshal(keyRec, schemaVersion, true);
                EmbeddedNearCacheKeyImpl<Tuple> cacheKey = new EmbeddedNearCacheKeyImpl<Tuple>(rec, keyRow);
                cacheKeys.add(cacheKey);
            }
            return CompletableFuture.completedFuture(cacheKeys);
        });
    }

    @Override
    public CompletableFuture<Pair<NearCacheKey<Tuple, BinaryRowEx>, NearCacheValue<Tuple>>> getNearCacheValueAsync(@Nullable Transaction tx, NearCacheKey<Tuple, BinaryRowEx> key) {
        return this.doOperation(tx, schemaVersion -> {
            BinaryRowEx keyRow = (BinaryRowEx)key.serializedKey();
            if (keyRow == null || schemaVersion != keyRow.schemaVersion()) {
                keyRow = this.marshal((Tuple)key.key(), schemaVersion, true);
            }
            EmbeddedNearCacheKeyImpl<Tuple> finalCacheKey = new EmbeddedNearCacheKeyImpl<Tuple>((Tuple)key.key(), keyRow);
            return this.tbl.get(keyRow, (InternalTransaction)tx).thenApply(binaryRow -> {
                NearCacheValue<Object> cacheVal;
                if (binaryRow == null) {
                    cacheVal = new NearCacheValue();
                } else {
                    @Nullable Tuple val = this.wrap((BinaryRow)binaryRow, schemaVersion);
                    cacheVal = new NearCacheValue<Tuple>(val);
                }
                return new Pair(finalCacheKey, cacheVal);
            });
        });
    }

    @Override
    public CompletableFuture<Map<NearCacheKey<Tuple, BinaryRowEx>, NearCacheValue<Tuple>>> getNearCacheValuesAsync(@Nullable Transaction tx, Collection<NearCacheKey<Tuple, BinaryRowEx>> nearCacheKeys) {
        return this.doOperation(tx, schemaVersion -> {
            ArrayList<EmbeddedNearCacheKeyImpl<Tuple>> ncKeys = new ArrayList<EmbeddedNearCacheKeyImpl<Tuple>>(nearCacheKeys.size());
            ArrayList<BinaryRowEx> keyRows = new ArrayList<BinaryRowEx>(nearCacheKeys.size());
            for (NearCacheKey nearCacheKey : nearCacheKeys) {
                BinaryRowEx keyRow = (BinaryRowEx)nearCacheKey.serializedKey();
                if (keyRow == null || schemaVersion != keyRow.schemaVersion()) {
                    keyRow = this.marshal((Tuple)nearCacheKey.key(), schemaVersion, true);
                }
                keyRows.add(keyRow);
                ncKeys.add(new EmbeddedNearCacheKeyImpl<Tuple>((Tuple)nearCacheKey.key(), keyRow));
            }
            return this.tbl.getAll(keyRows, (InternalTransaction)tx).thenApply(rows -> {
                assert (rows.size() == keyRows.size());
                LinkedHashMap ret = new LinkedHashMap(rows.size());
                for (int i = 0; i < rows.size(); ++i) {
                    NearCacheValue<Object> cacheVal;
                    BinaryRow binaryRow = (BinaryRow)rows.get(i);
                    if (binaryRow == null) {
                        cacheVal = new NearCacheValue();
                    } else {
                        @Nullable Tuple val = this.wrap(binaryRow, schemaVersion);
                        cacheVal = new NearCacheValue<Tuple>(val);
                    }
                    NearCacheKey nearCacheKey = (NearCacheKey)ncKeys.get(i);
                    ret.put(nearCacheKey, cacheVal);
                }
                return ret;
            });
        });
    }

    @Override
    public void subscribeToNearCacheUpdates(Flow.Subscriber<TableRowEventBatch<Map.Entry<Tuple, Tuple>>> subscriber, int updateInterval) {
        ContinuousQueryOptions opts = ContinuousQueryOptions.builder().pollIntervalMs(updateInterval).eventTypes(TableRowEventType.CREATED, TableRowEventType.UPDATED, TableRowEventType.REMOVED).build();
        this.queryContinuously((Flow.Subscriber<TableRowEventBatch<Tuple>>)new NearCacheSchemaVersionAwareSubscriber<Tuple>(new NearCacheRecordRowEventSubscriber(subscriber), () -> this.doOperation(null, CompletableFuture::completedFuture)), opts);
    }

    @Override
    public void close() throws Exception {
    }
}

