package org.apache.ignite3.internal.table.distributed.storage;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite3.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite3.internal.continuousquery.ContinuousQuery;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryMetricSink;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryRequest;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryScanResult;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryScanResultWithSchema;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.hlc.HybridTimestampTracker;
import org.apache.ignite3.internal.lang.IgnitePentaFunction;
import org.apache.ignite3.internal.lang.IgniteStringFormatter;
import org.apache.ignite3.internal.lang.IgniteSystemProperties;
import org.apache.ignite3.internal.lang.IgniteTriFunction;
import org.apache.ignite3.internal.network.ClusterNodeResolver;
import org.apache.ignite3.internal.network.UnresolvableConsistentIdException;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite3.internal.partition.replicator.network.replication.BinaryTupleMessage;
import org.apache.ignite3.internal.partition.replicator.network.replication.DcrWriteMultiRowReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.MultipleRowPkReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.MultipleRowReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadWriteMultiRowPkReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.ReadWriteMultiRowReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.RequestType;
import org.apache.ignite3.internal.partition.replicator.network.replication.SingleRowPkReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.SingleRowReplicaRequest;
import org.apache.ignite3.internal.partition.replicator.network.replication.SwapRowReplicaRequest;
import org.apache.ignite3.internal.placementdriver.PlacementDriver;
import org.apache.ignite3.internal.placementdriver.ReplicaMeta;
import org.apache.ignite3.internal.replicator.ReplicaService;
import org.apache.ignite3.internal.replicator.ReplicationGroupId;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite3.internal.replicator.exception.ReplicationException;
import org.apache.ignite3.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite3.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite3.internal.replicator.message.ReplicaRequest;
import org.apache.ignite3.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite3.internal.replicator.message.TimestampAware;
import org.apache.ignite3.internal.schema.BinaryRow;
import org.apache.ignite3.internal.schema.BinaryRowEx;
import org.apache.ignite3.internal.schema.BinaryTimedRow;
import org.apache.ignite3.internal.schema.BinaryTuple;
import org.apache.ignite3.internal.schema.BinaryTuplePrefix;
import org.apache.ignite3.internal.schema.SchemaDescriptor;
import org.apache.ignite3.internal.schema.SchemaRegistry;
import org.apache.ignite3.internal.schema.row.TimedRow;
import org.apache.ignite3.internal.storage.BinaryRowAndRowId;
import org.apache.ignite3.internal.storage.engine.MvTableStorage;
import org.apache.ignite3.internal.storage.operation.StorageOptimizedOperation;
import org.apache.ignite3.internal.storage.secondary.SecondaryTableStorage;
import org.apache.ignite3.internal.table.InternalTable;
import org.apache.ignite3.internal.table.StreamerReceiverRunner;
import org.apache.ignite3.internal.table.distributed.TableUtils;
import org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher;
import org.apache.ignite3.internal.tx.InternalTransaction;
import org.apache.ignite3.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite3.internal.tx.TransactionIds;
import org.apache.ignite3.internal.tx.TxManager;
import org.apache.ignite3.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite3.internal.tx.impl.TransactionInflights;
import org.apache.ignite3.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.FastTimestamps;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.util.Pair;
import org.apache.ignite3.internal.util.PendingComparableValuesTracker;
import org.apache.ignite3.internal.utils.PrimaryReplica;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.lang.IgniteException;
import org.apache.ignite3.lang.TableNotFoundException;
import org.apache.ignite3.network.ClusterNode;
import org.apache.ignite3.table.ContinuousQueryOptions;
import org.apache.ignite3.table.QualifiedName;
import org.apache.ignite3.table.QualifiedNameHelper;
import org.apache.ignite3.table.TableRowEventBatch;
import org.apache.ignite3.tx.IgniteTransactions;
import org.apache.ignite3.tx.TransactionException;
import org.gridgain.internal.license.LicenseFeatureChecker;
import org.gridgain.lang.GridgainErrorGroups;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite3/internal/table/distributed/storage/InternalTableImpl.class */
public class InternalTableImpl implements InternalTable {
    public static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 30;
    private static final ReadWriteInflightBatchRequestTracker READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER;
    private static final PartitionReplicationMessagesFactory TABLE_MESSAGES_FACTORY;
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY;
    public static final int DEFAULT_RW_TIMEOUT = 10000;
    private final int partitions;
    private final Supplier<ScheduledExecutorService> streamerFlushExecutor;
    private final ContinuousQueryResponseHandler continuousQueryResponseHandler;
    private final StreamerReceiverRunner streamerReceiverRunner;
    private volatile QualifiedName tableName;
    private final int tableId;
    private final int zoneId;
    private final ClusterNodeResolver clusterNodeResolver;
    protected final TxManager txManager;
    private final TransactionInflights transactionInflights;
    private final MvTableStorage tableStorage;
    private final TxStateStorage txStateStorage;
    private final ReplicaService replicaSvc;
    private final ClockService clockService;
    private final HybridTimestampTracker observableTimestampTracker;
    private final PlacementDriver placementDriver;
    private final int attemptsObtainLock;

    @Nullable
    private volatile SecondaryTableStorage secondaryTableStorage;
    private final SchemaRegistry schemaRegistry;
    private final boolean cache;
    private final LicenseFeatureChecker licenseFeatureChecker;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object updatePartitionMapsMux = new Object();
    private volatile Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp, Void>> safeTimeTrackerByPartitionId = Int2ObjectMaps.emptyMap();
    private volatile Int2ObjectMap<PendingComparableValuesTracker<Long, Void>> storageIndexTrackerByPartitionId = Int2ObjectMaps.emptyMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite3/internal/table/distributed/storage/InternalTableImpl$EmbeddedContinuousQueryMetricSink.class */
    public static class EmbeddedContinuousQueryMetricSink implements ContinuousQueryMetricSink {
        private EmbeddedContinuousQueryMetricSink() {
        }

        @Override // org.apache.ignite3.internal.continuousquery.ContinuousQueryMetricSink
        public void continuousQueryRequestsSentAdd(long j) {
        }

        @Override // org.apache.ignite3.internal.continuousquery.ContinuousQueryMetricSink
        public void continuousQueryEventsReceivedAdd(long j) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite3/internal/table/distributed/storage/InternalTableImpl$ReadOnlyInflightBatchRequestTracker.class */
    public static class ReadOnlyInflightBatchRequestTracker implements PartitionScanPublisher.InflightBatchRequestTracker {
        private final TransactionInflights transactionInflights;
        private final UUID txId;

        ReadOnlyInflightBatchRequestTracker(TransactionInflights transactionInflights, UUID uuid) {
            this.transactionInflights = transactionInflights;
            this.txId = uuid;
        }

        @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher.InflightBatchRequestTracker
        public void onRequestBegin() {
            if (!this.transactionInflights.addInflight(this.txId, true)) {
                throw new TransactionException(ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR, IgniteStringFormatter.format("Transaction is already finished () [txId={}, readOnly=true].", this.txId));
            }
        }

