package org.apache.ignite.internal.table.distributed.replicator;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
import org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlySingleRowPkReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyStorageOperationReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadSecondaryStorageLatestReplicatedRowInfoRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteMultipleRowsSecondaryReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
import org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.replicator.CommandApplicationResult;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
import org.apache.ignite.internal.replicator.message.SecondaryReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowImpl;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.storage.operation.StorageOptimizedOperation;
import org.apache.ignite.internal.storage.secondary.SecondaryStorage;
import org.apache.ignite.internal.storage.secondary.TimestampAndRowId;
import org.apache.ignite.internal.tx.impl.FullyQualifiedResourceId;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/table/distributed/replicator/SecondaryPartitionReplicaListener.class */
public class SecondaryPartitionReplicaListener implements ReplicaTableProcessor {
    private static final IgniteLogger LOG;
    private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY;
    private final int tableId;
    private final int partitionId;
    private final ReplicationGroupId replicationGroupId;
    private final SecondaryStorage secondaryStorage;
    private final SchemaSyncService schemaSyncService;
    private final SchemaCompatibilityValidator schemaCompatValidator;
    private final PendingComparableValuesTracker<HybridTimestamp, Void> safeTime;
    private final RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry;
    private final LowWatermark lowWatermark;
    private final ReplicationRaftCommandApplicator raftCommandApplicator;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    static final /* synthetic */ boolean $assertionsDisabled;

    public SecondaryPartitionReplicaListener(ReplicationGroupId replicationGroupId, RaftCommandRunner raftCommandRunner, SecondaryStorage secondaryStorage, ValidationSchemasSource validationSchemasSource, SchemaSyncService schemaSyncService, CatalogService catalogService, PendingComparableValuesTracker<HybridTimestamp, Void> pendingComparableValuesTracker, RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry, LowWatermark lowWatermark) {
        this.tableId = secondaryStorage.tableId();
        this.partitionId = secondaryStorage.partitionId();
        this.replicationGroupId = replicationGroupId;
        this.secondaryStorage = secondaryStorage;
        this.safeTime = pendingComparableValuesTracker;
        this.remotelyTriggeredResourceRegistry = remotelyTriggeredResourceRegistry;
        this.schemaSyncService = schemaSyncService;
        this.lowWatermark = lowWatermark;
        this.raftCommandApplicator = new ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
        this.schemaCompatValidator = new SchemaCompatibilityValidator(validationSchemasSource, catalogService, schemaSyncService);
    }

    private CompletableFuture<?> processRequest(ReplicaRequest replicaRequest, ReplicaPrimacy replicaPrimacy) {
        boolean z = replicaRequest instanceof SchemaVersionAwareReplicaRequest;
        if (!$assertionsDisabled && z && ((SchemaVersionAwareReplicaRequest) replicaRequest).schemaVersion() <= 0) {
            throw new AssertionError("No schema version passed?");
        }
        HybridTimestamp txTimestamp = getTxTimestamp(replicaRequest);
        if (!z) {
            return processOperationRequestWithTxOperationManagementLogic(replicaRequest, replicaPrimacy);
        }
        if (!$assertionsDisabled && txTimestamp == null) {
            throw new AssertionError("Operation timestamp is null for " + replicaRequest.getClass());
        }
        return this.schemaSyncService.waitForMetadataCompleteness(txTimestamp).thenRun(() -> {
            this.schemaCompatValidator.failIfTableDoesNotExistAt(txTimestamp, this.tableId);
            this.schemaCompatValidator.failIfRequestSchemaDiffersFromTxTs(txTimestamp, ((SchemaVersionAwareReplicaRequest) replicaRequest).schemaVersion(), this.tableId);
        }).thenCompose(r7 -> {
            return processOperationRequestWithTxOperationManagementLogic(replicaRequest, replicaPrimacy);
        });
    }

    @Nullable
    private static HybridTimestamp getTxTimestamp(ReplicaRequest replicaRequest) {
        if (replicaRequest instanceof ReadOnlyReplicaRequest) {
            return ((ReadOnlyReplicaRequest) replicaRequest).readTimestamp();
        }
        return null;
    }

