/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.dcr;

import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite3.client.IgniteClient;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metrics.MetricManager;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.table.IgniteTables;
import org.gridgain.internal.dcr.AuthConfig;
import org.gridgain.internal.dcr.ReplicationStartOptions;
import org.gridgain.internal.dcr.SslConfig;
import org.gridgain.internal.dcr.event.FstProgressEvent;
import org.gridgain.internal.dcr.event.ReplicationEvent;
import org.gridgain.internal.dcr.event.ReplicationEventHandler;
import org.gridgain.internal.dcr.event.TableEvent;
import org.gridgain.internal.dcr.exception.ReplicationException;
import org.gridgain.internal.dcr.exception.ReplicationStartupException;
import org.gridgain.internal.dcr.message.ReplicationNodeInfo;
import org.gridgain.internal.dcr.metrics.DcrMetricSource;
import org.gridgain.internal.dcr.table.TableManager;
import org.gridgain.internal.dcr.table.TableReplication;
import org.jetbrains.annotations.Nullable;

public class Replication {
    private static final IgniteLogger LOG = Loggers.forClass(Replication.class);
    private final String name;
    private final String nodeName;
    private final IgniteTables tables;
    private final ReplicationStartOptions options;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final MetricManager metricManager;
    private final ClockService clockService;
    private final Map<String, DcrMetricSource> metricSourceByTableName = new ConcurrentHashMap<String, DcrMetricSource>();
    @Nullable
    private Map<String, TableReplication> tableReplications;
    @Nullable
    private Map<String, FstProgressEvent> progress;
    @Nullable
    private volatile IgniteClient client;
    @Nullable
    private volatile ExecutorService fstExecutor;

    public Replication(String nodeName, String name, IgniteTables tables, ReplicationStartOptions options, MetricManager metricManager, ClockService clockService) {
        this.name = name;
        this.nodeName = nodeName;
        this.tables = tables;
        this.options = options;
        this.metricManager = metricManager;
        this.clockService = clockService;
    }