        @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher.InflightBatchRequestTracker
        public void onRequestEnd() {
            this.transactionInflights.removeInflight(this.txId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite3/internal/table/distributed/storage/InternalTableImpl$ReadWriteInflightBatchRequestTracker.class */
    public static class ReadWriteInflightBatchRequestTracker implements PartitionScanPublisher.InflightBatchRequestTracker {
        private ReadWriteInflightBatchRequestTracker() {
        }

        @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher.InflightBatchRequestTracker
        public void onRequestBegin() {
        }

        @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher.InflightBatchRequestTracker
        public void onRequestEnd() {
        }
    }

    public InternalTableImpl(QualifiedName qualifiedName, int i, int i2, int i3, ClusterNodeResolver clusterNodeResolver, TxManager txManager, MvTableStorage mvTableStorage, TxStateStorage txStateStorage, ReplicaService replicaService, ClockService clockService, HybridTimestampTracker hybridTimestampTracker, PlacementDriver placementDriver, TransactionInflights transactionInflights, int i4, Supplier<ScheduledExecutorService> supplier, @Nullable SecondaryTableStorage secondaryTableStorage, SchemaRegistry schemaRegistry, ContinuousQueryResponseHandler continuousQueryResponseHandler, boolean z, StreamerReceiverRunner streamerReceiverRunner, LicenseFeatureChecker licenseFeatureChecker) {
        this.tableName = qualifiedName;
        this.zoneId = i;
        this.tableId = i2;
        this.partitions = i3;
        this.clusterNodeResolver = clusterNodeResolver;
        this.txManager = txManager;
        this.tableStorage = mvTableStorage;
        this.txStateStorage = txStateStorage;
        this.replicaSvc = replicaService;
        this.clockService = clockService;
        this.observableTimestampTracker = hybridTimestampTracker;
        this.placementDriver = placementDriver;
        this.transactionInflights = transactionInflights;
        this.attemptsObtainLock = i4;
        this.streamerFlushExecutor = supplier;
        this.secondaryTableStorage = secondaryTableStorage;
        this.schemaRegistry = schemaRegistry;
        this.continuousQueryResponseHandler = continuousQueryResponseHandler;
        this.cache = z;
        this.streamerReceiverRunner = streamerReceiverRunner;
        this.licenseFeatureChecker = licenseFeatureChecker;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public MvTableStorage storage() {
        return this.tableStorage;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    @Nullable
    public SecondaryTableStorage secondaryStorage() {
        return this.secondaryTableStorage;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public void secondaryStorage(@Nullable SecondaryTableStorage secondaryTableStorage) {
        this.secondaryTableStorage = secondaryTableStorage;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public void removeSecondaryStorage() {
        this.secondaryTableStorage = null;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public boolean hasSecondaryStorage() {
        return this.secondaryTableStorage != null;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public int partitions() {
        return this.partitions;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public int tableId() {
        return this.tableId;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public int zoneId() {
        return this.zoneId;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public QualifiedName name() {
        return this.tableName;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public synchronized void name(String str) {
        this.tableName = QualifiedNameHelper.fromNormalized(this.tableName.schemaName(), str);
    }

    private <R> CompletableFuture<R> enlistInTx(BinaryRowEx binaryRowEx, @Nullable InternalTransaction internalTransaction, IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest> igniteTriFunction, BiPredicate<R, ReplicaRequest> biPredicate) {
        return enlistInTx(binaryRowEx, internalTransaction, igniteTriFunction, biPredicate, (Long) null);
    }

    private <R> CompletableFuture<R> enlistInTx(BinaryRowEx binaryRowEx, @Nullable InternalTransaction internalTransaction, IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest> igniteTriFunction, BiPredicate<R, ReplicaRequest> biPredicate, @Nullable Long l) {
        CompletableFuture<R> enlistAndInvoke;
        if (internalTransaction != null) {
            if (internalTransaction.isReadOnly()) {
                return CompletableFuture.failedFuture(new TransactionException(ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, "Failed to enlist read-write operation into read-only transaction txId={" + internalTransaction.id() + "}"));
            }
            if (internalTransaction.external() != cache()) {
                return CompletableFuture.failedFuture(new TransactionException(GridgainErrorGroups.Cache.TX_INCOMPATIBLE_OPERATION_ERR, IgniteStringFormatter.format("Requested operation is incompatible with cache or table type [txId={}, cache={}]", internalTransaction.id(), Boolean.valueOf(cache()))));
            }
        }
        InternalTransaction startImplicitRwTxIfNeeded = startImplicitRwTxIfNeeded(internalTransaction);
        int partitionId = partitionId(binaryRowEx);
        ReplicationGroupId targetReplicationGroupId = targetReplicationGroupId(partitionId);
        PendingTxPartitionEnlistment enlistedPartition = startImplicitRwTxIfNeeded.enlistedPartition(targetReplicationGroupId);
        if (enlistedPartition == null) {
            enlistAndInvoke = enlistAndInvoke(startImplicitRwTxIfNeeded, partitionId, l2 -> {
                return (ReplicaRequest) igniteTriFunction.apply(startImplicitRwTxIfNeeded, targetReplicationGroupId, l2);
            }, startImplicitRwTxIfNeeded.implicit(), biPredicate);
        } else {
            if (!$assertionsDisabled && startImplicitRwTxIfNeeded.implicit()) {
                throw new AssertionError();
            }
            enlistAndInvoke = trackingInvoke(startImplicitRwTxIfNeeded, partitionId, l3 -> {
                return (ReplicaRequest) igniteTriFunction.apply(startImplicitRwTxIfNeeded, targetReplicationGroupId, l3);
            }, false, enlistedPartition, biPredicate, this.attemptsObtainLock);
        }
        return postEnlist(enlistAndInvoke, false, startImplicitRwTxIfNeeded, startImplicitRwTxIfNeeded.implicit()).handle((obj, th) -> {
            if (th != null) {
                if (startImplicitRwTxIfNeeded.implicit()) {
                    long timeout = startImplicitRwTxIfNeeded.isReadOnly() ? startImplicitRwTxIfNeeded.timeout() : 10000L;
                    long physical = l == null ? startImplicitRwTxIfNeeded.startTimestamp().getPhysical() : l.longValue();
                    if (canRetry(th, physical, timeout)) {
                        return enlistInTx(binaryRowEx, (InternalTransaction) null, (IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, ReplicaRequest>) igniteTriFunction, biPredicate, Long.valueOf(physical));
                    }
                }
                ExceptionUtils.sneakyThrow(th);
            }
            return CompletableFuture.completedFuture(obj);
        }).thenCompose(Function.identity());
    }

    private <T> CompletableFuture<T> enlistInTx(Collection<BinaryRowEx> collection, @Nullable InternalTransaction internalTransaction, IgnitePentaFunction<Collection<? extends BinaryRow>, InternalTransaction, ReplicationGroupId, Long, Boolean, ReplicaRequest> ignitePentaFunction, Function<Collection<RowBatch>, CompletableFuture<T>> function, BiPredicate<T, ReplicaRequest> biPredicate) {
        return enlistInTx(collection, internalTransaction, ignitePentaFunction, function, biPredicate, null);
    }

    private <T> CompletableFuture<T> enlistInTx(Collection<BinaryRowEx> collection, @Nullable InternalTransaction internalTransaction, IgnitePentaFunction<Collection<? extends BinaryRow>, InternalTransaction, ReplicationGroupId, Long, Boolean, ReplicaRequest> ignitePentaFunction, Function<Collection<RowBatch>, CompletableFuture<T>> function, BiPredicate<T, ReplicaRequest> biPredicate, @Nullable Long l) {
        CompletableFuture<?> enlistAndInvoke;
        if (internalTransaction != null) {
            if (internalTransaction.isReadOnly()) {
                return CompletableFuture.failedFuture(new TransactionException(ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, "Failed to enlist read-write operation into read-only transaction txId={" + internalTransaction.id() + "}"));
            }
            if (internalTransaction.external() != cache()) {
                return CompletableFuture.failedFuture(new TransactionException(GridgainErrorGroups.Cache.TX_INCOMPATIBLE_OPERATION_ERR, "Requested operation is incompatible with cache or table type txId={" + internalTransaction.id() + "}"));
            }
        }
        InternalTransaction startImplicitRwTxIfNeeded = startImplicitRwTxIfNeeded(internalTransaction);
        Int2ObjectMap<RowBatch> rowBatchByPartitionId = toRowBatchByPartitionId(collection);
        boolean z = rowBatchByPartitionId.size() == 1;
        boolean z2 = startImplicitRwTxIfNeeded.implicit() && z;
        ObjectIterator it = rowBatchByPartitionId.int2ObjectEntrySet().iterator();
        while (it.hasNext()) {
            Int2ObjectMap.Entry entry = (Int2ObjectMap.Entry) it.next();
            int intKey = entry.getIntKey();
            RowBatch rowBatch = (RowBatch) entry.getValue();
            ReplicationGroupId targetReplicationGroupId = targetReplicationGroupId(intKey);
            PendingTxPartitionEnlistment enlistedPartition = startImplicitRwTxIfNeeded.enlistedPartition(targetReplicationGroupId);
            if (enlistedPartition == null) {
                enlistAndInvoke = enlistAndInvoke(startImplicitRwTxIfNeeded, intKey, l2 -> {
                    return (ReplicaRequest) ignitePentaFunction.apply(rowBatch.requestedRows, startImplicitRwTxIfNeeded, targetReplicationGroupId, l2, Boolean.valueOf(z2));
                }, z2, biPredicate);
            } else {
                if (!$assertionsDisabled && startImplicitRwTxIfNeeded.implicit()) {
                    throw new AssertionError();
                }
                enlistAndInvoke = trackingInvoke(startImplicitRwTxIfNeeded, intKey, l3 -> {
                    return (ReplicaRequest) ignitePentaFunction.apply(rowBatch.requestedRows, startImplicitRwTxIfNeeded, targetReplicationGroupId, l3, false);
                }, false, enlistedPartition, biPredicate, this.attemptsObtainLock);
            }
            rowBatch.resultFuture = enlistAndInvoke;
        }
        return postEnlist(function.apply(rowBatchByPartitionId.values()), startImplicitRwTxIfNeeded.implicit() && !z, startImplicitRwTxIfNeeded, z2).handle((BiFunction) (obj, th) -> {
            if (th != null) {
                if (startImplicitRwTxIfNeeded.implicit()) {
                    long timeout = startImplicitRwTxIfNeeded.isReadOnly() ? startImplicitRwTxIfNeeded.timeout() : 3000L;
                    long physical = l == null ? startImplicitRwTxIfNeeded.startTimestamp().getPhysical() : l.longValue();
                    if (canRetry(th, physical, timeout)) {
                        return enlistInTx(collection, null, ignitePentaFunction, function, biPredicate, Long.valueOf(physical));
                    }
                }
                ExceptionUtils.sneakyThrow(th);
            }
            return CompletableFuture.completedFuture(obj);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    private InternalTransaction startImplicitRwTxIfNeeded(@Nullable InternalTransaction internalTransaction) {
        return internalTransaction == null ? cache() ? this.txManager.beginExternal(this.observableTimestampTracker, true) : this.txManager.beginImplicitRw(this.observableTimestampTracker) : (cache() && internalTransaction.implicit() && !internalTransaction.external()) ? this.txManager.castToExternal(internalTransaction) : internalTransaction;
    }

    private InternalTransaction startImplicitRoTxIfNeeded(@Nullable InternalTransaction internalTransaction) {
        return internalTransaction == null ? this.txManager.beginImplicitRo(this.observableTimestampTracker) : internalTransaction;
    }

    private CompletableFuture<Collection<BinaryRow>> enlistCursorInTx(InternalTransaction internalTransaction, int i, long j, int i2, @Nullable Integer num, @Nullable BinaryTuple binaryTuple, @Nullable BinaryTuplePrefix binaryTuplePrefix, @Nullable BinaryTuplePrefix binaryTuplePrefix2, int i3, @Nullable BitSet bitSet) {
        CompletableFuture enlistAndInvoke;
        ReplicationGroupId targetReplicationGroupId = targetReplicationGroupId(i);
        PendingTxPartitionEnlistment enlistedPartition = internalTransaction.enlistedPartition(targetReplicationGroupId);
        Function<Long, ReplicaRequest> function = l -> {
            return TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest().groupId(serializeReplicationGroupId(targetReplicationGroupId)).tableId(this.tableId).timestamp(internalTransaction.startTimestamp()).transactionId(internalTransaction.id()).scanId(j).indexToUse(num).exactKey(binaryTupleMessage(binaryTuple)).lowerBoundPrefix(binaryTupleMessage(binaryTuplePrefix)).upperBoundPrefix(binaryTupleMessage(binaryTuplePrefix2)).flags(i3).columnsToInclude(bitSet).full(internalTransaction.implicit()).batchSize(i2).enlistmentConsistencyToken(l).commitPartitionId(serializeReplicationGroupId(internalTransaction.commitPartition())).coordinatorId(internalTransaction.coordinatorId()).build();
        };
        if (enlistedPartition != null) {
            enlistedPartition.addTableId(this.tableId);
            enlistAndInvoke = this.replicaSvc.invoke(enlistedPartition.primaryNodeConsistentId(), function.apply(Long.valueOf(enlistedPartition.consistencyToken())));
        } else {
            enlistAndInvoke = enlistAndInvoke(internalTransaction, i, function, false, null);
        }
        return postEnlist(enlistAndInvoke, false, internalTransaction, false);
    }

    @Nullable
    private static BinaryTupleMessage binaryTupleMessage(@Nullable BinaryTupleReader binaryTupleReader) {
        if (binaryTupleReader == null) {
            return null;
        }
        return TABLE_MESSAGES_FACTORY.binaryTupleMessage().tuple(binaryTupleReader.byteBuffer()).elementCount(binaryTupleReader.elementCount()).build();
    }

    private static boolean canRetry(Throwable th, long j, long j2) {
        return exceptionAllowsImplicitTxRetry(th) && FastTimestamps.coarseCurrentTimeMillis() - j < j2;
    }

    private <R> CompletableFuture<R> enlistAndInvoke(InternalTransaction internalTransaction, int i, Function<Long, ReplicaRequest> function, boolean z, @Nullable BiPredicate<R, ReplicaRequest> biPredicate) {
        return (CompletableFuture<R>) enlist(i, internalTransaction).thenCompose(pendingTxPartitionEnlistment -> {
            return trackingInvoke(internalTransaction, i, function, z, pendingTxPartitionEnlistment, biPredicate, this.attemptsObtainLock);
        });
    }

    private <R> CompletableFuture<R> trackingInvoke(InternalTransaction internalTransaction, int i, Function<Long, ReplicaRequest> function, boolean z, PendingTxPartitionEnlistment pendingTxPartitionEnlistment, @Nullable BiPredicate<R, ReplicaRequest> biPredicate, int i2) {
        if (!$assertionsDisabled && internalTransaction.isReadOnly()) {
            throw new AssertionError(IgniteStringFormatter.format("Tracking invoke is available only for read-write transactions [tx={}].", internalTransaction));
        }
        pendingTxPartitionEnlistment.addTableId(this.tableId);
        ReplicaRequest apply = function.apply(Long.valueOf(pendingTxPartitionEnlistment.consistencyToken()));
        return z ? (CompletableFuture<R>) this.replicaSvc.invokeRaw(pendingTxPartitionEnlistment.primaryNodeConsistentId(), apply).handle((replicaResponse, th) -> {
            boolean z2 = th != null;
            if (!$assertionsDisabled && !z2 && !(replicaResponse instanceof TimestampAware)) {
                throw new AssertionError();
            }
            internalTransaction.finish(!z2, z2 ? null : ((TimestampAware) replicaResponse).timestamp(), true);
            if (th != null) {
                ExceptionUtils.sneakyThrow(th);
            }
            return replicaResponse.result();
        }) : ((apply instanceof SingleRowReplicaRequest) && ((SingleRowReplicaRequest) apply).requestType() != RequestType.RW_GET) || (((apply instanceof MultipleRowReplicaRequest) && ((MultipleRowReplicaRequest) apply).requestType() != RequestType.RW_GET_ALL) || (((apply instanceof SingleRowPkReplicaRequest) && ((SingleRowPkReplicaRequest) apply).requestType() != RequestType.RW_GET) || (((apply instanceof MultipleRowPkReplicaRequest) && ((MultipleRowPkReplicaRequest) apply).requestType() != RequestType.RW_GET_ALL) || (apply instanceof SwapRowReplicaRequest)))) ? !this.transactionInflights.addInflight(internalTransaction.id(), false) ? CompletableFuture.failedFuture(new TransactionException(ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR, IgniteStringFormatter.format("Transaction is already finished [tableName={}, partId={}, txState={}].", this.tableName, Integer.valueOf(i), internalTransaction.state()))) : this.replicaSvc.invoke(pendingTxPartitionEnlistment.primaryNodeConsistentId(), apply).thenApply(obj -> {
            if (!$assertionsDisabled && biPredicate == null) {
                throw new AssertionError();
            }
            if (biPredicate.test(obj, apply)) {
                this.transactionInflights.removeInflight(internalTransaction.id());
            }
            return obj;
        }).handle((BiFunction<? super U, Throwable, ? extends U>) (obj2, th2) -> {
            if (th2 != null) {
                if (i2 > 0 && ExceptionUtils.matchAny(ExceptionUtils.unwrapCause(th2), ErrorGroups.Transactions.ACQUIRE_LOCK_ERR, new int[0])) {
                    this.transactionInflights.removeInflight(internalTransaction.id());
                    return trackingInvoke(internalTransaction, i, l -> {
                        return apply;
                    }, false, pendingTxPartitionEnlistment, biPredicate, i2 - 1);
                }
                ExceptionUtils.sneakyThrow(th2);
            }
            return CompletableFuture.completedFuture(obj2);
        }).thenCompose(Function.identity()) : this.replicaSvc.invoke(pendingTxPartitionEnlistment.primaryNodeConsistentId(), apply).handle((obj3, th3) -> {
            if (th3 != null) {
                if (i2 > 0 && ExceptionUtils.matchAny(ExceptionUtils.unwrapCause(th3), ErrorGroups.Transactions.ACQUIRE_LOCK_ERR, new int[0])) {
                    return trackingInvoke(internalTransaction, i, l -> {
                        return apply;
                    }, false, pendingTxPartitionEnlistment, biPredicate, i2 - 1);
                }
                ExceptionUtils.sneakyThrow(th3);
            }
            return CompletableFuture.completedFuture(obj3);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    private <T> CompletableFuture<T> postEnlist(CompletableFuture<T> completableFuture, boolean z, InternalTransaction internalTransaction, boolean z2) {
        if (!$assertionsDisabled && z && z2) {
            throw new AssertionError("Invalid combination of flags");
        }
        return completableFuture.handle((BiFunction) (obj, th) -> {
            return z2 ? th != null ? CompletableFuture.failedFuture(th) : CompletableFuture.completedFuture(obj) : th != null ? internalTransaction.rollbackAsync().handle((r4, th) -> {
                if (th != null) {
                    th.addSuppressed(th);
                }
                ExceptionUtils.sneakyThrow(th);
                return null;
            }) : z ? internalTransaction.commitAsync().thenApply(r3 -> {
                return obj;
            }) : CompletableFuture.completedFuture(obj);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    private <R> CompletableFuture<R> evaluateReadOnlyPrimaryNode(@Nullable InternalTransaction internalTransaction, BinaryRowEx binaryRowEx, BiFunction<ReplicationGroupId, Long, ReplicaRequest> biFunction) {
        return sendReadOnlyToPrimaryReplica(startImplicitRoTxIfNeeded(internalTransaction), targetReplicationGroupId(partitionId(binaryRowEx)), biFunction);
    }

    private <R> CompletableFuture<R> evaluateReadOnlyPrimaryNode(@Nullable InternalTransaction internalTransaction, Collection<BinaryRowEx> collection, BiFunction<ReplicationGroupId, Long, ReplicaRequest> biFunction) {
        return sendReadOnlyToPrimaryReplica(startImplicitRoTxIfNeeded(internalTransaction), targetReplicationGroupId(partitionId(collection.iterator().next())), biFunction);
    }

    private <R> CompletableFuture<R> sendReadOnlyToPrimaryReplica(InternalTransaction internalTransaction, ReplicationGroupId replicationGroupId, BiFunction<ReplicationGroupId, Long, ReplicaRequest> biFunction) {
        CompletableFuture<R> thenCompose;
        ReplicaMeta currentPrimaryReplica = this.placementDriver.getCurrentPrimaryReplica(replicationGroupId, internalTransaction.startTimestamp());
        Function<? super ReplicaMeta, ? extends CompletionStage<U>> function = replicaMeta -> {
            try {
                return this.replicaSvc.invoke(getClusterNode(replicaMeta), (ReplicaRequest) biFunction.apply(replicationGroupId, Long.valueOf(enlistmentConsistencyToken(replicaMeta))));
            } catch (Throwable th) {
                throw new TransactionException(ErrorGroups.Common.INTERNAL_ERR, IgniteStringFormatter.format("Failed to invoke the replica request [tableName={}, grp={}].", this.tableName.toCanonicalForm(), replicationGroupId), th);
            }
        };
        if (currentPrimaryReplica == null || this.clusterNodeResolver.getById(currentPrimaryReplica.getLeaseholderId()) == null) {
            thenCompose = awaitPrimaryReplica(replicationGroupId, internalTransaction.startTimestamp()).thenCompose(function);
        } else {
            try {
                thenCompose = (CompletableFuture) function.apply(currentPrimaryReplica);
            } catch (IgniteException e) {
                return CompletableFuture.failedFuture(e);
            }
        }
        return postEvaluate(thenCompose, internalTransaction);
    }

    private <R> CompletableFuture<R> postEvaluate(CompletableFuture<R> completableFuture, InternalTransaction internalTransaction) {
        return completableFuture.handle((obj, th) -> {
            return th != null ? internalTransaction.finish(false, this.clockService.current(), false).handle((r4, th) -> {
                if (th != null) {
                    th.addSuppressed(th);
                }
                ExceptionUtils.sneakyThrow(th);
                return null;
            }) : internalTransaction.finish(true, this.clockService.current(), false).thenApply(r3 -> {
                return obj;
            });
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<BinaryRow> get(BinaryRowEx binaryRowEx, @Nullable InternalTransaction internalTransaction) {
        checkTransactionFinishStarted(internalTransaction);
        return TableUtils.isDirectFlowApplicableTx(internalTransaction) ? evaluateReadOnlyPrimaryNode(internalTransaction, binaryRowEx, (replicationGroupId, l) -> {
            return TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).enlistmentConsistencyToken(l).schemaVersion(binaryRowEx.schemaVersion()).primaryKey(binaryRowEx.tupleSlice()).requestType(RequestType.RO_GET).build();
        }) : internalTransaction.isReadOnly() ? evaluateReadOnlyRecipientNode(partitionId(binaryRowEx), internalTransaction.readTimestamp()).thenCompose(clusterNode -> {
            return get(binaryRowEx, internalTransaction.readTimestamp(), internalTransaction.id(), internalTransaction.coordinatorId(), clusterNode);
        }) : enlistInTx(binaryRowEx, internalTransaction, (internalTransaction2, replicationGroupId2, l2) -> {
            return TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId2)).tableId(this.tableId).schemaVersion(binaryRowEx.schemaVersion()).primaryKey(binaryRowEx.tupleSlice()).commitPartitionId(serializeReplicationGroupId(internalTransaction2.commitPartition())).transactionId(internalTransaction2.id()).enlistmentConsistencyToken(l2).requestType(RequestType.RW_GET).timestamp(internalTransaction2.startTimestamp()).full(false).coordinatorId(internalTransaction2.coordinatorId()).build();
        }, (binaryRow, replicaRequest) -> {
            return false;
        });
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<BinaryRow> get(BinaryRowEx binaryRowEx, HybridTimestamp hybridTimestamp, @Nullable UUID uuid, @Nullable UUID uuid2, ClusterNode clusterNode) {
        return get(binaryRowEx, hybridTimestamp, uuid, uuid2, clusterNode, true);
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<BinaryRow> get(BinaryRowEx binaryRowEx, HybridTimestamp hybridTimestamp, @Nullable UUID uuid, @Nullable UUID uuid2, ClusterNode clusterNode, boolean z) {
        return this.replicaSvc.invoke(clusterNode, TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest().groupId(serializeReplicationGroupId(targetReplicationGroupId(partitionId(binaryRowEx)))).tableId(this.tableId).schemaVersion(binaryRowEx.schemaVersion()).primaryKey(binaryRowEx.tupleSlice()).requestType(RequestType.RO_GET).usePrimary(z).readTimestamp(hybridTimestamp).transactionId(uuid).coordinatorId(uuid2).build());
    }

    private boolean isSinglePartitionBatch(Collection<BinaryRowEx> collection) {
        Iterator<BinaryRowEx> it = collection.iterator();
        int partitionId = partitionId(it.next());
        while (it.hasNext()) {
            if (partitionId != partitionId(it.next())) {
                return false;
            }
        }
        return true;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> collection, InternalTransaction internalTransaction) {
        checkTransactionFinishStarted(internalTransaction);
        return CollectionUtils.nullOrEmpty((Collection<?>) collection) ? CompletableFutures.emptyListCompletedFuture() : (TableUtils.isDirectFlowApplicableTx(internalTransaction) && isSinglePartitionBatch(collection)) ? evaluateReadOnlyPrimaryNode(internalTransaction, collection, (replicationGroupId, l) -> {
            return TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).enlistmentConsistencyToken(l).schemaVersion(((BinaryRowEx) collection.iterator().next()).schemaVersion()).primaryKeys(serializeBinaryTuples(collection)).requestType(RequestType.RO_GET_ALL).build();
        }) : (internalTransaction == null || !internalTransaction.isReadOnly()) ? enlistInTx(collection, internalTransaction, (collection2, internalTransaction2, replicationGroupId2, l2, bool) -> {
            return readWriteMultiRowPkReplicaRequest(RequestType.RW_GET_ALL, collection2, internalTransaction2, replicationGroupId2, l2, bool.booleanValue());
        }, InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder, (list, replicaRequest) -> {
            return false;
        }) : evaluateReadOnlyRecipientNode(partitionId(collection.iterator().next()), internalTransaction.readTimestamp()).thenCompose(clusterNode -> {
            return getAll(collection, internalTransaction.readTimestamp(), internalTransaction.id(), internalTransaction.coordinatorId(), clusterNode);
        });
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> collection, HybridTimestamp hybridTimestamp, @Nullable UUID uuid, @Nullable UUID uuid2, ClusterNode clusterNode) {
        return getAll(collection, hybridTimestamp, uuid, uuid2, clusterNode, true);
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> collection, HybridTimestamp hybridTimestamp, @Nullable UUID uuid, @Nullable UUID uuid2, ClusterNode clusterNode, boolean z) {
        Int2ObjectMap<RowBatch> rowBatchByPartitionId = toRowBatchByPartitionId(collection);
        ObjectIterator it = rowBatchByPartitionId.int2ObjectEntrySet().iterator();
        while (it.hasNext()) {
            Int2ObjectMap.Entry entry = (Int2ObjectMap.Entry) it.next();
            ReadOnlyMultiRowPkReplicaRequest build = TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest().groupId(serializeReplicationGroupId(targetReplicationGroupId(entry.getIntKey()))).tableId(this.tableId).schemaVersion(((RowBatch) entry.getValue()).requestedRows.get(0).schemaVersion()).primaryKeys(serializeBinaryTuples(((RowBatch) entry.getValue()).requestedRows)).requestType(RequestType.RO_GET_ALL).usePrimary(z).readTimestamp(hybridTimestamp).transactionId(uuid).coordinatorId(uuid2).build();
            ((RowBatch) entry.getValue()).resultFuture = this.replicaSvc.invoke(clusterNode, build);
        }
        return collectMultiRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values());
    }

    private ReadWriteMultiRowPkReplicaRequest readWriteMultiRowPkReplicaRequest(RequestType requestType, Collection<? extends BinaryRow> collection, InternalTransaction internalTransaction, ReplicationGroupId replicationGroupId, Long l, boolean z) {
        if ($assertionsDisabled || allSchemaVersionsSame(collection)) {
            return TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).commitPartitionId(serializeReplicationGroupId(internalTransaction.commitPartition())).schemaVersion(collection.iterator().next().schemaVersion()).primaryKeys(serializeBinaryTuples(collection)).transactionId(internalTransaction.id()).enlistmentConsistencyToken(l).requestType(requestType).timestamp(internalTransaction.startTimestamp()).full(z).coordinatorId(internalTransaction.coordinatorId()).build();
        }
        throw new AssertionError("Different schema versions encountered: " + uniqueSchemaVersions(collection));
    }

    private static boolean allSchemaVersionsSame(Collection<? extends BinaryRow> collection) {
        int i = -1;
        boolean z = true;
        for (BinaryRow binaryRow : collection) {
            if (binaryRow != null) {
                if (z) {
                    i = binaryRow.schemaVersion();
                    z = false;
                } else if (binaryRow.schemaVersion() != i) {
                    return false;
                }
            }
        }
        return true;
    }

    private static Set<Integer> uniqueSchemaVersions(Collection<? extends BinaryRow> collection) {
        HashSet hashSet = new HashSet();
        Iterator<? extends BinaryRow> it = collection.iterator();
        while (it.hasNext()) {
            hashSet.add(Integer.valueOf(it.next().schemaVersion()));
        }
        return hashSet;
    }

    private static List<ByteBuffer> serializeBinaryTuples(Collection<? extends BinaryRow> collection) {
        ArrayList arrayList = new ArrayList(collection.size());
        Iterator<? extends BinaryRow> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().tupleSlice());
        }
        return arrayList;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<Void> upsert(BinaryRowEx binaryRowEx, @Nullable InternalTransaction internalTransaction) {
        return enlistInTx(binaryRowEx, internalTransaction, (internalTransaction2, replicationGroupId, l) -> {
            return TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).commitPartitionId(serializeReplicationGroupId(internalTransaction2.commitPartition())).schemaVersion(binaryRowEx.schemaVersion()).binaryTuple(binaryRowEx.tupleSlice()).transactionId(internalTransaction2.id()).enlistmentConsistencyToken(l).requestType(RequestType.RW_UPSERT).timestamp(internalTransaction2.startTimestamp()).full(internalTransaction2.implicit()).coordinatorId(internalTransaction2.coordinatorId()).build();
        }, (r2, replicaRequest) -> {
            return false;
        });
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> collection, @Nullable InternalTransaction internalTransaction) {
        return enlistInTx(collection, internalTransaction, (v1, v2, v3, v4, v5) -> {
            return upsertAllInternal(v1, v2, v3, v4, v5);
        }, RowBatch::allResultFutures, (r2, replicaRequest) -> {
            return false;
        });
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<Void> updateAll(Collection<BinaryRowEx> collection, @Nullable BitSet bitSet, int i) {
        return updateAllWithRetry(collection, bitSet, i, null);
    }

    private CompletableFuture<Void> updateAllWithRetry(Collection<BinaryRowEx> collection, @Nullable BitSet bitSet, int i, @Nullable Long l) {
        InternalTransaction beginExternal = cache() ? this.txManager.beginExternal(this.observableTimestampTracker, true) : this.txManager.beginImplicitRw(this.observableTimestampTracker);
        ReplicationGroupId targetReplicationGroupId = targetReplicationGroupId(i);
        if ($assertionsDisabled || collection.stream().allMatch(binaryRowEx -> {
            return partitionId(binaryRowEx) == i;
        })) {
            return postEnlist(enlistAndInvoke(beginExternal, i, l2 -> {
                return upsertAllInternal(collection, bitSet, beginExternal, targetReplicationGroupId, l2, true);
            }, true, null), false, beginExternal, true).handle((r13, th) -> {
                if (th != null) {
                    long timeout = beginExternal.isReadOnly() ? beginExternal.timeout() : 3000L;
                    long physical = l == null ? beginExternal.startTimestamp().getPhysical() : l.longValue();
                    if (canRetry(th, physical, timeout)) {
                        return updateAllWithRetry(collection, bitSet, i, Long.valueOf(physical));
                    }
                    ExceptionUtils.sneakyThrow(th);
                }
                return CompletableFuture.completedFuture(r13);
            }).thenCompose(Function.identity());
        }
        throw new AssertionError("Invalid batch for partition " + i);
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<BinaryRow> getAndUpsert(BinaryRowEx binaryRowEx, InternalTransaction internalTransaction) {
        checkTransactionFinishStarted(internalTransaction);
        return enlistInTx(binaryRowEx, internalTransaction, (internalTransaction2, replicationGroupId, l) -> {
            return TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).commitPartitionId(serializeReplicationGroupId(internalTransaction2.commitPartition())).schemaVersion(binaryRowEx.schemaVersion()).binaryTuple(binaryRowEx.tupleSlice()).transactionId(internalTransaction2.id()).enlistmentConsistencyToken(l).requestType(RequestType.RW_GET_AND_UPSERT).timestamp(internalTransaction2.startTimestamp()).full(internalTransaction2.implicit()).coordinatorId(internalTransaction2.coordinatorId()).build();
        }, (binaryRow, replicaRequest) -> {
            return false;
        });
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<Boolean> insert(BinaryRowEx binaryRowEx, InternalTransaction internalTransaction) {
        return enlistInTx(binaryRowEx, internalTransaction, (internalTransaction2, replicationGroupId, l) -> {
            return TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).commitPartitionId(serializeReplicationGroupId(internalTransaction2.commitPartition())).schemaVersion(binaryRowEx.schemaVersion()).binaryTuple(binaryRowEx.tupleSlice()).transactionId(internalTransaction2.id()).enlistmentConsistencyToken(l).requestType(RequestType.RW_INSERT).timestamp(internalTransaction2.startTimestamp()).full(internalTransaction2.implicit()).coordinatorId(internalTransaction2.coordinatorId()).build();
        }, (bool, replicaRequest) -> {
            return !bool.booleanValue();
        });
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<List<BinaryRow>> insertAll(Collection<BinaryRowEx> collection, InternalTransaction internalTransaction) {
        return enlistInTx(collection, internalTransaction, (collection2, internalTransaction2, replicationGroupId, l, bool) -> {
            return readWriteMultiRowReplicaRequest(RequestType.RW_INSERT_ALL, collection2, null, internalTransaction2, replicationGroupId, l, bool.booleanValue());
        }, InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder, (list, replicaRequest) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                if (((BinaryRow) it.next()) != null) {
                    return false;
                }
            }
            return true;
        });
    }

    private ReadWriteMultiRowReplicaRequest readWriteMultiRowReplicaRequest(RequestType requestType, Collection<? extends BinaryRow> collection, @Nullable BitSet bitSet, InternalTransaction internalTransaction, ReplicationGroupId replicationGroupId, Long l, boolean z) {
        if ($assertionsDisabled || allSchemaVersionsSame(collection)) {
            return TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).commitPartitionId(serializeReplicationGroupId(internalTransaction.commitPartition())).schemaVersion(collection.iterator().next().schemaVersion()).binaryTuples(serializeBinaryTuples(collection)).deleted(bitSet).transactionId(internalTransaction.id()).enlistmentConsistencyToken(l).requestType(requestType).timestamp(internalTransaction.startTimestamp()).full(z).coordinatorId(internalTransaction.coordinatorId()).build();
        }
        throw new AssertionError("Different schema versions encountered: " + uniqueSchemaVersions(collection));
    }

    private DcrWriteMultiRowReplicaRequest dcrWriteMultiRowReplicaRequest(RequestType requestType, Collection<? extends BinaryRow> collection, @Nullable BitSet bitSet, InternalTransaction internalTransaction, ReplicationGroupId replicationGroupId, Long l, boolean z) {
        if ($assertionsDisabled || allSchemaVersionsSame(collection)) {
            return TABLE_MESSAGES_FACTORY.dcrWriteMultiRowReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).commitPartitionId(serializeReplicationGroupId(internalTransaction.commitPartition())).schemaVersion(collection.iterator().next().schemaVersion()).binaryTuples(serializeBinaryTuples(collection)).deleted(bitSet).transactionId(internalTransaction.id()).enlistmentConsistencyToken(l).requestType(requestType).timestamp(this.clockService.now()).full(z).coordinatorId(internalTransaction.coordinatorId()).lastCommitTimestamps(lastCommitTimestamps(collection)).build();
        }
        throw new AssertionError("Different schema versions encountered: " + uniqueSchemaVersions(collection));
    }

    private static List<Long> lastCommitTimestamps(Collection<? extends BinaryRow> collection) {
        ArrayList arrayList = new ArrayList(collection.size());
        for (BinaryRow binaryRow : collection) {
            if (binaryRow instanceof TimedRow) {
                arrayList.add(Long.valueOf(((TimedRow) binaryRow).lastCommitTimestamp().longValue()));
            } else {
                arrayList.add(null);
            }
        }
        return arrayList;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<Boolean> replace(BinaryRowEx binaryRowEx, InternalTransaction internalTransaction) {
        return enlistInTx(binaryRowEx, internalTransaction, (internalTransaction2, replicationGroupId, l) -> {
            return TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).commitPartitionId(serializeReplicationGroupId(internalTransaction2.commitPartition())).schemaVersion(binaryRowEx.schemaVersion()).binaryTuple(binaryRowEx.tupleSlice()).transactionId(internalTransaction2.id()).enlistmentConsistencyToken(l).requestType(RequestType.RW_REPLACE_IF_EXIST).timestamp(internalTransaction2.startTimestamp()).full(internalTransaction2.implicit()).coordinatorId(internalTransaction2.coordinatorId()).build();
        }, (bool, replicaRequest) -> {
            return !bool.booleanValue();
        });
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<Boolean> replace(BinaryRowEx binaryRowEx, BinaryRowEx binaryRowEx2, InternalTransaction internalTransaction) {
        if ($assertionsDisabled || binaryRowEx.schemaVersion() == binaryRowEx2.schemaVersion()) {
            return enlistInTx(binaryRowEx2, internalTransaction, (internalTransaction2, replicationGroupId, l) -> {
                return TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).commitPartitionId(serializeReplicationGroupId(internalTransaction2.commitPartition())).schemaVersion(binaryRowEx.schemaVersion()).oldBinaryTuple(binaryRowEx.tupleSlice()).newBinaryTuple(binaryRowEx2.tupleSlice()).transactionId(internalTransaction2.id()).enlistmentConsistencyToken(l).requestType(RequestType.RW_REPLACE).timestamp(internalTransaction2.startTimestamp()).full(internalTransaction2.implicit()).coordinatorId(internalTransaction2.coordinatorId()).build();
            }, (bool, replicaRequest) -> {
                return !bool.booleanValue();
            });
        }
        throw new AssertionError("Mismatching schema versions: old " + binaryRowEx.schemaVersion() + ", new " + binaryRowEx2.schemaVersion());
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<BinaryRow> getAndReplace(BinaryRowEx binaryRowEx, InternalTransaction internalTransaction) {
        checkTransactionFinishStarted(internalTransaction);
        return enlistInTx(binaryRowEx, internalTransaction, (internalTransaction2, replicationGroupId, l) -> {
            return TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).commitPartitionId(serializeReplicationGroupId(internalTransaction2.commitPartition())).schemaVersion(binaryRowEx.schemaVersion()).binaryTuple(binaryRowEx.tupleSlice()).transactionId(internalTransaction2.id()).enlistmentConsistencyToken(l).requestType(RequestType.RW_GET_AND_REPLACE).timestamp(internalTransaction2.startTimestamp()).full(internalTransaction2.implicit()).coordinatorId(internalTransaction2.coordinatorId()).build();
        }, (binaryRow, replicaRequest) -> {
            return binaryRow == null;
        });
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<Boolean> delete(BinaryRowEx binaryRowEx, InternalTransaction internalTransaction) {
        return enlistInTx(binaryRowEx, internalTransaction, (internalTransaction2, replicationGroupId, l) -> {
            return TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).commitPartitionId(serializeReplicationGroupId(internalTransaction2.commitPartition())).schemaVersion(binaryRowEx.schemaVersion()).primaryKey(binaryRowEx.tupleSlice()).transactionId(internalTransaction2.id()).enlistmentConsistencyToken(l).requestType(RequestType.RW_DELETE).timestamp(internalTransaction2.startTimestamp()).full(internalTransaction2.implicit()).coordinatorId(internalTransaction2.coordinatorId()).build();
        }, (bool, replicaRequest) -> {
            return !bool.booleanValue();
        });
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<Boolean> deleteExact(BinaryRowEx binaryRowEx, InternalTransaction internalTransaction) {
        return enlistInTx(binaryRowEx, internalTransaction, (internalTransaction2, replicationGroupId, l) -> {
            return TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).commitPartitionId(serializeReplicationGroupId(internalTransaction2.commitPartition())).schemaVersion(binaryRowEx.schemaVersion()).binaryTuple(binaryRowEx.tupleSlice()).transactionId(internalTransaction2.id()).enlistmentConsistencyToken(l).requestType(RequestType.RW_DELETE_EXACT).timestamp(internalTransaction2.startTimestamp()).full(internalTransaction2.implicit()).coordinatorId(internalTransaction2.coordinatorId()).build();
        }, (bool, replicaRequest) -> {
            return !bool.booleanValue();
        });
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<BinaryRow> getAndDelete(BinaryRowEx binaryRowEx, InternalTransaction internalTransaction) {
        checkTransactionFinishStarted(internalTransaction);
        return enlistInTx(binaryRowEx, internalTransaction, (internalTransaction2, replicationGroupId, l) -> {
            return TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).commitPartitionId(serializeReplicationGroupId(internalTransaction2.commitPartition())).schemaVersion(binaryRowEx.schemaVersion()).primaryKey(binaryRowEx.tupleSlice()).transactionId(internalTransaction2.id()).enlistmentConsistencyToken(l).requestType(RequestType.RW_GET_AND_DELETE).timestamp(internalTransaction2.startTimestamp()).full(internalTransaction2.implicit()).coordinatorId(internalTransaction2.coordinatorId()).build();
        }, (binaryRow, replicaRequest) -> {
            return binaryRow == null;
        });
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<List<BinaryRow>> deleteAll(Collection<BinaryRowEx> collection, InternalTransaction internalTransaction) {
        return enlistInTx(collection, internalTransaction, (collection2, internalTransaction2, replicationGroupId, l, bool) -> {
            return readWriteMultiRowPkReplicaRequest(RequestType.RW_DELETE_ALL, collection2, internalTransaction2, replicationGroupId, l, bool.booleanValue());
        }, InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder, (list, replicaRequest) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                if (((BinaryRow) it.next()) != null) {
                    return false;
                }
            }
            return true;
        });
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<List<BinaryRow>> deleteAllExact(Collection<BinaryRowEx> collection, InternalTransaction internalTransaction) {
        return enlistInTx(collection, internalTransaction, (collection2, internalTransaction2, replicationGroupId, l, bool) -> {
            return readWriteMultiRowReplicaRequest(RequestType.RW_DELETE_EXACT_ALL, collection2, null, internalTransaction2, replicationGroupId, l, bool.booleanValue());
        }, InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder, (list, replicaRequest) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                if (((BinaryRow) it.next()) != null) {
                    return false;
                }
            }
            return true;
        });
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public Flow.Publisher<BinaryRow> lookup(int i, UUID uuid, HybridTimestamp hybridTimestamp, ClusterNode clusterNode, int i2, BinaryTuple binaryTuple, @Nullable BitSet bitSet, UUID uuid2) {
        return readOnlyScan(i, uuid, hybridTimestamp, clusterNode, Integer.valueOf(i2), binaryTuple, null, null, 0, true, bitSet, uuid2);
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public Flow.Publisher<BinaryRow> lookup(int i, UUID uuid, ReplicationGroupId replicationGroupId, UUID uuid2, PrimaryReplica primaryReplica, int i2, BinaryTuple binaryTuple, @Nullable BitSet bitSet) {
        return readWriteScan(i, uuid, replicationGroupId, uuid2, primaryReplica, Integer.valueOf(i2), binaryTuple, null, null, 0, bitSet);
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public Flow.Publisher<BinaryRowAndRowId> scanInterval(int i, final UUID uuid, final HybridTimestamp hybridTimestamp, final HybridTimestamp hybridTimestamp2, final ClusterNode clusterNode, final UUID uuid2) {
        validatePartitionIndex(i);
        final ReplicationGroupId targetReplicationGroupId = targetReplicationGroupId(i);
        return new PartitionScanPublisher<BinaryRowAndRowId>(new ReadOnlyInflightBatchRequestTracker(this.transactionInflights, uuid)) { // from class: org.apache.ignite3.internal.table.distributed.storage.InternalTableImpl.1
            @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher
            protected CompletableFuture<Collection<BinaryRowAndRowId>> retrieveBatch(long j, int i2) {
                return InternalTableImpl.this.replicaSvc.invoke(clusterNode, InternalTableImpl.TABLE_MESSAGES_FACTORY.readOnlyIntervalScanRetrieveBatchReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(targetReplicationGroupId)).lowerBoundTimestamp(hybridTimestamp).readTimestamp(hybridTimestamp2).usePrimary(true).transactionId(uuid).scanId(j).batchSize(i2).coordinatorId(uuid2).build());
            }

            @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher
            protected CompletableFuture<Void> onClose(boolean z, long j, @Nullable Throwable th) {
                return InternalTableImpl.this.completeScan(uuid, targetReplicationGroupId, j, th, clusterNode.name(), z || th != null);
            }
        };
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public Flow.Publisher<BinaryRow> scan(int i, UUID uuid, HybridTimestamp hybridTimestamp, ClusterNode clusterNode, UUID uuid2, boolean z, @Nullable BitSet bitSet) {
        return readOnlyScan(i, uuid, hybridTimestamp, clusterNode, null, null, null, null, 0, z, bitSet, uuid2);
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public Flow.Publisher<BinaryRow> scan(int i, UUID uuid, HybridTimestamp hybridTimestamp, ClusterNode clusterNode, @Nullable Integer num, @Nullable BinaryTuplePrefix binaryTuplePrefix, @Nullable BinaryTuplePrefix binaryTuplePrefix2, int i2, @Nullable BitSet bitSet, UUID uuid2) {
        return readOnlyScan(i, uuid, hybridTimestamp, clusterNode, num, null, binaryTuplePrefix, binaryTuplePrefix2, i2, true, bitSet, uuid2);
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public Flow.Publisher<BinaryRow> scan(int i, @Nullable InternalTransaction internalTransaction, @Nullable Integer num, @Nullable BinaryTuplePrefix binaryTuplePrefix, @Nullable BinaryTuplePrefix binaryTuplePrefix2, int i2, @Nullable BitSet bitSet) {
        return readWriteScan(i, internalTransaction, num, null, binaryTuplePrefix, binaryTuplePrefix2, i2, bitSet);
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public Flow.Publisher<BinaryRow> scan(int i, UUID uuid, ReplicationGroupId replicationGroupId, UUID uuid2, PrimaryReplica primaryReplica, @Nullable Integer num, @Nullable BinaryTuplePrefix binaryTuplePrefix, @Nullable BinaryTuplePrefix binaryTuplePrefix2, int i2, @Nullable BitSet bitSet) {
        return readWriteScan(i, uuid, replicationGroupId, uuid2, primaryReplica, num, null, binaryTuplePrefix, binaryTuplePrefix2, i2, bitSet);
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public Flow.Publisher<BinaryRow> scan(int i, @Nullable InternalTransaction internalTransaction, @Nullable BitSet bitSet) {
        return readWriteScan(i, internalTransaction, null, null, null, null, 0, bitSet);
    }

    private Flow.Publisher<BinaryRow> readOnlyScan(int i, final UUID uuid, final HybridTimestamp hybridTimestamp, final ClusterNode clusterNode, @Nullable final Integer num, @Nullable final BinaryTuple binaryTuple, @Nullable final BinaryTuplePrefix binaryTuplePrefix, @Nullable final BinaryTuplePrefix binaryTuplePrefix2, final int i2, final boolean z, @Nullable final BitSet bitSet, final UUID uuid2) {
        validatePartitionIndex(i);
        final ReplicationGroupId targetReplicationGroupId = targetReplicationGroupId(i);
        return new PartitionScanPublisher<BinaryRow>(new ReadOnlyInflightBatchRequestTracker(this.transactionInflights, uuid)) { // from class: org.apache.ignite3.internal.table.distributed.storage.InternalTableImpl.2
            @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher
            protected CompletableFuture<Collection<BinaryRow>> retrieveBatch(long j, int i3) {
                return InternalTableImpl.this.replicaSvc.invoke(clusterNode, InternalTableImpl.TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(targetReplicationGroupId)).tableId(InternalTableImpl.this.tableId).readTimestamp(hybridTimestamp).transactionId(uuid).scanId(j).batchSize(i3).usePrimary(z).indexToUse(num).exactKey(InternalTableImpl.binaryTupleMessage(binaryTuple)).lowerBoundPrefix(InternalTableImpl.binaryTupleMessage(binaryTuplePrefix)).upperBoundPrefix(InternalTableImpl.binaryTupleMessage(binaryTuplePrefix2)).flags(i2).columnsToInclude(bitSet).coordinatorId(uuid2).build());
            }

            @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher
            protected CompletableFuture<Void> onClose(boolean z2, long j, @Nullable Throwable th) {
                return InternalTableImpl.this.completeScan(uuid, targetReplicationGroupId, j, th, clusterNode.name(), z2 || th != null);
            }
        };
    }

    private Flow.Publisher<BinaryRow> readWriteScan(final int i, @Nullable final InternalTransaction internalTransaction, @Nullable final Integer num, @Nullable final BinaryTuple binaryTuple, @Nullable final BinaryTuplePrefix binaryTuplePrefix, @Nullable final BinaryTuplePrefix binaryTuplePrefix2, final int i2, @Nullable final BitSet bitSet) {
        if (internalTransaction != null) {
            if (internalTransaction.isReadOnly()) {
                throw new TransactionException(ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, "Failed to enlist read-write operation into read-only transaction txId={" + internalTransaction.id() + "}");
            }
            if (internalTransaction.external() != cache()) {
                throw new TransactionException(GridgainErrorGroups.Cache.TX_INCOMPATIBLE_OPERATION_ERR, "Requested operation is incompatible with cache or table type txId={" + internalTransaction.id() + "}");
            }
        }
        validatePartitionIndex(i);
        final InternalTransaction startImplicitRwTxIfNeeded = startImplicitRwTxIfNeeded(internalTransaction);
        return new PartitionScanPublisher<BinaryRow>(READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER) { // from class: org.apache.ignite3.internal.table.distributed.storage.InternalTableImpl.3
            @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher
            protected CompletableFuture<Collection<BinaryRow>> retrieveBatch(long j, int i3) {
                return InternalTableImpl.this.enlistCursorInTx(startImplicitRwTxIfNeeded, i, j, i3, num, binaryTuple, binaryTuplePrefix, binaryTuplePrefix2, i2, bitSet);
            }

            @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher
            protected CompletableFuture<Void> onClose(boolean z, long j, @Nullable Throwable th) {
                CompletableFuture<Void> completeScan;
                if (startImplicitRwTxIfNeeded.implicit()) {
                    completeScan = CompletableFutures.completedOrFailedFuture(null, th);
                } else {
                    ReplicationGroupId targetReplicationGroupId = InternalTableImpl.this.targetReplicationGroupId(i);
                    PendingTxPartitionEnlistment enlistedPartition = internalTransaction.enlistedPartition(targetReplicationGroupId);
                    completeScan = enlistedPartition != null ? InternalTableImpl.this.completeScan(internalTransaction.id(), targetReplicationGroupId, j, th, enlistedPartition.primaryNodeConsistentId(), z) : CompletableFutures.completedOrFailedFuture(null, th);
                }
                return InternalTableImpl.this.postEnlist(completeScan, z, startImplicitRwTxIfNeeded, startImplicitRwTxIfNeeded.implicit() && !z);
            }
        };
    }

    private Flow.Publisher<BinaryRow> readWriteScan(int i, final UUID uuid, final ReplicationGroupId replicationGroupId, final UUID uuid2, final PrimaryReplica primaryReplica, @Nullable final Integer num, @Nullable final BinaryTuple binaryTuple, @Nullable final BinaryTuplePrefix binaryTuplePrefix, @Nullable final BinaryTuplePrefix binaryTuplePrefix2, final int i2, @Nullable final BitSet bitSet) {
        final ReplicationGroupId targetReplicationGroupId = targetReplicationGroupId(i);
        return new PartitionScanPublisher<BinaryRow>(READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER) { // from class: org.apache.ignite3.internal.table.distributed.storage.InternalTableImpl.4
            @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher
            protected CompletableFuture<Collection<BinaryRow>> retrieveBatch(long j, int i3) {
                return InternalTableImpl.this.replicaSvc.invoke(primaryReplica.node(), InternalTableImpl.TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(targetReplicationGroupId)).tableId(InternalTableImpl.this.tableId).timestamp(TransactionIds.beginTimestamp(uuid)).transactionId(uuid).scanId(j).indexToUse(num).exactKey(InternalTableImpl.binaryTupleMessage(binaryTuple)).lowerBoundPrefix(InternalTableImpl.binaryTupleMessage(binaryTuplePrefix)).upperBoundPrefix(InternalTableImpl.binaryTupleMessage(binaryTuplePrefix2)).flags(i2).columnsToInclude(bitSet).batchSize(i3).enlistmentConsistencyToken(Long.valueOf(primaryReplica.enlistmentConsistencyToken())).full(false).commitPartitionId(InternalTableImpl.serializeReplicationGroupId(replicationGroupId)).coordinatorId(uuid2).build());
            }

            @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher
            protected CompletableFuture<Void> onClose(boolean z, long j, @Nullable Throwable th) {
                return InternalTableImpl.this.completeScan(uuid, targetReplicationGroupId, j, th, primaryReplica.node().name(), z);
            }
        };
    }

    private CompletableFuture<Void> completeScan(UUID uuid, ReplicationGroupId replicationGroupId, long j, Throwable th, String str, boolean z) {
        CompletableFuture nullCompletedFuture = CompletableFutures.nullCompletedFuture();
        if (z) {
            nullCompletedFuture = this.replicaSvc.invoke(str, TABLE_MESSAGES_FACTORY.scanCloseReplicaRequest().groupId(serializeReplicationGroupId(replicationGroupId)).tableId(this.tableId).transactionId(uuid).scanId(j).build());
        }
        return nullCompletedFuture.handle((r4, th2) -> {
            CompletableFuture nullCompletedFuture2 = CompletableFutures.nullCompletedFuture();
            if (th != null) {
                if (th2 != null) {
                    th.addSuppressed(th2);
                }
                nullCompletedFuture2 = CompletableFuture.failedFuture(th);
            } else if (th2 != null) {
                nullCompletedFuture2 = CompletableFuture.failedFuture(th2);
            }
            return nullCompletedFuture2;
        }).thenCompose(Function.identity());
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public Flow.Publisher<BinaryRow> performSecondaryStorageOperation(final int i, final UUID uuid, final UUID uuid2, final HybridTimestamp hybridTimestamp, final ClusterNode clusterNode, final StorageOptimizedOperation storageOptimizedOperation) {
        validatePartitionIndex(i);
        return new PartitionScanPublisher(new ReadOnlyInflightBatchRequestTracker(this.transactionInflights, uuid)) { // from class: org.apache.ignite3.internal.table.distributed.storage.InternalTableImpl.5
            @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher
            protected CompletableFuture<Collection> retrieveBatch(long j, int i2) {
                return InternalTableImpl.this.replicaSvc.invoke(clusterNode, InternalTableImpl.TABLE_MESSAGES_FACTORY.readOnlyStorageOperationReplicaRequest().groupId(InternalTableImpl.serializeReplicationGroupId(InternalTableImpl.this.targetReplicationGroupId(i))).readTimestamp(hybridTimestamp).scanId(j).transactionId(uuid).coordinatorId(uuid2).batchSize(i2).usePrimary(false).storageOperation(storageOptimizedOperation).build());
            }

            @Override // org.apache.ignite3.internal.table.distributed.storage.PartitionScanPublisher
            protected CompletableFuture<Void> onClose(boolean z, long j, @Nullable Throwable th) {
                return CompletableFutures.nullCompletedFuture();
            }
        };
    }

    private void validatePartitionIndex(int i) {
        if (i < 0 || i >= this.partitions) {
            throw new IllegalArgumentException(IgniteStringFormatter.format("Invalid partition [partition={}, minValue={}, maxValue={}].", Integer.valueOf(i), 0, Integer.valueOf(this.partitions - 1)));
        }
    }

    Int2ObjectMap<RowBatch> toRowBatchByPartitionId(Collection<BinaryRowEx> collection) {
        Int2ObjectOpenHashMap int2ObjectOpenHashMap = new Int2ObjectOpenHashMap();
        int i = 0;
        for (BinaryRowEx binaryRowEx : collection) {
            int i2 = i;
            i++;
            ((RowBatch) int2ObjectOpenHashMap.computeIfAbsent(partitionId(binaryRowEx), i3 -> {
                return new RowBatch();
            })).add(binaryRowEx, i2);
        }
        return int2ObjectOpenHashMap;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public TxStateStorage txStateStorage() {
        return this.txStateStorage;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public int partitionId(BinaryRowEx binaryRowEx) {
        return IgniteUtils.safeAbs(binaryRowEx.colocationHash()) % this.partitions;
    }

    public static CompletableFuture<List<BinaryRow>> collectRejectedRowsResponsesWithRestoreOrder(Collection<RowBatch> collection) {
        return collectMultiRowsResponsesWithRestoreOrder(collection, rowBatch -> {
            ArrayList arrayList = new ArrayList();
            List list = (List) rowBatch.getCompletedResult();
            if (!$assertionsDisabled && rowBatch.requestedRows.size() != list.size()) {
                throw new AssertionError("Replication response does not fit to request [requestRows=" + rowBatch.requestedRows.size() + "responseRows=" + list.size() + "]");
            }
            for (int i = 0; i < list.size(); i++) {
                arrayList.add(list.get(i) != null ? null : rowBatch.requestedRows.get(i));
            }
            return arrayList;
        }, true);
    }

    static CompletableFuture<List<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(Collection<RowBatch> collection) {
        return collectMultiRowsResponsesWithRestoreOrder(collection, rowBatch -> {
            return (Collection) rowBatch.getCompletedResult();
        }, false);
    }

    private static CompletableFuture<List<BinaryRow>> collectMultiRowsResponsesWithRestoreOrder(Collection<RowBatch> collection, Function<RowBatch, Collection<BinaryRow>> function, boolean z) {
        return RowBatch.allResultFutures(collection).thenApply(r8 -> {
            BinaryRow[] binaryRowArr = new BinaryRow[RowBatch.getTotalRequestedRowSize(collection)];
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                RowBatch rowBatch = (RowBatch) it.next();
                Collection collection2 = (Collection) function.apply(rowBatch);
                if (!$assertionsDisabled && collection2 == null) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && collection2.size() != rowBatch.requestedRows.size()) {
                    throw new AssertionError("batchResult=" + collection2.size() + ", requestedRows=" + rowBatch.requestedRows.size());
                }
                int i = 0;
                Iterator it2 = collection2.iterator();
                while (it2.hasNext()) {
                    int i2 = i;
                    i++;
                    binaryRowArr[rowBatch.getOriginalRowIndex(i2)] = (BinaryRow) it2.next();
                }
            }
            ArrayList arrayList = new ArrayList();
            for (BinaryRow binaryRow : binaryRowArr) {
                if (!z || binaryRow != null) {
                    arrayList.add(binaryRow);
                }
            }
            return arrayList;
        });
    }

    protected CompletableFuture<PendingTxPartitionEnlistment> enlist(int i, InternalTransaction internalTransaction) {
        HybridTimestamp startTimestamp = internalTransaction.startTimestamp();
        ReplicationGroupId targetReplicationGroupId = targetReplicationGroupId(i);
        if (internalTransaction.external()) {
            internalTransaction.assignCommitPartition(IgniteSystemProperties.enabledColocation() ? ZonePartitionId.NOT_EXISTING : TablePartitionId.NOT_EXISTING);
        } else {
            internalTransaction.assignCommitPartition(targetReplicationGroupId);
        }
        ReplicaMeta currentPrimaryReplica = this.placementDriver.getCurrentPrimaryReplica(targetReplicationGroupId, startTimestamp);
        Function<? super ReplicaMeta, ? extends U> function = replicaMeta -> {
            ReplicationGroupId targetReplicationGroupId2 = targetReplicationGroupId(i);
            internalTransaction.enlist(targetReplicationGroupId2, this.tableId, getClusterNode(replicaMeta), enlistmentConsistencyToken(replicaMeta));
            return internalTransaction.enlistedPartition(targetReplicationGroupId2);
        };
        if (currentPrimaryReplica == null || this.clusterNodeResolver.getById(currentPrimaryReplica.getLeaseholderId()) == null) {
            return partitionMeta(targetReplicationGroupId, startTimestamp).thenApply(function);
        }
        try {
            return CompletableFuture.completedFuture((PendingTxPartitionEnlistment) function.apply(currentPrimaryReplica));
        } catch (IgniteException e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<ClusterNode> partitionLocation(ReplicationGroupId replicationGroupId) {
        return partitionMeta(replicationGroupId, this.clockService.current()).thenApply(this::getClusterNode);
    }

    private CompletableFuture<ReplicaMeta> partitionMeta(ReplicationGroupId replicationGroupId, HybridTimestamp hybridTimestamp) {
        return awaitPrimaryReplica(replicationGroupId, hybridTimestamp).exceptionally(th -> {
            throw ((TransactionException) ExceptionUtils.withCause((v1, v2, v3, v4) -> {
                return new TransactionException(v1, v2, v3, v4);
            }, ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica [replicationGroupId=" + replicationGroupId + ", awaitTimestamp=" + hybridTimestamp + "]", th));
        });
    }

    private ClusterNode getClusterNode(ReplicaMeta replicaMeta) {
        UUID leaseholderId = replicaMeta.getLeaseholderId();
        ClusterNode byId = leaseholderId == null ? null : this.clusterNodeResolver.getById(leaseholderId);
        if (byId == null) {
            throw new TransactionException(ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR, String.format("Failed to resolve the primary replica node [id=%s]", leaseholderId));
        }
        return byId;
    }

    @Override // org.apache.ignite3.internal.table.InternalTable, org.apache.ignite3.internal.close.ManuallyCloseable
    public void close() {
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public boolean cache() {
        return this.cache;
    }

    protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int i, HybridTimestamp hybridTimestamp) {
        ReplicationGroupId targetReplicationGroupId = targetReplicationGroupId(i);
        return awaitPrimaryReplica(targetReplicationGroupId, hybridTimestamp).handle((replicaMeta, th) -> {
            if (th != null) {
                throw ((TransactionException) ExceptionUtils.withCause((v1, v2, v3) -> {
                    return new TransactionException(v1, v2, v3);
                }, ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR, th));
            }
            if (replicaMeta == null) {
                throw createFailedGetPrimaryReplicaTransactionException(targetReplicationGroupId, hybridTimestamp);
            }
            return getClusterNode(replicaMeta);
        });
    }

    private static TransactionException createFailedGetPrimaryReplicaTransactionException(ReplicationGroupId replicationGroupId, HybridTimestamp hybridTimestamp) {
        return new TransactionException(ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR, IgniteStringFormatter.format("Failed to get the primary replica [replicationGroupId={}, awaitTimestamp={}", replicationGroupId, hybridTimestamp));
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    @Nullable
    public PendingComparableValuesTracker<HybridTimestamp, Void> getPartitionSafeTimeTracker(int i) {
        return (PendingComparableValuesTracker) this.safeTimeTrackerByPartitionId.get(i);
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    @Nullable
    public PendingComparableValuesTracker<Long, Void> getPartitionStorageIndexTracker(int i) {
        return (PendingComparableValuesTracker) this.storageIndexTrackerByPartitionId.get(i);
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public ScheduledExecutorService streamerFlushExecutor() {
        return this.streamerFlushExecutor.get();
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public CompletableFuture<Long> estimatedSize() {
        HybridTimestamp current = this.clockService.current();
        CompletableFuture[] completableFutureArr = new CompletableFuture[this.partitions];
        for (int i = 0; i < this.partitions; i++) {
            ReplicationGroupId targetReplicationGroupId = targetReplicationGroupId(i);
            ReplicationGroupIdMessage serializeReplicationGroupId = serializeReplicationGroupId(targetReplicationGroupId);
            completableFutureArr[i] = sendToPrimaryWithRetry(targetReplicationGroupId, current, 5, replicaMeta -> {
                return TABLE_MESSAGES_FACTORY.getEstimatedSizeRequest().groupId(serializeReplicationGroupId).tableId(this.tableId).enlistmentConsistencyToken(Long.valueOf(enlistmentConsistencyToken(replicaMeta))).build();
            });
        }
        return CompletableFuture.allOf(completableFutureArr).thenApply(r4 -> {
            return Long.valueOf(Arrays.stream(completableFutureArr).mapToLong(completableFuture -> {
                return ((Long) completableFuture.join()).longValue();
            }).sum());
        });
    }

    private ReplicationGroupId targetReplicationGroupId(int i) {
        return IgniteSystemProperties.enabledColocation() ? new ZonePartitionId(this.zoneId, i) : new TablePartitionId(this.tableId, i);
    }

    private static ReplicationGroupIdMessage serializeReplicationGroupId(ReplicationGroupId replicationGroupId) {
        return ReplicaMessageUtils.toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, replicationGroupId);
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public StreamerReceiverRunner streamerReceiverRunner() {
        return this.streamerReceiverRunner;
    }

    private <T> CompletableFuture<T> sendToPrimaryWithRetry(ReplicationGroupId replicationGroupId, HybridTimestamp hybridTimestamp, int i, Function<ReplicaMeta, ReplicaRequest> function) {
        return awaitPrimaryReplica(replicationGroupId, hybridTimestamp).thenCompose(replicaMeta -> {
            String leaseholder = replicaMeta.getLeaseholder();
            if ($assertionsDisabled || leaseholder != null) {
                return this.replicaSvc.invoke(leaseholder, (ReplicaRequest) function.apply(replicaMeta)).handle((obj, th) -> {
                    if (th == null) {
                        return CompletableFuture.completedFuture(obj);
                    }
                    Throwable unwrapCause = ExceptionUtils.unwrapCause(th);
                    if ((unwrapCause instanceof ReplicationException) && unwrapCause.getCause() != null) {
                        unwrapCause = unwrapCause.getCause();
                    }
                    if (!(unwrapCause instanceof PrimaryReplicaMissException) && !(unwrapCause instanceof UnresolvableConsistentIdException) && !(unwrapCause instanceof ReplicationTimeoutException)) {
                        throw new IgniteException(ErrorGroups.Common.INTERNAL_ERR, unwrapCause);
                    }
                    if (i == 0) {
                        throw new IgniteException(ErrorGroups.Replicator.REPLICA_MISS_ERR, unwrapCause);
                    }
                    return sendToPrimaryWithRetry(replicationGroupId, replicaMeta.getExpirationTime().tick(), i - 1, function);
                });
            }
            throw new AssertionError();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public <T> void queryContinuously(Flow.Subscriber<TableRowEventBatch<T>> subscriber, @Nullable ContinuousQueryOptions continuousQueryOptions, BiFunction<BinaryRow, SchemaDescriptor, T> biFunction) {
        queryContinuously(subscriber, IntStream.range(0, this.partitions).toArray(), continuousQueryOptions, biFunction);
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public <T> void queryContinuously(Flow.Subscriber<TableRowEventBatch<T>> subscriber, int[] iArr, @Nullable ContinuousQueryOptions continuousQueryOptions, BiFunction<BinaryRow, SchemaDescriptor, T> biFunction) {
        if (!$assertionsDisabled && subscriber == null) {
            throw new AssertionError("subscriber != null");
        }
        if (!$assertionsDisabled && biFunction == null) {
            throw new AssertionError("mapper != null");
        }
        new ContinuousQuery(subscriber, continuousQueryOptions, biFunction, this, new EmbeddedContinuousQueryMetricSink(), this.partitions, iArr).run();
    }

    @Override // org.apache.ignite3.internal.continuousquery.ContinuousQueryRequestSender
    public CompletableFuture<ContinuousQueryScanResultWithSchema<BinaryRow, SchemaDescriptor>> sendContinuousQueryRequest(ContinuousQueryRequest continuousQueryRequest) {
        return partitionLocation(targetReplicationGroupId(continuousQueryRequest.partId())).thenCompose(clusterNode -> {
            Pair<CompletableFuture<ContinuousQueryScanResult<BinaryRow>>, Long> addPendingRequest = this.continuousQueryResponseHandler.addPendingRequest();
            CompletableFuture<ContinuousQueryScanResult<BinaryRow>> first = addPendingRequest.getFirst();
            long longValue = addPendingRequest.getSecond().longValue();
            return this.replicaSvc.invoke(clusterNode, TABLE_MESSAGES_FACTORY.continuousQueryScanRequest().groupId(serializeReplicationGroupId(targetReplicationGroupId(continuousQueryRequest.partId()))).lowerBoundTimestampLong(continuousQueryRequest.lowerBoundTs()).lowerBoundRowId(continuousQueryRequest.lowerBoundRowId()).maxItems(continuousQueryRequest.maxItems()).eventTypes(continuousQueryRequest.eventTypes()).columnNames(continuousQueryRequest.columnNames()).requestId(longValue).build()).thenCompose(obj -> {
                return first;
            }).thenCompose(continuousQueryScanResult -> {
                if (!$assertionsDisabled && continuousQueryScanResult.error() != null) {
                    throw new AssertionError("Error not handled in ContinuousQueryResponseHandler: " + continuousQueryScanResult.error());
                }
                if (continuousQueryScanResult.rows().isEmpty()) {
                    return CompletableFuture.completedFuture(new ContinuousQueryScanResultWithSchema(continuousQueryScanResult, (SchemaDescriptor) null));
                }
                if ($assertionsDisabled || allSchemaVersionsSame((Collection) continuousQueryScanResult.rows().stream().flatMap(rowUpdateInfo -> {
                    return Stream.of((Object[]) new BinaryRow[]{(BinaryRow) rowUpdateInfo.oldRow(), (BinaryRow) rowUpdateInfo.row()});
                }).collect(Collectors.toList()))) {
                    return this.schemaRegistry.schemaAsync(continuousQueryScanResult.schemaVersion()).thenApply(schemaDescriptor -> {
                        return new ContinuousQueryScanResultWithSchema(continuousQueryScanResult, schemaDescriptor);
                    });
                }
                throw new AssertionError();
            }).exceptionally(th -> {
                this.continuousQueryResponseHandler.removePendingRequest(longValue);
                Throwable unwrapCause = ExceptionUtils.unwrapCause(th);
                if (unwrapCause instanceof TableNotFoundException) {
                    throw new TableNotFoundException(this.tableName, th);
                }
                throw new ReplicationException(unwrapCause instanceof IgniteException ? ((IgniteException) unwrapCause).code() : ErrorGroups.Replicator.REPLICA_COMMON_ERR, "Failed to send continuous query request: " + th.getMessage(), th);
            });
        });
    }

    public void updatePartitionTrackers(int i, PendingComparableValuesTracker<HybridTimestamp, Void> pendingComparableValuesTracker, PendingComparableValuesTracker<Long, Void> pendingComparableValuesTracker2) {
        PendingComparableValuesTracker pendingComparableValuesTracker3;
        PendingComparableValuesTracker pendingComparableValuesTracker4;
        synchronized (this.updatePartitionMapsMux) {
            Int2ObjectOpenHashMap int2ObjectOpenHashMap = new Int2ObjectOpenHashMap(this.partitions);
            Int2ObjectOpenHashMap int2ObjectOpenHashMap2 = new Int2ObjectOpenHashMap(this.partitions);
            int2ObjectOpenHashMap.putAll(this.safeTimeTrackerByPartitionId);
            int2ObjectOpenHashMap2.putAll(this.storageIndexTrackerByPartitionId);
            pendingComparableValuesTracker3 = (PendingComparableValuesTracker) int2ObjectOpenHashMap.put(i, pendingComparableValuesTracker);
            pendingComparableValuesTracker4 = (PendingComparableValuesTracker) int2ObjectOpenHashMap2.put(i, pendingComparableValuesTracker2);
            this.safeTimeTrackerByPartitionId = int2ObjectOpenHashMap;
            this.storageIndexTrackerByPartitionId = int2ObjectOpenHashMap2;
        }
        if (pendingComparableValuesTracker3 != null) {
            pendingComparableValuesTracker3.close();
        }
        if (pendingComparableValuesTracker4 != null) {
            pendingComparableValuesTracker4.close();
        }
    }

    private ReplicaRequest upsertAllInternal(Collection<? extends BinaryRow> collection, InternalTransaction internalTransaction, ReplicationGroupId replicationGroupId, Long l, boolean z) {
        return readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL, collection, null, internalTransaction, replicationGroupId, l, z);
    }

    private ReplicaRequest upsertAllInternal(Collection<? extends BinaryRow> collection, @Nullable BitSet bitSet, InternalTransaction internalTransaction, ReplicationGroupId replicationGroupId, Long l, boolean z) {
        return (collection == null || collection.isEmpty() || !(collection.iterator().next() instanceof BinaryTimedRow)) ? readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL, collection, bitSet, internalTransaction, replicationGroupId, l, z) : dcrWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL, collection, bitSet, internalTransaction, replicationGroupId, l, z);
    }

    private static boolean exceptionAllowsImplicitTxRetry(Throwable th) {
        return ExceptionUtils.matchAny(ExceptionUtils.unwrapCause(th), ErrorGroups.Transactions.ACQUIRE_LOCK_ERR, ErrorGroups.Replicator.REPLICA_MISS_ERR);
    }

    private CompletableFuture<ReplicaMeta> awaitPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp hybridTimestamp) {
        return this.placementDriver.awaitPrimaryReplica(replicationGroupId, hybridTimestamp, 30L, TimeUnit.SECONDS);
    }

    private static long enlistmentConsistencyToken(ReplicaMeta replicaMeta) {
        return replicaMeta.getStartTime().longValue();
    }

    @Override // org.apache.ignite3.internal.table.InternalTable
    public IgniteTransactions transactions() {
        return new IgniteTransactionsImpl(this.txManager, this.observableTimestampTracker, this.licenseFeatureChecker);
    }

    private void checkTransactionFinishStarted(@Nullable InternalTransaction internalTransaction) {
        if (internalTransaction != null && internalTransaction.isFinishingOrFinished()) {
            throw new TransactionException(ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR, IgniteStringFormatter.format("Transaction is already finished () [txId={}, readOnly={}].", internalTransaction.id(), Boolean.valueOf(internalTransaction.isReadOnly())));
        }
    }

    static {
        $assertionsDisabled = !InternalTableImpl.class.desiredAssertionStatus();
        READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER = new ReadWriteInflightBatchRequestTracker();
        TABLE_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
        REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    }
}
