/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.dcr;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite3.client.IgniteClient;
import org.apache.ignite3.internal.client.TcpIgniteClient;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.metastorage.MetaStorageManager;
import org.apache.ignite3.internal.metrics.MetricManager;
import org.apache.ignite3.internal.network.MessagingService;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.lang.IgniteException;
import org.apache.ignite3.table.IgniteTables;
import org.apache.ignite3.table.QualifiedName;
import org.apache.ignite3.table.Table;
import org.gridgain.internal.dcr.AuthConfig;
import org.gridgain.internal.dcr.DcrErrorHandler;
import org.gridgain.internal.dcr.DcrFailover;
import org.gridgain.internal.dcr.DcrManager;
import org.gridgain.internal.dcr.DcrStorageListenerImpl;
import org.gridgain.internal.dcr.LocalReplicationManager;
import org.gridgain.internal.dcr.ReplicationInfo;
import org.gridgain.internal.dcr.ReplicationOptions;
import org.gridgain.internal.dcr.ReplicationStatus;
import org.gridgain.internal.dcr.SslConfig;
import org.gridgain.internal.dcr.exception.ReplicationAlreadyExistsException;
import org.gridgain.internal.dcr.exception.ReplicationAlreadyStartedForTableException;
import org.gridgain.internal.dcr.exception.ReplicationCreationException;
import org.gridgain.internal.dcr.exception.ReplicationException;
import org.gridgain.internal.dcr.exception.ReplicationInProgressException;
import org.gridgain.internal.dcr.exception.ReplicationNoSourceTableException;
import org.gridgain.internal.dcr.exception.ReplicationNoSourceTablesException;
import org.gridgain.internal.dcr.exception.ReplicationNotFoundException;
import org.gridgain.internal.dcr.exception.ReplicationSchemaSyncException;
import org.gridgain.internal.dcr.exception.ReplicationStartupException;
import org.gridgain.internal.dcr.exception.ReplicationStopException;
import org.gridgain.internal.dcr.exception.ReplicationToSelfException;
import org.gridgain.internal.dcr.message.DcrMessaging;
import org.gridgain.internal.dcr.message.ReplicationNodeInfo;
import org.gridgain.internal.dcr.metastorage.AuthConfigEntry;
import org.gridgain.internal.dcr.metastorage.DcrStorage;
import org.gridgain.internal.dcr.metastorage.ExceptionEntry;
import org.gridgain.internal.dcr.metastorage.ReplicationEntry;
import org.gridgain.internal.dcr.metastorage.SslConfigEntry;
import org.gridgain.internal.lang.Disposable;
import org.jetbrains.annotations.Nullable;

