package org.apache.ignite3.internal.table.distributed.replicator.secondary;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryEventWatermark;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryRequest;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryUtils;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.lowwatermark.LowWatermark;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite3.internal.partition.replicator.network.replication.BinaryRowWithTombstoneMessage;
import org.apache.ignite3.internal.partition.replicator.network.replication.RequestType;
import org.apache.ignite3.internal.replicator.ReplicationGroupId;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite3.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite3.internal.schema.BinaryRow;
import org.apache.ignite3.internal.schema.BinaryRowConverter;
import org.apache.ignite3.internal.schema.SchemaManager;
import org.apache.ignite3.internal.storage.RowId;
import org.apache.ignite3.internal.storage.secondary.TimestampAndRowId;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.lang.TableNotFoundException;
import org.apache.ignite3.table.QualifiedName;
import org.apache.ignite3.table.TableRowEvent;
import org.apache.ignite3.table.TableRowEventBatch;
import org.apache.ignite3.table.TableRowEventType;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite3/internal/table/distributed/replicator/secondary/SecondaryStorageReplicationSubscriber.class */
public class SecondaryStorageReplicationSubscriber implements Flow.Subscriber<TableRowEventBatch<BinaryRow>> {
    private static final IgniteLogger LOG;
    private static final PartitionReplicationMessagesFactory TABLE_MESSAGES_FACTORY;
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY;
    private final QualifiedName tableName;
    private final TablePartitionId tablePartitionId;
    private final ReplicationGroupId replicationGroupId;
    private final ReplicaServiceWrapper replicaService;
    private final LowWatermark lowWatermark;
    private final SchemaManager schemaManager;
    private Flow.Subscription subscription;
    private volatile TimestampAndRowId latestTimestampAndRowId;
    private final Executor delayedExecutor;
    private final ClockService clockService;
    static final /* synthetic */ boolean $assertionsDisabled;
    private CompletableFuture<Void> replicationFuture = CompletableFutures.nullCompletedFuture();
    private CompletableFuture<Void> safeTimeReplicationFuture = CompletableFutures.nullCompletedFuture();
    private volatile boolean stopped = false;
    private volatile boolean hasNext = false;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SecondaryStorageReplicationSubscriber(QualifiedName qualifiedName, TablePartitionId tablePartitionId, ReplicationGroupId replicationGroupId, TimestampAndRowId timestampAndRowId, ReplicaServiceWrapper replicaServiceWrapper, LowWatermark lowWatermark, long j, Executor executor, SchemaManager schemaManager, ClockService clockService) {
        this.tableName = qualifiedName;
        this.tablePartitionId = tablePartitionId;
        this.replicationGroupId = replicationGroupId;
        this.replicaService = replicaServiceWrapper;
        this.lowWatermark = lowWatermark;
        this.schemaManager = schemaManager;
        this.latestTimestampAndRowId = timestampAndRowId;
        this.clockService = clockService;
        this.delayedExecutor = CompletableFuture.delayedExecutor(j, TimeUnit.MILLISECONDS, executor);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1L);
        this.safeTimeReplicationFuture = this.safeTimeReplicationFuture.thenRunAsync(this::waitAndUpdateSafeTime, this.delayedExecutor);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(TableRowEventBatch<BinaryRow> tableRowEventBatch) {
        IgniteUtils.inBusyLock(this.busyLock, () -> {
            this.hasNext = true;
            ArrayList arrayList = new ArrayList(tableRowEventBatch.rows().size());
            Iterator it = tableRowEventBatch.rows().iterator();
            while (it.hasNext()) {
                arrayList.add(createRowMessage((TableRowEvent) it.next()));
            }
            BinaryRowWithTombstoneMessage binaryRowWithTombstoneMessage = (BinaryRowWithTombstoneMessage) arrayList.get(arrayList.size() - 1);
            this.latestTimestampAndRowId = new TimestampAndRowId(HybridTimestamp.hybridTimestamp(binaryRowWithTombstoneMessage.commitTimestamp()), new RowId(this.tablePartitionId.partitionId(), binaryRowWithTombstoneMessage.rowId()));
            this.replicationFuture = this.replicationFuture.thenCompose(r6 -> {
                return (CompletionStage) IgniteUtils.inBusyLock(this.busyLock, () -> {
                    return doOnNext(arrayList);
                });
            });
            if (this.stopped) {
                return;
            }
            this.hasNext = false;
            this.subscription.request(1L);
            this.safeTimeReplicationFuture = this.safeTimeReplicationFuture.thenRunAsync(this::waitAndUpdateSafeTime, this.delayedExecutor);
        });
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        if (ExceptionUtils.hasCauseOrSuppressed(th, NodeStoppingException.class, TableNotFoundException.class)) {
            return;
        }
        LOG.error("Replication error for table {} and partition {}:", th, this.tableName, Integer.valueOf(this.tablePartitionId.partitionId()));
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        LOG.info("Replication into secondary storage for table {} and partition {} stopped.", this.tableName, Integer.valueOf(this.tablePartitionId.partitionId()));
    }

