/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.table.distributed.replicator;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
import org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.command.SecondarySafeTimeSyncCommand;
import org.apache.ignite.internal.partition.replicator.network.command.UpdateAllSecondaryStorageCommand;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlySingleRowPkReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyStorageOperationReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadSecondaryStorageLatestReplicatedRowInfoRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteMultipleRowsSecondaryReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
import org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
import org.apache.ignite.internal.replicator.message.SecondaryReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowImpl;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.storage.operation.StorageOptimizedOperation;
import org.apache.ignite.internal.storage.secondary.SecondaryStorage;
import org.apache.ignite.internal.storage.secondary.TimestampAndRowId;
import org.apache.ignite.internal.table.distributed.replicator.CursorResource;
import org.apache.ignite.internal.table.distributed.replicator.RemoteResourceIds;
import org.apache.ignite.internal.tx.impl.FullyQualifiedResourceId;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class SecondaryPartitionReplicaListener
implements ReplicaTableProcessor {
    private static final IgniteLogger LOG = Loggers.forClass(SecondaryPartitionReplicaListener.class);
    private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    private final int tableId;
    private final int partitionId;
    private final ReplicationGroupId replicationGroupId;
    private final SecondaryStorage secondaryStorage;
    private final SchemaSyncService schemaSyncService;
    private final SchemaCompatibilityValidator schemaCompatValidator;
    private final PendingComparableValuesTracker<HybridTimestamp, Void> safeTime;
    private final RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry;
    private final LowWatermark lowWatermark;
    private final ReplicationRaftCommandApplicator raftCommandApplicator;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean stopGuard = new AtomicBoolean();

    public SecondaryPartitionReplicaListener(ReplicationGroupId replicationGroupId, RaftCommandRunner raftCommandRunner, SecondaryStorage secondaryStorage, ValidationSchemasSource validationSchemasSource, SchemaSyncService schemaSyncService, CatalogService catalogService, PendingComparableValuesTracker<HybridTimestamp, Void> safeTime, RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry, LowWatermark lowWatermark) {
        this.tableId = secondaryStorage.tableId();
        this.partitionId = secondaryStorage.partitionId();
        this.replicationGroupId = replicationGroupId;
        this.secondaryStorage = secondaryStorage;
        this.safeTime = safeTime;
        this.remotelyTriggeredResourceRegistry = remotelyTriggeredResourceRegistry;
        this.schemaSyncService = schemaSyncService;
        this.lowWatermark = lowWatermark;
        this.raftCommandApplicator = new ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
        this.schemaCompatValidator = new SchemaCompatibilityValidator(validationSchemasSource, catalogService, schemaSyncService);
    }

    private CompletableFuture<?> processRequest(ReplicaRequest request, ReplicaPrimacy replicaPrimacy) {
        boolean hasSchemaVersion = request instanceof SchemaVersionAwareReplicaRequest;
        assert (!hasSchemaVersion || ((SchemaVersionAwareReplicaRequest)request).schemaVersion() > 0) : "No schema version passed?";
        @Nullable HybridTimestamp operationTimestamp = SecondaryPartitionReplicaListener.getTxTimestamp(request);
        if (!hasSchemaVersion) {
            return this.processOperationRequestWithTxOperationManagementLogic(request, replicaPrimacy);
        }
        assert (operationTimestamp != null) : "Operation timestamp is null for " + request.getClass();
        Runnable validateClo = () -> {
            this.schemaCompatValidator.failIfTableDoesNotExistAt(operationTimestamp, this.tableId);
            SchemaVersionAwareReplicaRequest versionAwareRequest = (SchemaVersionAwareReplicaRequest)request;
            this.schemaCompatValidator.failIfRequestSchemaDiffersFromTxTs(operationTimestamp, versionAwareRequest.schemaVersion(), this.tableId);
        };
        return ((CompletableFuture)this.schemaSyncService.waitForMetadataCompleteness(operationTimestamp).thenRun(validateClo)).thenCompose(ignored -> this.processOperationRequestWithTxOperationManagementLogic(request, replicaPrimacy));
    }

    @Nullable
    private static HybridTimestamp getTxTimestamp(ReplicaRequest request) {
        if (request instanceof ReadOnlyReplicaRequest) {
            return ((ReadOnlyReplicaRequest)request).readTimestamp();
        }
        return null;
    }

    private CompletableFuture<?> processOperationRequest(ReplicaRequest request, ReplicaPrimacy replicaPrimacy) {
        if (request instanceof ScanCloseReplicaRequest) {
            return this.processScanCloseAction((ScanCloseReplicaRequest)request);
        }
        if (request instanceof ReadOnlySingleRowPkReplicaRequest) {
            return this.processReadOnlySingleEntryAction((ReadOnlySingleRowPkReplicaRequest)request);
        }
        if (request instanceof ReadOnlyMultiRowPkReplicaRequest) {
            return this.processReadOnlyMultiEntryAction((ReadOnlyMultiRowPkReplicaRequest)request);
        }
        if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
            return this.processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)request);
        }
        if (request instanceof ReadOnlyStorageOperationReplicaRequest) {
            return this.processReadOnlyStorageOperationBatchAction((ReadOnlyStorageOperationReplicaRequest)request);
        }
        if (request instanceof ReadSecondaryStorageLatestReplicatedRowInfoRequest) {
            return this.processSecondaryReadLatestReplicatedRowRequest();
        }
        if (request instanceof ReadWriteMultipleRowsSecondaryReplicaRequest) {
            ReadWriteMultipleRowsSecondaryReplicaRequest req = (ReadWriteMultipleRowsSecondaryReplicaRequest)request;
            return this.processReadWriteMultipleRowSecondaryReplicaRequest(req, replicaPrimacy);
        }
        if (request instanceof SecondaryReplicaSafeTimeSyncRequest) {
            SecondaryReplicaSafeTimeSyncRequest req = (SecondaryReplicaSafeTimeSyncRequest)request;
            return this.processSecondaryReplicaSafeTimeSyncRequest(req);
        }
        throw new UnsupportedReplicaRequestException(request.getClass());
    }

    private CompletableFuture<Void> processScanCloseAction(ScanCloseReplicaRequest request) {
        UUID txId = request.transactionId();
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(txId, request.scanId());
        try {
            this.remotelyTriggeredResourceRegistry.close(cursorId);
        }
        catch (IgniteException e) {
            throw this.wrapCursorCloseException(e);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    private ReplicationException wrapCursorCloseException(IgniteException e) {
        return new ReplicationException(ErrorGroups.Replicator.CURSOR_CLOSE_ERR, IgniteStringFormatter.format((String)"Close cursor exception [replicaGrpId={}, msg={}]", (Object[])new Object[]{this.replicationGroupId, e.getMessage()}), (Throwable)e);
    }

    public CompletableFuture<ReplicaResult> process(ReplicaRequest request, ReplicaPrimacy replicaPrimacy, UUID senderId) {
        return this.processRequestInContext(request, replicaPrimacy);
    }

    private CompletableFuture<ReplicaResult> processRequestInContext(ReplicaRequest request, ReplicaPrimacy replicaPrimacy) {
        return this.processRequest(request, replicaPrimacy).thenApply(SecondaryPartitionReplicaListener::wrapInReplicaResultIfNeeded);
    }

    private static ReplicaResult wrapInReplicaResultIfNeeded(Object res) {
        if (res instanceof ReplicaResult) {
            return (ReplicaResult)res;
        }
        return new ReplicaResult(res, null);
    }

    public void onShutdown() {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return;
        }
        this.busyLock.block();
    }

    private CompletableFuture<?> processOperationRequestWithTxOperationManagementLogic(ReplicaRequest request, ReplicaPrimacy replicaPrimacy) {
        this.tryToLockLwmIfNeeded(request);
        return this.processOperationRequest(request, replicaPrimacy);
    }

    private void tryToLockLwmIfNeeded(ReplicaRequest request) {
        UUID txIdToLockLwm;
        HybridTimestamp tsToLockLwm = null;
        if (request instanceof ReadOnlyReplicaRequest) {
            ReadOnlyReplicaRequest readOnlyRequest = (ReadOnlyReplicaRequest)request;
            txIdToLockLwm = readOnlyRequest.transactionId();
            tsToLockLwm = readOnlyRequest.readTimestamp();
        } else {
            txIdToLockLwm = null;
        }
        if (txIdToLockLwm != null) {
            if (!this.lowWatermark.tryLock(txIdToLockLwm, tsToLockLwm)) {
                throw new TransactionException(ErrorGroups.Transactions.TX_STALE_READ_ONLY_OPERATION_ERR, "Read timestamp is not available anymore.");
            }
            this.registerAutoLwmUnlockOnCoordinatorLeaveIfNeeded(request, txIdToLockLwm);
        }
    }

    private void registerAutoLwmUnlockOnCoordinatorLeaveIfNeeded(ReplicaRequest request, UUID txIdToLockLwm) {
        ReadOnlyReplicaRequest readOnlyReplicaRequest;
        UUID coordinatorId;
        if (request instanceof ReadOnlyReplicaRequest && (coordinatorId = (readOnlyReplicaRequest = (ReadOnlyReplicaRequest)request).coordinatorId()) != null) {
            FullyQualifiedResourceId resourceId = new FullyQualifiedResourceId(txIdToLockLwm, txIdToLockLwm);
            this.remotelyTriggeredResourceRegistry.register(resourceId, coordinatorId, () -> () -> this.lowWatermark.unlock(txIdToLockLwm));
        }
    }

    private CompletableFuture<TimestampAndRowId> processSecondaryReadLatestReplicatedRowRequest() {
        assert (this.secondaryStorage != null) : "Secondary storage engine should be initialized";
        return CompletableFuture.completedFuture(this.secondaryStorage.getLastPersistedRow());
    }

    private CompletableFuture<?> processReadWriteMultipleRowSecondaryReplicaRequest(ReadWriteMultipleRowsSecondaryReplicaRequest request, ReplicaPrimacy replicaPrimacy) {
        UpdateAllSecondaryStorageCommand command = PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllSecondaryStorageCommand().tableId(this.tableId).rows(request.rows()).leaseStartTime(HybridTimestamp.hybridTimestamp((long)replicaPrimacy.leaseStartTime())).build();
        return this.raftCommandApplicator.applyCommandWithExceptionHandling((Command)command).whenComplete((v, e) -> {
            if (e != null) {
                LOG.warn("Failed to write batch into secondary storage for [tableId={}, partitionId={}]", e, new Object[]{this.tableId, this.partitionId});
            }
        });
    }

    private CompletableFuture<?> processSecondaryReplicaSafeTimeSyncRequest(SecondaryReplicaSafeTimeSyncRequest request) {
        SecondarySafeTimeSyncCommand command = PARTITION_REPLICATION_MESSAGES_FACTORY.secondarySafeTimeSyncCommand().tableId(this.tableId).proposedSafeTime(request.proposedSafeTime()).build();
        return this.raftCommandApplicator.applyCommandWithExceptionHandling((Command)command).whenComplete((v, e) -> {
            if (e != null) {
                LOG.warn("Failed to sync safe time for secondary storage of [tableId={}, partitionId={}]", e, new Object[]{this.tableId, this.partitionId});
            }
        });
    }

    private CompletableFuture<BinaryRow> processReadOnlySingleEntryAction(ReadOnlySingleRowPkReplicaRequest request) {
        BinaryRowImpl searchRow = new BinaryRowImpl(0, request.primaryKey());
        HybridTimestamp readTimestamp = request.readTimestamp();
        return this.awaitDataPropagationToSecondaryStorage(readTimestamp).thenApply(arg_0 -> this.lambda$processReadOnlySingleEntryAction$6((BinaryRow)searchRow, readTimestamp, arg_0));
    }

    private CompletableFuture<List<BinaryRow>> processReadOnlyMultiEntryAction(ReadOnlyMultiRowPkReplicaRequest request) {
        if (request.requestType() != RequestType.RO_GET_ALL) {
            throw new IgniteInternalException(ErrorGroups.Replicator.REPLICA_COMMON_ERR, IgniteStringFormatter.format((String)"Unknown request [actionType={}]", (Object[])new Object[]{request.requestType()}));
        }
        Collection searchRows = request.primaryKeys().stream().map(primaryKey -> new BinaryRowImpl(request.schemaVersion(), primaryKey)).collect(Collectors.toList());
        HybridTimestamp readTimestamp = request.readTimestamp();
        return this.awaitDataPropagationToSecondaryStorage(readTimestamp).thenApply(ignored -> this.readFromSecondaryStorage(searchRows, readTimestamp));
    }

    private CompletableFuture<List<BinaryRow>> processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest request) {
        LOG.debug("Processing RO scan retrieve batch action on secondary storage for {}", new Object[]{request.groupId()});
        UUID txId = request.transactionId();
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(txId, request.scanId());
        int batchCount = request.batchSize();
        HybridTimestamp readTimestamp = request.readTimestamp();
        BitSet columnsToInclude = request.columnsToInclude();
        return this.awaitDataPropagationToSecondaryStorage(readTimestamp).thenApply(ignored -> this.retrieveExactEntries(request.coordinatorId(), readTimestamp, cursorId, batchCount, columnsToInclude));
    }

    private CompletableFuture<List<BinaryRow>> processReadOnlyStorageOperationBatchAction(ReadOnlyStorageOperationReplicaRequest request) {
        FullyQualifiedResourceId cursorId = RemoteResourceIds.cursorId(request.transactionId(), request.scanId());
        int batchCount = request.batchSize();
        HybridTimestamp readTimestamp = request.readTimestamp();
        return this.awaitDataPropagationToSecondaryStorage(readTimestamp).thenApply(ignored -> this.performStorageOperation(request.coordinatorId(), readTimestamp, cursorId, batchCount, request.storageOperation()));
    }

    private CompletableFuture<?> awaitDataPropagationToSecondaryStorage(HybridTimestamp readTimestamp) {
        return this.safeTime.waitFor((Comparable)readTimestamp);
    }

    @Nullable
    private BinaryRow readFromSecondaryStorage(BinaryRow searchRow, HybridTimestamp readTimestamp) {
        return this.secondaryStorage.read(searchRow, readTimestamp);
    }

    private List<BinaryRow> readFromSecondaryStorage(Collection<BinaryRow> searchRows, HybridTimestamp readTimestamp) {
        ArrayList<BinaryRow> results = new ArrayList<BinaryRow>();
        for (BinaryRow searchRow : searchRows) {
            BinaryRow readRow = this.secondaryStorage.read(searchRow, readTimestamp);
            results.add(readRow);
        }
        return results;
    }

    private List<BinaryRow> retrieveExactEntries(UUID txCoordinatorId, HybridTimestamp readTimestamp, FullyQualifiedResourceId cursorId, int count, @Nullable BitSet columnsToInclude) {
        Object cursor = ((CursorResource)this.remotelyTriggeredResourceRegistry.register(cursorId, txCoordinatorId, () -> new CursorResource(this.secondaryStorage.scan(readTimestamp, columnsToInclude)))).cursor();
        ArrayList<BinaryRow> result = new ArrayList<BinaryRow>(count);
        while (result.size() < count && cursor.hasNext()) {
            result.add((BinaryRow)cursor.next());
        }
        return result;
    }

    private List<BinaryRow> performStorageOperation(UUID txCoordinatorId, HybridTimestamp readTimestamp, FullyQualifiedResourceId cursorId, int count, StorageOptimizedOperation storageOptimizedOperation) {
        Object cursor = ((CursorResource)this.remotelyTriggeredResourceRegistry.register(cursorId, txCoordinatorId, () -> new CursorResource(this.secondaryStorage.scanWithOperation(readTimestamp, storageOptimizedOperation)))).cursor();
        ArrayList<BinaryRow> result = new ArrayList<BinaryRow>(count);
        while (result.size() < count && cursor.hasNext()) {
            result.add((BinaryRow)cursor.next());
        }
        return result;
    }

    @TestOnly
    public PendingComparableValuesTracker<HybridTimestamp, Void> safeTime() {
        return this.safeTime;
    }

    private /* synthetic */ BinaryRow lambda$processReadOnlySingleEntryAction$6(BinaryRow searchRow, HybridTimestamp readTimestamp, Object ignored) {
        return this.readFromSecondaryStorage(searchRow, readTimestamp);
    }
}