    public CompletableFuture<Void> start(ReplicationEventHandler<ReplicationEvent> handler, List<String> tablesToStart) {
        if (this.client != null) {
            if (this.tableReplications != null && this.tableReplications.keySet().containsAll(tablesToStart)) {
                LOG.info("Resume replication for tables {}", tablesToStart);
                AtomicInteger counter = new AtomicInteger(tablesToStart.size());
                tablesToStart.forEach(tableToStart -> this.tableReplications.get(tableToStart).start(Replication.tableEventReplicationEventHandler(handler, counter)));
            } else {
                return CompletableFutures.nullCompletedFuture();
            }
        }
        try {
            LOG.info("Start replication for tables {}", tablesToStart);
            return IgniteClient.builder().addresses((String[])this.options.sourceClusterAddresses().toArray(String[]::new)).authenticator(AuthConfig.createAuthenticator(this.options.authConfig())).ssl(SslConfig.createSslConfiguration(this.options.sslConfig())).buildAsync().thenAccept(client -> this.doStartReplication((IgniteClient)client, handler, tablesToStart));
        }
        catch (Exception e) {
            LOG.warn("Failed to start replication {}.", e, this.name);
            handler.handle(ReplicationEvent.failed(new ReplicationStartupException(e.getMessage(), this.nodeName, this.name, (Throwable)e)));
            return CompletableFutures.nullCompletedFuture();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doStartReplication(IgniteClient client, ReplicationEventHandler<ReplicationEvent> handler, List<String> tablesToStart) {
        this.lock.writeLock().lock();
        try {
            ExecutorService fstExecutor;
            this.client = client;
            this.fstExecutor = fstExecutor = Executors.newFixedThreadPool(tablesToStart.size(), IgniteThreadFactory.create(this.nodeName, "dcr-" + this.name, LOG, new ThreadOperation[0]));
            HashMap<String, TableReplication> replications = new HashMap<String, TableReplication>();
            for (String tableName : tablesToStart) {
                if (this.progress == null) {
                    this.progress = new ConcurrentHashMap<String, FstProgressEvent>();
                }
                TableManager tableManager = new TableManager(this.nodeName, tableName, client, this.tables);
                ReplicationEventHandler<FstProgressEvent> progressHandler = event -> this.progress.put(tableName, (FstProgressEvent)event);
                DcrMetricSource dcrMetricSource = this.registerMetricSource(this.name, tableName);
                TableReplication tableReplication = new TableReplication(this.nodeName, tableManager, fstExecutor, this.options.flushPoint(), progressHandler, dcrMetricSource, this.clockService);
                replications.put(tableName, tableReplication);
            }
            this.tableReplications = Collections.unmodifiableMap(replications);
            AtomicInteger counter = new AtomicInteger(this.tableReplications.size());
            for (TableReplication tableReplication : this.tableReplications.values()) {
                tableReplication.start(Replication.tableEventReplicationEventHandler(handler, counter));
            }
        }
        catch (Exception e) {
            this.stop(tablesToStart);
            LOG.warn("Failed to start replication {}.", e, this.name);
            handler.handle(ReplicationEvent.failed(e instanceof ReplicationException ? e : new ReplicationStartupException(e.getMessage(), this.nodeName, this.name, (Throwable)e)));
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flush(Instant flushPoint) {
        this.lock.readLock().lock();
        try {
            if (this.tableReplications != null) {
                for (TableReplication tableReplication : this.tableReplications.values()) {
                    tableReplication.updateFlushPoint(flushPoint);
                }
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopAll() {
        this.lock.writeLock().lock();
        try {
            try {
                if (this.tableReplications != null) {
                    this.tableReplications.values().forEach(TableReplication::stop);
                    this.tableReplications.keySet().forEach(this::unregisterMetricSource);
                }
            }
            finally {
                ExecutorService fstExecutor = this.fstExecutor;
                if (fstExecutor != null) {
                    IgniteUtils.shutdownAndAwaitTermination(fstExecutor, 10L, TimeUnit.SECONDS);
                }
                this.fstExecutor = null;
                this.closeClient();
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public void stop(List<String> tablesToStop) {
        this.lock.writeLock().lock();
        try {
            if (this.tableReplications != null) {
                if (new HashSet<String>(tablesToStop).containsAll(this.tableReplications.keySet())) {
                    this.stopAll();
                } else {
                    tablesToStop.forEach(tableName -> {
                        TableReplication tableReplication = this.tableReplications.get(tableName);
                        if (tableReplication != null) {
                            tableReplication.stop();
                            this.unregisterMetricSource((String)tableName);
                        }
                    });
                }
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public ReplicationNodeInfo replicationNodeInfo() {
        return new ReplicationNodeInfo(this.progress());
    }

    private void closeClient() {
        IgniteClient client = this.client;
        if (client == null) {
            return;
        }
        try {
            client.close();
            this.client = null;
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private int progress() {
        if (this.progress == null || this.tableReplications == null || this.tableReplications.isEmpty()) {
            return 0;
        }
        long totalCount = 0L;
        long replicatedCount = 0L;
        for (FstProgressEvent fstProgressEvent : this.progress.values()) {
            replicatedCount += fstProgressEvent.replicatedEntries();
            totalCount += fstProgressEvent.totalEntriesCount();
        }
        if (totalCount == 0L) {
            return 100;
        }
        return (int)(replicatedCount * 100L / totalCount);
    }

    private DcrMetricSource registerMetricSource(String replicationName, String fullQualifiedTableName) {
        return this.metricSourceByTableName.computeIfAbsent(fullQualifiedTableName, unused -> {
            DcrMetricSource metricSource = new DcrMetricSource(replicationName, fullQualifiedTableName);
            this.metricManager.registerSource(metricSource);
            this.metricManager.enable(metricSource);
            return metricSource;
        });
    }

    private void unregisterMetricSource(String fullQualifiedTableName) {
        DcrMetricSource metricSource = this.metricSourceByTableName.get(fullQualifiedTableName);
        if (metricSource != null) {
            this.metricManager.unregisterSource(metricSource);
            this.metricSourceByTableName.remove(fullQualifiedTableName);
        }
    }

    private static ReplicationEventHandler<TableEvent> tableEventReplicationEventHandler(ReplicationEventHandler<ReplicationEvent> handler, AtomicInteger counter) {
        return event -> {
            switch (event.state()) {
                case FINISHED: {
                    if (counter.decrementAndGet() != 0) break;
                    handler.handle(ReplicationEvent.finished());
                    break;
                }
                case FST_FINISHED: {
                    break;
                }
                case FAILED: {
                    LOG.error("Replication failed with error: ", event.error());
                    handler.handle(ReplicationEvent.failed(event.error()));
                    break;
                }
                default: {
                    LOG.error("Replication received unrecognized table event type: {}", new Object[]{event.state()});
                    handler.handle(ReplicationEvent.failed(new IllegalStateException("Unrecognized replication table event type.")));
                }
            }
        };
    }
}

