/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.table.distributed.storage;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.continuousquery.ContinuousQuery;
import org.apache.ignite.internal.continuousquery.ContinuousQueryMetricSink;
import org.apache.ignite.internal.continuousquery.ContinuousQueryRequest;
import org.apache.ignite.internal.continuousquery.ContinuousQueryRequestSender;
import org.apache.ignite.internal.continuousquery.ContinuousQueryScanResult;
import org.apache.ignite.internal.continuousquery.ContinuousQueryScanResultWithSchema;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.IgniteTriFunction;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.UnresolvableConsistentIdException;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.UserDetailsMessage;
import org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
import org.apache.ignite.internal.partition.replicator.network.replication.ContinuousQueryScanRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.DcrWriteMultiRowReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyIntervalScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyRowVersionsScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlySingleRowPkReplicaRequestV2;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyStorageOperationReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteMultiRowPkReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteMultiRowReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
import org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTimedRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.row.TimedRow;
import org.apache.ignite.internal.storage.BinaryRowAndRowId;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.TimedBinaryRowAndRowId;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.operation.StorageOptimizedOperation;
import org.apache.ignite.internal.storage.secondary.SecondaryTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.distributed.TableUtils;
import org.apache.ignite.internal.table.distributed.storage.ContinuousQueryResponseHandler;
import org.apache.ignite.internal.table.distributed.storage.PartitionScanPublisher;
import org.apache.ignite.internal.table.distributed.storage.RowBatch;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.FastTimestamps;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.table.ContinuousQueryOptions;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.QualifiedNameHelper;
import org.apache.ignite.table.TableRowEventBatch;
import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.TransactionException;
import org.gridgain.internal.license.LicenseFeature;
import org.gridgain.internal.license.LicenseFeatureChecker;
import org.gridgain.internal.security.context.SecurityContext;
import org.gridgain.internal.security.context.SecurityContextHolder;
import org.gridgain.lang.GridgainErrorGroups;
import org.jetbrains.annotations.Nullable;