    private BinaryRowWithTombstoneMessage createRowMessage(TableRowEvent<BinaryRow> tableRowEvent) {
        ContinuousQueryEventWatermark continuousQueryEventWatermark = (ContinuousQueryEventWatermark) tableRowEvent.watermark();
        int partitionId = this.tablePartitionId.partitionId();
        UUID uuid = continuousQueryEventWatermark.rowIds()[partitionId];
        long j = continuousQueryEventWatermark.timestamps()[partitionId];
        TableRowEventType type = tableRowEvent.type();
        boolean z = type == TableRowEventType.REMOVED;
        BinaryRow oldEntry = z ? tableRowEvent.oldEntry() : tableRowEvent.entry();
        if (!$assertionsDisabled && oldEntry == null) {
            throw new AssertionError("Row must not be null");
        }
        ByteBuffer extractPkColumnsBytes = z ? extractPkColumnsBytes(tableRowEvent.oldEntry()) : oldEntry.tupleSlice();
        if ($assertionsDisabled || extractPkColumnsBytes != null) {
            return TABLE_MESSAGES_FACTORY.binaryRowWithTombstoneMessage().rowId(uuid).binaryTuple(extractPkColumnsBytes).schemaVersion(oldEntry.schemaVersion()).tombstone(z).commitTimestamp(j).build();
        }
        throw new AssertionError("Binary tuple is not expected to be null for operation: " + type);
    }

    private void waitAndUpdateSafeTime() {
        IgniteUtils.inBusyLock(this.busyLock, () -> {
            if (this.hasNext) {
                return;
            }
            TimestampAndRowId timestampAndRowId = this.latestTimestampAndRowId;
            this.replicaService.sendContinuousQueryRequest(new ContinuousQueryRequest(this.tablePartitionId.partitionId(), calculateLowerBoundTs(timestampAndRowId), timestampAndRowId.rowId().uuid(), 1, ContinuousQueryUtils.encodeEventTypes(EnumSet.allOf(TableRowEventType.class)), null)).thenAccept(continuousQueryScanResultWithSchema -> {
                IgniteUtils.inBusyLock(this.busyLock, () -> {
                    if (continuousQueryScanResultWithSchema.result().rows().isEmpty()) {
                        doOnSafeTimeUpdate(continuousQueryScanResultWithSchema.result().safeTime());
                    }
                });
            }).whenComplete((r9, th) -> {
                if (th != null && !ExceptionUtils.hasCauseOrSuppressed(th, NodeStoppingException.class, TableNotFoundException.class)) {
                    LOG.error("Failed to send safe time sync request for secondary storage for table {} and partition {}.", th, this.tableName, Integer.valueOf(this.tablePartitionId.partitionId()));
                }
                this.safeTimeReplicationFuture = this.safeTimeReplicationFuture.thenRunAsync(this::waitAndUpdateSafeTime, this.delayedExecutor);
            });
        });
    }

    private long calculateLowerBoundTs(TimestampAndRowId timestampAndRowId) {
        HybridTimestamp timestamp = timestampAndRowId.timestamp();
        HybridTimestamp lowWatermark = this.lowWatermark.getLowWatermark();
        return (lowWatermark == null || timestamp.compareTo(lowWatermark) >= 0) ? timestamp.longValue() : lowWatermark.longValue();
    }

    private void doOnSafeTimeUpdate(long j) {
        this.replicationFuture = this.replicationFuture.thenCompose(r7 -> {
            return updateSafeTime(j);
        });
        this.latestTimestampAndRowId = new TimestampAndRowId(HybridTimestamp.hybridTimestamp(j), RowId.lowestRowId(this.tablePartitionId.partitionId()));
    }

    private CompletableFuture<Void> doOnNext(List<BinaryRowWithTombstoneMessage> list) {
        return this.replicaService.invokeWithRetry(() -> {
            return TABLE_MESSAGES_FACTORY.readWriteMultipleRowsSecondaryReplicaRequest().groupId(ReplicaMessageUtils.toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, this.replicationGroupId)).tableId(this.tablePartitionId.tableId()).requestType(RequestType.RW_INSERT).timestamp(this.clockService.current()).rows(list).build();
        });
    }

    private CompletableFuture<Void> updateSafeTime(long j) {
        return this.replicaService.invokeWithRetry(() -> {
            return REPLICA_MESSAGES_FACTORY.secondaryReplicaSafeTimeSyncRequest().groupId(ReplicaMessageUtils.toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, this.replicationGroupId)).tableId(this.tablePartitionId.tableId()).proposedSafeTime(HybridTimestamp.hybridTimestamp(j)).build();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        LOG.info("Stopping replication into secondary storage for table {} and partition {}", this.tableName, Integer.valueOf(this.tablePartitionId.partitionId()));
        this.stopped = true;
        this.busyLock.block();
        this.subscription.cancel();
    }

    @Nullable
    private ByteBuffer extractPkColumnsBytes(@Nullable BinaryRow binaryRow) {
        if (binaryRow == null) {
            return null;
        }
        return BinaryRowConverter.keyExtractor(this.schemaManager.schemaRegistry(this.tablePartitionId.tableId()).schema(binaryRow.schemaVersion())).extractColumns(binaryRow).byteBuffer();
    }

    static {
        $assertionsDisabled = !SecondaryStorageReplicationSubscriber.class.desiredAssertionStatus();
        LOG = Loggers.forClass(SecondaryStorageReplicationSubscriber.class);
        TABLE_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
        REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    }
}
