/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.ignite3.internal.lang.IgniteBiTuple;
import org.apache.ignite3.internal.lang.IgniteExceptionMapperUtil;
import org.apache.ignite3.internal.marshaller.MarshallersProvider;
import org.apache.ignite3.internal.nearcache.EmbeddedNearCacheKeyImpl;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.schema.BinaryRow;
import org.apache.ignite3.internal.schema.BinaryRowEx;
import org.apache.ignite3.internal.schema.Column;
import org.apache.ignite3.internal.schema.SchemaDescriptor;
import org.apache.ignite3.internal.schema.SchemaRegistry;
import org.apache.ignite3.internal.schema.row.Row;
import org.apache.ignite3.internal.streamer.StreamerBatchSender;
import org.apache.ignite3.internal.table.AbstractTableView;
import org.apache.ignite3.internal.table.DataStreamer;
import org.apache.ignite3.internal.table.InternalTable;
import org.apache.ignite3.internal.table.KeyValueTupleStreamerPartitionAwarenessProvider;
import org.apache.ignite3.internal.table.TableRow;
import org.apache.ignite3.internal.table.TupleMarshallerCache;
import org.apache.ignite3.internal.table.criteria.SqlRowProjection;
import org.apache.ignite3.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite3.internal.thread.PublicApiThreading;
import org.apache.ignite3.internal.tx.InternalTransaction;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.util.Pair;
import org.apache.ignite3.internal.util.ViewUtils;
import org.apache.ignite3.lang.NullableValue;
import org.apache.ignite3.sql.IgniteSql;
import org.apache.ignite3.sql.ResultSetMetadata;
import org.apache.ignite3.sql.SqlRow;
import org.apache.ignite3.table.ContinuousQueryOptions;
import org.apache.ignite3.table.DataStreamerItem;
import org.apache.ignite3.table.DataStreamerOptions;
import org.apache.ignite3.table.DataStreamerReceiverDescriptor;
import org.apache.ignite3.table.KeyValueView;
import org.apache.ignite3.table.TableRowEventBatch;
import org.apache.ignite3.table.TableRowEventType;
import org.apache.ignite3.table.Tuple;
import org.apache.ignite3.table.TupleHelper;
import org.apache.ignite3.tx.Transaction;
import org.gridgain.internal.table.nearcache.NearCacheEntriesProvider;
import org.gridgain.internal.table.nearcache.NearCacheKey;
import org.gridgain.internal.table.nearcache.NearCacheSchemaVersionAwareSubscriber;
import org.gridgain.internal.table.nearcache.NearCacheValue;
import org.jetbrains.annotations.Nullable;

