/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.table.distributed.replicator.secondary;

import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogSecondaryStorageState;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.replicator.secondary.SecondaryFullStateTransferHandler;
import org.apache.ignite.internal.table.distributed.replicator.secondary.SecondaryReplicationFailureException;
import org.apache.ignite.internal.table.distributed.replicator.secondary.SecondaryReplicationManager;
import org.apache.ignite.internal.table.distributed.replicator.secondary.SecondaryStorageReplicator;
import org.apache.ignite.internal.table.distributed.replicator.secondary.ZoneSecondaryStorageReplicator;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.StringUtils;
import org.jetbrains.annotations.Nullable;

public class ZoneSecondaryReplicationManager
implements SecondaryReplicationManager {
    private static final IgniteLogger LOG = Loggers.forClass(ZoneSecondaryReplicationManager.class);
    private final SchemaManager schemaManager;
    private final TxManager txManager;
    private final ReplicaService replicaService;
    private final LowWatermark lowWatermark;
    private final ClockService clockService;
    private final TopologyService topologyService;
    private final CatalogService catalogService;
    private final PlacementDriver placementDriver;
    private final FailureProcessor failureProcessor;
    private final SecondaryZoneTablesProvider zoneTablesProvider;
    private final MetaStorageManager metaStorageManager;
    private final Executor ioExecutor;
    private final ConcurrentMap<ZonePartitionId, ZoneReplication> replicators = new ConcurrentHashMap<ZonePartitionId, ZoneReplication>();
    private final WatchListener syncStateListener = new SecondarySyncStatePrefixWatch();
    private final ConcurrentMap<Integer, SecondaryFullStateTransferHandler> fstHandlers = new ConcurrentHashMap<Integer, SecondaryFullStateTransferHandler>();

    public ZoneSecondaryReplicationManager(SchemaManager schemaManager, TxManager txManager, ReplicaService replicaService, LowWatermark lowWatermark, ClockService clockService, TopologyService topologyService, CatalogService catalogService, PlacementDriver placementDriver, FailureProcessor failureProcessor, SecondaryZoneTablesProvider zoneTablesProvider, MetaStorageManager metaStorageManager, Executor ioExecutor) {
        this.schemaManager = schemaManager;
        this.txManager = txManager;
        this.replicaService = replicaService;
        this.lowWatermark = lowWatermark;
        this.clockService = clockService;
        this.topologyService = topologyService;
        this.catalogService = catalogService;
        this.placementDriver = placementDriver;
        this.failureProcessor = failureProcessor;
        this.zoneTablesProvider = zoneTablesProvider;
        this.metaStorageManager = metaStorageManager;
        this.ioExecutor = ioExecutor;
    }

    @Override
    public void start() {
        this.metaStorageManager.registerPrefixWatch(new ByteArray("secondary.sync.state.".getBytes(StandardCharsets.UTF_8)), this.syncStateListener);
    }

    @Override
    public void stop() {
        this.metaStorageManager.unregisterWatch(this.syncStateListener);
    }

    @Override
    public void startReplicationForTable(TableViewInternal table, int catalogVersion) {
        assert (table.internalTable().hasSecondaryStorage()) : table.qualifiedName() + " has no secondary storage";
        LOG.info("Starting secondary replication for table [tableName={}].", new Object[]{table.qualifiedName()});
        Catalog catalog = this.catalogService.catalog(catalogVersion);
        CatalogSecondaryStorageState secondaryStorageState = ZoneSecondaryReplicationManager.resolveSecondaryStorageState(catalog, table.tableId());
        if (secondaryStorageState != CatalogSecondaryStorageState.AVAILABLE) {
            LOG.info("Skip starting secondary replication for table, since secondary storage is not available yet [tableId={}, state={}].", new Object[]{table.qualifiedName(), secondaryStorageState});
            return;
        }
        HybridTimestamp catalogActivationTime = HybridTimestamp.hybridTimestamp((long)catalog.time());
        CompletionStage catalogActivationFuture = this.clockService.waitFor(catalogActivationTime).thenApply(v -> catalogActivationTime);
        HybridTimestamp now = this.clockService.current();
        for (int i = 0; i < table.internalTable().partitions(); ++i) {
            this.startReplicationForTablePartition(table, i, (CompletableFuture<HybridTimestamp>)catalogActivationFuture, now);
        }
    }

    private void startReplicationForTablePartition(TableViewInternal table, int partitionId, CompletableFuture<HybridTimestamp> catalogActivationFuture, HybridTimestamp now) {
        UUID leaseHolderId;
        Integer secondaryZoneId = table.internalTable().secondaryZoneId();
        assert (secondaryZoneId != null) : "Secondary zone id is missing for table " + table.qualifiedName();
        ZonePartitionId zonePartitionId = new ZonePartitionId(secondaryZoneId.intValue(), partitionId);
        ReplicaMeta currentPrimaryReplica = this.placementDriver.getCurrentPrimaryReplica((ReplicationGroupId)zonePartitionId, now);
        UUID uUID = leaseHolderId = currentPrimaryReplica == null ? null : currentPrimaryReplica.getLeaseholderId();
        if (!this.thisNodeHoldsLease(leaseHolderId)) {
            return;
        }
        long enlistmentConsistencyToken = currentPrimaryReplica.getStartTime().longValue();
        this.replicators.compute(zonePartitionId, (id, existingZoneReplication) -> {
            CompletableFuture<ZoneSecondaryStorageReplicator> replicatorFuture;
            CompletableFuture<SecondaryStorageReplicator> tableReplicationFuture = this.startTablePartitionReplicationAfterCatalogActivation(table, partitionId, catalogActivationFuture);
            if (existingZoneReplication == null) {
                ZoneSecondaryStorageReplicator zoneReplicator = new ZoneSecondaryStorageReplicator();
                zoneReplicator.addReplication(table.tableId(), tableReplicationFuture);
                replicatorFuture = CompletableFuture.completedFuture(zoneReplicator);
            } else {
                replicatorFuture = existingZoneReplication.replicatorFuture.thenApply(replicator -> {
                    replicator.addReplication(table.tableId(), tableReplicationFuture);
                    return replicator;
                });
            }
            return new ZoneReplication(replicatorFuture, enlistmentConsistencyToken);
        });
    }

    private CompletableFuture<SecondaryStorageReplicator> startTablePartitionReplicationAfterCatalogActivation(TableViewInternal table, int partitionId, CompletableFuture<HybridTimestamp> catalogActivationFuture) {
        return ((CompletableFuture)catalogActivationFuture.thenApplyAsync(catalogActivationTime -> {
            SecondaryStorageReplicator tablePartitionReplicator = this.createReplicator(table, partitionId);
            tablePartitionReplicator.start();
            return tablePartitionReplicator;
        }, this.ioExecutor)).whenComplete((v, e) -> {
            if (e != null && !ExceptionUtils.hasCause((Throwable)e, (Class[])new Class[]{NodeStoppingException.class})) {
                String errorMessage = String.format("Unabled to start secondary storage replication for [tableName=%s, partitionId=%d].", table.qualifiedName(), partitionId);
                this.failureProcessor.process(new FailureContext(e, errorMessage));
            }
        });
    }

    @Override
    public CompletableFuture<Void> stopReplicationForTable(TableViewInternal table) {
        LOG.info("Stopping secondary replication for table [tableName={}].", new Object[]{table.qualifiedName()});
        int partitions = table.internalTable().partitions();
        CompletableFuture[] futures = new CompletableFuture[partitions];
        for (int i = 0; i < partitions; ++i) {
            futures[i] = this.stopReplicationForTablePartition(table, i);
        }
        return CompletableFuture.allOf(futures);
    }

    private CompletableFuture<Void> stopReplicationForTablePartition(TableViewInternal table, int partitionId) {
        CompletableFuture<Void> stopFuture = new CompletableFuture<Void>();
        Integer secondaryZoneId = table.internalTable().secondaryZoneId();
        assert (secondaryZoneId != null) : "Secondary zone id is missing for table " + table.qualifiedName();
        ZonePartitionId zonePartitionId = new ZonePartitionId(secondaryZoneId.intValue(), partitionId);
        this.replicators.compute(zonePartitionId, (id, zoneReplication) -> {
            if (zoneReplication == null) {
                stopFuture.complete(null);
                return null;
            }
            CompletionStage replicationFuture = ((CompletableFuture)zoneReplication.replicatorFuture.thenApply(zoneReplicator -> {
                CompletableFuture<SecondaryStorageReplicator> tableReplicationFuture = zoneReplicator.removeReplication(table.tableId());
                if (tableReplicationFuture == null) {
                    stopFuture.complete(null);
                } else {
                    ((CompletableFuture)tableReplicationFuture.thenAcceptAsync(SecondaryStorageReplicator::stop, this.ioExecutor)).whenComplete(CompletableFutures.copyStateTo((CompletableFuture)stopFuture));
                }
                return zoneReplicator;
            })).whenComplete((v, e) -> {
                if (e != null && !ExceptionUtils.hasCause((Throwable)e, (Class[])new Class[]{NodeStoppingException.class})) {
                    String errorMessage = String.format("Unabled to stop secondary storage replication for table [table=%s, partitionId=%d].", table.qualifiedName(), partitionId);
                    this.failureProcessor.process(new FailureContext(e, errorMessage));
                }
            });
            return new ZoneReplication((CompletableFuture<ZoneSecondaryStorageReplicator>)replicationFuture, zoneReplication.enlistmentConsistencyToken);
        });
        return stopFuture;
    }

    @Override
    public void startReplicationForPartition(ZonePartitionId zonePartitionId, long enlistmentConsistencyToken) {
        int zoneId = zonePartitionId.zoneId();
        int partitionId = zonePartitionId.partitionId();
        this.replicators.compute(zonePartitionId, (id, zoneReplication) -> {
            if (zoneReplication != null) {
                if (zoneReplication.enlistmentConsistencyToken != enlistmentConsistencyToken) {
                    throw new SecondaryReplicationFailureException("Replication already started for " + id);
                }
                LOG.info("Skipping secondary replication start as it has already been started [partitionId={}].", new Object[]{zonePartitionId});
                return zoneReplication;
            }
            CompletionStage replicationFuture = ((CompletableFuture)this.zoneTablesProvider.secondaryZoneTables(zoneId).thenApply(zoneTables -> {
                ZoneSecondaryStorageReplicator zoneReplicator = new ZoneSecondaryStorageReplicator();
                if (zoneTables.isEmpty()) {
                    return zoneReplicator;
                }
                Catalog latestCatalog = this.catalogService.catalog(this.catalogService.latestCatalogVersion());
                CompletableFuture<HybridTimestamp> catalogActivationFuture = this.waitForCatalogActivation(latestCatalog);
                zoneTables.forEach(table -> {
                    CatalogSecondaryStorageState secondaryStorageState = ZoneSecondaryReplicationManager.resolveSecondaryStorageState(latestCatalog, table.tableId());
                    if (secondaryStorageState == CatalogSecondaryStorageState.AVAILABLE) {
                        CompletableFuture<SecondaryStorageReplicator> tableReplicationFuture = this.startTablePartitionReplicationAfterCatalogActivation((TableViewInternal)table, partitionId, catalogActivationFuture);
                        zoneReplicator.addReplication(table.tableId(), tableReplicationFuture);
                    }
                });
                return zoneReplicator;
            })).whenComplete((v, e) -> {
                if (e != null && !ExceptionUtils.hasCause((Throwable)e, (Class[])new Class[]{NodeStoppingException.class})) {
                    String errorMessage = String.format("Unabled to start secondary storage replication for zone [zoneId=%d, partitionId=%d].", zoneId, partitionId);
                    this.failureProcessor.process(new FailureContext(e, errorMessage));
                }
            });
            return new ZoneReplication((CompletableFuture<ZoneSecondaryStorageReplicator>)replicationFuture, enlistmentConsistencyToken);
        });
    }

    @Override
    public CompletableFuture<Void> stopReplicationForPartition(ZonePartitionId zonePartitionId) {
        ZoneReplication replication = (ZoneReplication)this.replicators.remove(zonePartitionId);
        if (replication == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        LOG.info("Stopping secondary replication for zone partition [partitionId={}].", new Object[]{zonePartitionId});
        return ((CompletableFuture)replication.replicatorFuture.thenComposeAsync(ZoneSecondaryStorageReplicator::stop, this.ioExecutor)).whenComplete((v, e) -> {
            if (e != null && !ExceptionUtils.hasCause((Throwable)e, (Class[])new Class[]{NodeStoppingException.class})) {
                String errorMessage = String.format("Unabled to stop secondary storage replication for zone [zoneId=%d, partitionId=%d].", zonePartitionId.zoneId(), zonePartitionId.partitionId());
                this.failureProcessor.process(new FailureContext(e, errorMessage));
            }
        });
    }

    @Override
    public CompletableFuture<?> startFullStateTransferForTable(TableViewInternal table, Catalog catalog) {
        LOG.info("Starting full state transfer for table [tableId={}, tableName={}].", new Object[]{table.tableId(), table.qualifiedName()});
        int tableId = table.tableId();
        assert (ZoneSecondaryReplicationManager.resolveSecondaryStorageState(catalog, tableId) == CatalogSecondaryStorageState.SYNCING) : String.format("Secondary storage is in incorrect state to start full state transfer [tableId=%d, state=%s]", tableId, ZoneSecondaryReplicationManager.resolveSecondaryStorageState(catalog, tableId));
        long catalogTime = catalog.time();
        SecondaryFullStateTransferHandler fstHandler = new SecondaryFullStateTransferHandler(table, this.txManager, this.clockService, this.catalogService, this.replicaService, this.metaStorageManager, this.topologyService, this.failureProcessor, this.ioExecutor);
        SecondaryFullStateTransferHandler oldHandler = this.fstHandlers.put(tableId, fstHandler);
        assert (oldHandler == null) : "Handler should be created exactly once for a table";
        return fstHandler.startFullStateTransfer(catalogTime);
    }

    @Override
    public void abortFullStateTransferForTable(TableViewInternal table) {
        LOG.info("Aborting full state transfer for table [tableName={}].", new Object[]{table.qualifiedName()});
        SecondaryFullStateTransferHandler fstHandler = (SecondaryFullStateTransferHandler)this.fstHandlers.remove(table.tableId());
        if (fstHandler != null) {
            fstHandler.abortFullStateTransfer();
        }
    }

    @Override
    public CompletableFuture<Boolean> hasOngoingReplication(int zoneId, int tableId, int partitionId) {
        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, partitionId);
        ZoneReplication replication = (ZoneReplication)this.replicators.get(zonePartitionId);
        if (replication == null) {
            return CompletableFutures.falseCompletedFuture();
        }
        return replication.replicatorFuture.thenApply(zoneReplicator -> zoneReplicator.hasReplication(tableId));
    }

    @Override
    public boolean hasOngoingFullStateTransfer(int tableId, int partitionId) {
        return this.fstHandlers.get(tableId) != null;
    }

    private CompletableFuture<HybridTimestamp> waitForCatalogActivation(Catalog catalog) {
        HybridTimestamp latestCatalogActivationTime = HybridTimestamp.hybridTimestamp((long)catalog.time());
        return this.clockService.waitFor(latestCatalogActivationTime).thenApply(v -> latestCatalogActivationTime);
    }

    private boolean thisNodeHoldsLease(@Nullable UUID leaseholderId) {
        return this.topologyService.localMember().id().equals(leaseholderId);
    }

    private SecondaryStorageReplicator createReplicator(TableViewInternal table, int partitionId) {
        return new SecondaryStorageReplicator(this.replicaService, this.schemaManager, this.clockService, this.lowWatermark, this.failureProcessor, table, partitionId, this.ioExecutor);
    }

    private static CatalogSecondaryStorageState resolveSecondaryStorageState(Catalog catalog, int tableId) {
        CatalogTableDescriptor tableDescriptor = catalog.table(tableId);
        if (tableDescriptor == null) {
            return CatalogSecondaryStorageState.NOT_AVAILABLE;
        }
        return tableDescriptor.secondaryStorageState();
    }

    private final class SecondarySyncStatePrefixWatch
    implements WatchListener {
        private SecondarySyncStatePrefixWatch() {
        }

        public CompletableFuture<Void> onUpdate(WatchEvent event) {
            if (event.single()) {
                byte[] key = event.entryEvent().newEntry().key();
                assert (key != null) : "Keys for full state transfer sync states should be removed in batches, but ";
                TablePartitionId tablePartitionId = TablePartitionId.fromString((String)StringUtils.toStringWithoutPrefix((byte[])key, (int)"secondary.sync.state.".length()));
                int tableId = tablePartitionId.tableId();
                int partitionId = tablePartitionId.partitionId();
                byte[] value = event.entryEvent().newEntry().value();
                if (value != null && value.length > 0 && value[0] == 1) {
                    ZoneSecondaryReplicationManager.this.fstHandlers.computeIfPresent(tableId, (tblId, handler) -> {
                        assert (handler != null);
                        if (handler.completeFullStateTransferForPartition(partitionId)) {
                            return null;
                        }
                        return handler;
                    });
                }
            }
            return CompletableFutures.nullCompletedFuture();
        }
    }

    @FunctionalInterface
    public static interface SecondaryZoneTablesProvider {
        public CompletableFuture<Set<? extends TableViewInternal>> secondaryZoneTables(int var1);
    }

    private static class ZoneReplication {
        final CompletableFuture<ZoneSecondaryStorageReplicator> replicatorFuture;
        final long enlistmentConsistencyToken;

        ZoneReplication(CompletableFuture<ZoneSecondaryStorageReplicator> replicatorFuture, long enlistmentConsistencyToken) {
            this.replicatorFuture = replicatorFuture;
            this.enlistmentConsistencyToken = enlistmentConsistencyToken;
        }
    }
}