    private CompletableFuture<?> processOperationRequest(ReplicaRequest replicaRequest, ReplicaPrimacy replicaPrimacy) {
        if (replicaRequest instanceof ScanCloseReplicaRequest) {
            return processScanCloseAction((ScanCloseReplicaRequest) replicaRequest);
        }
        if (replicaRequest instanceof ReadOnlySingleRowPkReplicaRequest) {
            return processReadOnlySingleEntryAction((ReadOnlySingleRowPkReplicaRequest) replicaRequest);
        }
        if (replicaRequest instanceof ReadOnlyMultiRowPkReplicaRequest) {
            return processReadOnlyMultiEntryAction((ReadOnlyMultiRowPkReplicaRequest) replicaRequest);
        }
        if (replicaRequest instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
            return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) replicaRequest);
        }
        if (replicaRequest instanceof ReadOnlyStorageOperationReplicaRequest) {
            return processReadOnlyStorageOperationBatchAction((ReadOnlyStorageOperationReplicaRequest) replicaRequest);
        }
        if (replicaRequest instanceof ReadSecondaryStorageLatestReplicatedRowInfoRequest) {
            return processSecondaryReadLatestReplicatedRowRequest();
        }
        if (replicaRequest instanceof ReadWriteMultipleRowsSecondaryReplicaRequest) {
            return processReadWriteMultipleRowSecondaryReplicaRequest((ReadWriteMultipleRowsSecondaryReplicaRequest) replicaRequest, replicaPrimacy);
        }
        if (replicaRequest instanceof SecondaryReplicaSafeTimeSyncRequest) {
            return processSecondaryReplicaSafeTimeSyncRequest((SecondaryReplicaSafeTimeSyncRequest) replicaRequest);
        }
        throw new UnsupportedReplicaRequestException(replicaRequest.getClass());
    }

    private CompletableFuture<Void> processScanCloseAction(ScanCloseReplicaRequest scanCloseReplicaRequest) {
        try {
            this.remotelyTriggeredResourceRegistry.close(RemoteResourceIds.cursorId(scanCloseReplicaRequest.transactionId(), scanCloseReplicaRequest.scanId()));
            return CompletableFutures.nullCompletedFuture();
        } catch (IgniteException e) {
            throw wrapCursorCloseException(e);
        }
    }

    private ReplicationException wrapCursorCloseException(IgniteException igniteException) {
        return new ReplicationException(ErrorGroups.Replicator.CURSOR_CLOSE_ERR, IgniteStringFormatter.format("Close cursor exception [replicaGrpId={}, msg={}]", new Object[]{this.replicationGroupId, igniteException.getMessage()}), igniteException);
    }

    public CompletableFuture<ReplicaResult> process(ReplicaRequest replicaRequest, ReplicaPrimacy replicaPrimacy, UUID uuid) {
        return processRequestInContext(replicaRequest, replicaPrimacy);
    }

    private CompletableFuture<ReplicaResult> processRequestInContext(ReplicaRequest replicaRequest, ReplicaPrimacy replicaPrimacy) {
        return processRequest(replicaRequest, replicaPrimacy).thenApply(SecondaryPartitionReplicaListener::wrapInReplicaResultIfNeeded);
    }

    private static ReplicaResult wrapInReplicaResultIfNeeded(Object obj) {
        return obj instanceof ReplicaResult ? (ReplicaResult) obj : new ReplicaResult(obj, (CommandApplicationResult) null);
    }

    public void onShutdown() {
        if (this.stopGuard.compareAndSet(false, true)) {
            this.busyLock.block();
        }
    }

    private CompletableFuture<?> processOperationRequestWithTxOperationManagementLogic(ReplicaRequest replicaRequest, ReplicaPrimacy replicaPrimacy) {
        tryToLockLwmIfNeeded(replicaRequest);
        return processOperationRequest(replicaRequest, replicaPrimacy);
    }

    private void tryToLockLwmIfNeeded(ReplicaRequest replicaRequest) {
        UUID uuid;
        HybridTimestamp hybridTimestamp = null;
        if (replicaRequest instanceof ReadOnlyReplicaRequest) {
            ReadOnlyReplicaRequest readOnlyReplicaRequest = (ReadOnlyReplicaRequest) replicaRequest;
            uuid = readOnlyReplicaRequest.transactionId();
            hybridTimestamp = readOnlyReplicaRequest.readTimestamp();
        } else {
            uuid = null;
        }
        if (uuid != null) {
            if (!this.lowWatermark.tryLock(uuid, hybridTimestamp)) {
                throw new TransactionException(ErrorGroups.Transactions.TX_STALE_READ_ONLY_OPERATION_ERR, "Read timestamp is not available anymore.");
            }
            registerAutoLwmUnlockOnCoordinatorLeaveIfNeeded(replicaRequest, uuid);
        }
    }

    private void registerAutoLwmUnlockOnCoordinatorLeaveIfNeeded(ReplicaRequest replicaRequest, UUID uuid) {
        UUID coordinatorId;
        if (!(replicaRequest instanceof ReadOnlyReplicaRequest) || (coordinatorId = ((ReadOnlyReplicaRequest) replicaRequest).coordinatorId()) == null) {
            return;
        }
        this.remotelyTriggeredResourceRegistry.register(new FullyQualifiedResourceId(uuid, uuid), coordinatorId, () -> {
            return () -> {
                this.lowWatermark.unlock(uuid);
            };
        });
    }

    private CompletableFuture<TimestampAndRowId> processSecondaryReadLatestReplicatedRowRequest() {
        if ($assertionsDisabled || this.secondaryStorage != null) {
            return CompletableFuture.completedFuture(this.secondaryStorage.getLastPersistedRow());
        }
        throw new AssertionError("Secondary storage engine should be initialized");
    }

    private CompletableFuture<?> processReadWriteMultipleRowSecondaryReplicaRequest(ReadWriteMultipleRowsSecondaryReplicaRequest readWriteMultipleRowsSecondaryReplicaRequest, ReplicaPrimacy replicaPrimacy) {
        return this.raftCommandApplicator.applyCommandWithExceptionHandling(PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllSecondaryStorageCommand().tableId(this.tableId).rows(readWriteMultipleRowsSecondaryReplicaRequest.rows()).leaseStartTime(HybridTimestamp.hybridTimestamp(replicaPrimacy.leaseStartTime())).build()).exceptionally(th -> {
            LOG.warn("Failed to write batch into secondary storage for [tableId={}, partitionId={}]", th, new Object[]{Integer.valueOf(this.tableId), Integer.valueOf(this.partitionId)});
            ExceptionUtils.sneakyThrow(th);
            return null;
        });
    }

    private CompletableFuture<?> processSecondaryReplicaSafeTimeSyncRequest(SecondaryReplicaSafeTimeSyncRequest secondaryReplicaSafeTimeSyncRequest) {
        return this.raftCommandApplicator.applyCommandWithExceptionHandling(PARTITION_REPLICATION_MESSAGES_FACTORY.secondarySafeTimeSyncCommand().tableId(this.tableId).proposedSafeTime(secondaryReplicaSafeTimeSyncRequest.proposedSafeTime()).build()).exceptionally(th -> {
            LOG.warn("Failed to sync safe time for secondary storage of [tableId={}, partitionId={}]", th, new Object[]{Integer.valueOf(this.tableId), Integer.valueOf(this.partitionId)});
            ExceptionUtils.sneakyThrow(th);
            return null;
        });
    }

    private CompletableFuture<BinaryRow> processReadOnlySingleEntryAction(ReadOnlySingleRowPkReplicaRequest readOnlySingleRowPkReplicaRequest) {
        BinaryRowImpl binaryRowImpl = new BinaryRowImpl(0, readOnlySingleRowPkReplicaRequest.primaryKey());
        HybridTimestamp readTimestamp = readOnlySingleRowPkReplicaRequest.readTimestamp();
        return awaitDataPropagationToSecondaryStorage(readTimestamp).thenCompose(obj -> {
            return readFromSecondaryStorage(binaryRowImpl, readTimestamp);
        });
    }

    private CompletableFuture<List<BinaryRow>> processReadOnlyMultiEntryAction(ReadOnlyMultiRowPkReplicaRequest readOnlyMultiRowPkReplicaRequest) {
        if (readOnlyMultiRowPkReplicaRequest.requestType() != RequestType.RO_GET_ALL) {
            throw new IgniteInternalException(ErrorGroups.Replicator.REPLICA_COMMON_ERR, IgniteStringFormatter.format("Unknown single request [actionType={}]", new Object[]{readOnlyMultiRowPkReplicaRequest.requestType()}));
        }
        Collection collection = (Collection) readOnlyMultiRowPkReplicaRequest.primaryKeys().stream().map(byteBuffer -> {
            return new BinaryRowImpl(readOnlyMultiRowPkReplicaRequest.schemaVersion(), byteBuffer);
        }).collect(Collectors.toList());
        HybridTimestamp readTimestamp = readOnlyMultiRowPkReplicaRequest.readTimestamp();
        return awaitDataPropagationToSecondaryStorage(readTimestamp).thenCompose(obj -> {
            return readFromSecondaryStorage((Collection<BinaryRow>) collection, readTimestamp);
        });
    }

    private CompletableFuture<List<BinaryRow>> processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest readOnlyScanRetrieveBatchReplicaRequest) {
        LOG.debug("Processing RO scan retrieve batch action on secondary storage for {}", new Object[]{readOnlyScanRetrieveBatchReplicaRequest.groupId()});
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(readOnlyScanRetrieveBatchReplicaRequest.transactionId(), readOnlyScanRetrieveBatchReplicaRequest.scanId());
        int batchSize = readOnlyScanRetrieveBatchReplicaRequest.batchSize();
        HybridTimestamp readTimestamp = readOnlyScanRetrieveBatchReplicaRequest.readTimestamp();
        BitSet columnsToInclude = readOnlyScanRetrieveBatchReplicaRequest.columnsToInclude();
        return awaitDataPropagationToSecondaryStorage(readTimestamp).thenCompose(obj -> {
            return retrieveExactEntries(readOnlyScanRetrieveBatchReplicaRequest.coordinatorId(), readTimestamp, cursorId, batchSize, columnsToInclude);
        });
    }

    private CompletableFuture<List<BinaryRow>> processReadOnlyStorageOperationBatchAction(ReadOnlyStorageOperationReplicaRequest readOnlyStorageOperationReplicaRequest) {
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(readOnlyStorageOperationReplicaRequest.transactionId(), readOnlyStorageOperationReplicaRequest.scanId());
        int batchSize = readOnlyStorageOperationReplicaRequest.batchSize();
        HybridTimestamp readTimestamp = readOnlyStorageOperationReplicaRequest.readTimestamp();
        return awaitDataPropagationToSecondaryStorage(readTimestamp).thenCompose(obj -> {
            return performStorageOperation(readOnlyStorageOperationReplicaRequest.coordinatorId(), readTimestamp, cursorId, batchSize, readOnlyStorageOperationReplicaRequest.storageOperation());
        });
    }

    private CompletableFuture<?> awaitDataPropagationToSecondaryStorage(HybridTimestamp hybridTimestamp) {
        return this.safeTime.waitFor(hybridTimestamp);
    }

    private CompletableFuture<BinaryRow> readFromSecondaryStorage(BinaryRow binaryRow, HybridTimestamp hybridTimestamp) {
        return CompletableFuture.completedFuture(this.secondaryStorage.read(binaryRow, hybridTimestamp));
    }

    private CompletableFuture<List<BinaryRow>> readFromSecondaryStorage(Collection<BinaryRow> collection, HybridTimestamp hybridTimestamp) {
        ArrayList arrayList = new ArrayList();
        Iterator<BinaryRow> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(this.secondaryStorage.read(it.next(), hybridTimestamp));
        }
        return CompletableFuture.completedFuture(arrayList);
    }

    private CompletableFuture<List<BinaryRow>> retrieveExactEntries(UUID uuid, HybridTimestamp hybridTimestamp, FullyQualifiedResourceId fullyQualifiedResourceId, int i, @Nullable BitSet bitSet) {
        Cursor cursor = ((CursorResource) this.remotelyTriggeredResourceRegistry.register(fullyQualifiedResourceId, uuid, () -> {
            return new CursorResource(this.secondaryStorage.scan(hybridTimestamp, bitSet));
        })).cursor();
        ArrayList arrayList = new ArrayList(i);
        while (arrayList.size() < i && cursor.hasNext()) {
            arrayList.add((BinaryRow) cursor.next());
        }
        return CompletableFuture.completedFuture(arrayList);
    }

    private CompletableFuture<List<BinaryRow>> performStorageOperation(UUID uuid, HybridTimestamp hybridTimestamp, FullyQualifiedResourceId fullyQualifiedResourceId, int i, StorageOptimizedOperation storageOptimizedOperation) {
        Cursor cursor = ((CursorResource) this.remotelyTriggeredResourceRegistry.register(fullyQualifiedResourceId, uuid, () -> {
            return new CursorResource(this.secondaryStorage.scanWithOperation(hybridTimestamp, storageOptimizedOperation));
        })).cursor();
        ArrayList arrayList = new ArrayList(i);
        while (arrayList.size() < i && cursor.hasNext()) {
            arrayList.add((BinaryRow) cursor.next());
        }
        return CompletableFuture.completedFuture(arrayList);
    }

    static {
        $assertionsDisabled = !SecondaryPartitionReplicaListener.class.desiredAssertionStatus();
        LOG = Loggers.forClass(SecondaryPartitionReplicaListener.class);
        PARTITION_REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    }
}