public class InternalTableImpl
implements InternalTable {
    public static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 30;
    private static final int PRIMARY_REPLICA_MISS_RETRY_COUNT = 16;
    private static final ReadWriteInflightBatchRequestTracker READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER = new ReadWriteInflightBatchRequestTracker();
    private static final PartitionReplicationMessagesFactory TABLE_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private final int partitions;
    private final Supplier<ScheduledExecutorService> streamerFlushExecutor;
    private final ContinuousQueryResponseHandler continuousQueryResponseHandler;
    private final StreamerReceiverRunner streamerReceiverRunner;
    private volatile QualifiedName tableName;
    private final int tableId;
    private final int zoneId;
    @Nullable
    private volatile Integer secondaryZoneId;
    private final ClusterNodeResolver clusterNodeResolver;
    protected final TxManager txManager;
    private final TransactionInflights transactionInflights;
    private final MvTableStorage tableStorage;
    private final TxStateStorage txStateStorage;
    private final ReplicaService replicaSvc;
    private final Object updatePartitionMapsMux = new Object();
    private final Object updateSecondaryPartitionMapsMux = new Object();
    private final ClockService clockService;
    private final HybridTimestampTracker observableTimestampTracker;
    private final PlacementDriver placementDriver;
    private volatile Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp, Void>> safeTimeTrackerByPartitionId = Int2ObjectMaps.emptyMap();
    private volatile Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp, Void>> safeTimeTrackerBySecondaryPartitionId = Int2ObjectMaps.emptyMap();
    private volatile Int2ObjectMap<PendingComparableValuesTracker<Long, Void>> storageIndexTrackerByPartitionId = Int2ObjectMaps.emptyMap();
    private final Supplier<Long> defaultRwTxTimeout;
    private final Supplier<Long> defaultReadTxTimeout;
    private final boolean colocationEnabled;
    private final TableMetricSource metrics;
    @Nullable
    private volatile SecondaryTableStorage secondaryTableStorage;
    private final SchemaRegistry schemaRegistry;
    private final boolean cache;
    private final LicenseFeatureChecker licenseFeatureChecker;

    public InternalTableImpl(QualifiedName tableName, int zoneId, int tableId, @Nullable Integer secondaryZoneId, int partitions, ClusterNodeResolver clusterNodeResolver, TxManager txManager, MvTableStorage tableStorage, TxStateStorage txStateStorage, ReplicaService replicaSvc, ClockService clockService, HybridTimestampTracker observableTimestampTracker, PlacementDriver placementDriver, TransactionInflights transactionInflights, Supplier<ScheduledExecutorService> streamerFlushExecutor, StreamerReceiverRunner streamerReceiverRunner, @Nullable SecondaryTableStorage secondaryTableStorage, SchemaRegistry schemaRegistry, ContinuousQueryResponseHandler continuousQueryResponseHandler, boolean cache, LicenseFeatureChecker licenseFeatureChecker, Supplier<Long> defaultRwTxTimeout, Supplier<Long> defaultReadTxTimeout, boolean colocationEnabled, TableMetricSource metrics, ClusterManagementGroupManager cmgManager) {
        this.tableName = tableName;
        this.zoneId = zoneId;
        this.tableId = tableId;
        this.secondaryZoneId = secondaryZoneId;
        this.partitions = partitions;
        this.clusterNodeResolver = clusterNodeResolver;
        this.txManager = txManager;
        this.tableStorage = tableStorage;
        this.txStateStorage = txStateStorage;
        this.replicaSvc = replicaSvc;
        this.clockService = clockService;
        this.observableTimestampTracker = observableTimestampTracker;
        this.placementDriver = placementDriver;
        this.transactionInflights = transactionInflights;
        this.streamerFlushExecutor = streamerFlushExecutor;
        this.secondaryTableStorage = secondaryTableStorage;
        this.schemaRegistry = schemaRegistry;
        this.continuousQueryResponseHandler = continuousQueryResponseHandler;
        this.cache = cache;
        this.streamerReceiverRunner = streamerReceiverRunner;
        this.defaultRwTxTimeout = defaultRwTxTimeout;
        this.defaultReadTxTimeout = defaultReadTxTimeout;
        this.licenseFeatureChecker = licenseFeatureChecker;
        this.colocationEnabled = colocationEnabled;
        this.metrics = metrics;
    }

    @Override
    public MvTableStorage storage() {
        return this.tableStorage;
    }

    @Override
    @Nullable
    public SecondaryTableStorage secondaryStorage() {
        return this.secondaryTableStorage;
    }

    @Override
    public void setSecondaryStorage(SecondaryTableStorage secondaryTableStorage) {
        this.secondaryZoneId = secondaryTableStorage.zoneId();
        this.secondaryTableStorage = secondaryTableStorage;
    }

    @Override
    public void setSecondaryZoneId(int zoneId) {
        this.secondaryZoneId = zoneId;
    }

    @Override
    public void removeSecondaryStorage() {
        this.secondaryZoneId = null;
        this.secondaryTableStorage = null;
    }

    @Override
    public boolean hasSecondaryStorage() {
        return this.secondaryZoneId != null;
    }

    @Override
    public int partitions() {
        return this.partitions;
    }

    @Override
    public int tableId() {
        return this.tableId;
    }

    @Override
    public int zoneId() {
        return this.zoneId;
    }

    @Override
    @Nullable
    public Integer secondaryZoneId() {
        return this.secondaryZoneId;
    }

    @Override
    public QualifiedName name() {
        return this.tableName;
    }

    @Override
    public synchronized void name(String newName) {
        this.tableName = QualifiedNameHelper.fromNormalized((String)this.tableName.schemaName(), (String)newName);
    }

    private <R> CompletableFuture<R> enlistInTx(BinaryRowEx row, @Nullable InternalTransaction tx, IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest> fac, BiPredicate<R, ReplicaRequest> noWriteChecker) {
        return this.enlistInTx(row, tx, fac, noWriteChecker, null);
    }

    private <R> CompletableFuture<R> enlistInTx(BinaryRowEx row, @Nullable InternalTransaction tx, IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest> fac, BiPredicate<R, ReplicaRequest> noWriteChecker, @Nullable Long txStartTs) {
        CompletableFuture<R> fut;
        int partId;
        ReplicationGroupId partGroupId;
        InternalTransaction actualTx;
        PendingTxPartitionEnlistment enlistment;
        if (tx != null) {
            if (tx.isReadOnly()) {
                return CompletableFuture.failedFuture((Throwable)new TransactionException(ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, "Failed to enlist read-write operation into read-only transaction txId={" + tx.id() + "}"));
            }
            if (tx.external() != this.cache()) {
                return CompletableFuture.failedFuture((Throwable)new TransactionException(GridgainErrorGroups.Cache.TX_INCOMPATIBLE_OPERATION_ERR, IgniteStringFormatter.format((String)"Requested operation is incompatible with cache or table type [txId={}, cache={}]", (Object[])new Object[]{tx.id(), this.cache()})));
            }
        }
        if ((enlistment = (actualTx = this.startImplicitRwTxIfNeeded(tx)).enlistedPartition(partGroupId = this.targetReplicationGroupId(partId = this.partitionId(row)))) != null) {
            assert (!actualTx.implicit());
            fut = this.trackingInvoke(actualTx, partId, enlistmentConsistencyToken -> (ReplicaRequest)fac.apply((Object)actualTx, (Object)partGroupId, enlistmentConsistencyToken), false, enlistment, noWriteChecker, this.txManager.lockRetryCount());
        } else {
            fut = this.enlistAndInvoke(actualTx, partId, enlistmentConsistencyToken -> (ReplicaRequest)fac.apply((Object)actualTx, (Object)partGroupId, enlistmentConsistencyToken), actualTx.implicit(), noWriteChecker);
        }
        return ((CompletableFuture)InternalTableImpl.postEnlist(fut, false, actualTx, actualTx.implicit()).handle((r, e) -> {
            if (e != null) {
                if (actualTx.implicit()) {
                    long ts;
                    long timeout = actualTx.getTimeout();
                    long l = ts = txStartTs == null ? actualTx.schemaTimestamp().getPhysical() : txStartTs.longValue();
                    if (InternalTableImpl.canRetry(e, ts, timeout)) {
                        return this.enlistInTx(row, null, fac, noWriteChecker, ts);
                    }
                }
                ExceptionUtils.sneakyThrow((Throwable)e);
            }
            return CompletableFuture.completedFuture(r);
        })).thenCompose(Function.identity());
    }

    private <T> CompletableFuture<T> enlistInTx(Collection<BinaryRowEx> keyRows, @Nullable InternalTransaction tx, ReplicaRequestFactory fac, Function<Collection<RowBatch>, CompletableFuture<T>> reducer, BiPredicate<T, ReplicaRequest> noOpChecker) {
        return this.enlistInTx(keyRows, tx, fac, reducer, noOpChecker, null);
    }

    private <T> CompletableFuture<T> enlistInTx(Collection<BinaryRowEx> keyRows, @Nullable InternalTransaction tx, ReplicaRequestFactory fac, Function<Collection<RowBatch>, CompletableFuture<T>> reducer, BiPredicate<T, ReplicaRequest> noOpChecker, @Nullable Long txStartTs) {
        if (tx != null) {
            if (tx.isReadOnly()) {
                return CompletableFuture.failedFuture((Throwable)new TransactionException(ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, "Failed to enlist read-write operation into read-only transaction txId={" + tx.id() + "}"));
            }
            if (tx.external() != this.cache()) {
                return CompletableFuture.failedFuture((Throwable)new TransactionException(GridgainErrorGroups.Cache.TX_INCOMPATIBLE_OPERATION_ERR, "Requested operation is incompatible with cache or table type txId={" + tx.id() + "}"));
            }
        }
        InternalTransaction actualTx = this.startImplicitRwTxIfNeeded(tx);
        Int2ObjectMap<RowBatch> rowBatchByPartitionId = this.toRowBatchByPartitionId(keyRows);
        boolean singlePart = rowBatchByPartitionId.size() == 1;
        boolean full = actualTx.implicit() && singlePart;
        for (Int2ObjectMap.Entry partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
            CompletableFuture<T> fut;
            int partitionId = partitionRowBatch.getIntKey();
            RowBatch rowBatch = (RowBatch)partitionRowBatch.getValue();
            ReplicationGroupId replicationGroupId = this.targetReplicationGroupId(partitionId);
            PendingTxPartitionEnlistment enlistment = actualTx.enlistedPartition(replicationGroupId);
            if (enlistment != null) {
                assert (!actualTx.implicit());
                fut = this.trackingInvoke(actualTx, partitionId, enlistmentConsistencyToken -> fac.create((Collection<? extends BinaryRow>)rowBatch.requestedRows, actualTx, replicationGroupId, (Long)enlistmentConsistencyToken, false), false, enlistment, noOpChecker, this.txManager.lockRetryCount());
            } else {
                fut = this.enlistAndInvoke(actualTx, partitionId, enlistmentConsistencyToken -> fac.create((Collection<? extends BinaryRow>)rowBatch.requestedRows, actualTx, replicationGroupId, (Long)enlistmentConsistencyToken, full), full, noOpChecker);
            }
            rowBatch.resultFuture = fut;
        }
        CompletableFuture<T> fut = reducer.apply((Collection<RowBatch>)rowBatchByPartitionId.values());
        return ((CompletableFuture)InternalTableImpl.postEnlist(fut, actualTx.implicit() && !singlePart, actualTx, full).handle((r, e) -> {
            if (e != null) {
                if (actualTx.implicit()) {
                    long ts;
                    long timeout = actualTx.getTimeout();
                    long l = ts = txStartTs == null ? actualTx.schemaTimestamp().getPhysical() : txStartTs.longValue();
                    if (InternalTableImpl.canRetry(e, ts, timeout)) {
                        return this.enlistInTx(keyRows, null, fac, reducer, noOpChecker, ts);
                    }
                }
                ExceptionUtils.sneakyThrow((Throwable)e);
            }
            return CompletableFuture.completedFuture(r);
        })).thenCompose(Function.identity());
    }

    private InternalTransaction startImplicitRwTxIfNeeded(@Nullable InternalTransaction tx) {
        if (tx == null) {
            return this.cache() ? this.txManager.beginExternal(this.observableTimestampTracker, true) : this.txManager.beginImplicitRw(this.observableTimestampTracker);
        }
        if (this.cache() && tx.implicit() && !tx.external()) {
            return this.txManager.castToExternal(tx);
        }
        return tx;
    }

    private InternalTransaction startImplicitRoTxIfNeeded(@Nullable InternalTransaction tx) {
        return tx == null ? this.txManager.beginImplicitRo(this.observableTimestampTracker) : tx;
    }

    private CompletableFuture<Collection<BinaryRow>> enlistCursorInTx(InternalTransaction tx, int partId, long scanId, int batchSize, @Nullable Integer indexId, @Nullable BinaryTuple exactKey, @Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags, @Nullable BitSet columnsToInclude) {
        CompletableFuture fut;
        ReplicationGroupId replicationGroupId = this.targetReplicationGroupId(partId);
        PendingTxPartitionEnlistment enlistment = tx.enlistedPartition(replicationGroupId);
        Function<Long, ReplicaRequest> mapFunc = enlistmentConsistencyToken -> TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).timestamp(tx.schemaTimestamp()).transactionId(tx.id()).scanId(scanId).indexToUse(indexId).exactKey(InternalTableImpl.binaryTupleMessage((BinaryTupleReader)exactKey)).lowerBoundPrefix(InternalTableImpl.binaryTupleMessage((BinaryTupleReader)lowerBound)).upperBoundPrefix(InternalTableImpl.binaryTupleMessage((BinaryTupleReader)upperBound)).flags(flags).columnsToInclude(columnsToInclude).full(tx.implicit()).batchSize(batchSize).enlistmentConsistencyToken(enlistmentConsistencyToken).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(tx.commitPartition())).coordinatorId(tx.coordinatorId()).build();
        if (enlistment != null) {
            enlistment.addTableId(this.tableId);
            fut = this.replicaSvc.invoke(enlistment.primaryNodeConsistentId(), mapFunc.apply(enlistment.consistencyToken()));
        } else {
            fut = this.enlistAndInvoke(tx, partId, mapFunc, false, null);
        }
        return InternalTableImpl.postEnlist(fut, false, tx, false);
    }

    @Nullable
    private static BinaryTupleMessage binaryTupleMessage(@Nullable BinaryTupleReader binaryTuple) {
        if (binaryTuple == null) {
            return null;
        }
        return TABLE_MESSAGES_FACTORY.binaryTupleMessage().tuple(binaryTuple.byteBuffer()).elementCount(binaryTuple.elementCount()).build();
    }

    private static boolean canRetry(Throwable e, long ts, long timeout) {
        return InternalTableImpl.exceptionAllowsImplicitTxRetry(e) && FastTimestamps.coarseCurrentTimeMillis() - ts < timeout;
    }

    private <R> CompletableFuture<R> enlistAndInvoke(InternalTransaction tx, int partId, Function<Long, ReplicaRequest> mapFunc, boolean full, @Nullable BiPredicate<R, ReplicaRequest> noWriteChecker) {
        return this.enlist(partId, tx).thenCompose(enlistment -> this.trackingInvoke(tx, partId, mapFunc, full, (PendingTxPartitionEnlistment)enlistment, noWriteChecker, this.txManager.lockRetryCount()));
    }

    private <R> CompletableFuture<R> trackingInvoke(InternalTransaction tx, int partId, Function<Long, ReplicaRequest> mapFunc, boolean full, PendingTxPartitionEnlistment enlistment, @Nullable BiPredicate<R, ReplicaRequest> noWriteChecker, int retryOnLockConflict) {
        assert (!tx.isReadOnly()) : IgniteStringFormatter.format((String)"Tracking invoke is available only for read-write transactions [tx={}].", (Object[])new Object[]{tx});
        enlistment.addTableId(this.tableId);
        ReplicaRequest request = mapFunc.apply(enlistment.consistencyToken());
        if (full) {
            return this.replicaSvc.invokeRaw(enlistment.primaryNodeConsistentId(), request).handle((r, e) -> {
                boolean hasError;
                boolean bl = hasError = e != null;
                assert (hasError || r instanceof TimestampAware);
                tx.finish(!hasError, hasError ? null : ((TimestampAware)r).timestamp(), true, false);
                if (e != null) {
                    ExceptionUtils.sneakyThrow((Throwable)e);
                }
                return r.result();
            });
        }
        ReadWriteReplicaRequest req = (ReadWriteReplicaRequest)request;
        if (req.isWrite()) {
            if (!tx.remote() && !this.transactionInflights.addInflight(tx.id())) {
                int code = ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
                if (tx.isRolledBackWithTimeoutExceeded()) {
                    code = ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
                }
                return CompletableFuture.failedFuture((Throwable)new TransactionException(code, IgniteStringFormatter.format((String)"Transaction is already finished [tableName={}, partId={}, txState={}, timeoutExceeded={}].", (Object[])new Object[]{this.tableName, partId, tx.state(), tx.isRolledBackWithTimeoutExceeded()})));
            }
            return ((CompletableFuture)((CompletableFuture)this.replicaSvc.invoke(enlistment.primaryNodeConsistentId(), request).thenApply(res -> {
                assert (noWriteChecker != null);
                if (noWriteChecker.test(res, request)) {
                    if (!tx.remote()) {
                        this.transactionInflights.removeInflight(tx.id());
                    } else {
                        tx.kill();
                    }
                }
                return res;
            })).handle((r, e) -> {
                if (e != null) {
                    if (retryOnLockConflict > 0 && ExceptionUtils.matchAny((Throwable)ExceptionUtils.unwrapCause((Throwable)e), (int)ErrorGroups.Transactions.ACQUIRE_LOCK_ERR, (int[])new int[0])) {
                        if (!tx.remote()) {
                            this.transactionInflights.removeInflight(tx.id());
                        }
                        return this.trackingInvoke(tx, partId, ignored -> request, false, enlistment, noWriteChecker, retryOnLockConflict - 1);
                    }
                    ExceptionUtils.sneakyThrow((Throwable)e);
                }
                return CompletableFuture.completedFuture(r);
            })).thenCompose(Function.identity());
        }
        return ((CompletableFuture)this.replicaSvc.invoke(enlistment.primaryNodeConsistentId(), request).handle((r, e) -> {
            if (e != null) {
                if (retryOnLockConflict > 0 && ExceptionUtils.matchAny((Throwable)ExceptionUtils.unwrapCause((Throwable)e), (int)ErrorGroups.Transactions.ACQUIRE_LOCK_ERR, (int[])new int[0])) {
                    return this.trackingInvoke(tx, partId, ignored -> request, false, enlistment, noWriteChecker, retryOnLockConflict - 1);
                }
                ExceptionUtils.sneakyThrow((Throwable)e);
            }
            return CompletableFuture.completedFuture(r);
        })).thenCompose(Function.identity());
    }

    private static <T> CompletableFuture<T> postEnlist(CompletableFuture<T> fut, boolean autoCommit, InternalTransaction tx0, boolean full) {
        assert (!autoCommit || !full) : "Invalid combination of flags";
        return ((CompletableFuture)fut.handle((r, e) -> {
            if (full || tx0.remote()) {
                return e != null ? CompletableFuture.failedFuture(e) : CompletableFuture.completedFuture(r);
            }
            if (e != null) {
                CompletableFuture rollbackFuture = InternalTableImpl.isFinishedDueToTimeout(e) ? tx0.rollbackTimeoutExceededAsync() : tx0.rollbackAsync();
                return rollbackFuture.handle((ignored, err) -> {
                    if (err != null) {
                        e.addSuppressed((Throwable)err);
                    }
                    ExceptionUtils.sneakyThrow((Throwable)e);
                    return null;
                });
            }
            if (autoCommit) {
                return tx0.commitAsync().thenApply(ignored -> r);
            }
            return CompletableFuture.completedFuture(r);
        })).thenCompose(Function.identity());
    }

    private <R> CompletableFuture<R> evaluateReadOnlyPrimaryNode(@Nullable InternalTransaction tx, BinaryRowEx row, BiFunction<ReplicationGroupId, Long, ReplicaRequest> op) {
        InternalTransaction actualTx = this.startImplicitRoTxIfNeeded(tx);
        int partId = this.partitionId(row);
        ReplicationGroupId replicationGroupId = this.targetReplicationGroupId(partId);
        return this.sendReadOnlyToPrimaryReplica(actualTx, replicationGroupId, op);
    }

    private <R> CompletableFuture<R> evaluateReadOnlyPrimaryNode(Collection<BinaryRowEx> rows, BiFunction<ReplicationGroupId, Long, ReplicaRequest> op) {
        InternalTransaction actualTx = this.txManager.beginImplicitRo(this.observableTimestampTracker);
        int partId = this.partitionId(rows.iterator().next());
        ReplicationGroupId replicationGroupId = this.targetReplicationGroupId(partId);
        return this.sendReadOnlyToPrimaryReplica(actualTx, replicationGroupId, op);
    }

    private <R> CompletableFuture<R> sendReadOnlyToPrimaryReplica(InternalTransaction tx, ReplicationGroupId replicationGroupId, BiFunction<ReplicationGroupId, Long, ReplicaRequest> op) {
        CompletionStage fut;
        ReplicaMeta meta = this.placementDriver.getCurrentPrimaryReplica(replicationGroupId, tx.schemaTimestamp());
        Function<ReplicaMeta, CompletableFuture> evaluateClo = primaryReplica -> {
            try {
                InternalClusterNode node = this.getClusterNode((ReplicaMeta)primaryReplica);
                return this.replicaSvc.invoke(node, (ReplicaRequest)op.apply(replicationGroupId, InternalTableImpl.enlistmentConsistencyToken(primaryReplica)));
            }
            catch (Throwable e) {
                String canonicalName = this.tableName.toCanonicalForm();
                throw new TransactionException(ErrorGroups.Common.INTERNAL_ERR, IgniteStringFormatter.format((String)"Failed to invoke the replica request [tableName={}, grp={}].", (Object[])new Object[]{canonicalName, replicationGroupId}), e);
            }
        };
        if (meta != null && this.clusterNodeResolver.getById(meta.getLeaseholderId()) != null) {
            try {
                fut = evaluateClo.apply(meta);
            }
            catch (IgniteException e) {
                return CompletableFuture.failedFuture(e);
            }
        } else {
            fut = this.awaitPrimaryReplica(replicationGroupId, tx.schemaTimestamp()).thenCompose(evaluateClo);
        }
        return this.postEvaluate((CompletableFuture<R>)fut, tx);
    }

    private <R> CompletableFuture<R> postEvaluate(CompletableFuture<R> fut, InternalTransaction tx) {
        return ((CompletableFuture)fut.handle((r, e) -> {
            if (e != null) {
                return tx.finish(false, this.clockService.current(), false, false).handle((ignored, err) -> {
                    if (err != null) {
                        e.addSuppressed((Throwable)err);
                    }
                    ExceptionUtils.sneakyThrow((Throwable)e);
                    return null;
                });
            }
            return tx.finish(true, this.clockService.current(), false, false).thenApply(ignored -> r);
        })).thenCompose(Function.identity());
    }

    @Override
    public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable InternalTransaction tx) {
        InternalTableImpl.checkTransactionFinishStarted(tx);
        if (TableUtils.isDirectFlowApplicable(tx)) {
            SecurityContext context = SecurityContextHolder.getOrNull();
            UserDetailsMessage userDetails = InternalTableImpl.isValidContext(context) ? TABLE_MESSAGES_FACTORY.userDetailsMessage().userName(context.authentication().username()).userRoles(context.authentication().roles()).build() : null;
            return this.evaluateReadOnlyPrimaryNode(tx, keyRow, (groupId, consistencyToken) -> TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequestV2().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).enlistmentConsistencyToken(consistencyToken).schemaVersion(keyRow.schemaVersion()).primaryKey(keyRow.tupleSlice()).requestType(RequestType.RO_GET).userDetails(userDetails).build());
        }
        if (tx.isReadOnly()) {
            return this.evaluateReadOnlyRecipientNode(this.partitionId(keyRow), tx.readTimestamp()).thenCompose(recipientNode -> this.get(keyRow, tx.readTimestamp(), tx.id(), tx.coordinatorId(), (InternalClusterNode)recipientNode));
        }
        return this.enlistInTx(keyRow, tx, (IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest>)((IgniteTriFunction)(txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).schemaVersion(keyRow.schemaVersion()).primaryKey(keyRow.tupleSlice()).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(txo.commitPartition())).transactionId(txo.id()).enlistmentConsistencyToken(enlistmentConsistencyToken).requestType(RequestType.RW_GET).timestamp(txo.schemaTimestamp()).full(false).coordinatorId(txo.coordinatorId()).build()), (res, req) -> false);
    }

    @Override
    public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, HybridTimestamp readTimestamp, @Nullable UUID transactionId, @Nullable UUID coordinatorId, InternalClusterNode recipientNode) {
        return this.get(keyRow, readTimestamp, transactionId, coordinatorId, recipientNode, true);
    }

    @Override
    public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, HybridTimestamp readTimestamp, @Nullable UUID transactionId, @Nullable UUID coordinatorId, InternalClusterNode recipientNode, boolean usePrimary) {
        int partId = this.partitionId(keyRow);
        ReplicationGroupId replicationGroupId = usePrimary ? this.targetReplicationGroupId(partId) : this.targetSecondaryReplicationGroupId(partId);
        SecurityContext context = SecurityContextHolder.getOrNull();
        UserDetailsMessage userDetails = InternalTableImpl.isValidContext(context) ? TABLE_MESSAGES_FACTORY.userDetailsMessage().userName(context.authentication().username()).userRoles(context.authentication().roles()).build() : null;
        ReadOnlySingleRowPkReplicaRequestV2 request = TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequestV2().groupId(InternalTableImpl.serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).schemaVersion(keyRow.schemaVersion()).primaryKey(keyRow.tupleSlice()).requestType(RequestType.RO_GET).usePrimary(usePrimary).readTimestamp(readTimestamp).transactionId(transactionId).coordinatorId(coordinatorId).userDetails(userDetails).build();
        return this.replicaSvc.invoke(recipientNode, (ReplicaRequest)request);
    }

    private boolean isSinglePartitionBatch(Collection<BinaryRowEx> rows) {
        Iterator<BinaryRowEx> rowIterator = rows.iterator();
        int partId = this.partitionId(rowIterator.next());
        while (rowIterator.hasNext()) {
            BinaryRowEx row = rowIterator.next();
            if (partId == this.partitionId(row)) continue;
            return false;
        }
        return true;
    }

    @Override
    public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> keyRows, InternalTransaction tx) {
        InternalTableImpl.checkTransactionFinishStarted(tx);
        if (CollectionUtils.nullOrEmpty(keyRows)) {
            return CompletableFutures.emptyListCompletedFuture();
        }
        if (tx == null && this.isSinglePartitionBatch(keyRows)) {
            return this.evaluateReadOnlyPrimaryNode(keyRows, (groupId, consistencyToken) -> TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).enlistmentConsistencyToken(consistencyToken).schemaVersion(((BinaryRowEx)keyRows.iterator().next()).schemaVersion()).primaryKeys(InternalTableImpl.serializeBinaryTuples(keyRows)).requestType(RequestType.RO_GET_ALL).build());
        }
        if (tx != null && tx.isReadOnly()) {
            assert (!tx.implicit()) : "implicit RO getAll not supported";
            BinaryRowEx firstRow = keyRows.iterator().next();
            return this.evaluateReadOnlyRecipientNode(this.partitionId(firstRow), tx.readTimestamp()).thenCompose(recipientNode -> this.getAll(keyRows, tx.readTimestamp(), tx.id(), tx.coordinatorId(), (InternalClusterNode)recipientNode));
        }
        return this.enlistInTx(keyRows, tx, (Collection<? extends BinaryRow> keyRows0, InternalTransaction txo, ReplicationGroupId groupId, Long enlistmentConsistencyToken, Boolean full) -> this.readWriteMultiRowPkReplicaRequest(RequestType.RW_GET_ALL, keyRows0, txo, groupId, enlistmentConsistencyToken, full), InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder, (T res, ReplicaRequest req) -> false);
    }

    @Override
    public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> keyRows, HybridTimestamp readTimestamp, @Nullable UUID transactionId, @Nullable UUID coordinatorId, InternalClusterNode recipientNode) {
        return this.getAll(keyRows, readTimestamp, transactionId, coordinatorId, recipientNode, true);
    }

    @Override
    public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> keyRows, HybridTimestamp readTimestamp, @Nullable UUID transactionId, @Nullable UUID coordinatorId, InternalClusterNode recipientNode, boolean usePrimary) {
        Int2ObjectMap<RowBatch> rowBatchByPartitionId = this.toRowBatchByPartitionId(keyRows);
        for (Int2ObjectMap.Entry partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
            ReplicationGroupId replicationGroupId = usePrimary ? this.targetReplicationGroupId(partitionRowBatch.getIntKey()) : this.targetSecondaryReplicationGroupId(partitionRowBatch.getIntKey());
            ReadOnlyMultiRowPkReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).schemaVersion(((RowBatch)partitionRowBatch.getValue()).requestedRows.get(0).schemaVersion()).primaryKeys(InternalTableImpl.serializeBinaryTuples(((RowBatch)partitionRowBatch.getValue()).requestedRows)).requestType(RequestType.RO_GET_ALL).usePrimary(usePrimary).readTimestamp(readTimestamp).transactionId(transactionId).coordinatorId(coordinatorId).build();
            ((RowBatch)partitionRowBatch.getValue()).resultFuture = this.replicaSvc.invoke(recipientNode, (ReplicaRequest)request);
        }
        return InternalTableImpl.collectMultiRowsResponsesWithRestoreOrder((Collection<RowBatch>)rowBatchByPartitionId.values());
    }

    private ReadWriteMultiRowPkReplicaRequest readWriteMultiRowPkReplicaRequest(RequestType requestType, Collection<? extends BinaryRow> rows, InternalTransaction tx, ReplicationGroupId groupId, Long enlistmentConsistencyToken, boolean full) {
        assert (InternalTableImpl.allSchemaVersionsSame(rows)) : "Different schema versions encountered: " + InternalTableImpl.uniqueSchemaVersions(rows);
        return TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(tx.commitPartition())).schemaVersion(rows.iterator().next().schemaVersion()).primaryKeys(InternalTableImpl.serializeBinaryTuples(rows)).transactionId(tx.id()).enlistmentConsistencyToken(enlistmentConsistencyToken).requestType(requestType).timestamp(tx.schemaTimestamp()).full(full).coordinatorId(tx.coordinatorId()).delayedAckProcessor(tx.remote() ? (arg_0, arg_1) -> ((InternalTransaction)tx).processDelayedAck(arg_0, arg_1) : null).build();
    }

    private static boolean allSchemaVersionsSame(Collection<? extends BinaryRow> rows) {
        int schemaVersion = -1;
        boolean first = true;
        for (BinaryRow binaryRow : rows) {
            if (binaryRow == null) continue;
            if (first) {
                schemaVersion = binaryRow.schemaVersion();
                first = false;
                continue;
            }
            if (binaryRow.schemaVersion() == schemaVersion) continue;
            return false;
        }
        return true;
    }

    private static Set<Integer> uniqueSchemaVersions(Collection<? extends BinaryRow> rows) {
        HashSet<Integer> set = new HashSet<Integer>();
        for (BinaryRow binaryRow : rows) {
            set.add(binaryRow.schemaVersion());
        }
        return set;
    }

    private static List<ByteBuffer> serializeBinaryTuples(Collection<? extends BinaryRow> keys) {
        ArrayList<ByteBuffer> result = new ArrayList<ByteBuffer>(keys.size());
        for (BinaryRow binaryRow : keys) {
            result.add(binaryRow.tupleSlice());
        }
        return result;
    }

    @Override
    public CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable InternalTransaction tx) {
        return this.enlistInTx(row, tx, (IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest>)((IgniteTriFunction)(txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(txo.commitPartition())).schemaVersion(row.schemaVersion()).binaryTuple(row.tupleSlice()).transactionId(txo.id()).enlistmentConsistencyToken(enlistmentConsistencyToken).requestType(RequestType.RW_UPSERT).timestamp(txo.schemaTimestamp()).full(txo.implicit()).coordinatorId(txo.coordinatorId()).delayedAckProcessor(txo.remote() ? (arg_0, arg_1) -> ((InternalTransaction)txo).processDelayedAck(arg_0, arg_1) : null).build()), (res, req) -> false);
    }

    @Override
    public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
        return this.enlistInTx(rows, tx, this::upsertAllInternal, RowBatch::allResultFutures, (T res, ReplicaRequest req) -> false);
    }

    @Override
    public CompletableFuture<Void> updateAll(Collection<BinaryRowEx> rows, @Nullable BitSet deleted, int partition) {
        return this.updateAllWithRetry(rows, deleted, partition, null);
    }

    private CompletableFuture<Void> updateAllWithRetry(Collection<BinaryRowEx> rows, @Nullable BitSet deleted, int partition, @Nullable Long txStartTs) {
        InternalTransaction tx = this.cache() ? this.txManager.beginExternal(this.observableTimestampTracker, true) : this.txManager.beginImplicitRw(this.observableTimestampTracker);
        ReplicationGroupId replicationGroupId = this.targetReplicationGroupId(partition);
        assert (rows.stream().allMatch(row -> this.partitionId((BinaryRowEx)row) == partition)) : "Invalid batch for partition " + partition;
        CompletableFuture fut = this.enlistAndInvoke(tx, partition, enlistmentConsistencyToken -> this.upsertAllInternal((Collection<? extends BinaryRow>)rows, deleted, tx, replicationGroupId, (Long)enlistmentConsistencyToken, true), true, null);
        return ((CompletableFuture)InternalTableImpl.postEnlist(fut, false, tx, true).handle((r, e) -> {
            if (e != null) {
                long ts;
                long timeout = tx.getTimeout();
                long l = ts = txStartTs == null ? tx.schemaTimestamp().getPhysical() : txStartTs.longValue();
                if (InternalTableImpl.canRetry(e, ts, timeout)) {
                    return this.updateAllWithRetry(rows, deleted, partition, ts);
                }
                ExceptionUtils.sneakyThrow((Throwable)e);
            }
            return CompletableFuture.completedFuture(r);
        })).thenCompose(Function.identity());
    }

    private long getDefaultTimeout(InternalTransaction tx) {
        return tx.isReadOnly() ? this.defaultReadTxTimeout.get() : this.defaultRwTxTimeout.get();
    }

    @Override
    public CompletableFuture<BinaryRow> getAndUpsert(BinaryRowEx row, InternalTransaction tx) {
        InternalTableImpl.checkTransactionFinishStarted(tx);
        return this.enlistInTx(row, tx, (IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest>)((IgniteTriFunction)(txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(txo.commitPartition())).schemaVersion(row.schemaVersion()).binaryTuple(row.tupleSlice()).transactionId(txo.id()).enlistmentConsistencyToken(enlistmentConsistencyToken).requestType(RequestType.RW_GET_AND_UPSERT).timestamp(txo.schemaTimestamp()).full(txo.implicit()).coordinatorId(txo.coordinatorId()).delayedAckProcessor(txo.remote() ? (arg_0, arg_1) -> ((InternalTransaction)txo).processDelayedAck(arg_0, arg_1) : null).build()), (res, req) -> false);
    }

    @Override
    public CompletableFuture<Boolean> insert(BinaryRowEx row, InternalTransaction tx) {
        return this.enlistInTx(row, tx, (IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest>)((IgniteTriFunction)(txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(txo.commitPartition())).schemaVersion(row.schemaVersion()).binaryTuple(row.tupleSlice()).transactionId(txo.id()).enlistmentConsistencyToken(enlistmentConsistencyToken).requestType(RequestType.RW_INSERT).timestamp(txo.schemaTimestamp()).full(txo.implicit()).coordinatorId(txo.coordinatorId()).delayedAckProcessor(txo.remote() ? (arg_0, arg_1) -> ((InternalTransaction)txo).processDelayedAck(arg_0, arg_1) : null).build()), (res, req) -> res == false);
    }

    @Override
    public CompletableFuture<List<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, InternalTransaction tx) {
        return this.enlistInTx(rows, tx, (Collection<? extends BinaryRow> keyRows, InternalTransaction txo, ReplicationGroupId groupId, Long enlistmentConsistencyToken, Boolean full) -> this.readWriteMultiRowReplicaRequest(RequestType.RW_INSERT_ALL, keyRows, null, txo, groupId, enlistmentConsistencyToken, full), InternalTableImpl::collectRejectedRowsResponses, (T res, ReplicaRequest req) -> {
            for (BinaryRow row : res) {
                if (row == null) continue;
                return false;
            }
            return true;
        });
    }

    private ReadWriteMultiRowReplicaRequest readWriteMultiRowReplicaRequest(RequestType requestType, Collection<? extends BinaryRow> rows, @Nullable BitSet deleted, InternalTransaction tx, ReplicationGroupId groupId, Long enlistmentConsistencyToken, boolean full) {
        assert (InternalTableImpl.allSchemaVersionsSame(rows)) : "Different schema versions encountered: " + InternalTableImpl.uniqueSchemaVersions(rows);
        return TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(tx.commitPartition())).schemaVersion(rows.iterator().next().schemaVersion()).binaryTuples(InternalTableImpl.serializeBinaryTuples(rows)).deleted(deleted).transactionId(tx.id()).enlistmentConsistencyToken(enlistmentConsistencyToken).requestType(requestType).timestamp(tx.schemaTimestamp()).full(full).coordinatorId(tx.coordinatorId()).delayedAckProcessor(tx.remote() ? (arg_0, arg_1) -> ((InternalTransaction)tx).processDelayedAck(arg_0, arg_1) : null).build();
    }

    private DcrWriteMultiRowReplicaRequest dcrWriteMultiRowReplicaRequest(RequestType requestType, Collection<? extends BinaryRow> rows, @Nullable BitSet deleted, InternalTransaction tx, ReplicationGroupId groupId, Long enlistmentConsistencyToken, boolean full) {
        assert (InternalTableImpl.allSchemaVersionsSame(rows)) : "Different schema versions encountered: " + InternalTableImpl.uniqueSchemaVersions(rows);
        return TABLE_MESSAGES_FACTORY.dcrWriteMultiRowReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(tx.commitPartition())).schemaVersion(rows.iterator().next().schemaVersion()).binaryTuples(InternalTableImpl.serializeBinaryTuples(rows)).deleted(deleted).transactionId(tx.id()).enlistmentConsistencyToken(enlistmentConsistencyToken).requestType(requestType).timestamp(this.clockService.now()).full(full).coordinatorId(tx.coordinatorId()).lastCommitTimestamps(InternalTableImpl.lastCommitTimestamps(rows)).build();
    }

    private static List<Long> lastCommitTimestamps(Collection<? extends BinaryRow> rows) {
        ArrayList<Long> timestamps = new ArrayList<Long>(rows.size());
        for (BinaryRow binaryRow : rows) {
            if (binaryRow instanceof TimedRow) {
                timestamps.add(((TimedRow)binaryRow).lastCommitTimestamp().longValue());
                continue;
            }
            timestamps.add(null);
        }
        return timestamps;
    }

    @Override
    public CompletableFuture<Boolean> replace(BinaryRowEx row, InternalTransaction tx) {
        return this.enlistInTx(row, tx, (IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest>)((IgniteTriFunction)(txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(txo.commitPartition())).schemaVersion(row.schemaVersion()).binaryTuple(row.tupleSlice()).transactionId(txo.id()).enlistmentConsistencyToken(enlistmentConsistencyToken).requestType(RequestType.RW_REPLACE_IF_EXIST).timestamp(txo.schemaTimestamp()).full(txo.implicit()).coordinatorId(txo.coordinatorId()).delayedAckProcessor(txo.remote() ? (arg_0, arg_1) -> ((InternalTransaction)txo).processDelayedAck(arg_0, arg_1) : null).build()), (res, req) -> res == false);
    }

    @Override
    public CompletableFuture<Boolean> replace(BinaryRowEx oldRow, BinaryRowEx newRow, InternalTransaction tx) {
        assert (oldRow.schemaVersion() == newRow.schemaVersion()) : "Mismatching schema versions: old " + oldRow.schemaVersion() + ", new " + newRow.schemaVersion();
        return this.enlistInTx(newRow, tx, (IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest>)((IgniteTriFunction)(txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(txo.commitPartition())).schemaVersion(oldRow.schemaVersion()).oldBinaryTuple(oldRow.tupleSlice()).newBinaryTuple(newRow.tupleSlice()).transactionId(txo.id()).enlistmentConsistencyToken(enlistmentConsistencyToken).requestType(RequestType.RW_REPLACE).timestamp(txo.schemaTimestamp()).full(txo.implicit()).coordinatorId(txo.coordinatorId()).delayedAckProcessor(txo.remote() ? (arg_0, arg_1) -> ((InternalTransaction)txo).processDelayedAck(arg_0, arg_1) : null).build()), (res, req) -> res == false);
    }

    @Override
    public CompletableFuture<BinaryRow> getAndReplace(BinaryRowEx row, InternalTransaction tx) {
        InternalTableImpl.checkTransactionFinishStarted(tx);
        return this.enlistInTx(row, tx, (IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest>)((IgniteTriFunction)(txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(txo.commitPartition())).schemaVersion(row.schemaVersion()).binaryTuple(row.tupleSlice()).transactionId(txo.id()).enlistmentConsistencyToken(enlistmentConsistencyToken).requestType(RequestType.RW_GET_AND_REPLACE).timestamp(txo.schemaTimestamp()).full(txo.implicit()).coordinatorId(txo.coordinatorId()).delayedAckProcessor(txo.remote() ? (arg_0, arg_1) -> ((InternalTransaction)txo).processDelayedAck(arg_0, arg_1) : null).build()), (res, req) -> res == null);
    }

    @Override
    public CompletableFuture<Boolean> delete(BinaryRowEx keyRow, InternalTransaction tx) {
        return this.enlistInTx(keyRow, tx, (IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest>)((IgniteTriFunction)(txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(txo.commitPartition())).schemaVersion(keyRow.schemaVersion()).primaryKey(keyRow.tupleSlice()).transactionId(txo.id()).enlistmentConsistencyToken(enlistmentConsistencyToken).requestType(RequestType.RW_DELETE).timestamp(txo.schemaTimestamp()).full(txo.implicit()).coordinatorId(txo.coordinatorId()).delayedAckProcessor(txo.remote() ? (arg_0, arg_1) -> ((InternalTransaction)txo).processDelayedAck(arg_0, arg_1) : null).build()), (res, req) -> res == false);
    }

    @Override
    public CompletableFuture<Boolean> deleteExact(BinaryRowEx oldRow, InternalTransaction tx) {
        return this.enlistInTx(oldRow, tx, (IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest>)((IgniteTriFunction)(txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(txo.commitPartition())).schemaVersion(oldRow.schemaVersion()).binaryTuple(oldRow.tupleSlice()).transactionId(txo.id()).enlistmentConsistencyToken(enlistmentConsistencyToken).requestType(RequestType.RW_DELETE_EXACT).timestamp(txo.schemaTimestamp()).full(txo.implicit()).coordinatorId(txo.coordinatorId()).delayedAckProcessor(txo.remote() ? (arg_0, arg_1) -> ((InternalTransaction)txo).processDelayedAck(arg_0, arg_1) : null).build()), (res, req) -> res == false);
    }

    @Override
    public CompletableFuture<BinaryRow> getAndDelete(BinaryRowEx row, InternalTransaction tx) {
        InternalTableImpl.checkTransactionFinishStarted(tx);
        return this.enlistInTx(row, tx, (IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest>)((IgniteTriFunction)(txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(groupId)).tableId(this.tableId).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(txo.commitPartition())).schemaVersion(row.schemaVersion()).primaryKey(row.tupleSlice()).transactionId(txo.id()).enlistmentConsistencyToken(enlistmentConsistencyToken).requestType(RequestType.RW_GET_AND_DELETE).timestamp(txo.schemaTimestamp()).full(txo.implicit()).coordinatorId(txo.coordinatorId()).delayedAckProcessor(txo.remote() ? (arg_0, arg_1) -> ((InternalTransaction)txo).processDelayedAck(arg_0, arg_1) : null).build()), (res, req) -> res == null);
    }

    @Override
    public CompletableFuture<List<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, InternalTransaction tx) {
        return this.enlistInTx(rows, tx, (Collection<? extends BinaryRow> keyRows0, InternalTransaction txo, ReplicationGroupId groupId, Long enlistmentConsistencyToken, Boolean full) -> this.readWriteMultiRowPkReplicaRequest(RequestType.RW_DELETE_ALL, keyRows0, txo, groupId, enlistmentConsistencyToken, full), InternalTableImpl::collectRejectedRowsResponses, (T res, ReplicaRequest req) -> {
            for (BinaryRow row : res) {
                if (row == null) continue;
                return false;
            }
            return true;
        });
    }

    @Override
    public CompletableFuture<List<BinaryRow>> deleteAllExact(Collection<BinaryRowEx> rows, InternalTransaction tx) {
        return this.enlistInTx(rows, tx, (Collection<? extends BinaryRow> keyRows0, InternalTransaction txo, ReplicationGroupId groupId, Long enlistmentConsistencyToken, Boolean full) -> this.readWriteMultiRowReplicaRequest(RequestType.RW_DELETE_EXACT_ALL, keyRows0, null, txo, groupId, enlistmentConsistencyToken, full), InternalTableImpl::collectRejectedRowsResponses, (T res, ReplicaRequest req) -> {
            for (BinaryRow row : res) {
                if (row == null) continue;
                return false;
            }
            return true;
        });
    }

    @Override
    public CompletableFuture<List<BinaryRow>> archiveAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
        if (!this.hasSecondaryStorage()) {
            return CompletableFuture.failedFuture(new IllegalStateException("Archive operation is not allowed on a table without secondary storage"));
        }
        return CompletableFuture.runAsync(() -> this.licenseFeatureChecker.checkFeature(LicenseFeature.EXTENDED_SECONDARY_STORAGE)).thenComposeAsync(ignored -> this.enlistInTx(rows, tx, (Collection<? extends BinaryRow> rowInBatch, InternalTransaction txo, ReplicationGroupId groupId, Long enlistmentConsistencyToken, Boolean full) -> this.readWriteMultiRowReplicaRequest(RequestType.RW_ARCHIVE_EXACT_ALL, rowInBatch, null, txo, groupId, enlistmentConsistencyToken, full), InternalTableImpl::collectRejectedRowsResponses, (T res, ReplicaRequest req) -> {
            for (BinaryRow row : res) {
                if (row == null) continue;
                return false;
            }
            return true;
        }));
    }

    @Override
    public Flow.Publisher<BinaryRow> lookup(int partId, UUID txId, HybridTimestamp readTimestamp, InternalClusterNode recipientNode, int indexId, BinaryTuple key, @Nullable BitSet columnsToInclude, UUID txCoordinatorId) {
        return this.readOnlyScan(partId, txId, readTimestamp, recipientNode, indexId, key, null, null, 0, true, columnsToInclude, txCoordinatorId);
    }

    @Override
    public Flow.Publisher<BinaryRow> lookup(int partId, UUID txId, ReplicationGroupId commitPartition, UUID coordinatorId, PrimaryReplica recipient, int indexId, BinaryTuple key, @Nullable BitSet columnsToInclude) {
        return this.readWriteScan(partId, txId, commitPartition, coordinatorId, recipient, indexId, key, null, null, 0, columnsToInclude);
    }

    @Override
    public Flow.Publisher<BinaryRowAndRowId> scanInterval(int partId, final UUID txId, final HybridTimestamp fromTs, final HybridTimestamp toTs, final InternalClusterNode recipientNode, final UUID txCoordinatorId) {
        this.validatePartitionIndex(partId);
        final ReplicationGroupId replicationGroupId = this.targetReplicationGroupId(partId);
        return new PartitionScanPublisher<BinaryRowAndRowId>((PartitionScanPublisher.InflightBatchRequestTracker)new ReadOnlyInflightBatchRequestTracker(this.transactionInflights, txId)){

            @Override
            protected CompletableFuture<Collection<BinaryRowAndRowId>> retrieveBatch(long scanId, int batchSize) {
                ReadOnlyIntervalScanRetrieveBatchReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyIntervalScanRetrieveBatchReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(replicationGroupId)).tableId(InternalTableImpl.this.tableId).lowerBoundTimestamp(fromTs).readTimestamp(toTs).usePrimary(true).transactionId(txId).scanId(scanId).batchSize(batchSize).coordinatorId(txCoordinatorId).build();
                return InternalTableImpl.this.replicaSvc.invoke(recipientNode, (ReplicaRequest)request);
            }

            @Override
            protected CompletableFuture<Void> onClose(boolean intentionallyClose, long scanId, @Nullable Throwable th) {
                return InternalTableImpl.this.completeScan(txId, replicationGroupId, scanId, th, recipientNode.name(), intentionallyClose || th != null);
            }
        };
    }

    @Override
    public Flow.Publisher<TimedBinaryRowAndRowId> scanAllVersions(int partitionId, final UUID txId, final RowId fromRowId, final HybridTimestamp fromTs, final HybridTimestamp toTs, final InternalClusterNode recipientNode, final UUID txCoordinatorId) {
        this.validatePartitionIndex(partitionId);
        final ReplicationGroupId replicationGroupId = this.targetReplicationGroupId(partitionId);
        return new PartitionScanPublisher<TimedBinaryRowAndRowId>((PartitionScanPublisher.InflightBatchRequestTracker)new ReadOnlyInflightBatchRequestTracker(this.transactionInflights, txId)){

            @Override
            protected CompletableFuture<Collection<TimedBinaryRowAndRowId>> retrieveBatch(long scanId, int batchSize) {
                ReadOnlyRowVersionsScanRetrieveBatchReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyRowVersionsScanRetrieveBatchReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(replicationGroupId)).tableId(InternalTableImpl.this.tableId).lowerBoundRowId(fromRowId.uuid()).lowerBoundTimestamp(fromTs).readTimestamp(toTs).usePrimary(true).transactionId(txId).scanId(scanId).batchSize(batchSize).coordinatorId(txCoordinatorId).build();
                return InternalTableImpl.this.replicaSvc.invoke(recipientNode, (ReplicaRequest)request);
            }

            @Override
            protected CompletableFuture<Void> onClose(boolean intentionallyClose, long scanId, @Nullable Throwable th) {
                return InternalTableImpl.this.completeScan(txId, replicationGroupId, scanId, th, recipientNode.name(), intentionallyClose || th != null);
            }
        };
    }

    @Override
    public Flow.Publisher<BinaryRow> scan(int partId, UUID txId, HybridTimestamp readTimestamp, InternalClusterNode recipientNode, UUID txCoordinatorId, boolean usePrimary, @Nullable BitSet columnsToInclude) {
        return this.readOnlyScan(partId, txId, readTimestamp, recipientNode, null, null, null, null, 0, usePrimary, columnsToInclude, txCoordinatorId);
    }

    @Override
    public Flow.Publisher<BinaryRow> scan(int partId, UUID txId, HybridTimestamp readTimestamp, InternalClusterNode recipientNode, @Nullable Integer indexId, @Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags, @Nullable BitSet columnsToInclude, UUID txCoordinatorId) {
        return this.readOnlyScan(partId, txId, readTimestamp, recipientNode, indexId, null, lowerBound, upperBound, flags, true, columnsToInclude, txCoordinatorId);
    }

    @Override
    public Flow.Publisher<BinaryRow> scan(int partId, @Nullable InternalTransaction tx, @Nullable Integer indexId, @Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags, @Nullable BitSet columnsToInclude) {
        return this.readWriteScan(partId, tx, indexId, null, lowerBound, upperBound, flags, columnsToInclude);
    }

    @Override
    public Flow.Publisher<BinaryRow> scan(int partId, UUID txId, ReplicationGroupId commitPartition, UUID coordinatorId, PrimaryReplica recipient, @Nullable Integer indexId, @Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags, @Nullable BitSet columnsToInclude) {
        return this.readWriteScan(partId, txId, commitPartition, coordinatorId, recipient, indexId, null, lowerBound, upperBound, flags, columnsToInclude);
    }

    @Override
    public Flow.Publisher<BinaryRow> scan(int partId, @Nullable InternalTransaction tx, @Nullable BitSet columnsToInclude) {
        return this.readWriteScan(partId, tx, null, null, null, null, 0, columnsToInclude);
    }

    private Flow.Publisher<BinaryRow> readOnlyScan(int partId, final UUID txId, final HybridTimestamp readTimestamp, final InternalClusterNode recipientNode, final @Nullable Integer indexId, final @Nullable BinaryTuple exactKey, final @Nullable BinaryTuplePrefix lowerBound, final @Nullable BinaryTuplePrefix upperBound, final int flags, final boolean usePrimary, final @Nullable BitSet columnsToInclude, final UUID txCoordinatorId) {
        this.validatePartitionIndex(partId);
        final ReplicationGroupId replicationGroupId = usePrimary ? this.targetReplicationGroupId(partId) : this.targetSecondaryReplicationGroupId(partId);
        return new PartitionScanPublisher<BinaryRow>((PartitionScanPublisher.InflightBatchRequestTracker)new ReadOnlyInflightBatchRequestTracker(this.transactionInflights, txId)){

            @Override
            protected CompletableFuture<Collection<BinaryRow>> retrieveBatch(long scanId, int batchSize) {
                ReadOnlyScanRetrieveBatchReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(replicationGroupId)).tableId(InternalTableImpl.this.tableId).readTimestamp(readTimestamp).transactionId(txId).scanId(scanId).batchSize(batchSize).usePrimary(usePrimary).indexToUse(indexId).exactKey(InternalTableImpl.binaryTupleMessage((BinaryTupleReader)exactKey)).lowerBoundPrefix(InternalTableImpl.binaryTupleMessage((BinaryTupleReader)lowerBound)).upperBoundPrefix(InternalTableImpl.binaryTupleMessage((BinaryTupleReader)upperBound)).flags(flags).columnsToInclude(columnsToInclude).coordinatorId(txCoordinatorId).build();
                return InternalTableImpl.this.replicaSvc.invoke(recipientNode, (ReplicaRequest)request);
            }

            @Override
            protected CompletableFuture<Void> onClose(boolean intentionallyClose, long scanId, @Nullable Throwable th) {
                return InternalTableImpl.this.completeScan(txId, replicationGroupId, scanId, th, recipientNode.name(), intentionallyClose || th != null);
            }
        };
    }

    private Flow.Publisher<BinaryRow> readWriteScan(final int partId, final @Nullable InternalTransaction tx, final @Nullable Integer indexId, final @Nullable BinaryTuple exactKey, final @Nullable BinaryTuplePrefix lowerBound, final @Nullable BinaryTuplePrefix upperBound, final int flags, final @Nullable BitSet columnsToInclude) {
        if (tx != null) {
            if (tx.isReadOnly()) {
                throw new TransactionException(ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, "Failed to enlist read-write operation into read-only transaction txId={" + tx.id() + "}");
            }
            if (tx.external() != this.cache()) {
                throw new TransactionException(GridgainErrorGroups.Cache.TX_INCOMPATIBLE_OPERATION_ERR, "Requested operation is incompatible with cache or table type txId={" + tx.id() + "}");
            }
        }
        this.validatePartitionIndex(partId);
        final InternalTransaction actualTx = this.startImplicitRwTxIfNeeded(tx);
        return new PartitionScanPublisher<BinaryRow>((PartitionScanPublisher.InflightBatchRequestTracker)READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER){

            @Override
            protected CompletableFuture<Collection<BinaryRow>> retrieveBatch(long scanId, int batchSize) {
                return InternalTableImpl.this.enlistCursorInTx(actualTx, partId, scanId, batchSize, indexId, exactKey, lowerBound, upperBound, flags, columnsToInclude);
            }

            @Override
            protected CompletableFuture<Void> onClose(boolean intentionallyClose, long scanId, @Nullable Throwable th) {
                ReplicationGroupId replicationGrpId;
                PendingTxPartitionEnlistment enlistment;
                CompletableFuture<Object> opFut = actualTx.implicit() ? CompletableFutures.completedOrFailedFuture(null, (Throwable)th) : ((enlistment = tx.enlistedPartition(replicationGrpId = InternalTableImpl.this.targetReplicationGroupId(partId))) != null ? InternalTableImpl.this.completeScan(tx.id(), replicationGrpId, scanId, th, enlistment.primaryNodeConsistentId(), intentionallyClose) : CompletableFutures.completedOrFailedFuture(null, (Throwable)th));
                return InternalTableImpl.postEnlist(opFut, intentionallyClose, actualTx, actualTx.implicit() && !intentionallyClose);
            }
        };
    }

    private Flow.Publisher<BinaryRow> readWriteScan(int partId, final UUID txId, final ReplicationGroupId commitPartition, final UUID coordinatorId, final PrimaryReplica recipient, final @Nullable Integer indexId, final @Nullable BinaryTuple exactKey, final @Nullable BinaryTuplePrefix lowerBound, final @Nullable BinaryTuplePrefix upperBound, final int flags, final @Nullable BitSet columnsToInclude) {
        final ReplicationGroupId replicationGroupId = this.targetReplicationGroupId(partId);
        return new PartitionScanPublisher<BinaryRow>((PartitionScanPublisher.InflightBatchRequestTracker)READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER){

            @Override
            protected CompletableFuture<Collection<BinaryRow>> retrieveBatch(long scanId, int batchSize) {
                ReadWriteScanRetrieveBatchReplicaRequest request = TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(replicationGroupId)).tableId(InternalTableImpl.this.tableId).timestamp(TransactionIds.beginTimestamp((UUID)txId)).transactionId(txId).scanId(scanId).indexToUse(indexId).exactKey(InternalTableImpl.binaryTupleMessage((BinaryTupleReader)exactKey)).lowerBoundPrefix(InternalTableImpl.binaryTupleMessage((BinaryTupleReader)lowerBound)).upperBoundPrefix(InternalTableImpl.binaryTupleMessage((BinaryTupleReader)upperBound)).flags(flags).columnsToInclude(columnsToInclude).batchSize(batchSize).enlistmentConsistencyToken(Long.valueOf(recipient.enlistmentConsistencyToken())).full(false).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(commitPartition)).coordinatorId(coordinatorId).build();
                return InternalTableImpl.this.replicaSvc.invoke(recipient.node(), (ReplicaRequest)request);
            }

            @Override
            protected CompletableFuture<Void> onClose(boolean intentionallyClose, long scanId, @Nullable Throwable th) {
                return InternalTableImpl.this.completeScan(txId, replicationGroupId, scanId, th, recipient.node().name(), intentionallyClose);
            }
        };
    }

    private CompletableFuture<Void> completeScan(UUID txId, ReplicationGroupId replicaGrpId, long scanId, Throwable th, String recipientConsistentId, boolean explicitCloseCursor) {
        CompletableFuture closeFut = CompletableFutures.nullCompletedFuture();
        if (explicitCloseCursor) {
            ScanCloseReplicaRequest scanCloseReplicaRequest = TABLE_MESSAGES_FACTORY.scanCloseReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(replicaGrpId)).tableId(this.tableId).transactionId(txId).scanId(scanId).timestamp(TransactionIds.beginTimestamp((UUID)txId)).build();
            closeFut = this.replicaSvc.invoke(recipientConsistentId, (ReplicaRequest)scanCloseReplicaRequest);
        }
        return ((CompletableFuture)closeFut.handle((unused, throwable) -> {
            CompletableFuture fut = CompletableFutures.nullCompletedFuture();
            if (th != null) {
                if (throwable != null) {
                    th.addSuppressed((Throwable)throwable);
                }
                fut = CompletableFuture.failedFuture(th);
            } else if (throwable != null) {
                fut = CompletableFuture.failedFuture(throwable);
            }
            return fut;
        })).thenCompose(Function.identity());
    }

    @Override
    public Flow.Publisher<BinaryRow> performSecondaryStorageOperation(final int partId, final UUID txId, final UUID txCoordinatorId, final HybridTimestamp readTimestamp, final InternalClusterNode recipientNode, final StorageOptimizedOperation storageOptimizedOperation) {
        this.validatePartitionIndex(partId);
        return new PartitionScanPublisher(new ReadOnlyInflightBatchRequestTracker(this.transactionInflights, txId)){

            protected CompletableFuture<Collection> retrieveBatch(long scanId, int batchSize) {
                ReplicationGroupId partGroupId = InternalTableImpl.this.targetSecondaryReplicationGroupId(partId);
                ReadOnlyStorageOperationReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyStorageOperationReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(partGroupId)).tableId(InternalTableImpl.this.tableId).readTimestamp(readTimestamp).scanId(scanId).transactionId(txId).coordinatorId(txCoordinatorId).batchSize(batchSize).usePrimary(false).storageOperation(storageOptimizedOperation).build();
                return InternalTableImpl.this.replicaSvc.invoke(recipientNode, (ReplicaRequest)request);
            }

            @Override
            protected CompletableFuture<Void> onClose(boolean intentionallyClose, long scanId, @Nullable Throwable th) {
                return CompletableFutures.nullCompletedFuture();
            }
        };
    }

    private void validatePartitionIndex(int p) {
        if (p < 0 || p >= this.partitions) {
            throw new IllegalArgumentException(IgniteStringFormatter.format((String)"Invalid partition [partition={}, minValue={}, maxValue={}].", (Object[])new Object[]{p, 0, this.partitions - 1}));
        }
    }

    Int2ObjectMap<RowBatch> toRowBatchByPartitionId(Collection<BinaryRowEx> rows) {
        Int2ObjectOpenHashMap rowBatchByPartitionId = new Int2ObjectOpenHashMap();
        int i = 0;
        for (BinaryRowEx row : rows) {
            ((RowBatch)rowBatchByPartitionId.computeIfAbsent(this.partitionId(row), partitionId -> new RowBatch())).add((BinaryRow)row, i++);
        }
        return rowBatchByPartitionId;
    }

    @Override
    public TxStateStorage txStateStorage() {
        return this.txStateStorage;
    }

    @Override
    public int partitionId(BinaryRowEx row) {
        return IgniteUtils.safeAbs((int)row.colocationHash()) % this.partitions;
    }

    public static CompletableFuture<List<BinaryRow>> collectRejectedRowsResponses(Collection<RowBatch> rowBatches) {
        return RowBatch.allResultFutures(rowBatches).thenApply(ignored -> {
            ArrayList<BinaryRow> result = new ArrayList<BinaryRow>();
            for (RowBatch batch : rowBatches) {
                List response = (List)batch.getCompletedResult();
                assert (batch.requestedRows.size() == response.size()) : "Replication response does not fit to request [requestRows=" + batch.requestedRows.size() + "responseRows=" + response.size() + "]";
                for (int i = 0; i < response.size(); ++i) {
                    if (response.get(i) != null) continue;
                    result.add(batch.requestedRows.get(i));
                }
            }
            return result;
        });
    }

    static CompletableFuture<List<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(Collection<RowBatch> rowBatches) {
        return InternalTableImpl.collectMultiRowsResponsesWithRestoreOrder(rowBatches, batch -> (Collection)batch.getCompletedResult(), false);
    }

    private static CompletableFuture<List<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(Collection<RowBatch> rowBatches, Function<RowBatch, Collection<BinaryRow>> bathResultMapper, boolean skipNull) {
        return RowBatch.allResultFutures(rowBatches).thenApply(response -> {
            BinaryRow[] result = new BinaryRow[RowBatch.getTotalRequestedRowSize(rowBatches)];
            for (RowBatch rowBatch : rowBatches) {
                Collection batchResult = (Collection)bathResultMapper.apply(rowBatch);
                assert (batchResult != null);
                assert (batchResult.size() == rowBatch.requestedRows.size()) : "batchResult=" + batchResult.size() + ", requestedRows=" + rowBatch.requestedRows.size();
                int i = 0;
                for (BinaryRow resultRow : batchResult) {
                    result[rowBatch.getOriginalRowIndex((int)i++)] = resultRow;
                }
            }
            ArrayList<BinaryRow> resultToReturn = new ArrayList<BinaryRow>();
            for (BinaryRow row : result) {
                if (skipNull && row == null) continue;
                resultToReturn.add(row);
            }
            return resultToReturn;
        });
    }

    protected CompletableFuture<PendingTxPartitionEnlistment> enlist(int partId, InternalTransaction tx) {
        HybridTimestamp now = tx.schemaTimestamp();
        ReplicationGroupId replicationGroupId = this.targetReplicationGroupId(partId);
        if (!tx.external()) {
            tx.assignCommitPartition(replicationGroupId);
        } else {
            tx.assignCommitPartition((ReplicationGroupId)(this.colocationEnabled ? ZonePartitionId.NOT_EXISTING : TablePartitionId.NOT_EXISTING));
        }
        ReplicaMeta meta = this.placementDriver.getCurrentPrimaryReplica(replicationGroupId, now);
        Function<ReplicaMeta, PendingTxPartitionEnlistment> enlistClo = replicaMeta -> {
            ReplicationGroupId partGroupId = this.targetReplicationGroupId(partId);
            String leaseHolderNodeId = replicaMeta.getLeaseholder();
            assert (leaseHolderNodeId != null);
            tx.enlist(partGroupId, this.tableId, leaseHolderNodeId, InternalTableImpl.enlistmentConsistencyToken(replicaMeta));
            return tx.enlistedPartition(partGroupId);
        };
        if (meta != null && this.clusterNodeResolver.getById(meta.getLeaseholderId()) != null) {
            try {
                return CompletableFuture.completedFuture(enlistClo.apply(meta));
            }
            catch (IgniteException e) {
                return CompletableFuture.failedFuture(e);
            }
        }
        return this.partitionMeta(replicationGroupId, now).thenApply(enlistClo);
    }

    @Override
    public CompletableFuture<InternalClusterNode> partitionLocation(int partitionIndex) {
        return this.partitionMeta(this.targetReplicationGroupId(partitionIndex), this.clockService.current()).thenApply(this::getClusterNode);
    }

    @Override
    public CompletableFuture<ReplicaMeta> secondaryPartitionLocation(int partitionIndex) {
        return this.partitionMeta(this.targetSecondaryReplicationGroupId(partitionIndex), this.clockService.current());
    }

    private CompletableFuture<ReplicaMeta> partitionMeta(ReplicationGroupId replicationGroupId, HybridTimestamp at) {
        return this.awaitPrimaryReplica(replicationGroupId, at).exceptionally(e -> {
            throw (TransactionException)ExceptionUtils.withCause(TransactionException::new, (int)ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR, (String)("Failed to get the primary replica [replicationGroupId=" + replicationGroupId + ", awaitTimestamp=" + at + "]"), (Throwable)e);
        });
    }

    private InternalClusterNode getClusterNode(ReplicaMeta replicaMeta) {
        InternalClusterNode node;
        UUID leaseHolderId = replicaMeta.getLeaseholderId();
        InternalClusterNode internalClusterNode = node = leaseHolderId == null ? null : this.clusterNodeResolver.getById(leaseHolderId);
        if (node == null) {
            throw new TransactionException(ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR, String.format("Failed to resolve the primary replica node [id=%s]", leaseHolderId));
        }
        return node;
    }

    private static boolean isFinishedDueToTimeout(Throwable e) {
        Throwable unwrapped = ExceptionUtils.unwrapCause((Throwable)e);
        if (!(unwrapped instanceof TransactionException)) {
            return false;
        }
        TransactionException ex = (TransactionException)unwrapped;
        return ex.code() == ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
    }

    @Override
    public void close() {
    }

    @Override
    public boolean cache() {
        return this.cache;
    }

    protected CompletableFuture<InternalClusterNode> evaluateReadOnlyRecipientNode(int partId, HybridTimestamp readTimestamp) {
        ReplicationGroupId replicationGroupId = this.targetReplicationGroupId(partId);
        return this.awaitPrimaryReplica(replicationGroupId, readTimestamp).handle((res, e) -> {
            if (e != null) {
                throw (TransactionException)ExceptionUtils.withCause(TransactionException::new, (int)ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR, (Throwable)e);
            }
            if (res == null) {
                throw InternalTableImpl.createFailedGetPrimaryReplicaTransactionException(replicationGroupId, readTimestamp);
            }
            return this.getClusterNode((ReplicaMeta)res);
        });
    }

    private static TransactionException createFailedGetPrimaryReplicaTransactionException(ReplicationGroupId replicationGroupId, HybridTimestamp readTimestamp) {
        String errorMessage = IgniteStringFormatter.format((String)"Failed to get the primary replica [replicationGroupId={}, awaitTimestamp={}", (Object[])new Object[]{replicationGroupId, readTimestamp});
        return new TransactionException(ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR, errorMessage);
    }

    @Override
    @Nullable
    public PendingComparableValuesTracker<HybridTimestamp, Void> getPartitionSafeTimeTracker(int partitionId) {
        return (PendingComparableValuesTracker)this.safeTimeTrackerByPartitionId.get(partitionId);
    }

    @Override
    @Nullable
    public PendingComparableValuesTracker<HybridTimestamp, Void> getSecondaryPartitionSafeTimeTracker(int partitionId) {
        return (PendingComparableValuesTracker)this.safeTimeTrackerBySecondaryPartitionId.get(partitionId);
    }

    @Override
    @Nullable
    public PendingComparableValuesTracker<Long, Void> getPartitionStorageIndexTracker(int partitionId) {
        return (PendingComparableValuesTracker)this.storageIndexTrackerByPartitionId.get(partitionId);
    }

    @Override
    public ScheduledExecutorService streamerFlushExecutor() {
        return this.streamerFlushExecutor.get();
    }

    @Override
    public CompletableFuture<Long> estimatedSize() {
        HybridTimestamp now = this.clockService.current();
        CompletableFuture[] invokeFutures = new CompletableFuture[this.partitions];
        for (int partId = 0; partId < this.partitions; ++partId) {
            ReplicationGroupId replicaGroupId = this.targetReplicationGroupId(partId);
            ReplicationGroupIdMessage partitionIdMessage = InternalTableImpl.serializeReplicationGroupId(replicaGroupId);
            Function<ReplicaMeta, ReplicaRequest> requestFactory = replicaMeta -> TABLE_MESSAGES_FACTORY.getEstimatedSizeRequest().groupId(partitionIdMessage).tableId(this.tableId).enlistmentConsistencyToken(Long.valueOf(InternalTableImpl.enlistmentConsistencyToken(replicaMeta))).timestamp(now).build();
            invokeFutures[partId] = this.sendToPrimaryWithRetry(replicaGroupId, now, 16, requestFactory);
        }
        return CompletableFuture.allOf(invokeFutures).thenApply(v -> Arrays.stream(invokeFutures).mapToLong(f -> (Long)f.join()).sum());
    }

    @Override
    public final ReplicationGroupId targetReplicationGroupId(int partitionIndex) {
        if (this.colocationEnabled) {
            return new ZonePartitionId(this.zoneId, partitionIndex);
        }
        return new TablePartitionId(this.tableId, partitionIndex);
    }

    private ReplicationGroupId targetSecondaryReplicationGroupId(int partId) {
        assert (this.secondaryZoneId != null) : IgniteStringFormatter.format((String)"Missing secondary zone id for tableId={}, partitionId={}.", (Object[])new Object[]{this.tableId, partId});
        return new ZonePartitionId(this.secondaryZoneId.intValue(), partId);
    }

    private static ReplicationGroupIdMessage serializeReplicationGroupId(ReplicationGroupId replicationGroupId) {
        return ReplicaMessageUtils.toReplicationGroupIdMessage((ReplicaMessagesFactory)REPLICA_MESSAGES_FACTORY, (ReplicationGroupId)replicationGroupId);
    }

    @Override
    public StreamerReceiverRunner streamerReceiverRunner() {
        return this.streamerReceiverRunner;
    }

    private <T> CompletableFuture<T> sendToPrimaryWithRetry(ReplicationGroupId replicationGroupId, HybridTimestamp hybridTimestamp, int numRetries, Function<ReplicaMeta, ReplicaRequest> requestFactory) {
        return this.sendToPrimaryWithRetryInner(replicationGroupId, hybridTimestamp, numRetries, replicaMeta -> {
            ReplicaRequest request = (ReplicaRequest)requestFactory.apply((ReplicaMeta)replicaMeta);
            String leaseholderName = replicaMeta.getLeaseholder();
            assert (leaseholderName != null);
            return this.replicaSvc.invoke(leaseholderName, request);
        });
    }

    private <T> CompletableFuture<T> sendToPrimaryWithRetryInner(ReplicationGroupId replicationGroupId, HybridTimestamp hybridTimestamp, int numRetries, Function<ReplicaMeta, CompletableFuture<T>> requestFactory) {
        return ((CompletableFuture)this.awaitPrimaryReplica(replicationGroupId, hybridTimestamp).thenCompose(replicaMeta -> ((CompletableFuture)requestFactory.apply((ReplicaMeta)replicaMeta)).handle((response, e) -> {
            if (e == null) {
                return CompletableFuture.completedFuture(response);
            }
            if ((e = ExceptionUtils.unwrapCause((Throwable)e)) instanceof ReplicationException && e.getCause() != null) {
                e = e.getCause();
            }
            if (e instanceof PrimaryReplicaMissException || e instanceof UnresolvableConsistentIdException || e instanceof ReplicationTimeoutException) {
                if (numRetries == 0) {
                    throw new IgniteException(ErrorGroups.Replicator.REPLICA_MISS_ERR, e);
                }
                return this.sendToPrimaryWithRetryInner(replicationGroupId, replicaMeta.getExpirationTime().tick(), numRetries - 1, requestFactory);
            }
            throw (RuntimeException)ExceptionUtils.sneakyThrow((Throwable)e);
        }))).thenCompose(Function.identity());
    }

    @Override
    public <T> void queryContinuously(Flow.Subscriber<TableRowEventBatch<T>> subscriber, @Nullable ContinuousQueryOptions options, BiFunction<BinaryRow, SchemaDescriptor, T> mapper) {
        assert (subscriber != null) : "subscriber != null";
        assert (mapper != null) : "mapper != null";
        EmbeddedContinuousQueryMetricSink metrics = new EmbeddedContinuousQueryMetricSink();
        try {
            ContinuousQuery cq = new ContinuousQuery(subscriber, options, mapper, (ContinuousQueryRequestSender)this, (ContinuousQueryMetricSink)metrics, this.partitions, this.name(), this.observableTimestampTracker.get());
            cq.run();
        }
        catch (Throwable t) {
            subscriber.onError(IgniteExceptionMapperUtil.mapToPublicException((Throwable)ExceptionUtils.unwrapCause((Throwable)t)));
        }
    }

    public CompletableFuture<ContinuousQueryScanResultWithSchema<BinaryRow, SchemaDescriptor>> sendContinuousQueryRequest(ContinuousQueryRequest request) {
        ReplicationGroupId replicationGroupId = this.targetReplicationGroupId(request.partId());
        return this.sendToPrimaryWithRetryInner(replicationGroupId, this.clockService.current(), 16, replicaMeta -> this.sendContinuousQueryRequestInner(request, (ReplicaMeta)replicaMeta, replicationGroupId));
    }

    private CompletableFuture<ContinuousQueryScanResultWithSchema<BinaryRow, SchemaDescriptor>> sendContinuousQueryRequestInner(ContinuousQueryRequest request, ReplicaMeta replicaMeta, ReplicationGroupId replicationGroupId) {
        Pair<CompletableFuture<ContinuousQueryScanResult<BinaryRow>>, Long> pendingRequest = this.continuousQueryResponseHandler.addPendingRequest();
        CompletableFuture responseFut = (CompletableFuture)pendingRequest.getFirst();
        long requestId = (Long)pendingRequest.getSecond();
        String leaseholderName = replicaMeta.getLeaseholder();
        assert (leaseholderName != null);
        ContinuousQueryScanRequest req = TABLE_MESSAGES_FACTORY.continuousQueryScanRequest().groupId(InternalTableImpl.serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).lowerBoundTimestampLong(request.lowerBoundTs()).lowerBoundRowId(request.lowerBoundRowId()).maxItems(request.maxItems()).eventTypes(request.eventTypes()).columnNames(request.columnNames()).requestId(requestId).enlistmentConsistencyToken(Long.valueOf(InternalTableImpl.enlistmentConsistencyToken(replicaMeta))).skipOldEntries(request.skipOldEntries()).build();
        return ((CompletableFuture)((CompletableFuture)this.replicaSvc.invoke(leaseholderName, (ReplicaRequest)req).thenCompose(ignored -> responseFut)).thenCompose(cqRes -> {
            assert (cqRes.error() == null) : "Error not handled in ContinuousQueryResponseHandler: " + cqRes.error();
            if (cqRes.rows().isEmpty()) {
                return CompletableFuture.completedFuture(new ContinuousQueryScanResultWithSchema(cqRes, (Object)null));
            }
            assert (InternalTableImpl.allSchemaVersionsSame(cqRes.rows().stream().flatMap(r -> Stream.of((BinaryRow)r.oldRow(), (BinaryRow)r.row())).collect(Collectors.toList())));
            return this.schemaRegistry.schemaAsync(cqRes.schemaVersion()).thenApply(schema -> new ContinuousQueryScanResultWithSchema(cqRes, schema));
        })).exceptionally(e -> {
            this.continuousQueryResponseHandler.removePendingRequest(requestId);
            Throwable cause = ExceptionUtils.unwrapCause((Throwable)e);
            if (cause instanceof PrimaryReplicaMissException) {
                throw (RuntimeException)ExceptionUtils.sneakyThrow((Throwable)cause);
            }
            if (cause instanceof TableNotFoundException) {
                throw new TableNotFoundException(this.tableName, e);
            }
            int code = cause instanceof IgniteException ? ((IgniteException)cause).code() : ErrorGroups.Replicator.REPLICA_COMMON_ERR;
            String message = "Failed to send continuous query request: " + e.getMessage();
            throw new ReplicationException(code, message, e);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updatePartitionTrackers(int partitionId, PendingComparableValuesTracker<HybridTimestamp, Void> newSafeTimeTracker, PendingComparableValuesTracker<Long, Void> newStorageIndexTracker) {
        PendingComparableValuesTracker previousStorageIndexTracker;
        PendingComparableValuesTracker previousSafeTimeTracker;
        Object object = this.updatePartitionMapsMux;
        synchronized (object) {
            Int2ObjectOpenHashMap newSafeTimeTrackerMap = new Int2ObjectOpenHashMap(this.partitions);
            Int2ObjectOpenHashMap newStorageIndexTrackerMap = new Int2ObjectOpenHashMap(this.partitions);
            newSafeTimeTrackerMap.putAll(this.safeTimeTrackerByPartitionId);
            newStorageIndexTrackerMap.putAll(this.storageIndexTrackerByPartitionId);
            previousSafeTimeTracker = (PendingComparableValuesTracker)newSafeTimeTrackerMap.put(partitionId, newSafeTimeTracker);
            previousStorageIndexTracker = (PendingComparableValuesTracker)newStorageIndexTrackerMap.put(partitionId, newStorageIndexTracker);
            this.safeTimeTrackerByPartitionId = newSafeTimeTrackerMap;
            this.storageIndexTrackerByPartitionId = newStorageIndexTrackerMap;
        }
        if (previousSafeTimeTracker != null) {
            previousSafeTimeTracker.close();
        }
        if (previousStorageIndexTracker != null) {
            previousStorageIndexTracker.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateSecondaryPartitionTrackers(int partitionId, PendingComparableValuesTracker<HybridTimestamp, Void> newSafeTimeTracker) {
        PendingComparableValuesTracker previousSafeTimeTracker;
        Object object = this.updateSecondaryPartitionMapsMux;
        synchronized (object) {
            Int2ObjectOpenHashMap newSafeTimeTrackerMap = new Int2ObjectOpenHashMap(this.partitions);
            newSafeTimeTrackerMap.putAll(this.safeTimeTrackerBySecondaryPartitionId);
            previousSafeTimeTracker = (PendingComparableValuesTracker)newSafeTimeTrackerMap.put(partitionId, newSafeTimeTracker);
            this.safeTimeTrackerBySecondaryPartitionId = newSafeTimeTrackerMap;
        }
        if (previousSafeTimeTracker != null) {
            previousSafeTimeTracker.close();
        }
    }

    private ReplicaRequest upsertAllInternal(Collection<? extends BinaryRow> keyRows0, InternalTransaction txo, ReplicationGroupId groupId, Long enlistmentConsistencyToken, boolean full) {
        return this.readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL, keyRows0, null, txo, groupId, enlistmentConsistencyToken, full);
    }

    private ReplicaRequest upsertAllInternal(Collection<? extends BinaryRow> keyRows0, @Nullable BitSet deleted, InternalTransaction txo, ReplicationGroupId groupId, Long enlistmentConsistencyToken, boolean full) {
        if (keyRows0 != null && !keyRows0.isEmpty() && keyRows0.iterator().next() instanceof BinaryTimedRow) {
            return this.dcrWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL, keyRows0, deleted, txo, groupId, enlistmentConsistencyToken, full);
        }
        return this.readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL, keyRows0, deleted, txo, groupId, enlistmentConsistencyToken, full);
    }

    private static boolean exceptionAllowsImplicitTxRetry(Throwable e) {
        return ExceptionUtils.matchAny((Throwable)ExceptionUtils.unwrapCause((Throwable)e), (int)ErrorGroups.Transactions.ACQUIRE_LOCK_ERR, (int[])new int[]{ErrorGroups.Replicator.REPLICA_MISS_ERR, ErrorGroups.Replicator.GROUP_OVERLOADED_ERR});
    }

    private CompletableFuture<ReplicaMeta> awaitPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp timestamp) {
        return this.placementDriver.awaitPrimaryReplica(replicationGroupId, timestamp, 30L, TimeUnit.SECONDS);
    }

    private static long enlistmentConsistencyToken(ReplicaMeta replicaMeta) {
        return replicaMeta.getStartTime().longValue();
    }

    @Override
    public IgniteTransactions transactions() {
        return new IgniteTransactionsImpl(this.txManager, this.observableTimestampTracker, this.licenseFeatureChecker);
    }

    private static void checkTransactionFinishStarted(@Nullable InternalTransaction transaction) {
        if (transaction != null && transaction.isFinishingOrFinished()) {
            boolean isFinishedDueToTimeout = transaction.isRolledBackWithTimeoutExceeded();
            throw new TransactionException(isFinishedDueToTimeout ? ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR : ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR, IgniteStringFormatter.format((String)"Transaction is already finished () [txId={}, readOnly={}].", (Object[])new Object[]{transaction.id(), transaction.isReadOnly()}));
        }
    }

    @Override
    public TableMetricSource metrics() {
        return this.metrics;
    }

    private static boolean isValidContext(@Nullable SecurityContext context) {
        return context != null && context.authentication() != null && context.authentication().username() != null && context.authentication().roles() != null;
    }

    @FunctionalInterface
    private static interface ReplicaRequestFactory {
        public ReplicaRequest create(Collection<? extends BinaryRow> var1, InternalTransaction var2, ReplicationGroupId var3, Long var4, Boolean var5);
    }

    private static class ReadOnlyInflightBatchRequestTracker
    implements PartitionScanPublisher.InflightBatchRequestTracker {
        private final TransactionInflights transactionInflights;
        private final UUID txId;

        ReadOnlyInflightBatchRequestTracker(TransactionInflights transactionInflights, UUID txId) {
            this.transactionInflights = transactionInflights;
            this.txId = txId;
        }

        @Override
        public void onRequestBegin() {
            if (!this.transactionInflights.addScanInflight(this.txId)) {
                throw new TransactionException(ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR, IgniteStringFormatter.format((String)"Transaction is already finished () [txId={}, readOnly=true].", (Object[])new Object[]{this.txId}));
            }
        }

        @Override
        public void onRequestEnd() {
            this.transactionInflights.removeInflight(this.txId);
        }
    }

    private static class ReadWriteInflightBatchRequestTracker
    implements PartitionScanPublisher.InflightBatchRequestTracker {
        private ReadWriteInflightBatchRequestTracker() {
        }

        @Override
        public void onRequestBegin() {
        }

        @Override
        public void onRequestEnd() {
        }
    }

    private static class EmbeddedContinuousQueryMetricSink
    implements ContinuousQueryMetricSink {
        private EmbeddedContinuousQueryMetricSink() {
        }

        public void continuousQueryRequestsSentAdd(long requests) {
        }

        public void continuousQueryEventsReceivedAdd(long events) {
        }
    }
}