public class DcrManagerImpl
implements DcrManager {
    private static final IgniteLogger LOG = Loggers.forClass(DcrManagerImpl.class);
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final String nodeName;
    private final CompletableFuture<String> clusterNameFuture;
    private final DcrStorage store;
    private final DcrFailover failover;
    private final LocalReplicationManager dcrLocal;
    private final DcrMessaging dcrMessaging;
    private final IgniteTables tables;
    @Nullable
    private Disposable listenerDisposable;

    public DcrManagerImpl(String nodeName, CompletableFuture<String> clusterNameFuture, IgniteTables tables, MetaStorageManager metaStorageManager, LogicalTopologyService logicalTopology, MessagingService messagingService, TopologyService topologyService, MetricManager metricManager, ClockService clockService) {
        this.nodeName = nodeName;
        this.clusterNameFuture = clusterNameFuture;
        this.store = new DcrStorage(metaStorageManager);
        this.dcrLocal = new LocalReplicationManager(nodeName, tables, metricManager, clockService);
        this.failover = new DcrFailover(nodeName, logicalTopology, topologyService, this.store);
        this.dcrMessaging = new DcrMessaging(this.dcrLocal, messagingService, topologyService);
        this.tables = tables;
    }

    @Override
    public CompletableFuture<ReplicationInfo> createReplication(ReplicationOptions options, boolean instantStart) {
        return ((CompletableFuture)this.createReplicationEntryAsync(options, instantStart).thenCompose(entry -> this.clusterNameFuture.thenApply(clusterName -> {
            if (Objects.equals(entry.sourceClusterName(), clusterName)) {
                throw new ReplicationToSelfException("Replication to self is not allowed.");
            }
            return entry;
        }))).thenCompose(entry -> this.store.put(options.name(), (ReplicationEntry)entry).thenApply(success -> {
            if (success.booleanValue()) {
                return this.toReplicationInfo(options.name(), (ReplicationEntry)entry, null);
            }
            LOG.error("Failed to create replication {}", options.name());
            throw new ReplicationAlreadyExistsException(options.name());
        }));
    }

    private CompletableFuture<ReplicationEntry> createReplicationEntryAsync(ReplicationOptions options, boolean instantStart) {
        return DcrManagerImpl.buildClient(options, this::handleCreateClientException).thenApply(client -> {
            try (IgniteClient igniteClient = client;){
                String clusterName = ((TcpIgniteClient)client).clusterName();
                String workerNode = this.chooseWorkerNode(options.replicationNodes());
                ReplicationEntry replicationEntry = ReplicationEntry.builder().sourceClusterAddresses(options.sourceClusterAddresses()).authConfig(DcrManagerImpl.toAuthConfigEntry(options.authConfig())).sslConfig(DcrManagerImpl.toSslConfigEntry(options.sslConfig())).sourceClusterName(clusterName).replicationNodes(options.replicationNodes()).workerNode(workerNode).status(instantStart ? ReplicationStatus.REPLICATING : ReplicationStatus.STOPPED).build();
                return replicationEntry;
            }
        });
    }

    private static CompletableFuture<List<QualifiedName>> chooseTablesAsync(IgniteClient client, boolean allTables, @Nullable List<String> tableList) {
        if (allTables) {
            try {
                return client.tables().tablesAsync().thenApply(tables -> tables.stream().map(Table::qualifiedName).collect(Collectors.toList()));
            }
            catch (Throwable e) {
                return CompletableFuture.failedFuture(e);
            }
        }
        return CompletableFuture.completedFuture(DcrManagerImpl.parseTableNames(tableList));
    }

    @Override
    public CompletableFuture<Boolean> startReplication(String name, String schema, @Nullable List<String> tables, boolean allTables) {
        return ((CompletableFuture)this.store.get(name).thenCompose(replicationEntry -> {
            if (replicationEntry == null) {
                throw new ReplicationNotFoundException(name);
            }
            return ((CompletableFuture)((CompletableFuture)((CompletableFuture)DcrManagerImpl.buildClient(name, replicationEntry, this::handleStartClientException).thenCompose(client -> ((CompletableFuture)DcrManagerImpl.chooseTablesAsync(client, allTables, tables).thenCompose(tableNames -> DcrManagerImpl.validateSourceTables(client, tableNames, replicationEntry.workerNode()))).whenComplete((tableList, throwable) -> client.close()))).thenApply(tableList -> {
                if (replicationEntry.tableStatuses().isEmpty() || !replicationEntry.isRunning()) {
                    return tableList;
                }
                List result = replicationEntry.tableStatuses().entrySet().stream().filter(entry -> entry.getValue() == ReplicationStatus.REPLICATING && tableList.contains(QualifiedName.parse((String)entry.getKey()))).map(Map.Entry::getKey).collect(Collectors.toList());
                if (!result.isEmpty()) {
                    throw new ReplicationAlreadyStartedForTableException("Table '" + QualifiedName.parse((String)result.get(0)).toCanonicalForm() + "' is already replicating.", this.nodeName, name, null);
                }
                return tableList;
            })).thenCompose(this::validateTargetTables)).thenCompose(tableList -> this.store.update(name, replication -> {
                if (tableList.stream().allMatch(tableName -> replication.tableStatuses().get(tableName.toCanonicalForm()) == ReplicationStatus.REPLICATING)) {
                    return null;
                }
                tableList.forEach(table -> replicationEntry.tableStatuses().put(table.toCanonicalForm(), ReplicationStatus.REPLICATING));
                return replication.toBuilder().schema(schema).tableStatuses(replicationEntry.tableStatuses()).status(ReplicationStatus.REPLICATING).build();
            }, false));
        })).whenComplete((tableList, throwable) -> {
            if (throwable != null) {
                DcrErrorHandler.handleFail(this.store, this.nodeName, name, throwable);
            }
        });
    }

    @Override
    public CompletableFuture<Boolean> stopReplication(String name, String schema, @Nullable List<String> tables, boolean allTables) {
        return this.store.get(name).thenCompose(replicationEntry -> {
            if (replicationEntry == null) {
                throw new ReplicationNotFoundException(name);
            }
            return ((CompletableFuture)DcrManagerImpl.buildClient(name, replicationEntry, this::handleStopClientException).thenCompose(client -> DcrManagerImpl.chooseTablesAsync(client, allTables, tables).whenComplete((tableList, throwable) -> client.close()))).thenCompose(tableList -> this.store.update(name, replication -> {
                if (!replication.isRunning()) {
                    return null;
                }
                if (replicationEntry.tableStatuses().keySet().stream().distinct().noneMatch(tableName -> tableList.contains(QualifiedName.parse(tableName)))) {
                    return null;
                }
                tableList.forEach(table -> replicationEntry.tableStatuses().put(table.toCanonicalForm(), ReplicationStatus.STOPPED));
                ReplicationStatus replicationStatus = replicationEntry.tableStatuses().containsValue((Object)ReplicationStatus.REPLICATING) ? ReplicationStatus.REPLICATING : ReplicationStatus.STOPPED;
                return replication.toBuilder().schema(schema).status(replicationStatus).tableStatuses(replicationEntry.tableStatuses()).build();
            }, false));
        });
    }

    @Override
    public CompletableFuture<Boolean> removeReplication(String name) {
        return this.store.remove(name, replicationEntry -> {
            if (replicationEntry.isRunning()) {
                throw new ReplicationInProgressException("Removing running replication is not supported. It's required to stop it before.", this.nodeName, name);
            }
            return true;
        });
    }

    @Override
    public CompletableFuture<@Nullable ReplicationInfo> showReplication(String name) {
        return this.store.get(name).thenCompose(replicationEntry -> {
            if (replicationEntry == null) {
                return CompletableFutures.nullCompletedFuture();
            }
            return ((CompletableFuture)this.dcrMessaging.status(name, replicationEntry.workerNode()).handle((status, throwable) -> {
                if (throwable != null) {
                    LOG.error("No replication node info on node {}", replicationEntry.workerNode());
                    return null;
                }
                return status;
            })).thenApply(replicationStatus -> this.toReplicationInfo(name, (ReplicationEntry)replicationEntry, (ReplicationNodeInfo)replicationStatus));
        });
    }

    @Override
    public CompletableFuture<Collection<ReplicationInfo>> listReplications() {
        return this.store.getAll().thenCompose(entries -> {
            ConcurrentHashMap.KeySetView result = ConcurrentHashMap.newKeySet();
            ArrayList futures = new ArrayList();
            entries.forEach((name, entry) -> {
                if (entry.isRunning()) {
                    futures.add(this.dcrMessaging.status((String)name, entry.workerNode()).thenAccept(info -> result.add(this.toReplicationInfo((String)name, (ReplicationEntry)entry, (ReplicationNodeInfo)info))));
                } else {
                    result.add(this.toReplicationInfo((String)name, (ReplicationEntry)entry, null));
                }
            });
            return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenApply(unused -> result);
        });
    }

    @Override
    public CompletableFuture<Boolean> flushReplication(String name, Instant timestamp) {
        return this.store.update(name, entry -> entry.toBuilder().flushPoint(timestamp).build(), true);
    }

    @Override
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        this.listenerDisposable = this.store.registerStoreListener(new DcrStorageListenerImpl(this.nodeName, this.dcrLocal, this.store));
        this.failover.start();
        this.dcrMessaging.start();
        return CompletableFutures.nullCompletedFuture();
    }

    @Override
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.dcrLocal.stop();
        this.failover.stop();
        this.listenerDisposable = Disposable.dispose(this.listenerDisposable);
        return CompletableFutures.nullCompletedFuture();
    }

    private String chooseWorkerNode(Set<String> replicationNodes) {
        if (replicationNodes.isEmpty()) {
            return this.nodeName;
        }
        return DcrManagerImpl.randomNode(replicationNodes);
    }

    private static String randomNode(Set<String> nodes) {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int nodesToSkip = ((Random)random).nextInt(nodes.size());
        Iterator<String> iterator = nodes.iterator();
        for (int i = 0; i < nodesToSkip; ++i) {
            iterator.next();
        }
        return iterator.next();
    }

    private ReplicationInfo toReplicationInfo(String name, ReplicationEntry replicationEntry, @Nullable ReplicationNodeInfo replicationNodeInfo) {
        ExceptionEntry exceptionEntry = replicationEntry.exceptionEntry();
        return ReplicationInfo.builder().name(name).sourceClusterAddresses(replicationEntry.sourceClusterAddresses()).authConfig(AuthConfig.fromEntry(replicationEntry.authConfig())).sslConfig(SslConfig.fromEntry(replicationEntry.sslConfig())).sourceClusterName(replicationEntry.sourceClusterName()).targetClusterName(this.clusterNameFuture.join()).workerNodeName(replicationEntry.workerNode()).schemaName(replicationEntry.schema()).tableStatuses(replicationEntry.tableStatuses()).status(replicationEntry.replicationStatus()).fstProgress(replicationNodeInfo != null ? replicationNodeInfo.progress() : 0).exception(exceptionEntry != null ? exceptionEntry.buildException() : null).build();
    }

    @Nullable
    private static AuthConfigEntry toAuthConfigEntry(@Nullable AuthConfig authConfig) {
        if (authConfig == null) {
            return null;
        }
        return AuthConfigEntry.builder().username(authConfig.username()).password(authConfig.password()).build();
    }

    @Nullable
    private static SslConfigEntry toSslConfigEntry(@Nullable SslConfig sslConfig) {
        if (sslConfig == null) {
            return null;
        }
        return SslConfigEntry.builder().ciphers(sslConfig.ciphers()).keyStorePath(sslConfig.keyStorePath()).keyStorePassword(sslConfig.keyStorePassword()).trustStorePath(sslConfig.trustStorePath()).trustStorePassword(sslConfig.trustStorePassword()).build();
    }

    private ReplicationException handleCreateClientException(String name, String message, Throwable cause) {
        return new ReplicationCreationException(message, this.nodeName, name, cause);
    }

    private ReplicationException handleStartClientException(String name, String message, Throwable cause) {
        return new ReplicationStartupException(message, this.nodeName, name, cause);
    }

    private ReplicationException handleStopClientException(String name, String message, Throwable cause) {
        return new ReplicationStopException(message, this.nodeName, name, cause);
    }

    private static List<QualifiedName> parseTableNames(List<String> tableList) {
        return tableList.stream().map(QualifiedName::parse).collect(Collectors.toList());
    }

    private static CompletableFuture<IgniteClient> buildClient(String name, ReplicationEntry replicationEntry, ClientExceptionHandler errorHandler) {
        return DcrManagerImpl.buildClient(name, (String[])replicationEntry.sourceClusterAddresses().toArray(String[]::new), AuthConfig.fromEntry(replicationEntry.authConfig()), SslConfig.fromEntry(replicationEntry.sslConfig()), errorHandler);
    }

    private static CompletableFuture<IgniteClient> buildClient(ReplicationOptions options, ClientExceptionHandler errorHandler) {
        return DcrManagerImpl.buildClient(options.name(), (String[])options.sourceClusterAddresses().toArray(String[]::new), options.authConfig(), options.sslConfig(), errorHandler);
    }

    private static CompletableFuture<IgniteClient> buildClient(String name, String[] sourceClusterAddresses, @Nullable AuthConfig authConfig, @Nullable SslConfig sslConfig, ClientExceptionHandler errorHandler) {
        return IgniteClient.builder().addresses(sourceClusterAddresses).authenticator(AuthConfig.createAuthenticator(authConfig)).ssl(SslConfig.createSslConfiguration(sslConfig)).buildAsync().exceptionally(exception -> {
            Throwable cause = ExceptionUtils.unwrapCause(exception);
            LOG.error("Failed to create client for replication {}: ", cause, name);
            throw DcrManagerImpl.handleClientException(name, cause, errorHandler);
        });
    }

    private static ReplicationException handleClientException(String name, Throwable throwable, ClientExceptionHandler errorHandler) {
        String prefix = "Cannot connect to the source cluster";
        Throwable cause = DcrManagerImpl.findErrorByCode(ErrorGroups.Authentication.INVALID_CREDENTIALS_ERR, throwable);
        if (cause != null) {
            return errorHandler.createException(name, prefix + ": " + cause.getMessage(), throwable);
        }
        cause = DcrManagerImpl.findErrorByCode(ErrorGroups.Client.CLIENT_SSL_CONFIGURATION_ERR, throwable);
        if (cause != null) {
            return errorHandler.createException(name, prefix + ": " + cause.getMessage(), throwable);
        }
        return errorHandler.createException(name, prefix + ": " + throwable.getMessage(), throwable);
    }

    @Nullable
    private static Throwable findErrorByCode(int code, @Nullable Throwable ex) {
        if (ex != null) {
            IgniteException igniteException;
            if (ex instanceof IgniteException && (igniteException = (IgniteException)ex).code() == code) {
                return ex;
            }
            return DcrManagerImpl.findErrorByCode(code, ex.getCause());
        }
        return null;
    }

    private static CompletableFuture<List<QualifiedName>> validateSourceTables(IgniteClient client, @Nullable List<QualifiedName> tableNames, String workerNode) {
        if (tableNames == null) {
            return CompletableFuture.failedFuture(new ReplicationNoSourceTablesException("No tables to replicate.", workerNode));
        }
        return DcrManagerImpl.validateTables(client.tables(), tableNames, missedTablesStr -> new ReplicationNoSourceTableException(String.format("Tables %s does not exist on the source side.", missedTablesStr), workerNode, (String)missedTablesStr));
    }

    private CompletableFuture<List<QualifiedName>> validateTargetTables(List<QualifiedName> tableNames) {
        return DcrManagerImpl.validateTables(this.tables, tableNames, missedTablesStr -> new ReplicationSchemaSyncException(String.format("Tables %s does not exist on the target side.", missedTablesStr), this.nodeName, (String)missedTablesStr));
    }

    private static CompletableFuture<List<QualifiedName>> validateTables(IgniteTables tables, List<QualifiedName> tableNames, Function<String, RuntimeException> error) {
        ArrayList missedTables = new ArrayList();
        return CompletableFuture.allOf((CompletableFuture[])tableNames.stream().map(tableName -> tables.tableAsync((QualifiedName)tableName).thenAccept(table -> {
            if (table == null) {
                missedTables.add(tableName);
            }
        })).toArray(CompletableFuture[]::new)).thenApply(unused -> {
            if (!missedTables.isEmpty()) {
                String missedTablesStr = missedTables.stream().map(QualifiedName::toCanonicalForm).collect(Collectors.joining(","));
                throw (RuntimeException)error.apply(missedTablesStr);
            }
            return tableNames;
        });
    }

    @FunctionalInterface
    static interface ClientExceptionHandler {
        public ReplicationException createException(String var1, String var2, Throwable var3);
    }
}

