package org.apache.ignite.internal.table.distributed.replicator.secondary;

import java.time.Instant;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.continuousquery.ContinuousQueryEventWatermark;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.storage.secondary.TimestampAndRowId;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.table.ContinuousQueryOptions;
import org.apache.ignite.table.ContinuousQueryWatermark;

/* loaded from: input_file:org/apache/ignite/internal/table/distributed/replicator/secondary/SecondaryReplicationManager.class */
public class SecondaryReplicationManager implements ManuallyCloseable {
    private static final IgniteLogger LOG = Loggers.forClass(SecondaryReplicationManager.class);
    private static final PartitionReplicationMessagesFactory TABLE_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private final SchemaManager schemaManager;
    private final ReplicaService replicaService;
    private final LowWatermark lowWatermark;
    private final ClockService clockService;
    private final TopologyService topologyService;
    private final Map<TablePartitionId, IgniteSpinBusyLock> locks = new ConcurrentHashMap();
    private final Map<TablePartitionId, SecondaryStorageReplicationSubscriber> subscribers = new ConcurrentHashMap();

    public SecondaryReplicationManager(SchemaManager schemaManager, ReplicaService replicaService, LowWatermark lowWatermark, ClockService clockService, TopologyService topologyService) {
        this.schemaManager = schemaManager;
        this.replicaService = replicaService;
        this.lowWatermark = lowWatermark;
        this.clockService = clockService;
        this.topologyService = topologyService;
    }

    public void startReplication(InternalTable internalTable, int i, ReplicationGroupId replicationGroupId) {
        int tableId = internalTable.tableId();
        IgniteSpinBusyLock computeIfAbsent = this.locks.computeIfAbsent(new TablePartitionId(tableId, i), tablePartitionId -> {
            return new IgniteSpinBusyLock();
        });
        ReplicaServiceWrapper replicaServiceWrapper = new ReplicaServiceWrapper(i, internalTable, this.replicaService, this.topologyService, computeIfAbsent);
        try {
            latestProcessedTimestampAndRowId(replicaServiceWrapper, tableId, replicationGroupId).thenAccept(timestampAndRowId -> {
                IgniteUtils.inBusyLock(computeIfAbsent, () -> {
                    startCqReplication(internalTable, replicaServiceWrapper, i, replicationGroupId, timestampAndRowId, computeIfAbsent);
                });
            });
        } catch (Exception e) {
            LOG.error("Failed to start replication", e);
        }
    }

    private void startCqReplication(InternalTable internalTable, ReplicaServiceWrapper replicaServiceWrapper, int i, ReplicationGroupId replicationGroupId, TimestampAndRowId timestampAndRowId, IgniteSpinBusyLock igniteSpinBusyLock) {
        ContinuousQueryOptions build = ContinuousQueryOptions.builder().watermark(createCqWatermark(timestampAndRowId, i, internalTable.partitions())).enableEmptyBatches(true).build();
        TablePartitionId tablePartitionId = new TablePartitionId(internalTable.tableId(), i);
        SecondaryStorageReplicationSubscriber secondaryStorageReplicationSubscriber = new SecondaryStorageReplicationSubscriber(internalTable.name(), tablePartitionId, replicationGroupId, replicaServiceWrapper, this.schemaManager, this.clockService, igniteSpinBusyLock);
        internalTable.queryContinuously(secondaryStorageReplicationSubscriber, new int[]{i}, build, (binaryRow, schemaDescriptor) -> {
            return binaryRow;
        });
        this.subscribers.put(tablePartitionId, secondaryStorageReplicationSubscriber);
    }

    private ContinuousQueryWatermark createCqWatermark(TimestampAndRowId timestampAndRowId, int i, int i2) {
        if (timestampAndRowId.timestamp().longValue() == HybridTimestamp.MIN_VALUE.longValue()) {
            return this.lowWatermark.getLowWatermark() != null ? ContinuousQueryWatermark.ofInstant(Instant.ofEpochMilli(this.lowWatermark.getLowWatermark().getPhysical())) : ContinuousQueryWatermark.ofInstant(Instant.ofEpochMilli(HybridTimestamp.MIN_VALUE.getPhysical() + 1));
        }
        UUID[] uuidArr = new UUID[i2];
        uuidArr[i] = timestampAndRowId.rowId().uuid();
        long[] jArr = new long[i2];
        jArr[i] = timestampAndRowId.timestamp().longValue();
        return ContinuousQueryEventWatermark.cqEventWatermark(uuidArr, jArr);
    }

    public void stopReplication(TablePartitionId tablePartitionId) {
        IgniteSpinBusyLock remove = this.locks.remove(tablePartitionId);
        if (remove != null) {
            remove.block();
            SecondaryStorageReplicationSubscriber remove2 = this.subscribers.remove(tablePartitionId);
            if (remove2 != null) {
                remove2.stop();
            }
        }
    }

    public void stopReplication(int i, int i2) {
        for (int i3 = 0; i3 < i2; i3++) {
            stopReplication(new TablePartitionId(i, i3));
        }
    }

    public void close() throws Exception {
        LOG.info("Shutting down Secondary Replication Manager.", new Object[0]);
        IgniteUtils.closeAll(this.locks.keySet().stream().map(tablePartitionId -> {
            return () -> {
                stopReplication(tablePartitionId);
            };
        }));
    }

    private CompletableFuture<TimestampAndRowId> latestProcessedTimestampAndRowId(ReplicaServiceWrapper replicaServiceWrapper, int i, ReplicationGroupId replicationGroupId) {
        return replicaServiceWrapper.invokeWithRetry(replicaMeta -> {
            return TABLE_MESSAGES_FACTORY.readSecondaryStorageLatestReplicatedRowInfoRequest().tableId(i).groupId(ReplicaMessageUtils.toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, replicationGroupId)).readTimestamp(this.clockService.current()).build();
        }).whenComplete((timestampAndRowId, th) -> {
            if (th != null) {
                LOG.error("Failed to read latest replicated row info for replication group {}", th, new Object[]{replicationGroupId});
            }
        });
    }
}