public class KeyValueBinaryViewImpl
extends AbstractTableView<Map.Entry<Tuple, Tuple>>
implements KeyValueView<Tuple, Tuple>,
NearCacheEntriesProvider<Tuple, Tuple, BinaryRowEx> {
    private final TupleMarshallerCache marshallerCache;

    public KeyValueBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg, SchemaVersions schemaVersions, IgniteSql sql, MarshallersProvider marshallers) {
        super(tbl, schemaVersions, schemaReg, sql, marshallers);
        this.marshallerCache = new TupleMarshallerCache(schemaReg);
    }

    @Override
    public Tuple get(@Nullable Transaction tx, Tuple key) {
        return ViewUtils.sync(this.getAsync(tx, key));
    }

    @Override
    public CompletableFuture<Tuple> getAsync(@Nullable Transaction tx, Tuple key) {
        Objects.requireNonNull(key, "key");
        return this.doOperation(tx, schemaVersion -> {
            Row keyRow = this.marshal(key, null, schemaVersion);
            return this.tbl.get(keyRow, (InternalTransaction)tx).thenApply(row -> this.unmarshalValue((BinaryRow)row, schemaVersion));
        });
    }

    @Override
    public NullableValue<Tuple> getNullable(@Nullable Transaction tx, Tuple key) {
        return ViewUtils.sync(this.getNullableAsync(tx, key));
    }

    @Override
    public CompletableFuture<NullableValue<Tuple>> getNullableAsync(@Nullable Transaction tx, Tuple key) {
        return this.getAsync(tx, key).thenApply(r -> r == null ? null : NullableValue.of(r));
    }

    @Override
    public Tuple getOrDefault(@Nullable Transaction tx, Tuple key, Tuple defaultValue) {
        return ViewUtils.sync(this.getOrDefaultAsync(tx, key, defaultValue));
    }

    @Override
    public CompletableFuture<Tuple> getOrDefaultAsync(@Nullable Transaction tx, Tuple key, Tuple defaultValue) {
        Objects.requireNonNull(key, "key");
        return this.doOperation(tx, schemaVersion -> {
            Row keyRow = this.marshal(key, null, schemaVersion);
            return this.tbl.get(keyRow, (InternalTransaction)tx).thenApply(r -> IgniteUtils.nonNullOrElse(this.unmarshalValue((BinaryRow)r, schemaVersion), defaultValue));
        });
    }

    @Override
    public Map<Tuple, Tuple> getAll(@Nullable Transaction tx, Collection<Tuple> keys) {
        return ViewUtils.sync(this.getAllAsync(tx, keys));
    }

    @Override
    public CompletableFuture<Map<Tuple, Tuple>> getAllAsync(@Nullable Transaction tx, Collection<Tuple> keys) {
        ViewUtils.checkKeysForNulls(keys);
        return this.doOperation(tx, schemaVersion -> {
            List<BinaryRowEx> keyRows = this.marshalKeys(keys, schemaVersion);
            return this.tbl.getAll(keyRows, (InternalTransaction)tx).thenApply(rows -> this.unmarshalValues((Collection<BinaryRow>)rows, schemaVersion));
        });
    }

    @Override
    public boolean contains(@Nullable Transaction tx, Tuple key) {
        return this.get(tx, key) != null;
    }

    @Override
    public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx, Tuple key) {
        return this.getAsync(tx, key).thenApply(Objects::nonNull);
    }

    @Override
    public boolean containsAll(@Nullable Transaction tx, Collection<Tuple> keys) {
        return ViewUtils.sync(this.containsAllAsync(tx, keys));
    }

    @Override
    public CompletableFuture<Boolean> containsAllAsync(@Nullable Transaction tx, Collection<Tuple> keys) {
        ViewUtils.checkKeysForNulls(keys);
        if (keys.isEmpty()) {
            return CompletableFutures.trueCompletedFuture();
        }
        return this.doOperation(tx, schemaVersion -> {
            List<BinaryRowEx> keyRows = this.marshalKeys(keys, schemaVersion);
            return this.tbl.getAll(keyRows, (InternalTransaction)tx).thenApply(rows -> {
                for (BinaryRow row : rows) {
                    if (row != null) continue;
                    return false;
                }
                return true;
            });
        });
    }

    @Override
    public void put(@Nullable Transaction tx, Tuple key, Tuple val) {
        ViewUtils.sync(this.putAsync(tx, key, val));
    }

    @Override
    public CompletableFuture<Void> putAsync(@Nullable Transaction tx, Tuple key, Tuple val) {
        Objects.requireNonNull(key, "key");
        Objects.requireNonNull(val, "val");
        return this.doOperation(tx, schemaVersion -> {
            Row row = this.marshal(key, val, schemaVersion);
            return this.tbl.upsert(row, (InternalTransaction)tx);
        });
    }

    @Override
    public void putAll(@Nullable Transaction tx, Map<Tuple, Tuple> pairs) {
        ViewUtils.sync(this.putAllAsync(tx, pairs));
    }

    @Override
    public CompletableFuture<Void> putAllAsync(@Nullable Transaction tx, Map<Tuple, Tuple> pairs) {
        Objects.requireNonNull(pairs, "pairs");
        for (Map.Entry<Tuple, Tuple> entry : pairs.entrySet()) {
            Objects.requireNonNull(entry.getKey(), "key");
            Objects.requireNonNull(entry.getValue(), "val");
        }
        return this.doOperation(tx, schemaVersion -> this.tbl.upsertAll(this.marshalPairs(pairs.entrySet(), schemaVersion, null), (InternalTransaction)tx));
    }

    @Override
    public Tuple getAndPut(@Nullable Transaction tx, Tuple key, Tuple val) {
        return ViewUtils.sync(this.getAndPutAsync(tx, key, val));
    }

    @Override
    public CompletableFuture<Tuple> getAndPutAsync(@Nullable Transaction tx, Tuple key, Tuple val) {
        Objects.requireNonNull(key, "key");
        Objects.requireNonNull(val, "val");
        return this.doOperation(tx, schemaVersion -> {
            Row row = this.marshal(key, val, schemaVersion);
            return this.tbl.getAndUpsert(row, (InternalTransaction)tx).thenApply(resultRow -> this.unmarshalValue((BinaryRow)resultRow, schemaVersion));
        });
    }

    @Override
    public NullableValue<Tuple> getNullableAndPut(@Nullable Transaction tx, Tuple key, Tuple val) {
        return ViewUtils.sync(this.getNullableAndPutAsync(tx, key, val));
    }

    @Override
    public CompletableFuture<NullableValue<Tuple>> getNullableAndPutAsync(@Nullable Transaction tx, Tuple key, Tuple val) {
        return this.getAndPutAsync(tx, key, val).thenApply(r -> r == null ? null : NullableValue.of(r));
    }

    @Override
    public boolean putIfAbsent(@Nullable Transaction tx, Tuple key, Tuple val) {
        return ViewUtils.sync(this.putIfAbsentAsync(tx, key, val));
    }

    @Override
    public CompletableFuture<Boolean> putIfAbsentAsync(@Nullable Transaction tx, Tuple key, Tuple val) {
        Objects.requireNonNull(key, "key");
        Objects.requireNonNull(val, "val");
        return this.doOperation(tx, schemaVersion -> {
            Row row = this.marshal(key, val, schemaVersion);
            return this.tbl.insert(row, (InternalTransaction)tx);
        });
    }

    @Override
    public boolean remove(@Nullable Transaction tx, Tuple key) {
        return ViewUtils.sync(this.removeAsync(tx, key));
    }

    @Override
    public boolean remove(@Nullable Transaction tx, Tuple key, Tuple val) {
        return ViewUtils.sync(this.removeAsync(tx, key, val));
    }

    @Override
    public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, Tuple key) {
        Objects.requireNonNull(key, "key");
        return this.doOperation(tx, schemaVersion -> {
            Row row = this.marshal(key, null, schemaVersion);
            return this.tbl.delete(row, (InternalTransaction)tx);
        });
    }

    @Override
    public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, Tuple key, Tuple val) {
        Objects.requireNonNull(key, "key");
        Objects.requireNonNull(val, "val");
        return this.doOperation(tx, schemaVersion -> {
            Row row = this.marshal(key, val, schemaVersion);
            return this.tbl.deleteExact(row, (InternalTransaction)tx);
        });
    }

    @Override
    public void removeAll(@Nullable Transaction tx) {
        ViewUtils.sync(this.removeAllAsync(tx));
    }

    @Override
    public Collection<Tuple> removeAll(@Nullable Transaction tx, Collection<Tuple> keys) {
        return ViewUtils.sync(this.removeAllAsync(tx, keys));
    }

    @Override
    public CompletableFuture<Void> removeAllAsync(@Nullable Transaction tx) {
        return this.sql.executeAsync(tx, "DELETE FROM " + this.tbl.name().toCanonicalForm(), new Object[0]).thenApply(r -> null);
    }

    @Override
    public CompletableFuture<Collection<Tuple>> removeAllAsync(@Nullable Transaction tx, Collection<Tuple> keys) {
        ViewUtils.checkKeysForNulls(keys);
        return this.doOperation(tx, schemaVersion -> {
            List<BinaryRowEx> keyRows = this.marshalKeys(keys, schemaVersion);
            return this.tbl.deleteAll(keyRows, (InternalTransaction)tx).thenApply(rows -> this.unmarshalKeys((Collection<BinaryRow>)rows, schemaVersion));
        });
    }

    @Override
    public Tuple getAndRemove(@Nullable Transaction tx, Tuple key) {
        Objects.requireNonNull(key, "key");
        return ViewUtils.sync(this.getAndRemoveAsync(tx, key));
    }

    @Override
    public CompletableFuture<Tuple> getAndRemoveAsync(@Nullable Transaction tx, Tuple key) {
        Objects.requireNonNull(key, "key");
        return this.doOperation(tx, schemaVersion -> this.tbl.getAndDelete(this.marshal(key, null, schemaVersion), (InternalTransaction)tx).thenApply(row -> this.unmarshalValue((BinaryRow)row, schemaVersion)));
    }

    @Override
    public NullableValue<Tuple> getNullableAndRemove(@Nullable Transaction tx, Tuple key) {
        return ViewUtils.sync(this.getNullableAndRemoveAsync(tx, key));
    }

    @Override
    public CompletableFuture<NullableValue<Tuple>> getNullableAndRemoveAsync(@Nullable Transaction tx, Tuple key) {
        return this.getAndRemoveAsync(tx, key).thenApply(r -> r == null ? null : NullableValue.of(r));
    }

    @Override
    public boolean replace(@Nullable Transaction tx, Tuple key, Tuple val) {
        return ViewUtils.sync(this.replaceAsync(tx, key, val));
    }

    @Override
    public boolean replace(@Nullable Transaction tx, Tuple key, Tuple oldVal, Tuple newVal) {
        return ViewUtils.sync(this.replaceAsync(tx, key, oldVal, newVal));
    }

    @Override
    public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, Tuple key, Tuple val) {
        Objects.requireNonNull(key, "key");
        Objects.requireNonNull(val, "val");
        return this.doOperation(tx, schemaVersion -> {
            Row row = this.marshal(key, val, schemaVersion);
            return this.tbl.replace(row, (InternalTransaction)tx);
        });
    }

    @Override
    public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, Tuple key, Tuple oldVal, Tuple newVal) {
        Objects.requireNonNull(key, "key");
        Objects.requireNonNull(oldVal, "oldVal");
        Objects.requireNonNull(newVal, "newVal");
        return this.doOperation(tx, schemaVersion -> {
            Row oldRow = this.marshal(key, oldVal, schemaVersion);
            Row newRow = this.marshal(key, newVal, schemaVersion);
            return this.tbl.replace(oldRow, newRow, (InternalTransaction)tx);
        });
    }

    @Override
    public Tuple getAndReplace(@Nullable Transaction tx, Tuple key, Tuple val) {
        return ViewUtils.sync(this.getAndReplaceAsync(tx, key, val));
    }

    @Override
    public CompletableFuture<Tuple> getAndReplaceAsync(@Nullable Transaction tx, Tuple key, Tuple val) {
        Objects.requireNonNull(key, "key");
        Objects.requireNonNull(val, "val");
        return this.doOperation(tx, schemaVersion -> this.tbl.getAndReplace(this.marshal(key, val, schemaVersion), (InternalTransaction)tx).thenApply(row -> this.unmarshalValue((BinaryRow)row, schemaVersion)));
    }

    @Override
    public NullableValue<Tuple> getNullableAndReplace(@Nullable Transaction tx, Tuple key, Tuple val) {
        return ViewUtils.sync(this.getNullableAndReplaceAsync(tx, key, val));
    }

    @Override
    public CompletableFuture<NullableValue<Tuple>> getNullableAndReplaceAsync(@Nullable Transaction tx, Tuple key, Tuple val) {
        return this.getAndReplaceAsync(tx, key, val).thenApply(r -> r == null ? null : NullableValue.of(r));
    }

    private Row marshal(Tuple key, @Nullable Tuple val, int schemaVersion) {
        return this.marshallerCache.marshaller(this.tbl::name, schemaVersion).marshal(key, val);
    }

    @Nullable
    private Tuple unmarshalValue(BinaryRow row, int schemaVersion) {
        if (row == null) {
            return null;
        }
        return TableRow.valueTuple(this.rowConverter.resolveRow(row, schemaVersion));
    }

    private Map<Tuple, Tuple> unmarshalValues(Collection<BinaryRow> rows, int schemaVersion) {
        HashMap<Tuple, Tuple> pairs = IgniteUtils.newHashMap(rows.size());
        for (Row row : this.rowConverter.resolveRows(rows, schemaVersion)) {
            if (row == null) continue;
            pairs.put(TableRow.keyTuple(row), TableRow.valueTuple(row));
        }
        return pairs;
    }

    private List<BinaryRowEx> marshalKeys(Collection<Tuple> keys, int schemaVersion) {
        if (keys.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<BinaryRowEx> keyRows = new ArrayList<BinaryRowEx>(keys.size());
        for (Tuple keyRec : keys) {
            keyRows.add(this.marshal(Objects.requireNonNull(keyRec, "keyRec"), null, schemaVersion));
        }
        return keyRows;
    }

    private Collection<Tuple> unmarshalKeys(Collection<BinaryRow> rows, int schemaVersion) {
        if (rows.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<Tuple> tuples = new ArrayList<Tuple>(rows.size());
        for (Row row : this.rowConverter.resolveKeys(rows, schemaVersion)) {
            tuples.add(TableRow.keyTuple(row));
        }
        return tuples;
    }

    @Override
    public CompletableFuture<Void> streamData(Flow.Publisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>> publisher, @Nullable DataStreamerOptions options) {
        Objects.requireNonNull(publisher, "publisher");
        KeyValueTupleStreamerPartitionAwarenessProvider partitioner = new KeyValueTupleStreamerPartitionAwarenessProvider(this.rowConverter.registry(), this.tbl.partitions());
        StreamerBatchSender batchSender = (partitionId, items, deleted) -> PublicApiThreading.execUserAsyncOperation(() -> this.withSchemaSync(null, schemaVersion -> this.tbl.updateAll((Collection<BinaryRowEx>)this.marshalPairs(items, schemaVersion, deleted), deleted, (int)partitionId)));
        CompletableFuture<Void> future = DataStreamer.streamData(publisher, options, batchSender, partitioner, this.tbl.streamerFlushExecutor());
        return IgniteExceptionMapperUtil.convertToPublicFuture(future);
    }

    @Override
    public <E, V, A, R> CompletableFuture<Void> streamData(Flow.Publisher<E> publisher, DataStreamerReceiverDescriptor<V, A, R> receiver, Function<E, Map.Entry<Tuple, Tuple>> keyFunc, Function<E, V> payloadFunc, @Nullable A receiverArg, @Nullable Flow.Subscriber<R> resultSubscriber, @Nullable DataStreamerOptions options) {
        Objects.requireNonNull(publisher);
        Objects.requireNonNull(keyFunc);
        Objects.requireNonNull(payloadFunc);
        Objects.requireNonNull(receiver);
        KeyValueTupleStreamerPartitionAwarenessProvider partitioner = new KeyValueTupleStreamerPartitionAwarenessProvider(this.rowConverter.registry(), this.tbl.partitions());
        StreamerBatchSender batchSender = (partitionIndex, rows, deleted) -> PublicApiThreading.execUserAsyncOperation(() -> this.tbl.partitionLocation((int)partitionIndex).thenCompose(node -> this.tbl.streamerReceiverRunner().runReceiverAsync(receiver, receiverArg, rows, (InternalClusterNode)node, receiver.units())));
        CompletableFuture<Void> future = DataStreamer.streamData(publisher, keyFunc, payloadFunc, x -> false, options, batchSender, resultSubscriber, partitioner, this.tbl.streamerFlushExecutor());
        return IgniteExceptionMapperUtil.convertToPublicFuture(future);
    }

    private List<BinaryRowEx> marshalPairs(Collection<Map.Entry<Tuple, Tuple>> pairs, int schemaVersion, @Nullable BitSet deleted) {
        ArrayList<BinaryRowEx> rows = new ArrayList<BinaryRowEx>(pairs.size());
        for (Map.Entry<Tuple, Tuple> pair : pairs) {
            boolean isDeleted = deleted != null && deleted.get(rows.size());
            Tuple key = Objects.requireNonNull(pair.getKey(), "key");
            Tuple val = isDeleted ? null : Objects.requireNonNull(pair.getValue(), "val");
            Row row = this.marshal(key, val, schemaVersion);
            rows.add(row);
        }
        return rows;
    }

    @Override
    protected Function<SqlRow, Map.Entry<Tuple, Tuple>> queryMapper(ResultSetMetadata meta, SchemaDescriptor schema) {
        return row -> new IgniteBiTuple<SqlRowProjection, SqlRowProjection>(new SqlRowProjection((SqlRow)row, meta, KeyValueBinaryViewImpl.columnNames(schema.keyColumns())), new SqlRowProjection((SqlRow)row, meta, KeyValueBinaryViewImpl.columnNames(schema.valueColumns())));
    }

    @Override
    public void queryContinuously(Flow.Subscriber<TableRowEventBatch<Map.Entry<Tuple, Tuple>>> subscriber, @Nullable ContinuousQueryOptions options) {
        Objects.requireNonNull(subscriber);
        BiFunction<BinaryRow, SchemaDescriptor, Map.Entry> mapper = (binaryRow, schema) -> {
            Row row = this.rowConverter.resolveRow((BinaryRow)binaryRow, binaryRow.schemaVersion());
            Tuple key = TableRow.keyTuple(row);
            Tuple val = TableRow.valueTuple(row);
            return new IgniteBiTuple<Tuple, Tuple>(key, val);
        };
        this.tbl.queryContinuously(subscriber, options, mapper);
    }

    @Override
    public Class<Tuple> valueType() {
        return Tuple.class;
    }

    @Override
    public CompletableFuture<NearCacheKey<Tuple, BinaryRowEx>> getNearCacheKeyAsync(Tuple key, boolean enforceKeyOnly) {
        return this.doOperation(null, schemaVersion -> {
            BinaryRowEx keyRow;
            SchemaDescriptor schemaDscr = this.rowConverter.registry().lastKnownSchema();
            List<Column> keyColumns = schemaDscr.keyColumns();
            if (!enforceKeyOnly) {
                Tuple keyRec;
                if (key.columnCount() == keyColumns.size()) {
                    keyRec = key;
                } else {
                    ArrayList<String> keyColumnNames = new ArrayList<String>(keyColumns.size());
                    for (Column c : keyColumns) {
                        keyColumnNames.add(c.name());
                    }
                    keyRec = TupleHelper.copyColumns(key, keyColumnNames);
                }
                keyRow = this.marshalKeys(Collections.singletonList(keyRec), schemaVersion).get(0);
            } else {
                keyRow = this.marshalKeys(Collections.singletonList(key), schemaVersion).get(0);
            }
            return CompletableFuture.completedFuture(new EmbeddedNearCacheKeyImpl<Tuple>(key, keyRow));
        });
    }

    @Override
    public CompletableFuture<Collection<NearCacheKey<Tuple, BinaryRowEx>>> getNearCacheKeysAsync(Iterable<? extends Tuple> keys, boolean enforceKeyOnly) {
        return this.doOperation(null, schemaVersion -> {
            ArrayList<EmbeddedNearCacheKeyImpl<Tuple>> cacheKeys = new ArrayList<EmbeddedNearCacheKeyImpl<Tuple>>();
            SchemaDescriptor schemaDscr = this.rowConverter.registry().lastKnownSchema();
            List<Column> keyColumns = schemaDscr.keyColumns();
            for (Tuple rec : keys) {
                Tuple keyRec;
                if (!enforceKeyOnly && rec.columnCount() != keyColumns.size()) {
                    ArrayList<String> keyColumnNames = new ArrayList<String>(keyColumns.size());
                    for (Column c : keyColumns) {
                        keyColumnNames.add(c.name());
                    }
                    keyRec = TupleHelper.copyColumns(rec, keyColumnNames);
                } else {
                    keyRec = rec;
                }
                BinaryRowEx keyRow = this.marshalKeys(Collections.singletonList(keyRec), schemaVersion).get(0);
                EmbeddedNearCacheKeyImpl<Tuple> cacheKey = new EmbeddedNearCacheKeyImpl<Tuple>(rec, keyRow);
                cacheKeys.add(cacheKey);
            }
            return CompletableFuture.completedFuture(cacheKeys);
        });
    }

    @Override
    public CompletableFuture<Pair<NearCacheKey<Tuple, BinaryRowEx>, NearCacheValue<Tuple>>> getNearCacheValueAsync(@Nullable Transaction tx, NearCacheKey<Tuple, BinaryRowEx> key) {
        return this.doOperation(tx, schemaVersion -> {
            BinaryRowEx keyRow = (BinaryRowEx)key.serializedKey();
            if (keyRow == null || schemaVersion != keyRow.schemaVersion()) {
                keyRow = this.marshalKeys(Collections.singletonList((Tuple)key.key()), schemaVersion).get(0);
            }
            EmbeddedNearCacheKeyImpl<Tuple> finalCacheKey = new EmbeddedNearCacheKeyImpl<Tuple>((Tuple)key.key(), keyRow);
            return this.tbl.get(keyRow, (InternalTransaction)tx).thenApply(binaryRow -> {
                NearCacheValue<Object> cacheVal;
                if (binaryRow == null) {
                    cacheVal = new NearCacheValue();
                } else {
                    @Nullable Tuple val = this.unmarshalValue((BinaryRow)binaryRow, schemaVersion);
                    cacheVal = new NearCacheValue<Tuple>(val);
                }
                return new Pair(finalCacheKey, cacheVal);
            });
        });
    }

    @Override
    public CompletableFuture<Map<NearCacheKey<Tuple, BinaryRowEx>, NearCacheValue<Tuple>>> getNearCacheValuesAsync(@Nullable Transaction tx, Collection<NearCacheKey<Tuple, BinaryRowEx>> nearCacheKeys) {
        return this.doOperation(tx, schemaVersion -> {
            ArrayList<EmbeddedNearCacheKeyImpl<Tuple>> ncKeys = new ArrayList<EmbeddedNearCacheKeyImpl<Tuple>>(nearCacheKeys.size());
            ArrayList<BinaryRowEx> keyRows = new ArrayList<BinaryRowEx>(nearCacheKeys.size());
            for (NearCacheKey nearCacheKey : nearCacheKeys) {
                BinaryRowEx keyRow = (BinaryRowEx)nearCacheKey.serializedKey();
                if (keyRow == null || schemaVersion != keyRow.schemaVersion()) {
                    keyRow = this.marshalKeys(Collections.singletonList((Tuple)nearCacheKey.key()), schemaVersion).get(0);
                }
                keyRows.add(keyRow);
                ncKeys.add(new EmbeddedNearCacheKeyImpl<Tuple>((Tuple)nearCacheKey.key(), keyRow));
            }
            return this.tbl.getAll(keyRows, (InternalTransaction)tx).thenApply(rows -> {
                assert (rows.size() == keyRows.size());
                HashMap ret = new HashMap(rows.size());
                for (int i = 0; i < rows.size(); ++i) {
                    NearCacheValue<Object> cacheVal;
                    BinaryRow binaryRow = (BinaryRow)rows.get(i);
                    if (binaryRow == null) {
                        cacheVal = new NearCacheValue();
                    } else {
                        @Nullable Tuple val = this.unmarshalValue(binaryRow, schemaVersion);
                        cacheVal = new NearCacheValue<Tuple>(val);
                    }
                    NearCacheKey nearCacheKey = (NearCacheKey)ncKeys.get(i);
                    ret.put(nearCacheKey, cacheVal);
                }
                return ret;
            });
        });
    }

    @Override
    public void subscribeToNearCacheUpdates(Flow.Subscriber<TableRowEventBatch<Map.Entry<Tuple, Tuple>>> subscriber, int updateInterval) {
        ContinuousQueryOptions opts = ContinuousQueryOptions.builder().pollIntervalMs(updateInterval).eventTypes(TableRowEventType.CREATED, TableRowEventType.UPDATED, TableRowEventType.REMOVED).build();
        this.queryContinuously((Flow.Subscriber<TableRowEventBatch<Map.Entry<Tuple, Tuple>>>)new NearCacheSchemaVersionAwareSubscriber<Map.Entry<Tuple, Tuple>>(subscriber, () -> this.doOperation(null, CompletableFuture::completedFuture)), opts);
    }

    @Override
    public void close() throws Exception {
    }
}

