/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.table.distributed.expiration;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.ignite3.internal.catalog.Catalog;
import org.apache.ignite3.internal.catalog.CatalogService;
import org.apache.ignite3.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite3.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite3.internal.catalog.events.CatalogEvent;
import org.apache.ignite3.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite3.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite3.internal.components.NodeProperties;
import org.apache.ignite3.internal.components.SystemPropertiesNodeProperties;
import org.apache.ignite3.internal.event.EventListener;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.hlc.HybridTimestampTracker;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metrics.MetricManager;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.placementdriver.PlacementDriver;
import org.apache.ignite3.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite3.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite3.internal.replicator.LocalReplicaEvent;
import org.apache.ignite3.internal.replicator.LocalReplicaEventParameters;
import org.apache.ignite3.internal.replicator.PartitionGroupId;
import org.apache.ignite3.internal.replicator.ReplicaManager;
import org.apache.ignite3.internal.replicator.ReplicationGroupId;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.schema.SchemaManager;
import org.apache.ignite3.internal.table.distributed.expiration.ExpiredRowsCleaner;
import org.apache.ignite3.internal.table.distributed.expiration.PartitionCleanerTask;
import org.apache.ignite3.internal.table.distributed.expiration.PartitionTaskFactory;
import org.apache.ignite3.internal.table.distributed.expiration.ScheduledTaskWrapper;
import org.apache.ignite3.internal.table.distributed.expiration.configuration.ExpirationConfiguration;
import org.apache.ignite3.internal.table.distributed.expiration.metrics.ExpirationMetricSource;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.tx.TxManager;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.utils.InternalTableProvider;
import org.gridgain.internal.license.LicenseFeatureChecker;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class ExpiredRowsCleanerImpl
implements ExpiredRowsCleaner {
    private static final IgniteLogger LOG = Loggers.forClass(ExpiredRowsCleanerImpl.class);
    private final TopologyService topologyService;
    private final PlacementDriver placementDriver;
    private final ExpirationConfiguration expirationConfiguration;
    private final PartitionTaskFactory partitionTaskFactory;
    private final ScheduledExecutorService scheduledExecutorService;
    private final MetricManager metricManager;
    private final ReplicaManager replicaManager;
    private final CatalogService catalogService;
    private final NodeProperties nodeProperties;
    private volatile Semaphore semaphore;
    private final ExpirationMetricSource metrics = new ExpirationMetricSource();
    private final Map<PartitionGroupId, ScheduledTaskWrapper> monitoredTablePartitions = new ConcurrentHashMap<PartitionGroupId, ScheduledTaskWrapper>();
    private final Map<PartitionGroupId, Map<Integer, ScheduledTaskWrapper>> monitoredZonePartitions = new ConcurrentHashMap<PartitionGroupId, Map<Integer, ScheduledTaskWrapper>>();
    private final EventListener<CreateTableEventParameters> onTableCreate = EventListener.fromConsumer(this::onTableCreate);
    private final EventListener<DropTableEventParameters> onTableDrop = EventListener.fromConsumer(this::onTableDrop);
    private final EventListener<PrimaryReplicaEventParameters> startMonitorPartition = EventListener.fromConsumer(this::startMonitorPartition);
    private final EventListener<PrimaryReplicaEventParameters> stopMonitorPartition = EventListener.fromConsumer(this::stopMonitorPartition);
    private final EventListener<LocalReplicaEventParameters> removePartitionMetric = EventListener.fromConsumer(this::removePartitionMetric);

    public ExpiredRowsCleanerImpl(String igniteInstanceName, TopologyService topologyService, ClockService clockService, HybridTimestampTracker observableTimestampTracker, InternalTableProvider internalTableProvider, CatalogService catalogService, SchemaManager schemaManager, TxManager txManager, ExpirationConfiguration expirationConfiguration, PlacementDriver placementDriver, MetricManager metricManager, ReplicaManager replicaManager, LicenseFeatureChecker licenseFeatureChecker, NodeProperties nodeProperties) {
        this.topologyService = topologyService;
        this.placementDriver = placementDriver;
        this.expirationConfiguration = expirationConfiguration;
        this.metricManager = metricManager;
        this.replicaManager = replicaManager;
        this.catalogService = catalogService;
        this.nodeProperties = nodeProperties;
        this.scheduledExecutorService = ExpiredRowsCleanerImpl.createRowsCleanerExecutor(igniteInstanceName);
        this.partitionTaskFactory = (tableId, partitionKey) -> new PartitionCleanerTask(tableId, partitionKey, clockService, observableTimestampTracker, internalTableProvider, catalogService, schemaManager, txManager, topologyService.localMember(), (Integer)expirationConfiguration.batchSize().value(), this.semaphore, this.metrics, licenseFeatureChecker);
    }

    @TestOnly
    public ExpiredRowsCleanerImpl(String igniteInstanceName, TopologyService topologyService, PlacementDriver placementDriver, CatalogService catalogService, ExpirationConfiguration expirationConfiguration, PartitionTaskFactory partitionTaskFactory, MetricManager metricManager, ReplicaManager replicaManager) {
        this.topologyService = topologyService;
        this.placementDriver = placementDriver;
        this.expirationConfiguration = expirationConfiguration;
        this.partitionTaskFactory = partitionTaskFactory;
        this.metricManager = metricManager;
        this.replicaManager = replicaManager;
        this.catalogService = catalogService;
        this.nodeProperties = new SystemPropertiesNodeProperties();
        this.scheduledExecutorService = ExpiredRowsCleanerImpl.createRowsCleanerExecutor(igniteInstanceName);
    }

    private static ScheduledExecutorService createRowsCleanerExecutor(String igniteInstanceName) {
        return Executors.newSingleThreadScheduledExecutor(IgniteThreadFactory.create(igniteInstanceName, "expired-rows-cleaner", LOG, ThreadOperation.STORAGE_READ));
    }

    @Override
    public void start() {
        this.semaphore = new Semaphore((Integer)this.expirationConfiguration.parallelismLevel().value());
        if (this.nodeProperties.colocationEnabled()) {
            this.catalogService.listen(CatalogEvent.TABLE_CREATE, this.onTableCreate);
            this.catalogService.listen(CatalogEvent.TABLE_DROP, this.onTableDrop);
        }
        this.placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this.startMonitorPartition);
        this.placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.stopMonitorPartition);
        if (!this.nodeProperties.colocationEnabled()) {
            this.replicaManager.listen(LocalReplicaEvent.BEFORE_REPLICA_STOPPED, this.removePartitionMetric);
        }
        this.metricManager.registerSource(this.metrics);
        this.metricManager.enable(this.metrics);
    }

    private void startMonitorPartition(PrimaryReplicaEventParameters primaryReplicaEventParameters) {
        if (!primaryReplicaEventParameters.leaseholderId().equals(this.topologyService.localMember().id())) {
            return;
        }
        PartitionGroupId partitionKey = (PartitionGroupId)primaryReplicaEventParameters.groupId();
        if (!this.nodeProperties.colocationEnabled() && partitionKey instanceof ZonePartitionId) {
            return;
        }
        if (this.nodeProperties.colocationEnabled()) {
            this.startMonitorZonePartition((ZonePartitionId)partitionKey);
        } else {
            this.startMonitorPartition((TablePartitionId)partitionKey);
        }
    }

    private void startMonitorPartition(TablePartitionId partitionKey) {
        try {
            this.monitoredTablePartitions.computeIfAbsent(partitionKey, key -> this.createAndScheduleTableCleanerTask(partitionKey.tableId(), partitionKey));
        }
        catch (RejectedExecutionException rejectedExecutionException) {
            // empty catch block
        }
    }

    private ScheduledTaskWrapper createAndScheduleTableCleanerTask(int tableId, PartitionGroupId partitionKey) {
        PartitionCleanerTask partitionCleanerTask = this.partitionTaskFactory.create(tableId, partitionKey);
        ScheduledFuture<?> scheduledFuture = this.scheduledExecutorService.scheduleWithFixedDelay(partitionCleanerTask, 0L, (Long)this.expirationConfiguration.checkInterval().value(), TimeUnit.MILLISECONDS);
        return new ScheduledTaskWrapper(scheduledFuture, partitionCleanerTask);
    }

    private void stopMonitorPartition(PrimaryReplicaEventParameters primaryReplicaEventParameters) {
        PartitionGroupId partitionKey = (PartitionGroupId)primaryReplicaEventParameters.groupId();
        if (!this.nodeProperties.colocationEnabled() && partitionKey instanceof ZonePartitionId) {
            return;
        }
        if (!primaryReplicaEventParameters.leaseholderId().equals(this.topologyService.localMember().id())) {
            return;
        }
        if (this.nodeProperties.colocationEnabled()) {
            this.stopMonitorZonePartition((ZonePartitionId)partitionKey);
        } else {
            this.monitoredTablePartitions.computeIfPresent(partitionKey, (key, task) -> {
                task.cancel();
                return null;
            });
        }
    }

    private void removePartitionMetric(LocalReplicaEventParameters primaryReplicaEventParameters) {
        ReplicationGroupId partitionKey = primaryReplicaEventParameters.groupId();
        if (!(partitionKey instanceof TablePartitionId)) {
            return;
        }
        TablePartitionId tablePartitionKey = (TablePartitionId)partitionKey;
        int tableId = tablePartitionKey.tableId();
        int partitionId = tablePartitionKey.partitionId();
        this.removeTableMetric(tableId, partitionId, this.monitoredTablePartitions.get(partitionKey));
    }

    private void removeTableMetric(int tableId, int partitionId, @Nullable ScheduledTaskWrapper tableCleanupTaskWrapper) {
        if (tableCleanupTaskWrapper != null) {
            tableCleanupTaskWrapper.stopped().whenComplete((v, e) -> this.metrics.remove(tableId, partitionId));
        } else {
            this.metrics.remove(tableId, partitionId);
        }
    }

    private void startMonitorZonePartition(ZonePartitionId partitionKey) {
        assert (!this.monitoredZonePartitions.containsKey(partitionKey));
        int zoneId = partitionKey.zoneId();
        ConcurrentHashMap<Integer, ScheduledTaskWrapper> tasks = new ConcurrentHashMap<Integer, ScheduledTaskWrapper>();
        for (CatalogTableDescriptor tableDesc : this.catalogService.catalog(this.catalogService.latestCatalogVersion()).tables(zoneId)) {
            int tableId = tableDesc.id();
            try {
                tasks.put(tableId, this.createAndScheduleTableCleanerTask(tableId, partitionKey));
            }
            catch (RejectedExecutionException rejectedExecutionException) {}
        }
        this.monitoredZonePartitions.put(partitionKey, tasks);
    }

    private void stopMonitorZonePartition(ZonePartitionId partitionKey) {
        Map<Integer, ScheduledTaskWrapper> tasks = this.monitoredZonePartitions.remove(partitionKey);
        for (Map.Entry<Integer, ScheduledTaskWrapper> entry : tasks.entrySet()) {
            ExpiredRowsCleanerImpl.cancelTask(tasks, entry.getKey());
        }
    }

    private void onTableCreate(CreateTableEventParameters parameters) {
        int zoneId = parameters.tableDescriptor().zoneId();
        int tableId = parameters.tableId();
        CatalogZoneDescriptor zoneDesc = this.catalogService.catalog(parameters.catalogVersion()).zone(zoneId);
        assert (zoneDesc != null);
        int partitionsCount = zoneDesc.partitions();
        for (int partId = 0; partId < partitionsCount; ++partId) {
            this.monitoredZonePartitions.computeIfPresent(new ZonePartitionId(zoneId, partId), (partitionKey, tasks) -> {
                try {
                    tasks.put(tableId, this.createAndScheduleTableCleanerTask(tableId, (PartitionGroupId)partitionKey));
                }
                catch (RejectedExecutionException rejectedExecutionException) {
                    // empty catch block
                }
                return tasks;
            });
        }
    }

    private void onTableDrop(DropTableEventParameters parameters) {
        int tableId = parameters.tableId();
        Catalog catalog = this.catalogService.catalog(parameters.catalogVersion() - 1);
        CatalogTableDescriptor droppedTableDesc = catalog.table(tableId);
        assert (droppedTableDesc != null);
        int zoneId = droppedTableDesc.zoneId();
        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
        assert (zoneDescriptor != null);
        int partitionsCount = zoneDescriptor.partitions();
        for (int partId = 0; partId < partitionsCount; ++partId) {
            this.monitoredZonePartitions.computeIfPresent(new ZonePartitionId(zoneId, partId), (partitionKey, tasks) -> {
                this.removeTableMetric(tableId, partitionKey.partitionId(), ExpiredRowsCleanerImpl.cancelTask(tasks, tableId));
                return tasks;
            });
        }
    }

    @Nullable
    private static ScheduledTaskWrapper cancelTask(Map<Integer, ScheduledTaskWrapper> tasks, int tableId) {
        ScheduledTaskWrapper task = tasks.get(tableId);
        if (task != null) {
            task.cancel();
            task.stopped().thenAccept(v -> tasks.remove(tableId));
        }
        return task;
    }

    @Override
    public void stop() {
        IgniteUtils.shutdownAndAwaitTermination(this.scheduledExecutorService, 10L, TimeUnit.SECONDS);
        this.metricManager.unregisterSource(this.metrics);
        if (!this.nodeProperties.colocationEnabled()) {
            this.replicaManager.removeListener(LocalReplicaEvent.BEFORE_REPLICA_STOPPED, this.removePartitionMetric);
        }
        this.placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.stopMonitorPartition);
        this.placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this.startMonitorPartition);
        if (this.nodeProperties.colocationEnabled()) {
            this.catalogService.removeListener(CatalogEvent.TABLE_DROP, this.onTableDrop);
            this.catalogService.removeListener(CatalogEvent.TABLE_CREATE, this.onTableCreate);
        }
    }

    @TestOnly
    public Map<PartitionGroupId, ?> monitoredPartitions() {
        return this.nodeProperties.colocationEnabled() ? this.monitoredZonePartitions : this.monitoredTablePartitions;
    }
}

