/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table.distributed.replicator;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.catalog.Catalog;
import org.apache.ignite3.internal.catalog.CatalogService;
import org.apache.ignite3.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite3.internal.components.NodeProperties;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryScanResult;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryScanResultStatus;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryUtils;
import org.apache.ignite3.internal.continuousquery.RowUpdateInfo;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.IgniteBiTuple;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.lang.IgniteSystemProperties;
import org.apache.ignite3.internal.lang.IgniteTriFunction;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.lowwatermark.LowWatermark;
import org.apache.ignite3.internal.network.ClusterNodeResolver;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.NetworkMessage;
import org.apache.ignite3.internal.partition.replicator.FuturesCleanupResult;
import org.apache.ignite3.internal.partition.replicator.ReliableCatalogVersions;
import org.apache.ignite3.internal.partition.replicator.ReplicaPrimacy;
import org.apache.ignite3.internal.partition.replicator.ReplicaPrimacyEngine;
import org.apache.ignite3.internal.partition.replicator.ReplicaTableProcessor;
import org.apache.ignite3.internal.partition.replicator.ReplicaTxFinishMarker;
import org.apache.ignite3.internal.partition.replicator.ReplicationRaftCommandApplicator;
import org.apache.ignite3.internal.partition.replicator.TableAwareReplicaRequestPreProcessor;
import org.apache.ignite3.internal.partition.replicator.TxRecoveryEngine;
import org.apache.ignite3.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler;
import org.apache.ignite3.internal.partition.replicator.handlers.TxCleanupRecoveryRequestHandler;
import org.apache.ignite3.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler;
import org.apache.ignite3.internal.partition.replicator.handlers.TxRecoveryMessageHandler;
import org.apache.ignite3.internal.partition.replicator.handlers.TxStateCommitPartitionReplicaRequestHandler;
import org.apache.ignite3.internal.partition.replicator.handlers.VacuumTxStateReplicaRequestHandler;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite3.internal.partition.replicator.network.TimedBinaryRow;
import org.apache.ignite3.internal.partition.replicator.network.UserDetailsAware;
import org.apache.ignite3.internal.partition.replicator.network.UserDetailsMessage;
import org.apache.ignite3.internal.partition.replicator.network.command.ArchiveAllCommand;
import org.apache.ignite3.internal.partition.replicator.network.command.TimedBinaryRowMessage;
import org.apache.ignite3.internal.partition.replicator.network.command.TimedBinaryRowMessageBuilder;
import org.apache.ignite3.internal.partition.replicator.network.command.UpdateAllCommand;
import org.apache.ignite3.internal.partition.replicator.network.command.UpdateCommand;
import org.apache.ignite3.internal.partition.replicator.network.command.UpdateCommandV2Builder;
import org.apache.ignite3.internal.partition.replicator.network.command.WriteIntentSwitchCommandV2;
import org.apache.ignite3.internal.partition.replicator.network.replication.BinaryRowMessage;
import org.apache.ignite3.internal.partition.replicator.network.replication.BinaryTupleMessage;
import org.apache.ignite3.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ContinuousQueryScanRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.DcrWriteMultiRowReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.GetEstimatedSizeRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadOnlyIntervalScanRetrieveBatchReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadOnlyRowVersionsScanRetrieveBatchReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadOnlyScanRetrieveBatchReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadOnlySingleRowPkReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadWriteMultiRowPkReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadWriteMultiRowReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadWriteReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadWriteScanRetrieveBatchReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadWriteSingleRowPkReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadWriteSwapRowReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.RequestType;
import org.apache.ignite3.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite3.internal.partition.replicator.schemacompat.IncompatibleSchemaVersionException;
import org.apache.ignite3.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
import org.apache.ignite3.internal.partitiondistribution.Assignments;
import org.apache.ignite3.internal.placementdriver.LeasePlacementDriver;
import org.apache.ignite3.internal.raft.Command;
import org.apache.ignite3.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite3.internal.raft.PeersAndLearners;
import org.apache.ignite3.internal.raft.service.LeaderWithTerm;
import org.apache.ignite3.internal.raft.service.RaftCommandRunner;
import org.apache.ignite3.internal.raft.service.RaftGroupService;
import org.apache.ignite3.internal.replicator.CommandApplicationResult;
import org.apache.ignite3.internal.replicator.PartitionGroupId;
import org.apache.ignite3.internal.replicator.ReplicaResult;
import org.apache.ignite3.internal.replicator.ReplicationGroupId;
import org.apache.ignite3.internal.replicator.ReplicatorRecoverableExceptions;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite3.internal.replicator.exception.ReplicationException;
import org.apache.ignite3.internal.replicator.exception.UnsupportedReplicaRequestException;
import org.apache.ignite3.internal.replicator.listener.ReplicaListener;
import org.apache.ignite3.internal.replicator.message.ReadOnlyDirectReplicaRequest;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite3.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite3.internal.replicator.message.ReplicaRequest;
import org.apache.ignite3.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite3.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite3.internal.replicator.message.SchemaVersionAwareReplicaRequest;
import org.apache.ignite3.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite3.internal.replicator.message.ZonePartitionIdMessage;
import org.apache.ignite3.internal.schema.BinaryRow;
import org.apache.ignite3.internal.schema.BinaryRowImpl;
import org.apache.ignite3.internal.schema.BinaryRowUpgrader;
import org.apache.ignite3.internal.schema.BinaryTuple;
import org.apache.ignite3.internal.schema.BinaryTupleComparator;
import org.apache.ignite3.internal.schema.BinaryTuplePrefix;
import org.apache.ignite3.internal.schema.NullBinaryRow;
import org.apache.ignite3.internal.schema.SchemaRegistry;
import org.apache.ignite3.internal.schema.SchemaSyncService;
import org.apache.ignite3.internal.storage.BinaryRowAndRowId;
import org.apache.ignite3.internal.storage.MvPartitionStorage;
import org.apache.ignite3.internal.storage.PartitionTimestampCursor;
import org.apache.ignite3.internal.storage.ReadResult;
import org.apache.ignite3.internal.storage.RowId;
import org.apache.ignite3.internal.storage.TimedBinaryRowAndRowId;
import org.apache.ignite3.internal.storage.index.IndexRow;
import org.apache.ignite3.internal.storage.index.IndexRowImpl;
import org.apache.ignite3.internal.storage.index.IndexStorage;
import org.apache.ignite3.internal.storage.index.SortedIndexStorage;
import org.apache.ignite3.internal.storage.util.StorageUtils;
import org.apache.ignite3.internal.table.RowIdGenerator;
import org.apache.ignite3.internal.table.distributed.IndexLocker;
import org.apache.ignite3.internal.table.distributed.SortedIndexLocker;
import org.apache.ignite3.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite3.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite3.internal.table.distributed.TableUtils;
import org.apache.ignite3.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite3.internal.table.distributed.replicator.BinaryRowColumnExtractor;
import org.apache.ignite3.internal.table.distributed.replicator.CursorResource;
import org.apache.ignite3.internal.table.distributed.replicator.PartitionReplicaBuildIndexProcessor;
import org.apache.ignite3.internal.table.distributed.replicator.RemoteResourceIds;
import org.apache.ignite3.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite3.internal.table.distributed.replicator.handlers.BuildIndexReplicaRequestHandler;
import org.apache.ignite3.internal.table.metrics.TableMetricSource;
import org.apache.ignite3.internal.table.policy.RlsChecker;
import org.apache.ignite3.internal.table.policy.RlsContext;
import org.apache.ignite3.internal.table.policy.RlsPolicyManager;
import org.apache.ignite3.internal.tx.Lock;
import org.apache.ignite3.internal.tx.LockException;
import org.apache.ignite3.internal.tx.LockKey;
import org.apache.ignite3.internal.tx.LockManager;
import org.apache.ignite3.internal.tx.LockMode;
import org.apache.ignite3.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite3.internal.tx.TransactionIds;
import org.apache.ignite3.internal.tx.TransactionMeta;
import org.apache.ignite3.internal.tx.TxManager;
import org.apache.ignite3.internal.tx.TxPriority;
import org.apache.ignite3.internal.tx.TxState;
import org.apache.ignite3.internal.tx.TxStateMeta;
import org.apache.ignite3.internal.tx.UpdateCommandResult;
import org.apache.ignite3.internal.tx.impl.FullyQualifiedResourceId;
import org.apache.ignite3.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite3.internal.tx.message.TableWriteIntentSwitchReplicaRequest;
import org.apache.ignite3.internal.tx.message.TxCleanupRecoveryRequest;
import org.apache.ignite3.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite3.internal.tx.message.TxRecoveryMessage;
import org.apache.ignite3.internal.tx.message.TxStateCommitPartitionRequest;
import org.apache.ignite3.internal.tx.message.VacuumTxStateReplicaRequest;
import org.apache.ignite3.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite3.internal.tx.message.WriteIntentSwitchReplicaRequestBase;
import org.apache.ignite3.internal.tx.message.WriteIntentSwitchReplicatedInfo;
import org.apache.ignite3.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.Cursor;
import org.apache.ignite3.internal.util.CursorUtils;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.util.Lazy;
import org.apache.ignite3.internal.util.PendingComparableValuesTracker;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.lang.IgniteException;
import org.apache.ignite3.lang.TableNotFoundException;
import org.apache.ignite3.table.TableRowEventType;
import org.apache.ignite3.tx.TransactionException;
import org.gridgain.internal.license.LicenseFeature;
import org.gridgain.internal.license.LicenseFeatureChecker;
import org.gridgain.lang.GridgainErrorGroups;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class PartitionReplicaListener
implements ReplicaListener,
ReplicaTableProcessor {
    private static final Object INTERNAL_DOC_PLACEHOLDER = null;
    private static final IgniteLogger LOG = Loggers.forClass(PartitionReplicaListener.class);
    private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private static final RlsContext SYSTEM_CONTEXT = new RlsContext("gridgain-system-user", Set.of("gridgain-system-bypass"));
    private final PartitionGroupId replicationGroupId;
    private final int tableId;
    private final TablePartitionId tableLockKey;
    private final Lazy<TableSchemaAwareIndexStorage> pkIndexStorage;
    private final Supplier<Map<Integer, TableSchemaAwareIndexStorage>> secondaryIndexStorages;
    private final MvPartitionStorage mvDataStorage;
    private final RaftCommandRunner raftCommandRunner;
    private final TxManager txManager;
    private final LockManager lockManager;
    private final StorageUpdateHandler storageUpdateHandler;
    private final RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry;
    private final ClockService clockService;
    private final PendingComparableValuesTracker<HybridTimestamp, Void> safeTime;
    private final TransactionStateResolver transactionStateResolver;
    private final Executor scanRequestExecutor;
    private final Executor partitionOperationsExecutor;
    private final Supplier<Map<Integer, IndexLocker>> indexesLockers;
    private final ConcurrentMap<UUID, TxCleanupReadyFutureList> txCleanupReadyFutures = new ConcurrentHashMap<UUID, TxCleanupReadyFutureList>();
    private final ConcurrentHashMap<RowId, CompletableFuture<?>> rowCleanupMap = new ConcurrentHashMap();
    private final ConcurrentNavigableMap<UUID, CompletableFuture<Void>> pendingTransactions = new ConcurrentSkipListMap<UUID, CompletableFuture<Void>>();
    private final SchemaCompatibilityValidator schemaCompatValidator;
    private final SchemaSyncService schemaSyncService;
    private final CatalogService catalogService;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final PartitionReplicaBuildIndexProcessor indexBuildingProcessor;
    private final SchemaRegistry schemaRegistry;
    private final LowWatermark lowWatermark;
    private final NodeProperties nodeProperties;
    private static final boolean SKIP_UPDATES = IgniteSystemProperties.getBoolean("IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK");
    private final TableMetricSource metrics;
    private final ReplicaPrimacyEngine replicaPrimacyEngine;
    private final TableAwareReplicaRequestPreProcessor tableAwareReplicaRequestPreProcessor;
    private final ReliableCatalogVersions reliableCatalogVersions;
    private final ReplicationRaftCommandApplicator raftCommandApplicator;
    private final ReplicaTxFinishMarker replicaTxFinishMarker;
    private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler;
    private final TxStateCommitPartitionReplicaRequestHandler txStateCommitPartitionReplicaRequestHandler;
    private final TxRecoveryMessageHandler txRecoveryMessageHandler;
    private final TxCleanupRecoveryRequestHandler txCleanupRecoveryRequestHandler;
    private final MinimumActiveTxTimeReplicaRequestHandler minimumActiveTxTimeReplicaRequestHandler;
    private final VacuumTxStateReplicaRequestHandler vacuumTxStateReplicaRequestHandler;
    private final BuildIndexReplicaRequestHandler buildIndexReplicaRequestHandler;
    private final LicenseFeatureChecker licenseFeatureChecker;
    private final RlsPolicyManager policyManager;

    public PartitionReplicaListener(MvPartitionStorage mvDataStorage, RaftCommandRunner raftCommandRunner, TxManager txManager, LockManager lockManager, Executor scanRequestExecutor, Executor partitionOperationsExecutor, PartitionGroupId replicationGroupId, int tableId, Supplier<Map<Integer, IndexLocker>> indexesLockers, Lazy<TableSchemaAwareIndexStorage> pkIndexStorage, Supplier<Map<Integer, TableSchemaAwareIndexStorage>> secondaryIndexStorages, ClockService clockService, PendingComparableValuesTracker<HybridTimestamp, Void> safeTime, TxStatePartitionStorage txStatePartitionStorage, TransactionStateResolver transactionStateResolver, StorageUpdateHandler storageUpdateHandler, ValidationSchemasSource validationSchemasSource, InternalClusterNode localNode, SchemaSyncService schemaSyncService, CatalogService catalogService, LeasePlacementDriver placementDriver, ClusterNodeResolver clusterNodeResolver, RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry, SchemaRegistry schemaRegistry, IndexMetaStorage indexMetaStorage, LowWatermark lowWatermark, LicenseFeatureChecker licenseFeatureChecker, FailureProcessor failureProcessor, NodeProperties nodeProperties, TableMetricSource metrics, RlsPolicyManager policyManager) {
        this.mvDataStorage = mvDataStorage;
        this.raftCommandRunner = raftCommandRunner;
        this.txManager = txManager;
        this.lockManager = lockManager;
        this.scanRequestExecutor = scanRequestExecutor;
        this.partitionOperationsExecutor = partitionOperationsExecutor;
        this.indexesLockers = indexesLockers;
        this.pkIndexStorage = pkIndexStorage;
        this.secondaryIndexStorages = secondaryIndexStorages;
        this.clockService = clockService;
        this.safeTime = safeTime;
        this.transactionStateResolver = transactionStateResolver;
        this.storageUpdateHandler = storageUpdateHandler;
        this.schemaSyncService = schemaSyncService;
        this.catalogService = catalogService;
        this.remotelyTriggeredResourceRegistry = remotelyTriggeredResourceRegistry;
        this.schemaRegistry = schemaRegistry;
        this.lowWatermark = lowWatermark;
        this.licenseFeatureChecker = licenseFeatureChecker;
        this.nodeProperties = nodeProperties;
        this.replicationGroupId = replicationGroupId;
        this.tableId = tableId;
        this.policyManager = policyManager;
        this.tableLockKey = new TablePartitionId(tableId, replicationGroupId.partitionId());
        this.metrics = metrics;
        this.schemaCompatValidator = new SchemaCompatibilityValidator(validationSchemasSource, catalogService, schemaSyncService);
        this.indexBuildingProcessor = new PartitionReplicaBuildIndexProcessor(this.busyLock, tableId, indexMetaStorage, catalogService);
        this.replicaPrimacyEngine = new ReplicaPrimacyEngine(placementDriver, clockService, replicationGroupId, localNode);
        this.tableAwareReplicaRequestPreProcessor = new TableAwareReplicaRequestPreProcessor(clockService, this.schemaCompatValidator, schemaSyncService, nodeProperties);
        this.reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService);
        this.raftCommandApplicator = new ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
        this.replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
        TxRecoveryEngine txRecoveryEngine = new TxRecoveryEngine(txManager, clusterNodeResolver, replicationGroupId, this::createAbandonedTxRecoveryEnlistment);
        this.txFinishReplicaRequestHandler = new TxFinishReplicaRequestHandler(txStatePartitionStorage, clockService, txManager, validationSchemasSource, schemaSyncService, catalogService, raftCommandRunner, replicationGroupId);
        this.txStateCommitPartitionReplicaRequestHandler = new TxStateCommitPartitionReplicaRequestHandler(txStatePartitionStorage, txManager, clusterNodeResolver, localNode, txRecoveryEngine);
        this.txRecoveryMessageHandler = new TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, txRecoveryEngine);
        this.txCleanupRecoveryRequestHandler = new TxCleanupRecoveryRequestHandler(txStatePartitionStorage, txManager, failureProcessor, replicationGroupId);
        this.minimumActiveTxTimeReplicaRequestHandler = new MinimumActiveTxTimeReplicaRequestHandler(clockService, this.raftCommandApplicator);
        this.vacuumTxStateReplicaRequestHandler = new VacuumTxStateReplicaRequestHandler(this.raftCommandApplicator);
        this.buildIndexReplicaRequestHandler = new BuildIndexReplicaRequestHandler(indexMetaStorage, this.indexBuildingProcessor.tracker(), safeTime, this.raftCommandApplicator);
    }

    private PendingTxPartitionEnlistment createAbandonedTxRecoveryEnlistment(InternalClusterNode node) {
        assert (!this.nodeProperties.colocationEnabled()) : "Unexpected method call within colocation enabled.";
        return new PendingTxPartitionEnlistment(node.name(), 0L, ((TablePartitionId)this.replicationGroupId).tableId());
    }

    @Override
    public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) {
        return this.replicaPrimacyEngine.validatePrimacy(request).thenCompose(replicaPrimacy -> this.processRequestInContext(request, (ReplicaPrimacy)replicaPrimacy, senderId));
    }

    @Override
    public CompletableFuture<ReplicaResult> process(ReplicaRequest request, ReplicaPrimacy replicaPrimacy, UUID senderId) {
        return this.processRequestInContext(request, replicaPrimacy, senderId);
    }

    private CompletableFuture<ReplicaResult> processRequestInContext(ReplicaRequest request, ReplicaPrimacy replicaPrimacy, UUID senderId) {
        return this.processRequest(request, replicaPrimacy, senderId).thenApply(PartitionReplicaListener::wrapInReplicaResultIfNeeded);
    }

    private static ReplicaResult wrapInReplicaResultIfNeeded(Object res) {
        if (res instanceof ReplicaResult) {
            return (ReplicaResult)res;
        }
        return new ReplicaResult(res, null);
    }

    @Override
    public RaftCommandRunner raftClient() {
        if (this.raftCommandRunner instanceof ExecutorInclinedRaftCommandRunner) {
            return ((ExecutorInclinedRaftCommandRunner)this.raftCommandRunner).decoratedCommandRunner();
        }
        return this.raftCommandRunner;
    }

    private CompletableFuture<?> processRequest(ReplicaRequest request, ReplicaPrimacy replicaPrimacy, UUID senderId) {
        HybridTimestamp opTsIfDirectRo;
        ReadWriteReplicaRequest req;
        boolean hasSchemaVersion = request instanceof SchemaVersionAwareReplicaRequest;
        if (hasSchemaVersion) assert (((SchemaVersionAwareReplicaRequest)request).schemaVersion() > 0) : "No schema version passed for request: " + request.getClass();
        if (request instanceof ReadWriteReplicaRequest && !(req = (ReadWriteReplicaRequest)request).full()) {
            this.txManager.updateTxMeta(req.transactionId(), old -> new TxStateMeta(TxState.PENDING, req.coordinatorId(), req.commitPartitionId().asReplicationGroupId(), null, old == null ? null : old.tx(), old == null ? null : old.isFinishedDueToTimeout()));
        }
        if (request instanceof TxRecoveryMessage) {
            assert (!this.nodeProperties.colocationEnabled()) : "Unexpected method call within colocation enabled.";
            return this.txRecoveryMessageHandler.handle((TxRecoveryMessage)request, senderId);
        }
        if (request instanceof TxCleanupRecoveryRequest) {
            assert (!this.nodeProperties.colocationEnabled()) : "Unexpected method call within colocation enabled.";
            return this.txCleanupRecoveryRequestHandler.handle((TxCleanupRecoveryRequest)request);
        }
        if (request instanceof GetEstimatedSizeRequest) {
            return this.processGetEstimatedSizeRequest();
        }
        if (request instanceof ChangePeersAndLearnersAsyncReplicaRequest) {
            return this.processChangePeersAndLearnersReplicaRequest((ChangePeersAndLearnersAsyncReplicaRequest)request);
        }
        @Nullable HybridTimestamp opTs = this.tableAwareReplicaRequestPreProcessor.getOperationTimestamp(request);
        HybridTimestamp hybridTimestamp = opTsIfDirectRo = request instanceof ReadOnlyDirectReplicaRequest ? opTs : null;
        if (this.nodeProperties.colocationEnabled()) {
            return this.processOperationRequestWithTxOperationManagementLogic(senderId, request, replicaPrimacy, opTsIfDirectRo);
        }
        if (opTs == null) {
            assert (opTsIfDirectRo == null);
            return this.processOperationRequestWithTxOperationManagementLogic(senderId, request, replicaPrimacy, null);
        }
        return this.tableAwareReplicaRequestPreProcessor.preProcessTableAwareRequest(request, replicaPrimacy, senderId).thenCompose(ignored -> this.processOperationRequestWithTxOperationManagementLogic(senderId, request, replicaPrimacy, opTsIfDirectRo));
    }

    private CompletableFuture<Long> processGetEstimatedSizeRequest() {
        return CompletableFuture.completedFuture(this.mvDataStorage.estimatedSize());
    }

    private CompletableFuture<Void> processChangePeersAndLearnersReplicaRequest(ChangePeersAndLearnersAsyncReplicaRequest request) {
        TablePartitionId replicaGrpId = (TablePartitionId)request.groupId().asReplicationGroupId();
        RaftGroupService raftClient = this.raftCommandRunner instanceof RaftGroupService ? (RaftGroupService)this.raftCommandRunner : (RaftGroupService)((ExecutorInclinedRaftCommandRunner)this.raftCommandRunner).decoratedCommandRunner();
        return ((CompletableFuture)raftClient.refreshAndGetLeaderWithTerm().exceptionally(throwable -> {
            if ((throwable = ExceptionUtils.unwrapCause(throwable)) instanceof TimeoutException) {
                LOG.info("Node couldn't get the leader within timeout so the changing peers is skipped [grp={}].", replicaGrpId);
                return LeaderWithTerm.NO_LEADER;
            }
            throw new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, "Failed to get a leader for the RAFT replication group [get=" + replicaGrpId + "].", (Throwable)throwable);
        })).thenCompose(leaderWithTerm -> {
            if (leaderWithTerm.isEmpty() || !this.replicaPrimacyEngine.tokenStillMatchesPrimary(request.enlistmentConsistencyToken())) {
                return CompletableFutures.nullCompletedFuture();
            }
            PeersAndLearners peersAndLearners = PartitionReplicaListener.peersConfigurationFromMessage(request);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Current node={} is the leader of partition raft group={}. Initiate rebalance process for partition={}, table={}, peersAndLearners={}", leaderWithTerm.leader(), replicaGrpId, replicaGrpId.partitionId(), replicaGrpId.tableId(), peersAndLearners);
            }
            return raftClient.changePeersAndLearnersAsync(peersAndLearners, leaderWithTerm.term(), request.sequenceToken());
        });
    }

    private static PeersAndLearners peersConfigurationFromMessage(ChangePeersAndLearnersAsyncReplicaRequest request) {
        Assignments pendingAssignments = Assignments.fromBytes(request.pendingAssignments());
        return PeersAndLearners.fromAssignments(pendingAssignments.nodes());
    }

    private static void setDelayedAckProcessor(@Nullable ReplicaResult result, @Nullable BiConsumer<Object, Throwable> proc) {
        if (result != null) {
            result.delayedAckProcessor = proc;
        }
    }

    private CompletableFuture<?> processOperationRequest(UUID senderId, ReplicaRequest request, ReplicaPrimacy replicaPrimacy, @Nullable HybridTimestamp opStartTsIfDirectRo) {
        if (request instanceof ReadWriteSingleRowReplicaRequest) {
            ReadWriteSingleRowReplicaRequest req = (ReadWriteSingleRowReplicaRequest)request;
            OperationId opId = new OperationId(senderId, req.timestamp().longValue());
            return this.appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> this.processSingleEntryAction(req, replicaPrimacy.leaseStartTime()).whenComplete((r, e) -> PartitionReplicaListener.setDelayedAckProcessor(r, req.delayedAckProcessor())));
        }
        if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
            ReadWriteSingleRowPkReplicaRequest req = (ReadWriteSingleRowPkReplicaRequest)request;
            OperationId opId = new OperationId(senderId, req.timestamp().longValue());
            return this.appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> this.processSingleEntryAction(req, replicaPrimacy.leaseStartTime()).whenComplete((r, e) -> PartitionReplicaListener.setDelayedAckProcessor(r, req.delayedAckProcessor())));
        }
        if (request instanceof DcrWriteMultiRowReplicaRequest) {
            DcrWriteMultiRowReplicaRequest req = (DcrWriteMultiRowReplicaRequest)request;
            OperationId opId = new OperationId(senderId, req.timestamp().longValue());
            return this.appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> this.processDcrMultiEntryAction(req, replicaPrimacy.leaseStartTime()));
        }
        if (request instanceof ReadWriteMultiRowReplicaRequest) {
            ReadWriteMultiRowReplicaRequest req = (ReadWriteMultiRowReplicaRequest)request;
            OperationId opId = new OperationId(senderId, req.timestamp().longValue());
            return this.appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> this.processMultiEntryAction(req, (Long)replicaPrimacy.leaseStartTime()).whenComplete((r, e) -> PartitionReplicaListener.setDelayedAckProcessor(r, req.delayedAckProcessor())));
        }
        if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
            ReadWriteMultiRowPkReplicaRequest req = (ReadWriteMultiRowPkReplicaRequest)request;
            OperationId opId = new OperationId(senderId, req.timestamp().longValue());
            return this.appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> this.processMultiEntryAction(req, replicaPrimacy.leaseStartTime()).whenComplete((r, e) -> PartitionReplicaListener.setDelayedAckProcessor(r, req.delayedAckProcessor())));
        }
        if (request instanceof ReadWriteSwapRowReplicaRequest) {
            ReadWriteSwapRowReplicaRequest req = (ReadWriteSwapRowReplicaRequest)request;
            OperationId opId = new OperationId(senderId, req.timestamp().longValue());
            return this.appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> this.processTwoEntriesAction(req, replicaPrimacy.leaseStartTime()).whenComplete((r, e) -> PartitionReplicaListener.setDelayedAckProcessor(r, req.delayedAckProcessor())));
        }
        if (request instanceof ReadWriteScanRetrieveBatchReplicaRequest) {
            ReadWriteScanRetrieveBatchReplicaRequest req = (ReadWriteScanRetrieveBatchReplicaRequest)request;
            this.txManager.updateTxMeta(req.transactionId(), old -> new TxStateMeta(TxState.PENDING, req.coordinatorId(), req.commitPartitionId().asReplicationGroupId(), null, old == null ? null : old.tx(), old == null ? null : old.isFinishedDueToTimeout()));
            OperationId opId = new OperationId(senderId, req.timestamp().longValue());
            return ((CompletableFuture)this.appendTxCommand(req.transactionId(), opId, RequestType.RW_SCAN, false, () -> this.processScanRetrieveBatchAction(req)).thenCompose(rows -> {
                if (PartitionReplicaListener.allElementsAreNull(rows)) {
                    return CompletableFuture.completedFuture(rows);
                }
                return this.validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId()).thenApply(ignored -> {
                    this.metrics.onRead(rows.size(), false);
                    return rows;
                });
            })).whenComplete((rows, err) -> {
                if (req.full() && (err != null || rows.size() < req.batchSize())) {
                    this.releaseTxLocks(req.transactionId());
                }
            });
        }
        if (request instanceof ScanCloseReplicaRequest) {
            this.processScanCloseAction((ScanCloseReplicaRequest)request);
            return CompletableFutures.nullCompletedFuture();
        }
        if (request instanceof TxFinishReplicaRequest) {
            assert (!this.nodeProperties.colocationEnabled()) : request;
            return this.txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest)request);
        }
        if (request instanceof WriteIntentSwitchReplicaRequest) {
            return this.processWriteIntentSwitchAction((WriteIntentSwitchReplicaRequest)request);
        }
        if (request instanceof TableWriteIntentSwitchReplicaRequest) {
            return this.processTableWriteIntentSwitchAction((TableWriteIntentSwitchReplicaRequest)request);
        }
        if (request instanceof ReadOnlySingleRowPkReplicaRequest) {
            return this.processReadOnlySingleEntryAction((ReadOnlySingleRowPkReplicaRequest)request, replicaPrimacy.isPrimary());
        }
        if (request instanceof ReadOnlyMultiRowPkReplicaRequest) {
            return this.processReadOnlyMultiEntryAction((ReadOnlyMultiRowPkReplicaRequest)request, replicaPrimacy.isPrimary());
        }
        if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
            return this.processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)request, replicaPrimacy.isPrimary());
        }
        if (request instanceof ReplicaSafeTimeSyncRequest) {
            return this.processReplicaSafeTimeSyncRequest(replicaPrimacy.isPrimary());
        }
        if (request instanceof BuildIndexReplicaRequest) {
            return this.buildIndexReplicaRequestHandler.handle((BuildIndexReplicaRequest)request);
        }
        if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
            return this.processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest)request, opStartTsIfDirectRo);
        }
        if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
            return this.processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest)request, opStartTsIfDirectRo);
        }
        if (request instanceof TxStateCommitPartitionRequest) {
            assert (!this.nodeProperties.colocationEnabled()) : request;
            return this.txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest)request);
        }
        if (request instanceof VacuumTxStateReplicaRequest) {
            assert (!this.nodeProperties.colocationEnabled()) : request;
            return this.vacuumTxStateReplicaRequestHandler.handle((VacuumTxStateReplicaRequest)request);
        }
        if (request instanceof UpdateMinimumActiveTxBeginTimeReplicaRequest) {
            assert (!this.nodeProperties.colocationEnabled()) : request;
            return this.minimumActiveTxTimeReplicaRequestHandler.handle((UpdateMinimumActiveTxBeginTimeReplicaRequest)request);
        }
        if (request instanceof ContinuousQueryScanRequest) {
            return this.beginProcessContinuousQueryScanRequest((ContinuousQueryScanRequest)request);
        }
        if (request instanceof ReadOnlyIntervalScanRetrieveBatchReplicaRequest) {
            return this.processReadOnlyIntervalScanRetrieveBatchAction((ReadOnlyIntervalScanRetrieveBatchReplicaRequest)request, replicaPrimacy.isPrimary());
        }
        if (request instanceof ReadOnlyRowVersionsScanRetrieveBatchReplicaRequest) {
            return this.processReadOnlyRowVersionsScanRetrieveBatchAction((ReadOnlyRowVersionsScanRetrieveBatchReplicaRequest)request, replicaPrimacy.isPrimary());
        }
        throw new UnsupportedReplicaRequestException(request.getClass());
    }

    private CompletableFuture<List<BinaryRow>> processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest request, boolean isPrimary) {
        CompletableFuture safeReadFuture;
        UUID txId = request.transactionId();
        int batchCount = request.batchSize();
        HybridTimestamp readTimestamp = request.readTimestamp();
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(txId, request.scanId());
        CompletableFuture<Object> completableFuture = safeReadFuture = this.isPrimaryInTimestamp(isPrimary, readTimestamp) ? CompletableFutures.nullCompletedFuture() : this.safeTime.waitFor(readTimestamp);
        if (request.indexToUse() != null) {
            TableSchemaAwareIndexStorage indexStorage = this.secondaryIndexStorages.get().get(request.indexToUse());
            if (indexStorage == null) {
                throw new AssertionError((Object)("Index not found: uuid=" + request.indexToUse()));
            }
            if (request.exactKey() != null) {
                assert (request.lowerBoundPrefix() == null && request.upperBoundPrefix() == null) : "Index lookup doesn't allow bounds.";
                return ((CompletableFuture)safeReadFuture.thenCompose(unused -> this.lookupIndex(request, indexStorage))).thenApply(rows -> {
                    this.metrics.onRead(rows.size(), true);
                    return rows;
                });
            }
            assert (indexStorage.storage() instanceof SortedIndexStorage);
            return ((CompletableFuture)safeReadFuture.thenCompose(unused -> this.scanSortedIndex(request, indexStorage))).thenApply(rows -> {
                this.metrics.onRead(rows.size(), true);
                return rows;
            });
        }
        return ((CompletableFuture)safeReadFuture.thenCompose(unused -> this.retrieveExactEntriesUntilCursorEmpty(txId, request.coordinatorId(), readTimestamp, cursorId, batchCount))).thenApply(rows -> {
            this.metrics.onRead(rows.size(), true);
            return rows;
        });
    }

    private CompletableFuture<List<BinaryRowAndRowId>> processReadOnlyIntervalScanRetrieveBatchAction(ReadOnlyIntervalScanRetrieveBatchReplicaRequest request, boolean isPrimary) {
        UUID txId = request.transactionId();
        int batchCount = request.batchSize();
        HybridTimestamp fromTs = request.lowerBoundTimestamp();
        HybridTimestamp toTs = request.readTimestamp();
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(txId, request.scanId());
        CompletableFuture safeReadFuture = this.isPrimaryInTimestamp(isPrimary, toTs) ? CompletableFutures.nullCompletedFuture() : this.safeTime.waitFor(toTs);
        return safeReadFuture.thenCompose(unused -> this.retrieveExactEntriesUntilCursorEmpty(txId, request.coordinatorId(), fromTs, toTs, cursorId, batchCount));
    }

    private CompletableFuture<?> processReadOnlyRowVersionsScanRetrieveBatchAction(ReadOnlyRowVersionsScanRetrieveBatchReplicaRequest request, boolean isPrimary) {
        UUID txId = request.transactionId();
        int batchCount = request.batchSize();
        RowId fromRowId = new RowId(this.partId(), request.lowerBoundRowId());
        HybridTimestamp fromTs = request.lowerBoundTimestamp();
        HybridTimestamp toTs = request.readTimestamp();
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(txId, request.scanId());
        CompletableFuture safeReadFuture = this.isPrimaryInTimestamp(isPrimary, toTs) ? CompletableFutures.nullCompletedFuture() : this.safeTime.waitFor(toTs);
        ArrayList result = new ArrayList(batchCount);
        return ((CompletableFuture)safeReadFuture.thenCompose(unused -> this.retrieveRowVersionsUntilCursorEmpty(request.coordinatorId(), fromRowId, fromTs, toTs, cursorId, batchCount, result))).thenApply(v -> {
            this.closeCursorIfBatchNotFull(result, batchCount, cursorId);
            return result;
        });
    }

    private CompletableFuture<List<BinaryRow>> retrieveExactEntriesUntilCursorEmpty(UUID txId, UUID txCoordinatorId, @Nullable HybridTimestamp readTimestamp, FullyQualifiedResourceId cursorId, int count) {
        ArrayList<BinaryRow> result = new ArrayList<BinaryRow>(count);
        return this.retrieveExactEntriesUntilCursorEmpty(txId, txCoordinatorId, readTimestamp, cursorId, count, result).thenApply(v -> {
            this.closeCursorIfBatchNotFull(result, count, cursorId);
            return result;
        });
    }

    private CompletableFuture<List<BinaryRowAndRowId>> retrieveExactEntriesUntilCursorEmpty(UUID txId, UUID txCoordinatorId, HybridTimestamp fromTs, HybridTimestamp toTs, FullyQualifiedResourceId cursorId, int count) {
        ArrayList<BinaryRowAndRowId> result = new ArrayList<BinaryRowAndRowId>(count);
        return this.retrieveExactEntriesUntilCursorEmpty(txId, txCoordinatorId, fromTs, toTs, cursorId, count, result).thenApply(v -> {
            this.closeCursorIfBatchNotFull(result, count, cursorId);
            return result;
        });
    }

    private CompletableFuture<Void> retrieveExactEntriesUntilCursorEmpty(UUID txId, UUID txCoordinatorId, @Nullable HybridTimestamp readTimestamp, FullyQualifiedResourceId cursorId, int count, List<BinaryRow> result) {
        CursorResource resource = (CursorResource)this.remotelyTriggeredResourceRegistry.register(cursorId, txCoordinatorId, () -> new CursorResource(this.mvDataStorage.scan(readTimestamp == null ? HybridTimestamp.MAX_VALUE : readTimestamp)));
        PartitionTimestampCursor cursor = (PartitionTimestampCursor)resource.cursor();
        int resultStartIndex = result.size();
        ArrayList<CompletionStage> resolutionFutures = new ArrayList<CompletionStage>();
        while (result.size() < count && cursor.hasNext()) {
            BinaryRow committedRow;
            ReadResult readResult = (ReadResult)cursor.next();
            UUID retrievedResultTxId = readResult.transactionId();
            if (!readResult.isWriteIntent() || readTimestamp == null && txId.equals(retrievedResultTxId)) {
                BinaryRow row = readResult.binaryRow();
                if (row == null) continue;
                result.add(row);
                continue;
            }
            HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp();
            TimedBinaryRow candidate = newestCommitTimestamp == null ? null : ((committedRow = cursor.committed(newestCommitTimestamp)) == null ? null : new TimedBinaryRow(committedRow, newestCommitTimestamp));
            CompletionStage resolutionFuture = this.resolveWriteIntentAsync(readResult, readTimestamp, () -> candidate).thenApply(timedBinaryRow -> timedBinaryRow == null ? null : timedBinaryRow.binaryRow());
            resolutionFutures.add(resolutionFuture);
            result.add(null);
        }
        if (resolutionFutures.isEmpty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        return CompletableFuture.allOf((CompletableFuture[])resolutionFutures.toArray(CompletableFuture[]::new)).thenComposeAsync(unused -> {
            PartitionReplicaListener.mergeRowsWithResolvedWriteIntents(result, resultStartIndex, resolutionFutures);
            if (result.size() < count && cursor.hasNext()) {
                return this.retrieveExactEntriesUntilCursorEmpty(txId, txCoordinatorId, readTimestamp, cursorId, count, result);
            }
            return CompletableFutures.nullCompletedFuture();
        }, this.scanRequestExecutor);
    }

    private CompletableFuture<Void> retrieveExactEntriesUntilCursorEmpty(UUID txId, UUID txCoordinatorId, HybridTimestamp fromTs, HybridTimestamp toTs, FullyQualifiedResourceId cursorId, int count, List<BinaryRowAndRowId> result) {
        CursorResource resource = (CursorResource)this.remotelyTriggeredResourceRegistry.register(cursorId, txCoordinatorId, () -> new CursorResource(this.mvDataStorage.scan(fromTs, toTs)));
        PartitionTimestampCursor cursor = (PartitionTimestampCursor)resource.cursor();
        int resultStartIndex = result.size();
        ArrayList<CompletionStage> resolutionFutures = new ArrayList<CompletionStage>();
        while (result.size() < count && cursor.hasNext()) {
            BinaryRow committedRow;
            ReadResult readResult = (ReadResult)cursor.next();
            UUID retrievedResultTxId = readResult.transactionId();
            RowId rowId = readResult.rowId();
            if (!readResult.isWriteIntent() || txId.equals(retrievedResultTxId)) {
                result.add(new BinaryRowAndRowId(readResult.binaryRow(), rowId));
                continue;
            }
            HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp();
            TimedBinaryRow candidate = newestCommitTimestamp == null ? null : ((committedRow = cursor.committed(newestCommitTimestamp)) == null ? null : new TimedBinaryRow(committedRow, newestCommitTimestamp));
            CompletionStage resolutionFuture = this.resolveWriteIntentAsync(readResult, toTs, () -> candidate).thenApply(timedBinaryRow -> timedBinaryRow == null ? null : new BinaryRowAndRowId(timedBinaryRow.binaryRow(), rowId));
            resolutionFutures.add(resolutionFuture);
            result.add(null);
        }
        if (resolutionFutures.isEmpty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        return CompletableFuture.allOf((CompletableFuture[])resolutionFutures.toArray(CompletableFuture[]::new)).thenComposeAsync(unused -> {
            PartitionReplicaListener.mergeRowsWithResolvedWriteIntents(result, resultStartIndex, resolutionFutures);
            if (result.size() < count && cursor.hasNext()) {
                return this.retrieveExactEntriesUntilCursorEmpty(txId, txCoordinatorId, fromTs, toTs, cursorId, count, result);
            }
            return CompletableFutures.nullCompletedFuture();
        }, this.scanRequestExecutor);
    }

    private CompletableFuture<List<BinaryRow>> retrieveExactEntriesUntilCursorEmpty(UUID txId, UUID txCoordinatorId, FullyQualifiedResourceId cursorId, int count) {
        return this.retrieveExactEntriesUntilCursorEmpty(txId, txCoordinatorId, null, cursorId, count).thenCompose(rows -> {
            if (CollectionUtils.nullOrEmpty(rows)) {
                return CompletableFutures.emptyListCompletedFuture();
            }
            CompletableFuture[] futs = new CompletableFuture[rows.size()];
            for (int i = 0; i < rows.size(); ++i) {
                BinaryRow row = (BinaryRow)rows.get(i);
                futs[i] = this.validateBackwardCompatibility(row, txId);
            }
            return CompletableFuture.allOf(futs).thenApply(unused -> rows);
        });
    }

    private CompletableFuture<Void> retrieveRowVersionsUntilCursorEmpty(UUID txCoordinatorId, RowId fromRowId, HybridTimestamp fromTs, HybridTimestamp toTs, FullyQualifiedResourceId cursorId, int count, List<TimedBinaryRowAndRowId> result) {
        CursorResource resource = (CursorResource)this.remotelyTriggeredResourceRegistry.register(cursorId, txCoordinatorId, () -> new CursorResource(this.mvDataStorage.scanAllVersions(fromRowId, fromTs, toTs)));
        Object cursor = resource.cursor();
        int resultStartIndex = result.size();
        ArrayList<CompletionStage> resolutionFutures = new ArrayList<CompletionStage>();
        while (result.size() < count && cursor.hasNext()) {
            ReadResult readResult = (ReadResult)cursor.next();
            BinaryTuple pkTuple = this.resolveCurrentRowPk(readResult, result);
            if (readResult.isArchived()) continue;
            if (!readResult.isWriteIntent()) {
                this.addRowVersionToResult(result, readResult, pkTuple);
                continue;
            }
            HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp();
            RowId rowId = readResult.rowId();
            CompletionStage resolutionFuture = this.resolveWriteIntentAsync(readResult, toTs, () -> this.resolveRowVersionCandidate(rowId, newestCommitTimestamp, toTs)).thenApply(timedBinaryRow -> timedBinaryRow == null ? null : this.mapRowVersionCandidate((TimedBinaryRow)timedBinaryRow, rowId, pkTuple));
            resolutionFutures.add(resolutionFuture);
            result.add(null);
        }
        if (resolutionFutures.isEmpty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        return CompletableFuture.allOf((CompletableFuture[])resolutionFutures.toArray(CompletableFuture[]::new)).thenComposeAsync(unused -> {
            PartitionReplicaListener.mergeRowsWithResolvedWriteIntents(result, resultStartIndex, resolutionFutures);
            if (result.size() < count && cursor.hasNext()) {
                return this.retrieveRowVersionsUntilCursorEmpty(txCoordinatorId, fromRowId, fromTs, toTs, cursorId, count, result);
            }
            return CompletableFutures.nullCompletedFuture();
        }, this.scanRequestExecutor);
    }

    @Nullable
    private BinaryTuple resolveCurrentRowPk(ReadResult rowVersion, List<TimedBinaryRowAndRowId> result) {
        if (rowVersion.binaryRow() == null && !result.isEmpty()) {
            return this.extractPkFromNullable(result.get(result.size() - 1).binaryRow());
        }
        return null;
    }

    @Nullable
    private TimedBinaryRow resolveRowVersionCandidate(RowId rowId, @Nullable HybridTimestamp commitTimestamp, HybridTimestamp toTs) {
        if (commitTimestamp == null || commitTimestamp.compareTo(toTs) > 0) {
            return null;
        }
        ReadResult result = this.mvDataStorage.read(rowId, commitTimestamp);
        if (result == null || result.isEmpty()) {
            return null;
        }
        BinaryRow committedRow = result.binaryRow();
        return committedRow == null ? null : new TimedBinaryRow(committedRow, commitTimestamp);
    }

    @Nullable
    private TimedBinaryRowAndRowId mapRowVersionCandidate(@Nullable TimedBinaryRow committedRow, RowId rowId, @Nullable BinaryTuple pkTuple) {
        if (committedRow == null) {
            return null;
        }
        HybridTimestamp rowCommitTimestamp = committedRow.commitTimestamp();
        assert (rowCommitTimestamp != null);
        BinaryRow row = committedRow.binaryRow();
        if (row == null) {
            BinaryTuple key = this.resolvePkForTombstone(rowId, pkTuple, rowCommitTimestamp);
            return new TimedBinaryRowAndRowId(new BinaryRowImpl(-1, key.byteBuffer()), rowId, rowCommitTimestamp, true);
        }
        return new TimedBinaryRowAndRowId(row, rowId, rowCommitTimestamp, false);
    }

    private void addRowVersionToResult(List<TimedBinaryRowAndRowId> result, ReadResult readResult, @Nullable BinaryTuple pkTuple) {
        HybridTimestamp rowCommitTimestamp = readResult.commitTimestamp();
        assert (rowCommitTimestamp != null);
        RowId rowId = readResult.rowId();
        BinaryRow row = readResult.binaryRow();
        if (row == null) {
            BinaryTuple key = this.resolvePkForTombstone(rowId, pkTuple, rowCommitTimestamp);
            result.add(new TimedBinaryRowAndRowId(new BinaryRowImpl(-1, key.byteBuffer()), rowId, rowCommitTimestamp, true));
        } else {
            result.add(new TimedBinaryRowAndRowId(row, rowId, rowCommitTimestamp, false));
        }
    }

    private BinaryTuple resolvePkForTombstone(RowId rowId, @Nullable BinaryTuple pkTuple, HybridTimestamp commitTimestamp) {
        if (pkTuple == null) {
            ReadResult previousVersion = this.mvDataStorage.read(rowId, commitTimestamp.subtractPhysicalTime(1L));
            return this.extractPkFromNullable(previousVersion.binaryRow());
        }
        return pkTuple;
    }

    private static <T> void mergeRowsWithResolvedWriteIntents(List<T> result, int resultStartIndex, List<CompletableFuture<T>> resolutionFutures) {
        assert (!resolutionFutures.isEmpty());
        int futuresIndex = 0;
        ListIterator<T> it = result.listIterator(resultStartIndex);
        while (it.hasNext()) {
            T row = it.next();
            if (row != null) continue;
            CompletableFuture<T> future = resolutionFutures.get(futuresIndex++);
            assert (future.isDone());
            T resolvedReadResult = future.join();
            if (resolvedReadResult == null) {
                it.remove();
                continue;
            }
            it.set(resolvedReadResult);
        }
        assert (futuresIndex == resolutionFutures.size());
    }

    private CompletableFuture<Void> validateBackwardCompatibility(BinaryRow row, UUID txId) {
        return this.schemaCompatValidator.validateBackwards(row.schemaVersion(), this.tableId(), txId).thenAccept(validationResult -> {
            if (!validationResult.isSuccessful()) {
                throw new IncompatibleSchemaVersionException(String.format("Operation failed because it tried to access a row with newer schema version than transaction's [table=%s, txSchemaVersion=%d, rowSchemaVersion=%d]", validationResult.failedTableName(), validationResult.fromSchemaVersion(), validationResult.toSchemaVersion()));
            }
        });
    }

    private CompletableFuture<BinaryRow> processReadOnlySingleEntryAction(ReadOnlySingleRowPkReplicaRequest request, boolean isPrimary) {
        BinaryTuple primaryKey = this.resolvePk(request.primaryKey());
        HybridTimestamp readTimestamp = request.readTimestamp();
        if (request.requestType() != RequestType.RO_GET) {
            throw new IgniteInternalException(ErrorGroups.Replicator.REPLICA_COMMON_ERR, IgniteStringFormatter.format("Unknown single request [actionType={}]", new Object[]{request.requestType()}));
        }
        CompletableFuture safeReadFuture = this.isPrimaryInTimestamp(isPrimary, readTimestamp) ? CompletableFutures.nullCompletedFuture() : this.safeTime.waitFor(request.readTimestamp());
        RlsContext rlsContext = PartitionReplicaListener.extractRlsContext(request);
        return safeReadFuture.thenCompose(unused -> this.resolveRowByPkForReadOnly(primaryKey, readTimestamp, rlsContext));
    }

    private boolean isPrimaryInTimestamp(boolean isPrimary, HybridTimestamp timestamp) {
        return isPrimary && this.clockService.now().compareTo(timestamp) > 0;
    }

    private CompletableFuture<List<BinaryRow>> processReadOnlyMultiEntryAction(ReadOnlyMultiRowPkReplicaRequest request, boolean isPrimary) {
        List<BinaryTuple> primaryKeys = this.resolvePks(request.primaryKeys());
        HybridTimestamp readTimestamp = request.readTimestamp();
        if (request.requestType() != RequestType.RO_GET_ALL) {
            throw new IgniteInternalException(ErrorGroups.Replicator.REPLICA_COMMON_ERR, IgniteStringFormatter.format("Unknown single request [actionType={}]", new Object[]{request.requestType()}));
        }
        CompletableFuture safeReadFuture = this.isPrimaryInTimestamp(isPrimary, readTimestamp) ? CompletableFutures.nullCompletedFuture() : this.safeTime.waitFor(request.readTimestamp());
        return safeReadFuture.thenCompose(unused -> {
            CompletableFuture[] resolutionFuts = new CompletableFuture[primaryKeys.size()];
            for (int i = 0; i < primaryKeys.size(); ++i) {
                resolutionFuts[i] = this.resolveRowByPkForReadOnly((BinaryTuple)primaryKeys.get(i), readTimestamp, SYSTEM_CONTEXT);
            }
            return CompletableFutures.allOfToList(resolutionFuts);
        });
    }

    private CompletableFuture<?> processReplicaSafeTimeSyncRequest(boolean isPrimary) {
        if (!isPrimary || this.nodeProperties.colocationEnabled()) {
            return CompletableFutures.nullCompletedFuture();
        }
        return this.applyCmdWithExceptionHandling(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().initiatorTime(this.clockService.now()).build());
    }

    private CompletableFuture<ReplicaResult> beginProcessContinuousQueryScanRequest(ContinuousQueryScanRequest request) {
        this.licenseFeatureChecker.checkFeature(LicenseFeature.CONTINUOUS_QUERIES);
        CompletionStage fut = this.processContinuousQueryScanRequest(request, this.safeTime.current()).exceptionally(err -> new ContinuousQueryScanResult(request.requestId(), (Throwable)err));
        return CompletableFuture.completedFuture(new ReplicaResult(null, new CommandApplicationResult(null, (CompletableFuture<?>)fut)));
    }

    private CompletableFuture<ContinuousQueryScanResult<BinaryRow>> processContinuousQueryScanRequest(ContinuousQueryScanRequest request, HybridTimestamp currentSafeTime) {
        assert (PartitionReplicaListener.partitionIdFromGroupIdMessage(request.groupId()) == this.partId()) : "Unexpected partition id: " + request.groupId() + ", expected: " + this.partId();
        if (currentSafeTime.compareTo(request.lowerBoundTimestamp()) < 0) {
            return this.safeTime.waitFor(request.lowerBoundTimestamp()).thenComposeAsync(unused -> this.processContinuousQueryScanRequest(request, this.safeTime.current()), this.partitionOperationsExecutor);
        }
        assert (currentSafeTime.compareTo(request.lowerBoundTimestamp()) >= 0) : "currentSafeTime < lowerBoundTimestamp";
        assert (currentSafeTime.compareTo(this.safeTime.current()) <= 0) : "currentSafeTime > safeTime";
        UUID uppedBoundTxId = TransactionIds.transactionId(currentSafeTime, Integer.MAX_VALUE, TxPriority.NORMAL);
        NavigableMap txToWait = this.pendingTransactions.headMap((Object)uppedBoundTxId, true);
        if (!txToWait.isEmpty()) {
            Collection futs = null;
            for (CompletableFuture olderTxFut : txToWait.values()) {
                if (olderTxFut.isDone()) continue;
                futs = futs == null ? new ArrayList() : futs;
                futs.add(olderTxFut);
            }
            if (futs != null) {
                return CompletableFuture.allOf((CompletableFuture[])futs.toArray(CompletableFuture[]::new)).thenComposeAsync(unused -> this.processContinuousQueryScanRequestInternal(request, currentSafeTime), this.partitionOperationsExecutor);
            }
        }
        return this.processContinuousQueryScanRequestInternal(request, currentSafeTime);
    }

    private static int partitionIdFromGroupIdMessage(ReplicationGroupIdMessage message) {
        if (message instanceof ZonePartitionIdMessage) {
            return ((ZonePartitionIdMessage)message).partitionId();
        }
        if (message instanceof TablePartitionIdMessage) {
            return ((TablePartitionIdMessage)message).partitionId();
        }
        throw new IllegalArgumentException(message + " is not supported");
    }

    private CompletableFuture<ContinuousQueryScanResult<BinaryRow>> processContinuousQueryScanRequestInternal(ContinuousQueryScanRequest request, HybridTimestamp currentSafeTime) {
        return this.schemaSyncService.waitForMetadataCompleteness(currentSafeTime).thenCompose(unused -> this.processContinuousQueryScanRequestAfterSchemaSync(request, currentSafeTime));
    }

    private CompletableFuture<ContinuousQueryScanResult<BinaryRow>> processContinuousQueryScanRequestAfterSchemaSync(ContinuousQueryScanRequest request, HybridTimestamp currentSafeTime) {
        HybridTimestamp currentLwm;
        boolean tableExists;
        Catalog catalogForSafeTime = this.catalogService.activeCatalog(currentSafeTime.longValue());
        boolean bl = tableExists = catalogForSafeTime.table(this.tableId()) != null;
        if (!tableExists && this.nodeProperties.colocationEnabled()) {
            for (int version = catalogForSafeTime.version() + 1; version <= this.catalogService.latestCatalogVersion(); ++version) {
                Catalog catalog = this.catalogService.catalog(version);
                if (catalog.table(this.tableId()) == null) continue;
                return this.safeTime.waitFor(HybridTimestamp.hybridTimestamp(catalog.time())).thenComposeAsync(unused -> this.processContinuousQueryScanRequest(request, this.safeTime.current()), this.partitionOperationsExecutor);
            }
        }
        if (PartitionReplicaListener.isCqRequestBelowLowWatermark(request, currentLwm = this.lowWatermark.getLowWatermark())) {
            return CompletableFuture.failedFuture(PartitionReplicaListener.cqRequestBelowLowWatermarkException(request, currentLwm, tableExists));
        }
        RowId lowerBoundRowId = new RowId(this.partId(), request.lowerBoundRowId());
        EnumSet<TableRowEventType> eventTypes = ContinuousQueryUtils.decodeEventTypes(request.eventTypes());
        List<RowUpdateInfo<BinaryRow>> rows = this.storageUpdateHandler.storage().scanUpdateLog(request.lowerBoundTimestamp(), lowerBoundRowId, currentSafeTime, request.maxItems(), eventTypes, request.skipOldEntries());
        assert (rows.size() <= request.maxItems()) : "rows.size()=" + rows.size() + ", maxItems=" + request.maxItems();
        currentLwm = this.lowWatermark.getLowWatermark();
        if (PartitionReplicaListener.isCqRequestBelowLowWatermark(request, currentLwm)) {
            return CompletableFuture.failedFuture(PartitionReplicaListener.cqRequestBelowLowWatermarkException(request, currentLwm, tableExists));
        }
        assert (currentLwm == null || currentLwm.longValue() < currentSafeTime.longValue()) : "LWM should be less than safeTime: lwm=" + currentLwm + ", safeTime=" + currentSafeTime;
        int schemaVersion = this.upgrade(rows);
        BinaryRowColumnExtractor.extractColumns(rows, request.columnNames(), this.schemaRegistry);
        ContinuousQueryScanResultStatus status = !tableExists && rows.size() < request.maxItems() ? ContinuousQueryScanResultStatus.END_OF_LOG_REACHED_TABLE_DROPPED : ContinuousQueryScanResultStatus.OK;
        ContinuousQueryScanResult<BinaryRow> cqRes = new ContinuousQueryScanResult<BinaryRow>(currentSafeTime.longValue(), rows, schemaVersion, request.requestId(), status);
        return this.replicaPrimacyEngine.validatePrimacy(request).thenApply(unused -> cqRes);
    }

    private static boolean isCqRequestBelowLowWatermark(ContinuousQueryScanRequest request, @Nullable HybridTimestamp currentLwm) {
        return currentLwm != null && request.lowerBoundTimestamp().compareTo(currentLwm) < 0;
    }

    private static Exception cqRequestBelowLowWatermarkException(ContinuousQueryScanRequest request, HybridTimestamp currentLwm, boolean tableExists) {
        if (tableExists) {
            String errMsg = String.format("Scan query request below Low Watermark (too far back in the past) [lowerBoundTs=%s, lowWatermark=%s]", request.lowerBoundTimestamp(), currentLwm);
            return new IgniteException(GridgainErrorGroups.ContinuousQuery.WATERMARK_TOO_OLD_ERR, errMsg);
        }
        String errMsg = String.format("Scan query request below Low Watermark for dropped table [lowerBoundTs=%s, lowWatermark=%s]", request.lowerBoundTimestamp(), currentLwm);
        return new TableNotFoundException(UUID.randomUUID(), GridgainErrorGroups.ContinuousQuery.WATERMARK_TOO_OLD_TABLE_DOES_NOT_EXIST_ERR, errMsg, null);
    }

    private void processScanCloseAction(ScanCloseReplicaRequest request) {
        UUID txId = request.transactionId();
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(txId, request.scanId());
        try {
            this.remotelyTriggeredResourceRegistry.close(cursorId);
        }
        catch (IgniteException e) {
            throw this.wrapCursorCloseException(e);
        }
    }

    private void closeCursorIfBatchNotFull(List<?> rows, int batchSize, FullyQualifiedResourceId cursorId) {
        if (rows.size() < batchSize) {
            try {
                this.remotelyTriggeredResourceRegistry.close(cursorId);
            }
            catch (IgniteException e) {
                throw this.wrapCursorCloseException(e);
            }
        }
    }

    private ReplicationException wrapCursorCloseException(IgniteException e) {
        return new ReplicationException(ErrorGroups.Replicator.CURSOR_CLOSE_ERR, IgniteStringFormatter.format("Close cursor exception [replicaGrpId={}, msg={}]", this.replicationGroupId, e.getMessage()), (Throwable)e);
    }

    private CompletableFuture<List<BinaryRow>> processScanRetrieveBatchAction(ReadWriteScanRetrieveBatchReplicaRequest request) {
        if (request.indexToUse() != null) {
            TableSchemaAwareIndexStorage indexStorage = this.secondaryIndexStorages.get().get(request.indexToUse());
            if (indexStorage == null) {
                throw new AssertionError((Object)("Index not found: uuid=" + request.indexToUse()));
            }
            if (request.exactKey() != null) {
                assert (request.lowerBoundPrefix() == null && request.upperBoundPrefix() == null) : "Index lookup doesn't allow bounds.";
                return this.lookupIndex(request, indexStorage.storage(), request.coordinatorId());
            }
            assert (indexStorage.storage() instanceof SortedIndexStorage);
            return this.scanSortedIndex(request, indexStorage);
        }
        UUID txId = request.transactionId();
        int batchCount = request.batchSize();
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(txId, request.scanId());
        return this.lockManager.acquire(txId, new LockKey(this.tableLockKey), LockMode.S).thenCompose(tblLock -> this.retrieveExactEntriesUntilCursorEmpty(txId, request.coordinatorId(), cursorId, batchCount));
    }

    private CompletableFuture<List<BinaryRow>> lookupIndex(ReadOnlyScanRetrieveBatchReplicaRequest request, TableSchemaAwareIndexStorage schemaAwareIndexStorage) {
        IndexStorage indexStorage = schemaAwareIndexStorage.storage();
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(request.transactionId(), request.scanId());
        BinaryTuple key = request.exactKey().asBinaryTuple();
        Object cursor = ((CursorResource)this.remotelyTriggeredResourceRegistry.register(cursorId, request.coordinatorId(), () -> new CursorResource(indexStorage.get(key)))).cursor();
        Cursor<IndexRow> indexRowCursor = CursorUtils.map(cursor, rowId -> new IndexRowImpl(key, (RowId)rowId));
        int batchCount = request.batchSize();
        ArrayList<BinaryRow> result = new ArrayList<BinaryRow>(batchCount);
        HybridTimestamp readTimestamp = request.readTimestamp();
        return this.continueReadOnlyIndexScan(schemaAwareIndexStorage, indexRowCursor, readTimestamp, batchCount, result, this.tableVersionByTs(readTimestamp)).thenApply(ignore -> {
            this.closeCursorIfBatchNotFull(result, batchCount, cursorId);
            return result;
        });
    }

    private CompletableFuture<List<BinaryRow>> lookupIndex(ReadWriteScanRetrieveBatchReplicaRequest request, IndexStorage indexStorage, UUID txCoordinatorId) {
        UUID txId = request.transactionId();
        int batchCount = request.batchSize();
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(txId, request.scanId());
        Integer indexId = request.indexToUse();
        BinaryTuple exactKey = request.exactKey().asBinaryTuple();
        return this.lockManager.acquire(txId, new LockKey(indexId, exactKey.byteBuffer()), LockMode.S).thenCompose(indRowLock -> {
            Object cursor = ((CursorResource)this.remotelyTriggeredResourceRegistry.register(cursorId, txCoordinatorId, () -> new CursorResource(indexStorage.get(exactKey)))).cursor();
            ArrayList<BinaryRow> result = new ArrayList<BinaryRow>(batchCount);
            return this.continueIndexLookup(txId, (Cursor<RowId>)cursor, batchCount, (List<BinaryRow>)result).thenApply(ignore -> {
                this.closeCursorIfBatchNotFull(result, batchCount, cursorId);
                return result;
            });
        });
    }

    private CompletableFuture<List<BinaryRow>> scanSortedIndex(ReadWriteScanRetrieveBatchReplicaRequest request, TableSchemaAwareIndexStorage schemaAwareIndexStorage) {
        SortedIndexStorage indexStorage = (SortedIndexStorage)schemaAwareIndexStorage.storage();
        UUID txId = request.transactionId();
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(txId, request.scanId());
        Integer indexId = request.indexToUse();
        BinaryTupleMessage lowerBoundMessage = request.lowerBoundPrefix();
        BinaryTupleMessage upperBoundMessage = request.upperBoundPrefix();
        BinaryTuplePrefix lowerBound = lowerBoundMessage == null ? null : lowerBoundMessage.asBinaryTuplePrefix();
        BinaryTuplePrefix upperBound = upperBoundMessage == null ? null : upperBoundMessage.asBinaryTuplePrefix();
        int flags = request.flags();
        BinaryTupleComparator comparator = StorageUtils.binaryTupleComparator(indexStorage.indexDescriptor().columns());
        Predicate<IndexRow> isUpperBoundAchieved = indexRow -> {
            if (indexRow == null) {
                return true;
            }
            if (upperBound == null) {
                return false;
            }
            ByteBuffer buffer = upperBound.byteBuffer();
            if ((flags & 2) != 0) {
                byte boundFlags = buffer.get(0);
                buffer.put(0, (byte)(boundFlags | 0x10));
            }
            return comparator.compare(indexRow.indexColumns().byteBuffer(), buffer) >= 0;
        };
        Object cursor = ((CursorResource)this.remotelyTriggeredResourceRegistry.register(cursorId, request.coordinatorId(), () -> new CursorResource(indexStorage.scan(lowerBound, null, flags)))).cursor();
        SortedIndexLocker indexLocker = (SortedIndexLocker)this.indexesLockers.get().get(indexId);
        int batchCount = request.batchSize();
        ArrayList<BinaryRow> result = new ArrayList<BinaryRow>(batchCount);
        return this.continueIndexScan(txId, schemaAwareIndexStorage, indexLocker, (Cursor<IndexRow>)cursor, batchCount, (List<BinaryRow>)result, isUpperBoundAchieved, this.tableVersionByTs(TransactionIds.beginTimestamp(txId))).thenApply(ignore -> {
            this.closeCursorIfBatchNotFull(result, batchCount, cursorId);
            return result;
        });
    }

    private CompletableFuture<List<BinaryRow>> scanSortedIndex(ReadOnlyScanRetrieveBatchReplicaRequest request, TableSchemaAwareIndexStorage schemaAwareIndexStorage) {
        SortedIndexStorage indexStorage = (SortedIndexStorage)schemaAwareIndexStorage.storage();
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(request.transactionId(), request.scanId());
        BinaryTupleMessage lowerBoundMessage = request.lowerBoundPrefix();
        BinaryTupleMessage upperBoundMessage = request.upperBoundPrefix();
        BinaryTuplePrefix lowerBound = lowerBoundMessage == null ? null : lowerBoundMessage.asBinaryTuplePrefix();
        BinaryTuplePrefix upperBound = upperBoundMessage == null ? null : upperBoundMessage.asBinaryTuplePrefix();
        int flags = request.flags();
        Object cursor = ((CursorResource)this.remotelyTriggeredResourceRegistry.register(cursorId, request.coordinatorId(), () -> new CursorResource(indexStorage.readOnlyScan(lowerBound, upperBound, flags)))).cursor();
        int batchCount = request.batchSize();
        ArrayList<BinaryRow> result = new ArrayList<BinaryRow>(batchCount);
        HybridTimestamp readTimestamp = request.readTimestamp();
        return this.continueReadOnlyIndexScan(schemaAwareIndexStorage, (Cursor<IndexRow>)cursor, readTimestamp, batchCount, (List<BinaryRow>)result, this.tableVersionByTs(readTimestamp)).thenApply(ignore -> {
            this.closeCursorIfBatchNotFull(result, batchCount, cursorId);
            return result;
        });
    }

    private CompletableFuture<Void> continueReadOnlyIndexScan(TableSchemaAwareIndexStorage schemaAwareIndexStorage, Cursor<IndexRow> cursor, HybridTimestamp readTimestamp, int batchSize, List<BinaryRow> result, int tableVersion) {
        if (result.size() >= batchSize || !cursor.hasNext()) {
            return CompletableFutures.nullCompletedFuture();
        }
        IndexRow indexRow = (IndexRow)cursor.next();
        RowId rowId = indexRow.rowId();
        return this.resolvePlainReadResult(rowId, null, readTimestamp).thenComposeAsync(resolvedReadResult -> {
            BinaryRow binaryRow = this.upgrade(PartitionReplicaListener.binaryRow(resolvedReadResult), tableVersion);
            if (binaryRow != null && PartitionReplicaListener.indexRowMatches(indexRow, binaryRow, schemaAwareIndexStorage)) {
                result.add(binaryRow);
            }
            return this.continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, readTimestamp, batchSize, result, tableVersion);
        }, this.scanRequestExecutor);
    }

    private CompletableFuture<Void> continueIndexScan(UUID txId, TableSchemaAwareIndexStorage schemaAwareIndexStorage, SortedIndexLocker indexLocker, Cursor<IndexRow> indexCursor, int batchSize, List<BinaryRow> result, Predicate<IndexRow> isUpperBoundAchieved, int tableVersion) {
        if (result.size() == batchSize) {
            return CompletableFutures.nullCompletedFuture();
        }
        return indexLocker.locksForScan(txId, indexCursor).thenCompose(currentRow -> {
            if (isUpperBoundAchieved.test((IndexRow)currentRow)) {
                return CompletableFutures.nullCompletedFuture();
            }
            RowId rowId = currentRow.rowId();
            return this.lockManager.acquire(txId, new LockKey(this.tableLockKey, rowId), LockMode.S).thenComposeAsync(rowLock -> this.resolvePlainReadResult(rowId, txId).thenCompose(resolvedReadResult -> {
                BinaryRow binaryRow = this.upgrade(PartitionReplicaListener.binaryRow(resolvedReadResult), tableVersion);
                if (binaryRow != null && PartitionReplicaListener.indexRowMatches(currentRow, binaryRow, schemaAwareIndexStorage)) {
                    result.add(resolvedReadResult.binaryRow());
                }
                return this.continueIndexScan(txId, schemaAwareIndexStorage, indexLocker, indexCursor, batchSize, result, isUpperBoundAchieved, tableVersion);
            }), this.scanRequestExecutor);
        });
    }

    private static boolean indexRowMatches(IndexRow indexRow, BinaryRow binaryRow, TableSchemaAwareIndexStorage schemaAwareIndexStorage) {
        BinaryTuple actualIndexRow = schemaAwareIndexStorage.indexRowResolver().extractColumns(binaryRow);
        return indexRow.indexColumns().byteBuffer().equals(actualIndexRow.byteBuffer());
    }

    private CompletableFuture<Void> continueIndexLookup(UUID txId, Cursor<RowId> indexCursor, int batchSize, List<BinaryRow> result) {
        if (result.size() >= batchSize || !indexCursor.hasNext()) {
            return CompletableFutures.nullCompletedFuture();
        }
        RowId rowId = (RowId)indexCursor.next();
        return this.lockManager.acquire(txId, new LockKey(this.tableLockKey, rowId), LockMode.S).thenComposeAsync(rowLock -> this.resolvePlainReadResult(rowId, txId).thenCompose(resolvedReadResult -> {
            if (resolvedReadResult != null && resolvedReadResult.binaryRow() != null) {
                result.add(resolvedReadResult.binaryRow());
            }
            return this.continueIndexLookup(txId, indexCursor, batchSize, result);
        }), this.scanRequestExecutor);
    }

    private CompletableFuture<@Nullable TimedBinaryRow> resolvePlainReadResult(RowId rowId, @Nullable UUID txId, @Nullable HybridTimestamp timestamp) {
        ReadResult readResult = this.mvDataStorage.read(rowId, timestamp == null ? HybridTimestamp.MAX_VALUE : timestamp);
        return this.resolveReadResult(readResult, txId, timestamp, () -> {
            HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp();
            if (newestCommitTimestamp == null) {
                return null;
            }
            ReadResult committedReadResult = this.mvDataStorage.read(rowId, newestCommitTimestamp);
            assert (!committedReadResult.isWriteIntent()) : "The result is not committed [rowId=" + rowId + ", timestamp=" + newestCommitTimestamp + "]";
            return new TimedBinaryRow(committedReadResult.binaryRow(), committedReadResult.commitTimestamp());
        });
    }

    private CompletableFuture<@Nullable TimedBinaryRow> resolvePlainReadResult(RowId rowId, UUID txId) {
        return this.resolvePlainReadResult(rowId, txId, null).thenCompose(row -> {
            if (row == null || row.binaryRow() == null) {
                return CompletableFutures.nullCompletedFuture();
            }
            return this.validateBackwardCompatibility(row.binaryRow(), txId).thenApply(unused -> row);
        });
    }

    private CompletableFuture<ReplicaResult> processWriteIntentSwitchAction(WriteIntentSwitchReplicaRequest request) {
        assert (!this.nodeProperties.colocationEnabled()) : request;
        this.replicaTxFinishMarker.markFinished(request.txId(), request.commit() ? TxState.COMMITTED : TxState.ABORTED, request.commitTimestamp());
        return ((CompletableFuture)this.awaitCleanupReadyFutures(request.txId(), request.commit()).thenApply(res -> {
            if (res.shouldApplyWriteIntent()) {
                CompletableFuture<WriteIntentSwitchReplicatedInfo> commandReplicatedFuture = this.applyWriteIntentSwitchCommandLocallyAndToGroup(request);
                return new ReplicaResult(null, new CommandApplicationResult(null, commandReplicatedFuture));
            }
            return new ReplicaResult(this.writeIntentSwitchReplicatedInfoFor(request), null);
        })).whenComplete((res, ex) -> this.partitionOperationsExecutor.execute(() -> {
            CompletableFuture pendingTxFut = (CompletableFuture)this.pendingTransactions.remove(request.txId());
            if (pendingTxFut != null) {
                pendingTxFut.complete(null);
            }
        }));
    }

    private CompletableFuture<ReplicaResult> processTableWriteIntentSwitchAction(TableWriteIntentSwitchReplicaRequest request) {
        assert (this.nodeProperties.colocationEnabled()) : request;
        return this.awaitCleanupReadyFutures(request.txId(), request.commit()).thenApply(res -> {
            if (res.shouldApplyWriteIntent()) {
                this.applyWriteIntentSwitchCommandLocally(request);
            }
            this.partitionOperationsExecutor.execute(() -> {
                CompletableFuture pendingTxFut = (CompletableFuture)this.pendingTransactions.remove(request.txId());
                if (pendingTxFut != null) {
                    pendingTxFut.complete(null);
                }
            });
            return new ReplicaResult(res, null);
        });
    }

    private WriteIntentSwitchReplicatedInfo writeIntentSwitchReplicatedInfoFor(WriteIntentSwitchReplicaRequest request) {
        return new WriteIntentSwitchReplicatedInfo(request.txId(), this.replicationGroupId);
    }

    private CompletableFuture<FuturesCleanupResult> awaitCleanupReadyFutures(UUID txId, boolean commit) {
        ArrayList txUpdateFutures = new ArrayList();
        ArrayList txReadFutures = new ArrayList();
        AtomicBoolean forceCleanup = new AtomicBoolean(true);
        this.txCleanupReadyFutures.compute(txId, (id, txOps) -> {
            if (txOps == null) {
                return null;
            }
            forceCleanup.set(txOps.futures.isEmpty());
            txOps.futures.forEach((opType, futures) -> {
                if (opType.isRwRead()) {
                    txReadFutures.addAll(futures.values());
                } else {
                    txUpdateFutures.addAll(futures.values());
                }
            });
            txOps.futures.clear();
            return null;
        });
        return ((CompletableFuture)PartitionReplicaListener.allOfFuturesExceptionIgnored(txUpdateFutures, commit, txId).thenCompose(v -> PartitionReplicaListener.allOfFuturesExceptionIgnored(txReadFutures, commit, txId))).thenApply(v -> new FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty(), forceCleanup.get()));
    }

    private CompletableFuture<WriteIntentSwitchReplicatedInfo> applyWriteIntentSwitchCommandLocallyAndToGroup(WriteIntentSwitchReplicaRequest request) {
        this.applyWriteIntentSwitchCommandLocally(request);
        WriteIntentSwitchReplicatedInfo result = this.writeIntentSwitchReplicatedInfoFor(request);
        assert (!this.nodeProperties.colocationEnabled()) : request;
        @Nullable HybridTimestamp commitTimestamp = request.commitTimestamp();
        HybridTimestamp commandTimestamp = commitTimestamp != null ? commitTimestamp : TransactionIds.beginTimestamp(request.txId());
        return ((CompletableFuture)this.reliableCatalogVersions.safeReliableCatalogVersionFor(commandTimestamp).thenCompose(catalogVersion -> this.applyWriteIntentSwitchCommandToGroup(request, (int)catalogVersion))).thenApply(res -> result);
    }

    private void applyWriteIntentSwitchCommandLocally(WriteIntentSwitchReplicaRequestBase request) {
        this.storageUpdateHandler.switchWriteIntents(request.txId(), request.commit(), request.commitTimestamp(), this.indexIdsAtRwTxBeginTsOrNull(request.txId()));
    }

    private CompletableFuture<?> applyWriteIntentSwitchCommandToGroup(WriteIntentSwitchReplicaRequest request, int catalogVersion) {
        WriteIntentSwitchCommandV2 wiSwitchCmd = PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2().txId(request.txId()).commit(request.commit()).commitTimestamp(request.commitTimestamp()).initiatorTime(this.clockService.current()).tableIds(request.tableIds()).requiredCatalogVersion(catalogVersion).build();
        return this.applyCmdWithExceptionHandling(wiSwitchCmd).exceptionally(e -> {
            if (!ReplicatorRecoverableExceptions.isRecoverable(e)) {
                LOG.warn("Failed to complete transaction cleanup command [txId=" + request.txId() + "]", (Throwable)e);
            }
            ExceptionUtils.sneakyThrow(e);
            return null;
        });
    }

    private static CompletableFuture<Void> allOfFuturesExceptionIgnored(List<CompletableFuture<?>> txFutures, boolean commit, UUID txId) {
        return CompletableFuture.allOf(txFutures.toArray(new CompletableFuture[0])).exceptionally(e -> {
            assert (ExceptionUtils.isOrCausedBy(LockException.class, e) || !commit) : "Transaction is committing, but an operation has completed with exception [txId=" + txId + ", err=" + e.getMessage() + "]";
            return null;
        });
    }

    private void releaseTxLocks(UUID txId) {
        this.lockManager.releaseAll(txId);
    }

    private <T> CompletableFuture<T> resolveRowByPk(BinaryTuple pk, UUID txId, IgniteTriFunction<@Nullable RowId, @Nullable BinaryRow, @Nullable HybridTimestamp, CompletableFuture<T>> action) {
        IndexLocker pkLocker = this.indexesLockers.get().get(this.pkIndexStorage.get().id());
        assert (pkLocker != null);
        CompletableFuture<Void> lockFut = pkLocker.locksForLookupByKey(txId, pk);
        Supplier<CompletableFuture> sup = () -> {
            boolean cursorClosureSetUp = false;
            Cursor<RowId> cursor = null;
            try {
                Cursor<RowId> finalCursor = cursor = this.getFromPkIndex(pk);
                CompletionStage resolvingFuture = this.continueResolvingByPk(cursor, txId, action).whenComplete((res, ex) -> finalCursor.close());
                cursorClosureSetUp = true;
                CompletionStage completionStage = resolvingFuture;
                return completionStage;
            }
            finally {
                if (!cursorClosureSetUp && cursor != null) {
                    cursor.close();
                }
            }
        };
        if (CompletableFutures.isCompletedSuccessfully(lockFut)) {
            return sup.get();
        }
        return lockFut.thenCompose(ignored -> (CompletionStage)sup.get());
    }

    private <T> CompletableFuture<T> continueResolvingByPk(Cursor<RowId> cursor, UUID txId, IgniteTriFunction<@Nullable RowId, @Nullable BinaryRow, @Nullable HybridTimestamp, CompletableFuture<T>> action) {
        if (!cursor.hasNext()) {
            return action.apply(null, null, null);
        }
        RowId rowId = (RowId)cursor.next();
        return this.resolvePlainReadResult(rowId, txId).thenCompose(row -> {
            if (row != null && row.binaryRow() != null) {
                return (CompletionStage)action.apply(rowId, row.binaryRow(), row.commitTimestamp());
            }
            return this.continueResolvingByPk(cursor, txId, action);
        });
    }

    private <T> CompletableFuture<T> appendTxCommand(UUID txId, OperationId opId, RequestType cmdType, boolean full, Supplier<CompletableFuture<T>> op) {
        CompletableFuture txFut;
        CompletableFuture completableFuture = txFut = cmdType.isRwRead() ? null : this.pendingTransactions.computeIfAbsent(txId, id -> new CompletableFuture());
        if (full) {
            return op.get().whenComplete((v, th) -> {
                this.releaseTxLocks(txId);
                this.partitionOperationsExecutor.execute(() -> {
                    if (!cmdType.isRwRead()) {
                        txFut.complete(null);
                        this.pendingTransactions.remove(txId);
                    }
                });
            });
        }
        CompletableFuture cleanupReadyFut = new CompletableFuture();
        this.txCleanupReadyFutures.compute(txId, (id, txOps) -> {
            TxStateMeta txStateMeta = this.txManager.stateMeta(txId);
            if (txStateMeta == null || TxState.isFinalState(txStateMeta.txState()) || txStateMeta.txState() == TxState.FINISHING) {
                cleanupReadyFut.completeExceptionally(new Exception());
                return txOps;
            }
            if (txOps == null) {
                txOps = new TxCleanupReadyFutureList();
            }
            txOps.futures.computeIfAbsent(cmdType, type -> new HashMap()).put(opId, cleanupReadyFut);
            return txOps;
        });
        if (cleanupReadyFut.isCompletedExceptionally()) {
            TxStateMeta txStateMeta = this.txManager.stateMeta(txId);
            TxState txState = txStateMeta == null ? null : txStateMeta.txState();
            boolean isFinishedDueToTimeout = txStateMeta != null && txStateMeta.isFinishedDueToTimeout() != null && txStateMeta.isFinishedDueToTimeout() != false;
            return CompletableFuture.failedFuture(new TransactionException(isFinishedDueToTimeout ? ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR : ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR, "Transaction is already finished txId=[" + txId + ", txState=" + txState + "]."));
        }
        CompletableFuture<T> fut = op.get();
        fut.whenComplete((v, th) -> {
            if (th != null) {
                cleanupReadyFut.completeExceptionally((Throwable)th);
            } else if (v instanceof ReplicaResult) {
                ReplicaResult res = (ReplicaResult)v;
                if (res.applyResult().replicationFuture() != null) {
                    res.applyResult().replicationFuture().whenComplete((v0, th0) -> {
                        if (th0 != null) {
                            cleanupReadyFut.completeExceptionally((Throwable)th0);
                        } else {
                            cleanupReadyFut.complete(null);
                        }
                    });
                } else {
                    cleanupReadyFut.complete(null);
                }
            } else {
                cleanupReadyFut.complete(null);
            }
        });
        return fut;
    }

    private CompletableFuture<@Nullable BinaryRow> resolveRowByPkForReadOnly(BinaryTuple pk, HybridTimestamp ts, RlsContext rlsContext) {
        try (Cursor<RowId> cursor = this.getFromPkIndex(pk);){
            CompletableFuture<BinaryRow> completableFuture;
            ArrayList<ReadResult> writeIntents = new ArrayList<ReadResult>();
            ArrayList<ReadResult> regularEntries = new ArrayList<ReadResult>();
            for (RowId rowId : cursor) {
                ReadResult readResult = this.mvDataStorage.read(rowId, ts);
                if (readResult.isWriteIntent()) {
                    writeIntents.add(readResult);
                    continue;
                }
                if (readResult.isEmpty()) continue;
                regularEntries.add(readResult);
            }
            if (writeIntents.isEmpty() && regularEntries.isEmpty()) {
                this.metrics.onRead(true);
                CompletableFuture completableFuture2 = CompletableFutures.nullCompletedFuture();
                return completableFuture2;
            }
            RlsChecker rlsChecker = this.policyManager.checkerForTable(this.tableId, ts);
            if (writeIntents.isEmpty()) {
                this.metrics.onRead(true);
                BinaryRow row = ((ReadResult)regularEntries.get(0)).binaryRow();
                if (row != null && rlsChecker.isRowVisible(rlsContext, ts, row)) {
                    completableFuture = CompletableFuture.completedFuture(row);
                    return completableFuture;
                }
                completableFuture = CompletableFutures.nullCompletedFuture();
                return completableFuture;
            }
            ReadResult writeIntent = (ReadResult)writeIntents.get(0);
            PartitionReplicaListener.checkWriteIntentsBelongSameTx(writeIntents);
            completableFuture = IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.resolveWriteIntentReadability(writeIntent, ts).thenApply(writeIntentReadable -> IgniteUtils.inBusyLock(this.busyLock, () -> {
                this.metrics.onRead(true);
                if (writeIntentReadable.booleanValue()) {
                    return IgniteUtils.findAny(writeIntents, wi -> !wi.isEmpty()).map(ReadResult::binaryRow).filter(row -> rlsChecker.isRowVisible(rlsContext, ts, (BinaryRow)row)).orElse(null);
                }
                for (ReadResult wi2 : writeIntents) {
                    HybridTimestamp newestCommitTimestamp = wi2.newestCommitTimestamp();
                    if (newestCommitTimestamp == null) continue;
                    ReadResult committedReadResult = this.mvDataStorage.read(wi2.rowId(), newestCommitTimestamp);
                    assert (!committedReadResult.isWriteIntent()) : "The result is not committed [rowId=" + wi2.rowId() + ", timestamp=" + newestCommitTimestamp + "]";
                    BinaryRow row2 = committedReadResult.binaryRow();
                    if (row2 != null && rlsChecker.isRowVisible(rlsContext, ts, row2)) {
                        return row2;
                    }
                    return null;
                }
                return IgniteUtils.findFirst(regularEntries).map(ReadResult::binaryRow).filter(row -> rlsChecker.isRowVisible(rlsContext, ts, (BinaryRow)row)).orElse(null);
            })));
            return completableFuture;
        }
    }

    private static void checkWriteIntentsBelongSameTx(Collection<ReadResult> writeIntents) {
        ReadResult writeIntent = IgniteUtils.findAny(writeIntents).orElseThrow();
        for (ReadResult wi : writeIntents) {
            assert (Objects.equals(wi.transactionId(), writeIntent.transactionId())) : "Unexpected write intent, tx1=" + writeIntent.transactionId() + ", tx2=" + wi.transactionId();
            assert (Objects.equals(wi.commitTableOrZoneId(), writeIntent.commitTableOrZoneId())) : "Unexpected write intent, commitTableOrZoneId1=" + writeIntent.commitTableOrZoneId() + ", commitTableId2=" + wi.commitTableOrZoneId();
            assert (wi.commitPartitionId() == writeIntent.commitPartitionId()) : "Unexpected write intent, commitPartitionId1=" + writeIntent.commitPartitionId() + ", commitPartitionId2=" + wi.commitPartitionId();
        }
    }

    private static boolean equalValues(BinaryRow row, BinaryRow row2) {
        return row.tupleSlice().compareTo(row2.tupleSlice()) == 0;
    }

    private CompletableFuture<List<BinaryRow>> processReadOnlyDirectMultiEntryAction(ReadOnlyDirectMultiRowReplicaRequest request, HybridTimestamp opStartTimestamp) {
        List<BinaryTuple> primaryKeys = this.resolvePks(request.primaryKeys());
        assert (request.requestType() == RequestType.RO_GET_ALL);
        CompletableFuture[] resolutionFuts = new CompletableFuture[primaryKeys.size()];
        for (int i = 0; i < primaryKeys.size(); ++i) {
            resolutionFuts[i] = this.resolveRowByPkForReadOnly(primaryKeys.get(i), opStartTimestamp, SYSTEM_CONTEXT);
        }
        return CompletableFutures.allOfToList(resolutionFuts).thenApply(rows -> {
            HybridTimestamp lwm = this.lowWatermark.getLowWatermark();
            if (lwm != null && opStartTimestamp.compareTo(lwm) < 0) {
                throw new IgniteInternalException(ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR, "Attempted to read data below the garbage collection watermark: [readTimestamp={}, gcTimestamp={}]", opStartTimestamp, this.lowWatermark.getLowWatermark());
            }
            return rows;
        });
    }

    private CompletableFuture<ReplicaResult> processDcrMultiEntryAction(DcrWriteMultiRowReplicaRequest request, Long leaseStartTime) {
        return this.processMultiEntryAction(request.requestType(), request.transactionId(), request.full(), request.commitPartitionId(), request.binaryRows(), request.coordinatorId(), request.skipDelayedAck(), request.deleted(), leaseStartTime);
    }

    private CompletableFuture<ReplicaResult> processMultiEntryAction(ReadWriteMultiRowReplicaRequest request, Long leaseStartTime) {
        return this.processMultiEntryAction(request.requestType(), request.transactionId(), request.full(), request.commitPartitionId(), request.binaryRows(), request.coordinatorId(), request.skipDelayedAck(), request.deleted(), leaseStartTime);
    }

    private CompletableFuture<ReplicaResult> processMultiEntryAction(RequestType requestType, UUID txId, boolean full, ReplicationGroupIdMessage commitPartitionId, List<BinaryRow> searchRows, UUID coordinatorId, boolean skipDelayedAck, BitSet deleted, Long leaseStartTime) {
        assert (commitPartitionId != null) : "Commit partition is null [type=" + requestType + "]";
        switch (requestType) {
            case RW_ARCHIVE_EXACT_ALL: 
            case RW_DELETE_EXACT_ALL: {
                CompletableFuture[] deleteExactLockFuts = new CompletableFuture[searchRows.size()];
                ConcurrentHashMap lastCommitTimes = new ConcurrentHashMap();
                for (int i = 0; i < searchRows.size(); ++i) {
                    BinaryRow searchRow = searchRows.get(i);
                    deleteExactLockFuts[i] = this.resolveRowByPk(this.extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                        if (rowId == null) {
                            return CompletableFutures.nullCompletedFuture();
                        }
                        if (lastCommitTime != null) {
                            lastCommitTimes.put(rowId.uuid(), lastCommitTime);
                        }
                        return this.takeLocksForDeleteExact(searchRow, (RowId)rowId, (BinaryRow)row, txId);
                    });
                }
                return CompletableFuture.allOf(deleteExactLockFuts).thenCompose(ignore -> {
                    HashMap<UUID, TimedBinaryRowMessage> rowIdsToDelete = new HashMap<UUID, TimedBinaryRowMessage>();
                    ArrayList<NullBinaryRow> result = new ArrayList<NullBinaryRow>();
                    ArrayList<RowId> rows = new ArrayList<RowId>();
                    for (int i = 0; i < searchRows.size(); ++i) {
                        RowId lockedRowId = (RowId)deleteExactLockFuts[i].join();
                        if (lockedRowId != null) {
                            rowIdsToDelete.put(lockedRowId.uuid(), PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage().timestamp((HybridTimestamp)lastCommitTimes.get(lockedRowId.uuid())).build());
                            result.add(new NullBinaryRow());
                            rows.add(lockedRowId);
                            continue;
                        }
                        result.add(null);
                    }
                    if (rowIdsToDelete.isEmpty()) {
                        this.metrics.onRead(searchRows.size(), false);
                        return CompletableFuture.completedFuture(new ReplicaResult(result, null));
                    }
                    return ((CompletableFuture)((CompletableFuture)this.validateWriteAgainstSchemaAfterTakingLocks(txId).thenCompose(catalogVersion -> this.awaitCleanup(rows, catalogVersion))).thenCompose(catalogVersion -> this.applyUpdateAllCommand((Map<UUID, TimedBinaryRowMessage>)rowIdsToDelete, commitPartitionId, txId, full, coordinatorId, (int)catalogVersion, skipDelayedAck, leaseStartTime, requestType == RequestType.RW_ARCHIVE_EXACT_ALL))).thenApply(res -> {
                        this.metrics.onRead(searchRows.size(), false);
                        this.metrics.onWrite(rowIdsToDelete.size());
                        return new ReplicaResult(result, (CommandApplicationResult)res);
                    });
                });
            }
            case RW_INSERT_ALL: {
                ArrayList<BinaryTuple> pks = new ArrayList<BinaryTuple>(searchRows.size());
                CompletableFuture[] pkReadLockFuts = new CompletableFuture[searchRows.size()];
                for (int i = 0; i < searchRows.size(); ++i) {
                    BinaryTuple pk = this.extractPk(searchRows.get(i));
                    pks.add(pk);
                    pkReadLockFuts[i] = this.resolveRowByPk(pk, txId, (rowId, row, lastCommitTime) -> CompletableFuture.completedFuture(rowId));
                }
                return CompletableFuture.allOf(pkReadLockFuts).thenCompose(ignore -> {
                    ArrayList<NullBinaryRow> result = new ArrayList<NullBinaryRow>();
                    HashMap<RowId, BinaryRow> rowsToInsert = new HashMap<RowId, BinaryRow>();
                    HashSet<ByteBuffer> uniqueKeys = new HashSet<ByteBuffer>();
                    for (int i = 0; i < searchRows.size(); ++i) {
                        BinaryRow row = (BinaryRow)searchRows.get(i);
                        RowId lockedRow = (RowId)pkReadLockFuts[i].join();
                        if (lockedRow == null && uniqueKeys.add(((BinaryTuple)pks.get(i)).byteBuffer())) {
                            rowsToInsert.put(new RowId(this.partId(), RowIdGenerator.next()), row);
                            result.add(new NullBinaryRow());
                            continue;
                        }
                        result.add(null);
                    }
                    if (rowsToInsert.isEmpty()) {
                        this.metrics.onRead(searchRows.size(), false);
                        return CompletableFuture.completedFuture(new ReplicaResult(result, null));
                    }
                    CompletableFuture[] insertLockFuts = new CompletableFuture[rowsToInsert.size()];
                    int idx = 0;
                    for (Map.Entry entry : rowsToInsert.entrySet()) {
                        insertLockFuts[idx++] = this.takeLocksForInsert((BinaryRow)entry.getValue(), (RowId)entry.getKey(), txId);
                    }
                    Map<UUID, TimedBinaryRowMessage> convertedMap = rowsToInsert.entrySet().stream().collect(Collectors.toMap(e -> ((RowId)e.getKey()).uuid(), e -> PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage().binaryRowMessage(PartitionReplicaListener.binaryRowMessage((BinaryRow)e.getValue())).build()));
                    return ((CompletableFuture)((CompletableFuture)CompletableFuture.allOf(insertLockFuts).thenCompose(ignored -> this.validateWriteAgainstSchemaAfterTakingLocks(txId))).thenCompose(catalogVersion -> this.applyUpdateAllCommand(convertedMap, commitPartitionId, txId, full, coordinatorId, (int)catalogVersion, skipDelayedAck, leaseStartTime))).thenApply(res -> {
                        this.metrics.onRead(searchRows.size(), false);
                        this.metrics.onWrite(rowsToInsert.size());
                        for (CompletableFuture insertLockFut : insertLockFuts) {
                            ((Collection)((IgniteBiTuple)insertLockFut.join()).get2()).forEach(lock -> this.lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
                        }
                        return new ReplicaResult(result, (CommandApplicationResult)res);
                    });
                });
            }
            case RW_UPSERT_ALL: {
                CompletableFuture[] rowIdFuts = new CompletableFuture[searchRows.size()];
                BinaryTuple[] pks = new BinaryTuple[searchRows.size()];
                ConcurrentHashMap lastCommitTimes = new ConcurrentHashMap();
                HashMap<ByteBuffer, Integer> prevRowIdx = new HashMap<ByteBuffer, Integer>();
                for (int i = 0; i < searchRows.size(); ++i) {
                    BinaryTuple pk;
                    BinaryRow searchRow = searchRows.get(i);
                    boolean isDelete = deleted != null && deleted.get(i);
                    pks[i] = pk = isDelete ? this.resolvePk(searchRow.tupleSlice()) : this.extractPk(searchRow);
                    Integer prevRowIdx0 = prevRowIdx.put(pk.byteBuffer(), i);
                    if (prevRowIdx0 == null) continue;
                    rowIdFuts[prevRowIdx0.intValue()] = CompletableFutures.nullCompletedFuture();
                }
                int uniqueKeysCount = 0;
                for (int i = 0; i < searchRows.size(); ++i) {
                    if (rowIdFuts[i] != null) continue;
                    ++uniqueKeysCount;
                    BinaryRow searchRow = searchRows.get(i);
                    boolean isDelete = deleted != null && deleted.get(i);
                    rowIdFuts[i] = this.resolveRowByPk(pks[i], txId, (rowId, row, lastCommitTime) -> {
                        if (isDelete && rowId == null) {
                            return CompletableFutures.nullCompletedFuture();
                        }
                        if (lastCommitTime != null) {
                            lastCommitTimes.put(rowId.uuid(), lastCommitTime);
                        }
                        if (isDelete) {
                            assert (row != null);
                            return this.takeLocksForDelete((BinaryRow)row, (RowId)rowId, txId).thenApply(id -> new IgniteBiTuple<RowId, Object>((RowId)id, null));
                        }
                        boolean insert = rowId == null;
                        RowId rowId0 = insert ? new RowId(this.partId(), RowIdGenerator.next()) : rowId;
                        return insert ? this.takeLocksForInsert(searchRow, rowId0, txId) : this.takeLocksForUpdate(searchRow, rowId0, txId);
                    });
                }
                int uniqueKeysCountFinal = uniqueKeysCount;
                return CompletableFuture.allOf(rowIdFuts).thenCompose(ignore -> {
                    HashMap<UUID, TimedBinaryRowMessage> rowsToUpdate = IgniteUtils.newHashMap(searchRows.size());
                    ArrayList<RowId> rows = new ArrayList<RowId>();
                    for (int i = 0; i < searchRows.size(); ++i) {
                        IgniteBiTuple locks = (IgniteBiTuple)rowIdFuts[i].join();
                        if (locks == null) continue;
                        RowId lockedRow = (RowId)locks.get1();
                        TimedBinaryRowMessageBuilder timedBinaryRowMessageBuilder = PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage().timestamp((HybridTimestamp)lastCommitTimes.get(lockedRow.uuid()));
                        if (deleted == null || !deleted.get(i)) {
                            timedBinaryRowMessageBuilder.binaryRowMessage(PartitionReplicaListener.binaryRowMessage((BinaryRow)searchRows.get(i)));
                        }
                        rowsToUpdate.put(lockedRow.uuid(), timedBinaryRowMessageBuilder.build());
                        rows.add(lockedRow);
                    }
                    if (rowsToUpdate.isEmpty()) {
                        this.metrics.onRead(uniqueKeysCountFinal, false);
                        return CompletableFuture.completedFuture(new ReplicaResult(null, null));
                    }
                    return ((CompletableFuture)((CompletableFuture)this.validateWriteAgainstSchemaAfterTakingLocks(txId).thenCompose(catalogVersion -> this.awaitCleanup(rows, catalogVersion))).thenCompose(catalogVersion -> this.applyUpdateAllCommand((Map<UUID, TimedBinaryRowMessage>)rowsToUpdate, commitPartitionId, txId, full, coordinatorId, (int)catalogVersion, skipDelayedAck, leaseStartTime))).thenApply(res -> {
                        this.metrics.onRead(uniqueKeysCountFinal, false);
                        this.metrics.onWrite(uniqueKeysCountFinal);
                        for (CompletableFuture rowIdFut : rowIdFuts) {
                            Collection locks;
                            IgniteBiTuple futRes = (IgniteBiTuple)rowIdFut.join();
                            Collection collection = locks = futRes == null ? null : (Collection)futRes.get2();
                            if (locks == null) continue;
                            locks.forEach(lock -> this.lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
                        }
                        return new ReplicaResult(null, (CommandApplicationResult)res);
                    });
                });
            }
        }
        throw new IgniteInternalException(ErrorGroups.Replicator.REPLICA_COMMON_ERR, IgniteStringFormatter.format("Unknown multi request [actionType={}]", new Object[]{requestType}));
    }

    private CompletableFuture<ReplicaResult> processMultiEntryAction(ReadWriteMultiRowPkReplicaRequest request, long leaseStartTime) {
        UUID txId = request.transactionId();
        ReplicationGroupId commitPartitionId = request.commitPartitionId().asReplicationGroupId();
        List<BinaryTuple> primaryKeys = this.resolvePks(request.primaryKeys());
        assert (commitPartitionId != null || request.requestType() == RequestType.RW_GET_ALL) : "Commit partition is null [type=" + request.requestType() + "]";
        switch (request.requestType()) {
            case RW_GET_ALL: {
                CompletableFuture[] rowFuts = new CompletableFuture[primaryKeys.size()];
                for (int i = 0; i < primaryKeys.size(); ++i) {
                    rowFuts[i] = this.resolveRowByPk(primaryKeys.get(i), txId, (rowId, row, lastCommitTime) -> {
                        if (rowId == null) {
                            return CompletableFutures.nullCompletedFuture();
                        }
                        return this.takeLocksForGet((RowId)rowId, txId).thenApply(ignored -> row);
                    });
                }
                return CompletableFuture.allOf(rowFuts).thenCompose(ignored -> {
                    ArrayList<BinaryRow> result = new ArrayList<BinaryRow>(primaryKeys.size());
                    for (CompletableFuture rowFut : rowFuts) {
                        result.add((BinaryRow)rowFut.join());
                    }
                    if (PartitionReplicaListener.allElementsAreNull(result)) {
                        this.metrics.onRead(result.size(), false);
                        return CompletableFuture.completedFuture(new ReplicaResult(result, null));
                    }
                    return this.validateRwReadAgainstSchemaAfterTakingLocks(txId).thenApply(unused -> {
                        this.metrics.onRead(result.size(), false);
                        return new ReplicaResult(result, null);
                    });
                });
            }
            case RW_DELETE_ALL: {
                CompletableFuture[] rowIdLockFuts = new CompletableFuture[primaryKeys.size()];
                ConcurrentHashMap lastCommitTimes = new ConcurrentHashMap();
                for (int i = 0; i < primaryKeys.size(); ++i) {
                    rowIdLockFuts[i] = this.resolveRowByPk(primaryKeys.get(i), txId, (rowId, row, lastCommitTime) -> {
                        if (rowId == null) {
                            return CompletableFutures.nullCompletedFuture();
                        }
                        if (lastCommitTime != null) {
                            lastCommitTimes.put(rowId.uuid(), lastCommitTime);
                        }
                        return this.takeLocksForDelete((BinaryRow)row, (RowId)rowId, txId);
                    });
                }
                return CompletableFuture.allOf(rowIdLockFuts).thenCompose(ignore -> {
                    HashMap<UUID, TimedBinaryRowMessage> rowIdsToDelete = new HashMap<UUID, TimedBinaryRowMessage>();
                    ArrayList<NullBinaryRow> result = new ArrayList<NullBinaryRow>();
                    ArrayList<RowId> rows = new ArrayList<RowId>();
                    for (CompletableFuture lockFut : rowIdLockFuts) {
                        RowId lockedRowId = (RowId)lockFut.join();
                        if (lockedRowId != null) {
                            rowIdsToDelete.put(lockedRowId.uuid(), PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage().timestamp((HybridTimestamp)lastCommitTimes.get(lockedRowId.uuid())).build());
                            rows.add(lockedRowId);
                            result.add(new NullBinaryRow());
                            continue;
                        }
                        result.add(null);
                    }
                    if (rowIdsToDelete.isEmpty()) {
                        return CompletableFuture.completedFuture(new ReplicaResult(result, null));
                    }
                    return ((CompletableFuture)((CompletableFuture)this.validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()).thenCompose(catalogVersion -> this.awaitCleanup(rows, catalogVersion))).thenCompose(catalogVersion -> this.applyUpdateAllCommand((Map<UUID, TimedBinaryRowMessage>)rowIdsToDelete, request.commitPartitionId(), request.transactionId(), request.full(), request.coordinatorId(), (int)catalogVersion, request.skipDelayedAck(), leaseStartTime))).thenApply(res -> {
                        this.metrics.onWrite(rowIdsToDelete.size());
                        return new ReplicaResult(result, (CommandApplicationResult)res);
                    });
                });
            }
        }
        throw new IgniteInternalException(ErrorGroups.Replicator.REPLICA_COMMON_ERR, IgniteStringFormatter.format("Unknown multi request [actionType={}]", new Object[]{request.requestType()}));
    }

    private static <T> boolean allElementsAreNull(List<T> list) {
        for (T element : list) {
            if (element == null) continue;
            return false;
        }
        return true;
    }

    private CompletableFuture<Object> applyCmdWithExceptionHandling(Command cmd) {
        return this.raftCommandApplicator.applyCommandWithExceptionHandling(cmd);
    }

    private CompletableFuture<CommandApplicationResult> applyUpdateCommand(ReplicationGroupId commitPartitionId, UUID rowUuid, @Nullable BinaryRow row, @Nullable HybridTimestamp lastCommitTimestamp, UUID txId, boolean full, UUID txCoordinatorId, int catalogVersion, boolean skipDelayedAck, long leaseStartTime) {
        UpdateCommand cmd = this.updateCommand(commitPartitionId, rowUuid, row, lastCommitTimestamp, txId, full, txCoordinatorId, this.clockService.current(), catalogVersion, full ? Long.valueOf(leaseStartTime) : null);
        if (!cmd.full()) {
            if (skipDelayedAck) {
                if (!SKIP_UPDATES) {
                    this.storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), cmd.commitPartitionId().asReplicationGroupId(), cmd.rowToUpdate(), true, null, null, null, this.indexIdsAtRwTxBeginTs(txId));
                }
                return this.applyCmdWithExceptionHandling(cmd).thenApply(res -> null);
            }
            if (!SKIP_UPDATES) {
                this.storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), cmd.commitPartitionId().asReplicationGroupId(), cmd.rowToUpdate(), true, null, null, null, this.indexIdsAtRwTxBeginTs(txId));
            }
            CompletionStage repFut = this.applyCmdWithExceptionHandling(cmd).thenApply(res -> cmd.txId());
            return CompletableFuture.completedFuture(new CommandApplicationResult(null, (CompletableFuture<?>)repFut));
        }
        return this.applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
            HybridTimestamp safeTs;
            UpdateCommandResult updateCommandResult = (UpdateCommandResult)res;
            if (updateCommandResult != null && !updateCommandResult.isPrimaryReplicaMatch()) {
                throw new PrimaryReplicaMissException(txId, cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
            }
            if (updateCommandResult != null && updateCommandResult.isPrimaryInPeersAndLearners()) {
                HybridTimestamp safeTs2 = HybridTimestamp.hybridTimestamp(updateCommandResult.safeTimestamp());
                return this.safeTime.waitFor(safeTs2).thenApply(ignored -> new CommandApplicationResult(safeTs2, null));
            }
            HybridTimestamp hybridTimestamp = safeTs = updateCommandResult == null ? null : HybridTimestamp.hybridTimestamp(updateCommandResult.safeTimestamp());
            if (!SKIP_UPDATES) {
                this.storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), cmd.commitPartitionId().asReplicationGroupId(), cmd.rowToUpdate(), false, null, safeTs, null, this.indexIdsAtRwTxBeginTs(txId));
            }
            return CompletableFuture.completedFuture(new CommandApplicationResult(safeTs, null));
        });
    }

    private CompletableFuture<CommandApplicationResult> applyUpdateCommand(ReadWriteSingleRowReplicaRequest request, UUID rowUuid, @Nullable BinaryRow row, @Nullable HybridTimestamp lastCommitTimestamp, int catalogVersion, long leaseStartTime) {
        return this.applyUpdateCommand(request.commitPartitionId().asReplicationGroupId(), rowUuid, row, lastCommitTimestamp, request.transactionId(), request.full(), request.coordinatorId(), catalogVersion, request.skipDelayedAck(), leaseStartTime);
    }

    private CompletableFuture<CommandApplicationResult> applyUpdateAllCommand(Map<UUID, TimedBinaryRowMessage> rowsToUpdate, ReplicationGroupIdMessage commitPartitionId, UUID txId, boolean full, UUID txCoordinatorId, int catalogVersion, boolean skipDelayedAck, long leaseStartTime) {
        UpdateAllCommand cmd = this.updateAllCommand(rowsToUpdate, commitPartitionId, txId, this.clockService.current(), full, txCoordinatorId, catalogVersion, full ? Long.valueOf(leaseStartTime) : null);
        return this.applyUpdateAllCommand(cmd, skipDelayedAck);
    }

    private CompletableFuture<CommandApplicationResult> applyUpdateAllCommand(Map<UUID, TimedBinaryRowMessage> rowsToUpdate, ReplicationGroupIdMessage commitPartitionId, UUID txId, boolean full, UUID txCoordinatorId, int catalogVersion, boolean skipDelayedAck, long leaseStartTime, boolean isArchivation) {
        UpdateAllCommand cmd = isArchivation ? PARTITION_REPLICATION_MESSAGES_FACTORY.archiveAllCommand().tableId(this.tableId).commitPartitionId(commitPartitionId).messageRowsToUpdate(rowsToUpdate).txId(txId).initiatorTime(this.clockService.current()).full(full).txCoordinatorId(txCoordinatorId).requiredCatalogVersion(catalogVersion).leaseStartTime(full ? Long.valueOf(leaseStartTime) : null).build() : this.updateAllCommand(rowsToUpdate, commitPartitionId, txId, this.clockService.current(), full, txCoordinatorId, catalogVersion, full ? Long.valueOf(leaseStartTime) : null);
        return this.applyUpdateAllCommand(cmd, skipDelayedAck);
    }

    private CompletableFuture<CommandApplicationResult> applyUpdateAllCommand(UpdateAllCommand cmd, boolean skipDelayedAck) {
        if (!cmd.full()) {
            if (skipDelayedAck) {
                this.storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(), cmd.commitPartitionId().asReplicationGroupId(), true, null, null, this.indexIdsAtRwTxBeginTs(cmd.txId()), cmd instanceof ArchiveAllCommand);
                return this.applyCmdWithExceptionHandling(cmd).thenApply(res -> null);
            }
            this.storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(), cmd.commitPartitionId().asReplicationGroupId(), true, null, null, this.indexIdsAtRwTxBeginTs(cmd.txId()), cmd instanceof ArchiveAllCommand);
            CompletionStage repFut = this.applyCmdWithExceptionHandling(cmd).thenApply(res -> cmd.txId());
            return CompletableFuture.completedFuture(new CommandApplicationResult(null, (CompletableFuture<?>)repFut));
        }
        return this.applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
            UpdateCommandResult updateCommandResult = (UpdateCommandResult)res;
            if (!updateCommandResult.isPrimaryReplicaMatch()) {
                throw new PrimaryReplicaMissException(cmd.txId(), cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
            }
            if (updateCommandResult.isPrimaryInPeersAndLearners()) {
                HybridTimestamp safeTs = HybridTimestamp.hybridTimestamp(updateCommandResult.safeTimestamp());
                return this.safeTime.waitFor(safeTs).thenApply(ignored -> new CommandApplicationResult(safeTs, null));
            }
            HybridTimestamp safeTs = HybridTimestamp.hybridTimestamp(updateCommandResult.safeTimestamp());
            this.storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(), cmd.commitPartitionId().asReplicationGroupId(), false, null, safeTs, this.indexIdsAtRwTxBeginTs(cmd.txId()), cmd instanceof ArchiveAllCommand);
            return CompletableFuture.completedFuture(new CommandApplicationResult(safeTs, null));
        });
    }

    private CompletableFuture<BinaryRow> processReadOnlyDirectSingleEntryAction(ReadOnlyDirectSingleRowReplicaRequest request, HybridTimestamp opStartTimestamp) {
        BinaryTuple primaryKey = this.resolvePk(request.primaryKey());
        HybridTimestamp readTimestamp = opStartTimestamp;
        if (request.requestType() != RequestType.RO_GET) {
            throw new IgniteInternalException(ErrorGroups.Replicator.REPLICA_COMMON_ERR, IgniteStringFormatter.format("Unknown single request [actionType={}]", new Object[]{request.requestType()}));
        }
        RlsContext rlsContext = PartitionReplicaListener.extractRlsContext(request);
        return this.resolveRowByPkForReadOnly(primaryKey, readTimestamp, rlsContext);
    }

    private static RlsContext extractRlsContext(NetworkMessage request) {
        if (!(request instanceof UserDetailsAware)) {
            return SYSTEM_CONTEXT;
        }
        UserDetailsMessage userDetails = ((UserDetailsAware)request).userDetails();
        if (userDetails == null) {
            return SYSTEM_CONTEXT;
        }
        String userName = userDetails.userName();
        Set<String> userRoles = userDetails.userRoles();
        return new RlsContext(userName, userRoles);
    }

    private CompletableFuture<ReplicaResult> processSingleEntryAction(ReadWriteSingleRowReplicaRequest request, long leaseStartTime) {
        UUID txId = request.transactionId();
        BinaryRow searchRow = request.binaryRow();
        ReplicationGroupId commitPartitionId = request.commitPartitionId().asReplicationGroupId();
        assert (commitPartitionId != null) : "Commit partition is null [type=" + request.requestType() + "]";
        switch (request.requestType()) {
            case RW_DELETE_EXACT: {
                return this.resolveRowByPk(this.extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                    if (rowId == null) {
                        this.metrics.onRead(false);
                        return CompletableFuture.completedFuture(new ReplicaResult(false, null));
                    }
                    return this.takeLocksForDeleteExact(searchRow, (RowId)rowId, (BinaryRow)row, txId).thenCompose(validatedRowId -> {
                        if (validatedRowId == null) {
                            this.metrics.onRead(false);
                            return CompletableFuture.completedFuture(new ReplicaResult(false, null));
                        }
                        return ((CompletableFuture)((CompletableFuture)this.validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()).thenCompose(catalogVersion -> this.awaitCleanup((RowId)validatedRowId, (Object)catalogVersion))).thenCompose(catalogVersion -> this.applyUpdateCommand(request, validatedRowId.uuid(), null, (HybridTimestamp)lastCommitTime, (int)catalogVersion, leaseStartTime))).thenApply(res -> {
                            this.metrics.onRead(false);
                            this.metrics.onWrite();
                            return new ReplicaResult(true, (CommandApplicationResult)res);
                        });
                    });
                });
            }
            case RW_INSERT: {
                return this.resolveRowByPk(this.extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                    if (rowId != null) {
                        this.metrics.onRead(false);
                        return CompletableFuture.completedFuture(new ReplicaResult(false, null));
                    }
                    RowId rowId0 = new RowId(this.partId(), RowIdGenerator.next());
                    return ((CompletableFuture)this.takeLocksForInsert(searchRow, rowId0, txId).thenCompose(rowIdLock -> ((CompletableFuture)this.validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()).thenCompose(catalogVersion -> this.applyUpdateCommand(request, rowId0.uuid(), searchRow, (HybridTimestamp)lastCommitTime, (int)catalogVersion, leaseStartTime))).thenApply(res -> new IgniteBiTuple<CommandApplicationResult, IgniteBiTuple>((CommandApplicationResult)res, (IgniteBiTuple)rowIdLock)))).thenApply(tuple -> {
                        this.metrics.onRead(false);
                        this.metrics.onWrite();
                        ((Collection)((IgniteBiTuple)tuple.get2()).get2()).forEach(lock -> this.lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
                        return new ReplicaResult(true, (CommandApplicationResult)tuple.get1());
                    });
                });
            }
            case RW_UPSERT: {
                return this.resolveRowByPk(this.extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                    boolean insert = rowId == null;
                    RowId rowId0 = insert ? new RowId(this.partId(), RowIdGenerator.next()) : rowId;
                    CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> lockFut = insert ? this.takeLocksForInsert(searchRow, rowId0, txId) : this.takeLocksForUpdate(searchRow, rowId0, txId);
                    return ((CompletableFuture)lockFut.thenCompose(rowIdLock -> ((CompletableFuture)((CompletableFuture)this.validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()).thenCompose(catalogVersion -> this.awaitCleanup((RowId)rowId, (Object)catalogVersion))).thenCompose(catalogVersion -> this.applyUpdateCommand(request, rowId0.uuid(), searchRow, (HybridTimestamp)lastCommitTime, (int)catalogVersion, leaseStartTime))).thenApply(res -> new IgniteBiTuple<CommandApplicationResult, IgniteBiTuple>((CommandApplicationResult)res, (IgniteBiTuple)rowIdLock)))).thenApply(tuple -> {
                        this.metrics.onWrite();
                        ((Collection)((IgniteBiTuple)tuple.get2()).get2()).forEach(lock -> this.lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
                        return new ReplicaResult(null, (CommandApplicationResult)tuple.get1());
                    });
                });
            }
            case RW_GET_AND_UPSERT: {
                return this.resolveRowByPk(this.extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                    boolean insert = rowId == null;
                    RowId rowId0 = insert ? new RowId(this.partId(), RowIdGenerator.next()) : rowId;
                    CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> lockFut = insert ? this.takeLocksForInsert(searchRow, rowId0, txId) : this.takeLocksForUpdate(searchRow, rowId0, txId);
                    return ((CompletableFuture)lockFut.thenCompose(rowIdLock -> ((CompletableFuture)((CompletableFuture)this.validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()).thenCompose(catalogVersion -> this.awaitCleanup((RowId)rowId, (Object)catalogVersion))).thenCompose(catalogVersion -> this.applyUpdateCommand(request, rowId0.uuid(), searchRow, (HybridTimestamp)lastCommitTime, (int)catalogVersion, leaseStartTime))).thenApply(res -> new IgniteBiTuple<CommandApplicationResult, IgniteBiTuple>((CommandApplicationResult)res, (IgniteBiTuple)rowIdLock)))).thenApply(tuple -> {
                        this.metrics.onRead(false);
                        this.metrics.onWrite();
                        ((Collection)((IgniteBiTuple)tuple.get2()).get2()).forEach(lock -> this.lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
                        return new ReplicaResult(row, (CommandApplicationResult)tuple.get1());
                    });
                });
            }
            case RW_GET_AND_REPLACE: {
                return this.resolveRowByPk(this.extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                    if (rowId == null) {
                        this.metrics.onRead(false);
                        return CompletableFuture.completedFuture(new ReplicaResult(null, null));
                    }
                    return ((CompletableFuture)this.takeLocksForUpdate(searchRow, (RowId)rowId, txId).thenCompose(rowIdLock -> ((CompletableFuture)((CompletableFuture)this.validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()).thenCompose(catalogVersion -> this.awaitCleanup((RowId)rowId, (Object)catalogVersion))).thenCompose(catalogVersion -> this.applyUpdateCommand(request, rowId.uuid(), searchRow, (HybridTimestamp)lastCommitTime, (int)catalogVersion, leaseStartTime))).thenApply(res -> new IgniteBiTuple<CommandApplicationResult, IgniteBiTuple>((CommandApplicationResult)res, (IgniteBiTuple)rowIdLock)))).thenApply(tuple -> {
                        this.metrics.onRead(false);
                        this.metrics.onWrite();
                        ((Collection)((IgniteBiTuple)tuple.get2()).get2()).forEach(lock -> this.lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
                        return new ReplicaResult(row, (CommandApplicationResult)tuple.get1());
                    });
                });
            }
            case RW_REPLACE_IF_EXIST: {
                return this.resolveRowByPk(this.extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                    if (rowId == null) {
                        this.metrics.onRead(false);
                        return CompletableFuture.completedFuture(new ReplicaResult(false, null));
                    }
                    return ((CompletableFuture)this.takeLocksForUpdate(searchRow, (RowId)rowId, txId).thenCompose(rowIdLock -> ((CompletableFuture)((CompletableFuture)this.validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()).thenCompose(catalogVersion -> this.awaitCleanup((RowId)rowId, (Object)catalogVersion))).thenCompose(catalogVersion -> this.applyUpdateCommand(request, rowId.uuid(), searchRow, (HybridTimestamp)lastCommitTime, (int)catalogVersion, leaseStartTime))).thenApply(res -> new IgniteBiTuple<CommandApplicationResult, IgniteBiTuple>((CommandApplicationResult)res, (IgniteBiTuple)rowIdLock)))).thenApply(tuple -> {
                        this.metrics.onRead(false);
                        this.metrics.onWrite();
                        ((Collection)((IgniteBiTuple)tuple.get2()).get2()).forEach(lock -> this.lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
                        return new ReplicaResult(true, (CommandApplicationResult)tuple.get1());
                    });
                });
            }
        }
        throw new IgniteInternalException(ErrorGroups.Replicator.REPLICA_COMMON_ERR, IgniteStringFormatter.format("Unknown single request [actionType={}]", new Object[]{request.requestType()}));
    }

    private CompletableFuture<ReplicaResult> processSingleEntryAction(ReadWriteSingleRowPkReplicaRequest request, long leaseStartTime) {
        UUID txId = request.transactionId();
        BinaryTuple primaryKey = this.resolvePk(request.primaryKey());
        ReplicationGroupId commitPartitionId = request.commitPartitionId().asReplicationGroupId();
        assert (commitPartitionId != null || request.requestType() == RequestType.RW_GET) : "Commit partition is null [type=" + request.requestType() + "]";
        switch (request.requestType()) {
            case RW_GET: {
                return this.resolveRowByPk(primaryKey, txId, (rowId, row, lastCommitTime) -> {
                    if (rowId == null) {
                        this.metrics.onRead(false);
                        return CompletableFutures.nullCompletedFuture();
                    }
                    return ((CompletableFuture)this.takeLocksForGet((RowId)rowId, txId).thenCompose(ignored -> this.validateRwReadAgainstSchemaAfterTakingLocks(txId))).thenApply(ignored -> {
                        this.metrics.onRead(false);
                        return new ReplicaResult(row, null);
                    });
                });
            }
            case RW_DELETE: {
                return this.resolveRowByPk(primaryKey, txId, (rowId, row, lastCommitTime) -> {
                    if (rowId == null) {
                        return CompletableFuture.completedFuture(new ReplicaResult(false, null));
                    }
                    return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.takeLocksForDelete((BinaryRow)row, (RowId)rowId, txId).thenCompose(rowLock -> this.validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()))).thenCompose(catalogVersion -> this.awaitCleanup((RowId)rowId, (Object)catalogVersion))).thenCompose(catalogVersion -> this.applyUpdateCommand(request.commitPartitionId().asReplicationGroupId(), rowId.uuid(), null, (HybridTimestamp)lastCommitTime, request.transactionId(), request.full(), request.coordinatorId(), (int)catalogVersion, request.skipDelayedAck(), leaseStartTime))).thenApply(res -> {
                        this.metrics.onWrite();
                        return new ReplicaResult(true, (CommandApplicationResult)res);
                    });
                });
            }
            case RW_GET_AND_DELETE: {
                return this.resolveRowByPk(primaryKey, txId, (rowId, row, lastCommitTime) -> {
                    if (rowId == null) {
                        this.metrics.onRead(false);
                        return CompletableFutures.nullCompletedFuture();
                    }
                    return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.takeLocksForDelete((BinaryRow)row, (RowId)rowId, txId).thenCompose(ignored -> this.validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()))).thenCompose(catalogVersion -> this.awaitCleanup((RowId)rowId, (Object)catalogVersion))).thenCompose(catalogVersion -> this.applyUpdateCommand(request.commitPartitionId().asReplicationGroupId(), rowId.uuid(), null, (HybridTimestamp)lastCommitTime, request.transactionId(), request.full(), request.coordinatorId(), (int)catalogVersion, request.skipDelayedAck(), leaseStartTime))).thenApply(res -> {
                        this.metrics.onRead(false);
                        this.metrics.onWrite();
                        return new ReplicaResult(row, (CommandApplicationResult)res);
                    });
                });
            }
        }
        throw new IgniteInternalException(ErrorGroups.Replicator.REPLICA_COMMON_ERR, IgniteStringFormatter.format("Unknown single request [actionType={}]", new Object[]{request.requestType()}));
    }

    private <T> CompletableFuture<T> awaitCleanup(@Nullable RowId rowId, T result) {
        return (rowId == null ? CompletableFutures.nullCompletedFuture() : this.rowCleanupMap.getOrDefault(rowId, CompletableFutures.nullCompletedFuture())).thenApply(ignored -> result);
    }

    private <T> CompletableFuture<T> awaitCleanup(Collection<RowId> rowIds, T result) {
        if (this.rowCleanupMap.isEmpty()) {
            return CompletableFuture.completedFuture(result);
        }
        ArrayList list = new ArrayList(rowIds.size());
        for (RowId rowId : rowIds) {
            CompletableFuture<?> completableFuture = this.rowCleanupMap.get(rowId);
            if (completableFuture == null) continue;
            list.add(completableFuture);
        }
        if (list.isEmpty()) {
            return CompletableFuture.completedFuture(result);
        }
        return CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).thenApply(unused -> result);
    }

    private BinaryTuple extractPkFromNullable(@Nullable BinaryRow row) {
        assert (row != null);
        return this.extractPk(row);
    }

    private BinaryTuple extractPk(BinaryRow row) {
        return this.pkIndexStorage.get().indexRowResolver().extractColumns(row);
    }

    private BinaryTuple resolvePk(ByteBuffer bytes) {
        return this.pkIndexStorage.get().resolve(bytes);
    }

    private List<BinaryTuple> resolvePks(List<ByteBuffer> bytesList) {
        ArrayList<BinaryTuple> pks = new ArrayList<BinaryTuple>(bytesList.size());
        for (ByteBuffer bytes : bytesList) {
            pks.add(this.resolvePk(bytes));
        }
        return pks;
    }

    private Cursor<RowId> getFromPkIndex(BinaryTuple key) {
        return this.pkIndexStorage.get().storage().get(key);
    }

    private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForUpdate(BinaryRow binaryRow, RowId rowId, UUID txId) {
        return ((CompletableFuture)((CompletableFuture)this.lockManager.acquire(txId, new LockKey(this.tableLockKey), LockMode.IX).thenCompose(ignored -> this.lockManager.acquire(txId, new LockKey(this.tableLockKey, rowId), LockMode.X))).thenCompose(ignored -> this.takePutLockOnIndexes(binaryRow, rowId, txId))).thenApply(shortTermLocks -> new IgniteBiTuple<RowId, Collection>(rowId, (Collection)shortTermLocks));
    }

    private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForInsert(BinaryRow binaryRow, RowId rowId, UUID txId) {
        return ((CompletableFuture)this.lockManager.acquire(txId, new LockKey(this.tableLockKey), LockMode.IX).thenCompose(ignored -> this.takePutLockOnIndexes(binaryRow, rowId, txId))).thenApply(shortTermLocks -> new IgniteBiTuple<RowId, Collection>(rowId, (Collection)shortTermLocks));
    }

    private CompletableFuture<Collection<Lock>> takePutLockOnIndexes(BinaryRow binaryRow, RowId rowId, UUID txId) {
        Collection<IndexLocker> indexes = this.indexesLockers.get().values();
        if (CollectionUtils.nullOrEmpty(indexes)) {
            return CompletableFutures.emptyCollectionCompletedFuture();
        }
        CompletableFuture[] locks = new CompletableFuture[indexes.size()];
        int idx = 0;
        for (IndexLocker locker : indexes) {
            locks[idx++] = locker.locksForInsert(txId, binaryRow, rowId);
        }
        return CompletableFuture.allOf(locks).thenApply(unused -> {
            ArrayList<Lock> shortTermLocks = new ArrayList<Lock>();
            for (CompletableFuture lockFut : locks) {
                Lock shortTermLock = (Lock)lockFut.join();
                if (shortTermLock == null) continue;
                shortTermLocks.add(shortTermLock);
            }
            return shortTermLocks;
        });
    }

    private CompletableFuture<?> takeRemoveLockOnIndexes(BinaryRow binaryRow, RowId rowId, UUID txId) {
        Collection<IndexLocker> indexes = this.indexesLockers.get().values();
        if (CollectionUtils.nullOrEmpty(indexes)) {
            return CompletableFutures.nullCompletedFuture();
        }
        CompletableFuture[] locks = new CompletableFuture[indexes.size()];
        int idx = 0;
        for (IndexLocker locker : indexes) {
            locks[idx++] = locker.locksForRemove(txId, binaryRow, rowId);
        }
        return CompletableFuture.allOf(locks);
    }

    private CompletableFuture<RowId> takeLocksForDeleteExact(BinaryRow expectedRow, RowId rowId, BinaryRow actualRow, UUID txId) {
        return ((CompletableFuture)this.lockManager.acquire(txId, new LockKey(this.tableLockKey), LockMode.IX).thenCompose(ignored -> this.lockManager.acquire(txId, new LockKey(this.tableLockKey, rowId), LockMode.S))).thenCompose(ignored -> {
            if (PartitionReplicaListener.equalValues(actualRow, expectedRow)) {
                return ((CompletableFuture)this.lockManager.acquire(txId, new LockKey(this.tableLockKey, rowId), LockMode.X).thenCompose(ignored0 -> this.takeRemoveLockOnIndexes(actualRow, rowId, txId))).thenApply(exclusiveRowLock -> rowId);
            }
            return CompletableFutures.nullCompletedFuture();
        });
    }

    private CompletableFuture<RowId> takeLocksForDelete(BinaryRow binaryRow, RowId rowId, UUID txId) {
        return ((CompletableFuture)((CompletableFuture)this.lockManager.acquire(txId, new LockKey(this.tableLockKey), LockMode.IX).thenCompose(ignored -> this.lockManager.acquire(txId, new LockKey(this.tableLockKey, rowId), LockMode.X))).thenCompose(ignored -> this.takeRemoveLockOnIndexes(binaryRow, rowId, txId))).thenApply(ignored -> rowId);
    }

    private CompletableFuture<RowId> takeLocksForGet(RowId rowId, UUID txId) {
        return this.lockManager.acquire(txId, new LockKey(this.tableLockKey, rowId), LockMode.S).thenApply(ignored -> rowId);
    }

    private CompletableFuture<ReplicaResult> processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request, long leaseStartTime) {
        BinaryRow newRow = request.newBinaryRow();
        BinaryRow expectedRow = request.oldBinaryRow();
        ReplicationGroupIdMessage commitPartitionId = request.commitPartitionId();
        assert (commitPartitionId != null) : "Commit partition is null [type=" + request.requestType() + "]";
        UUID txId = request.transactionId();
        if (request.requestType() == RequestType.RW_REPLACE) {
            return this.resolveRowByPk(this.extractPk(newRow), txId, (rowId, row, lastCommitTime) -> {
                if (rowId == null) {
                    this.metrics.onRead(false);
                    return CompletableFuture.completedFuture(new ReplicaResult(false, null));
                }
                return this.takeLocksForReplace(expectedRow, (BinaryRow)row, newRow, (RowId)rowId, txId).thenCompose(rowIdLock -> {
                    if (rowIdLock == null) {
                        this.metrics.onRead(false);
                        return CompletableFuture.completedFuture(new ReplicaResult(false, null));
                    }
                    return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateWriteAgainstSchemaAfterTakingLocks(txId).thenCompose(catalogVersion -> this.awaitCleanup((RowId)rowIdLock.get1(), catalogVersion))).thenCompose(catalogVersion -> this.applyUpdateCommand(commitPartitionId.asReplicationGroupId(), ((RowId)rowIdLock.get1()).uuid(), newRow, (HybridTimestamp)lastCommitTime, txId, request.full(), request.coordinatorId(), (int)catalogVersion, request.skipDelayedAck(), leaseStartTime))).thenApply(res -> new IgniteBiTuple<CommandApplicationResult, IgniteBiTuple>((CommandApplicationResult)res, (IgniteBiTuple)rowIdLock))).thenApply(tuple -> {
                        this.metrics.onRead(false);
                        this.metrics.onWrite();
                        ((Collection)((IgniteBiTuple)tuple.get2()).get2()).forEach(lock -> this.lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
                        return new ReplicaResult(true, (CommandApplicationResult)tuple.get1());
                    });
                });
            });
        }
        throw new IgniteInternalException(ErrorGroups.Replicator.REPLICA_COMMON_ERR, IgniteStringFormatter.format("Unknown two actions operation [actionType={}]", new Object[]{request.requestType()}));
    }

    private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForReplace(BinaryRow expectedRow, @Nullable BinaryRow oldRow, BinaryRow newRow, RowId rowId, UUID txId) {
        return ((CompletableFuture)this.lockManager.acquire(txId, new LockKey(this.tableLockKey), LockMode.IX).thenCompose(ignored -> this.lockManager.acquire(txId, new LockKey(this.tableLockKey, rowId), LockMode.S))).thenCompose(ignored -> {
            if (oldRow != null && PartitionReplicaListener.equalValues(oldRow, expectedRow)) {
                return ((CompletableFuture)this.lockManager.acquire(txId, new LockKey(this.tableLockKey, rowId), LockMode.X).thenCompose(ignored1 -> this.takePutLockOnIndexes(newRow, rowId, txId))).thenApply(shortTermLocks -> new IgniteBiTuple<RowId, Collection>(rowId, (Collection)shortTermLocks));
            }
            return CompletableFutures.nullCompletedFuture();
        });
    }

    private CompletableFuture<@Nullable TimedBinaryRow> resolveReadResult(ReadResult readResult, @Nullable UUID txId, @Nullable HybridTimestamp timestamp, Supplier<@Nullable TimedBinaryRow> lastCommitted) {
        UUID retrievedResultTxId;
        if (!readResult.isWriteIntent()) {
            return CompletableFuture.completedFuture(new TimedBinaryRow(readResult.binaryRow(), readResult.commitTimestamp()));
        }
        if (timestamp == null && txId.equals(retrievedResultTxId = readResult.transactionId())) {
            return CompletableFuture.completedFuture(new TimedBinaryRow(readResult.binaryRow()));
        }
        return this.resolveWriteIntentAsync(readResult, timestamp, lastCommitted);
    }

    private CompletableFuture<@Nullable TimedBinaryRow> resolveWriteIntentAsync(ReadResult readResult, @Nullable HybridTimestamp timestamp, Supplier<@Nullable TimedBinaryRow> lastCommitted) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.resolveWriteIntentReadability(readResult, timestamp).thenApply(arg_0 -> this.lambda$resolveWriteIntentAsync$228(readResult, (Supplier)lastCommitted, arg_0)));
    }

    private void scheduleAsyncWriteIntentSwitch(UUID txId, RowId rowId, TransactionMeta meta) {
        TxState txState = meta.txState();
        assert (TxState.isFinalState(txState)) : "Unexpected state [txId=" + txId + ", txState=" + txState + "]";
        HybridTimestamp commitTimestamp = meta.commitTimestamp();
        this.storageUpdateHandler.handleWriteIntentRead(txId, rowId);
        CompletableFuture future = this.rowCleanupMap.computeIfAbsent(rowId, k -> this.txManager.executeWriteIntentSwitchAsync(() -> IgniteUtils.inBusyLock(this.busyLock, () -> this.storageUpdateHandler.switchWriteIntents(txId, txState == TxState.COMMITTED, commitTimestamp, this.indexIdsAtRwTxBeginTsOrNull(txId)))).whenComplete((unused, e) -> {
            if (e != null && !ReplicatorRecoverableExceptions.isRecoverable(e)) {
                LOG.warn("Failed to complete transaction cleanup command [txId=" + txId + "]", (Throwable)e);
            }
        }));
        future.handle((v, e) -> this.rowCleanupMap.remove(rowId, future));
    }

    private CompletableFuture<Boolean> resolveWriteIntentReadability(ReadResult writeIntent, @Nullable HybridTimestamp timestamp) {
        UUID txId = writeIntent.transactionId();
        Integer commitTableOrZoneId = writeIntent.commitTableOrZoneId();
        assert (txId != null && commitTableOrZoneId != null) : "Expecting write intent";
        if (commitTableOrZoneId.intValue() == TablePartitionId.NOT_EXISTING.tableId()) {
            IgniteUtils.inBusyLock(this.busyLock, () -> this.storageUpdateHandler.discardTransaction(txId));
            return CompletableFutures.falseCompletedFuture();
        }
        return this.transactionStateResolver.resolveTxState(txId, this.replicationGroupId(commitTableOrZoneId, writeIntent.commitPartitionId()), timestamp).thenApply(transactionMeta -> {
            if (TxState.isFinalState(transactionMeta.txState())) {
                this.scheduleAsyncWriteIntentSwitch(txId, writeIntent.rowId(), (TransactionMeta)transactionMeta);
            }
            return PartitionReplicaListener.canReadFromWriteIntent(txId, transactionMeta, timestamp);
        });
    }

    private ReplicationGroupId replicationGroupId(int tableOrZoneId, int partitionId) {
        if (this.nodeProperties.colocationEnabled()) {
            return new ZonePartitionId(tableOrZoneId, partitionId);
        }
        return new TablePartitionId(tableOrZoneId, partitionId);
    }

    private static Boolean canReadFromWriteIntent(UUID txId, TransactionMeta txMeta, @Nullable HybridTimestamp timestamp) {
        assert (TxState.isFinalState(txMeta.txState()) || txMeta.txState() == TxState.PENDING) : IgniteStringFormatter.format("Unexpected state defined by write intent resolution [txId={}, txMeta={}].", txId, txMeta);
        if (txMeta.txState() == TxState.COMMITTED) {
            boolean readLatest = timestamp == null;
            return readLatest || txMeta.commitTimestamp().compareTo(timestamp) <= 0;
        }
        return false;
    }

    private CompletableFuture<Void> validateRwReadAgainstSchemaAfterTakingLocks(UUID txId) {
        HybridTimestamp operationTimestamp = this.clockService.now();
        return this.schemaSyncService.waitForMetadataCompleteness(operationTimestamp).thenRun(() -> this.failIfSchemaChangedSinceTxStart(txId, operationTimestamp));
    }

    private CompletableFuture<Integer> validateWriteAgainstSchemaAfterTakingLocks(UUID txId) {
        HybridTimestamp operationTimestamp = this.clockService.current();
        return this.reliableCatalogVersionFor(operationTimestamp).thenApply(catalogVersion -> {
            this.failIfSchemaChangedSinceTxStart(txId, operationTimestamp);
            return catalogVersion;
        });
    }

    private UpdateCommand updateCommand(ReplicationGroupId commitPartitionId, UUID rowUuid, @Nullable BinaryRow row, @Nullable HybridTimestamp lastCommitTimestamp, UUID txId, boolean full, UUID txCoordinatorId, @Nullable HybridTimestamp initiatorTime, int catalogVersion, @Nullable Long leaseStartTime) {
        UpdateCommandV2Builder bldr = PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2().tableId(this.tableId).commitPartitionId(PartitionReplicaListener.replicationGroupIdMessage(commitPartitionId)).rowUuid(rowUuid).txId(txId).full(full).initiatorTime(initiatorTime).txCoordinatorId(txCoordinatorId).requiredCatalogVersion(catalogVersion).leaseStartTime(leaseStartTime);
        if (lastCommitTimestamp != null || row != null) {
            TimedBinaryRowMessageBuilder rowMsgBldr = PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage();
            if (lastCommitTimestamp != null) {
                rowMsgBldr.timestamp(lastCommitTimestamp);
            }
            if (row != null) {
                rowMsgBldr.binaryRowMessage(PartitionReplicaListener.binaryRowMessage(row));
            }
            bldr.messageRowToUpdate(rowMsgBldr.build());
        }
        return bldr.build();
    }

    private static BinaryRowMessage binaryRowMessage(BinaryRow row) {
        return PARTITION_REPLICATION_MESSAGES_FACTORY.binaryRowMessage().binaryTuple(row.tupleSlice()).schemaVersion(row.schemaVersion()).build();
    }

    private UpdateAllCommand updateAllCommand(Map<UUID, TimedBinaryRowMessage> rowsToUpdate, ReplicationGroupIdMessage commitPartitionId, UUID transactionId, HybridTimestamp initiatorTime, boolean full, UUID txCoordinatorId, int catalogVersion, @Nullable Long leaseStartTime) {
        return PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllCommandV2().tableId(this.tableId).commitPartitionId(commitPartitionId).messageRowsToUpdate(rowsToUpdate).txId(transactionId).initiatorTime(initiatorTime).full(full).txCoordinatorId(txCoordinatorId).requiredCatalogVersion(catalogVersion).leaseStartTime(leaseStartTime).build();
    }

    private void failIfSchemaChangedSinceTxStart(UUID txId, HybridTimestamp operationTimestamp) {
        this.schemaCompatValidator.failIfSchemaChangedAfterTxStart(txId, operationTimestamp, this.tableId());
    }

    private CompletableFuture<Integer> reliableCatalogVersionFor(HybridTimestamp ts) {
        return this.reliableCatalogVersions.reliableCatalogVersionFor(ts);
    }

    public static TablePartitionIdMessage tablePartitionId(TablePartitionId tablePartId) {
        if (tablePartId == null) {
            tablePartId = new TablePartitionId(0, 0);
        }
        return ReplicaMessageUtils.toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartId);
    }

    private static ReplicationGroupIdMessage replicationGroupIdMessage(ReplicationGroupId groupId) {
        return ReplicaMessageUtils.toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, groupId);
    }

    @Override
    public void onShutdown() {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return;
        }
        this.busyLock.block();
        this.indexBuildingProcessor.onShutdown();
    }

    private int partId() {
        return this.replicationGroupId.partitionId();
    }

    private int tableId() {
        return this.tableId;
    }

    private CompletableFuture<?> processOperationRequestWithTxOperationManagementLogic(UUID senderId, ReplicaRequest request, ReplicaPrimacy replicaPrimacy, @Nullable HybridTimestamp opStartTsIfDirectRo) {
        this.indexBuildingProcessor.incrementRwOperationCountIfNeeded(request);
        UUID txIdLockingLwm = this.tryToLockLwmIfNeeded(request, opStartTsIfDirectRo);
        try {
            return this.processOperationRequest(senderId, request, replicaPrimacy, opStartTsIfDirectRo).whenComplete((unused, throwable) -> {
                this.unlockLwmIfNeeded(txIdLockingLwm, request);
                this.indexBuildingProcessor.decrementRwOperationCountIfNeeded(request);
            });
        }
        catch (Throwable e) {
            try {
                this.unlockLwmIfNeeded(txIdLockingLwm, request);
            }
            catch (Throwable unlockProblem) {
                e.addSuppressed(unlockProblem);
            }
            try {
                this.indexBuildingProcessor.decrementRwOperationCountIfNeeded(request);
            }
            catch (Throwable decrementProblem) {
                e.addSuppressed(decrementProblem);
            }
            throw e;
        }
    }

    private static UUID newFakeTxId() {
        return UUID.randomUUID();
    }

    @Nullable
    private UUID tryToLockLwmIfNeeded(ReplicaRequest request, @Nullable HybridTimestamp opStartTsIfDirectRo) {
        UUID txIdToLockLwm;
        HybridTimestamp tsToLockLwm = null;
        if (request instanceof ReadOnlyDirectMultiRowReplicaRequest && ((ReadOnlyDirectMultiRowReplicaRequest)request).primaryKeys().size() > 1) {
            assert (opStartTsIfDirectRo != null);
            txIdToLockLwm = PartitionReplicaListener.newFakeTxId();
            tsToLockLwm = opStartTsIfDirectRo;
        } else if (request instanceof ReadOnlyReplicaRequest) {
            ReadOnlyReplicaRequest readOnlyRequest = (ReadOnlyReplicaRequest)request;
            txIdToLockLwm = readOnlyRequest.transactionId();
            tsToLockLwm = readOnlyRequest.readTimestamp();
        } else {
            txIdToLockLwm = null;
        }
        if (txIdToLockLwm != null) {
            if (!this.lowWatermark.tryLock(txIdToLockLwm, tsToLockLwm)) {
                throw new TransactionException(ErrorGroups.Transactions.TX_STALE_READ_ONLY_OPERATION_ERR, "Read timestamp is not available anymore.");
            }
            this.registerAutoLwmUnlockOnCoordinatorLeaveIfNeeded(request, txIdToLockLwm);
        }
        return txIdToLockLwm;
    }

    private void registerAutoLwmUnlockOnCoordinatorLeaveIfNeeded(ReplicaRequest request, UUID txIdToLockLwm) {
        ReadOnlyReplicaRequest readOnlyReplicaRequest;
        UUID coordinatorId;
        if (request instanceof ReadOnlyReplicaRequest && (coordinatorId = (readOnlyReplicaRequest = (ReadOnlyReplicaRequest)request).coordinatorId()) != null) {
            FullyQualifiedResourceId resourceId = new FullyQualifiedResourceId(txIdToLockLwm, txIdToLockLwm);
            this.remotelyTriggeredResourceRegistry.register(resourceId, coordinatorId, () -> () -> this.lowWatermark.unlock(txIdToLockLwm));
        }
    }

    private void unlockLwmIfNeeded(@Nullable UUID txIdToUnlockLwm, ReplicaRequest request) {
        if (txIdToUnlockLwm != null && request instanceof ReadOnlyDirectReplicaRequest) {
            this.lowWatermark.unlock(txIdToUnlockLwm);
        }
    }

    private List<Integer> indexIdsAtRwTxBeginTs(UUID txId) {
        return TableUtils.indexIdsAtRwTxBeginTs(this.catalogService, txId, this.tableId());
    }

    @Nullable
    private List<Integer> indexIdsAtRwTxBeginTsOrNull(UUID txId) {
        return TableUtils.indexIdsAtRwTxBeginTsOrNull(this.catalogService, txId, this.tableId());
    }

    private int tableVersionByTs(HybridTimestamp ts) {
        Catalog catalog = this.catalogService.activeCatalog(ts.longValue());
        CatalogTableDescriptor table = catalog.table(this.tableId());
        assert (table != null) : "tableId=" + this.tableId() + ", catalogVersion=" + catalog.version();
        return table.latestSchemaVersion();
    }

    @Nullable
    private static BinaryRow binaryRow(@Nullable TimedBinaryRow timedBinaryRow) {
        return timedBinaryRow == null ? null : timedBinaryRow.binaryRow();
    }

    @Nullable
    private BinaryRow upgrade(@Nullable BinaryRow source, int targetSchemaVersion) {
        if (source == null) {
            return null;
        }
        if (source.schemaVersion() >= targetSchemaVersion) {
            return source;
        }
        return new BinaryRowUpgrader(this.schemaRegistry, targetSchemaVersion).upgrade(source);
    }

    private int upgrade(List<RowUpdateInfo<BinaryRow>> rows) {
        int batchSchemaVer = -1;
        for (int i = 0; i < rows.size(); ++i) {
            RowUpdateInfo<BinaryRow> rowInfo = rows.get(i);
            int rowSchemaVer = this.tableVersionByTs(rowInfo.commitTs());
            if (i == 0) {
                batchSchemaVer = rowSchemaVer;
            } else if (rowSchemaVer > batchSchemaVer || PartitionReplicaListener.isSchemaNewer(rowInfo.oldRow(), batchSchemaVer) || PartitionReplicaListener.isSchemaNewer(rowInfo.row(), batchSchemaVer)) {
                rows.subList(i, rows.size()).clear();
                return batchSchemaVer;
            }
            BinaryRow row = this.upgrade(rowInfo.row(), batchSchemaVer);
            BinaryRow oldRow = this.upgrade(rowInfo.oldRow(), batchSchemaVer);
            if (row == rowInfo.row() && oldRow == rowInfo.oldRow()) continue;
            RowUpdateInfo<BinaryRow> newRowInfo = new RowUpdateInfo<BinaryRow>(rowInfo.rowUuid(), rowInfo.timestamp(), row, oldRow, rowInfo.commitTs(), rowInfo.oldCommitTs(), rowInfo.eventType());
            rows.set(i, newRowInfo);
        }
        return batchSchemaVer;
    }

    private static boolean isSchemaNewer(@Nullable BinaryRow row, int schemaVer) {
        return row != null && row.schemaVersion() > schemaVer;
    }

    @TestOnly
    public void cleanupLocally(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) {
        this.storageUpdateHandler.switchWriteIntents(txId, commit, commitTimestamp, null);
    }

    private /* synthetic */ TimedBinaryRow lambda$resolveWriteIntentAsync$228(ReadResult readResult, Supplier lastCommitted, Boolean writeIntentReadable) {
        return IgniteUtils.inBusyLock(this.busyLock, () -> this.lambda$resolveWriteIntentAsync$227(writeIntentReadable, readResult, (Supplier)lastCommitted));
    }

    private /* synthetic */ TimedBinaryRow lambda$resolveWriteIntentAsync$227(Boolean writeIntentReadable, ReadResult readResult, Supplier lastCommitted) {
        if (writeIntentReadable.booleanValue()) {
            HybridTimestamp commitTimestamp = this.txManager.stateMeta(readResult.transactionId()).commitTimestamp();
            return new TimedBinaryRow(readResult.binaryRow(), commitTimestamp);
        }
        return (TimedBinaryRow)lastCommitted.get();
    }

    private static class OperationId {
        private UUID initiatorId;
        private long ts;

        public OperationId(UUID initiatorId, long ts) {
            this.initiatorId = initiatorId;
            this.ts = ts;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            OperationId that = (OperationId)o;
            if (this.ts != that.ts) {
                return false;
            }
            return this.initiatorId.equals(that.initiatorId);
        }

        public int hashCode() {
            int result = this.initiatorId.hashCode();
            result = 31 * result + (int)(this.ts ^ this.ts >>> 32);
            return result;
        }
    }

    private static class TxCleanupReadyFutureList {
        final Map<RequestType, Map<OperationId, CompletableFuture<?>>> futures = new EnumMap(RequestType.class);

        private TxCleanupReadyFutureList() {
        }
    }
}

