package org.gridgain.internal.dcr;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite3.client.IgniteClient;
import org.apache.ignite3.internal.client.TcpIgniteClient;
import org.apache.ignite3.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.metastorage.MetaStorageManager;
import org.apache.ignite3.internal.metrics.MetricManager;
import org.apache.ignite3.internal.network.MessagingService;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.lang.IgniteException;
import org.apache.ignite3.table.IgniteTables;
import org.apache.ignite3.table.QualifiedName;
import org.gridgain.internal.dcr.exception.ReplicationAlreadyExistsException;
import org.gridgain.internal.dcr.exception.ReplicationAlreadyStartedForTableException;
import org.gridgain.internal.dcr.exception.ReplicationCreationException;
import org.gridgain.internal.dcr.exception.ReplicationException;
import org.gridgain.internal.dcr.exception.ReplicationInProgressException;
import org.gridgain.internal.dcr.exception.ReplicationNoSourceTableException;
import org.gridgain.internal.dcr.exception.ReplicationNoSourceTablesException;
import org.gridgain.internal.dcr.exception.ReplicationNotFoundException;
import org.gridgain.internal.dcr.exception.ReplicationSchemaSyncException;
import org.gridgain.internal.dcr.exception.ReplicationStartupException;
import org.gridgain.internal.dcr.exception.ReplicationStopException;
import org.gridgain.internal.dcr.message.DcrMessaging;
import org.gridgain.internal.dcr.message.ReplicationNodeInfo;
import org.gridgain.internal.dcr.metastorage.AuthConfigEntry;
import org.gridgain.internal.dcr.metastorage.DcrStorage;
import org.gridgain.internal.dcr.metastorage.Disposable;
import org.gridgain.internal.dcr.metastorage.ExceptionEntry;
import org.gridgain.internal.dcr.metastorage.ReplicationEntry;
import org.gridgain.internal.dcr.metastorage.SslConfigEntry;
import org.gridgain.internal.dcr.metrics.DcrMetricSource;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/gridgain/internal/dcr/DcrManagerImpl.class */
public class DcrManagerImpl implements DcrManager {
    private static final IgniteLogger LOG = Loggers.forClass(DcrManagerImpl.class);
    private final String nodeName;
    private final CompletableFuture<String> clusterNameFuture;
    private final DcrStorage store;
    private final DcrFailover failover;
    private final LocalReplicationManager dcrLocal;
    private final DcrMessaging dcrMessaging;
    private final MetricManager metricManager;
    private final IgniteTables tables;

    @Nullable
    private Disposable listenerDisposable;
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final DcrMetricSource metricSource = new DcrMetricSource();

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/gridgain/internal/dcr/DcrManagerImpl$ClientExceptionHandler.class */
    public interface ClientExceptionHandler {
        ReplicationException createException(String str, String str2, Throwable th);
    }

    public DcrManagerImpl(String str, CompletableFuture<String> completableFuture, IgniteTables igniteTables, MetaStorageManager metaStorageManager, LogicalTopologyService logicalTopologyService, MessagingService messagingService, TopologyService topologyService, MetricManager metricManager) {
        this.nodeName = str;
        this.clusterNameFuture = completableFuture;
        this.store = new DcrStorage(metaStorageManager);
        this.dcrLocal = new LocalReplicationManager(str, igniteTables, this.metricSource);
        this.failover = new DcrFailover(str, logicalTopologyService, topologyService, this.store);
        this.dcrMessaging = new DcrMessaging(this.dcrLocal, messagingService, topologyService);
        this.metricManager = metricManager;
        this.tables = igniteTables;
    }

    @Override // org.gridgain.internal.dcr.DcrManager
    public CompletableFuture<ReplicationInfo> createReplication(ReplicationOptions replicationOptions, boolean z) {
        return createReplicationEntryAsync(replicationOptions, z).thenCompose(replicationEntry -> {
            return this.store.put(replicationOptions.name(), replicationEntry).thenApply(bool -> {
                if (bool.booleanValue()) {
                    return toReplicationInfo(replicationOptions.name(), replicationEntry, null);
                }
                LOG.error("Failed to create replication {}", replicationOptions.name());
                throw new ReplicationAlreadyExistsException(replicationOptions.name());
            });
        });
    }

    private CompletableFuture<ReplicationEntry> createReplicationEntryAsync(ReplicationOptions replicationOptions, boolean z) {
        return buildClient(replicationOptions, this::handleCreateClientException).thenApply(igniteClient -> {
            try {
                ReplicationEntry build = ReplicationEntry.builder().sourceClusterAddresses(replicationOptions.sourceClusterAddresses()).authConfig(toAuthConfigEntry(replicationOptions.authConfig())).sslConfig(toSslConfigEntry(replicationOptions.sslConfig())).sourceClusterName(((TcpIgniteClient) igniteClient).clusterName()).replicationNodes(replicationOptions.replicationNodes()).workerNode(chooseWorkerNode(replicationOptions.replicationNodes())).status(z ? ReplicationStatus.REPLICATING : ReplicationStatus.STOPPED).build();
                if (igniteClient != null) {
                    igniteClient.close();
                }
                return build;
            } catch (Throwable th) {
                if (igniteClient != null) {
                    try {
                        igniteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    private static CompletableFuture<List<QualifiedName>> chooseTablesAsync(IgniteClient igniteClient, boolean z, @Nullable List<String> list) {
        if (!z) {
            return CompletableFuture.completedFuture(parseTableNames(list));
        }
        try {
            return igniteClient.tables().tablesAsync().thenApply(list2 -> {
                return (List) list2.stream().map((v0) -> {
                    return v0.qualifiedName();
                }).collect(Collectors.toList());
            });
        } catch (Throwable th) {
            return CompletableFuture.failedFuture(th);
        }
    }

    @Override // org.gridgain.internal.dcr.DcrManager
    public CompletableFuture<Boolean> startReplication(String str, String str2, @Nullable List<String> list, boolean z) {
        return this.store.get(str).thenCompose(replicationEntry -> {
            if (replicationEntry == null) {
                throw new ReplicationNotFoundException(str);
            }
            return buildClient(str, replicationEntry, this::handleStartClientException).thenCompose(igniteClient -> {
                return chooseTablesAsync(igniteClient, z, list).thenCompose(list2 -> {
                    return validateSourceTables(igniteClient, list2, replicationEntry.workerNode());
                }).whenComplete((BiConsumer<? super U, ? super Throwable>) (list3, th) -> {
                    igniteClient.close();
                });
            }).thenApply((Function<? super U, ? extends U>) list2 -> {
                if (replicationEntry.tableStatuses().isEmpty() || !replicationEntry.isRunning()) {
                    return list2;
                }
                List list2 = (List) replicationEntry.tableStatuses().entrySet().stream().filter(entry -> {
                    return entry.getValue() == ReplicationStatus.REPLICATING && list2.contains(QualifiedName.parse((String) entry.getKey()));
                }).map((v0) -> {
                    return v0.getKey();
                }).collect(Collectors.toList());
                if (list2.isEmpty()) {
                    return list2;
                }
                throw new ReplicationAlreadyStartedForTableException("Table '" + QualifiedName.parse((String) list2.get(0)).toCanonicalForm() + "' is already replicating.", this.nodeName, str, null);
            }).thenCompose(this::validateTargetTables).thenCompose(list3 -> {
                return this.store.update(str, replicationEntry -> {
                    if (list3.stream().allMatch(qualifiedName -> {
                        return replicationEntry.tableStatuses().get(qualifiedName.toCanonicalForm()) == ReplicationStatus.REPLICATING;
                    })) {
                        return null;
                    }
                    list3.forEach(qualifiedName2 -> {
                        replicationEntry.tableStatuses().put(qualifiedName2.toCanonicalForm(), ReplicationStatus.REPLICATING);
                    });
                    return replicationEntry.toBuilder().schema(str2).tableStatuses(replicationEntry.tableStatuses()).status(ReplicationStatus.REPLICATING).build();
                }, false);
            });
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (bool, th) -> {
            if (th != null) {
                DcrErrorHandler.handleFail(this.store, this.nodeName, str, th);
            }
        });
    }

    @Override // org.gridgain.internal.dcr.DcrManager
    public CompletableFuture<Boolean> stopReplication(String str, String str2, @Nullable List<String> list, boolean z) {
        return this.store.get(str).thenCompose(replicationEntry -> {
            if (replicationEntry == null) {
                throw new ReplicationNotFoundException(str);
            }
            return buildClient(str, replicationEntry, this::handleStopClientException).thenCompose(igniteClient -> {
                return chooseTablesAsync(igniteClient, z, list).whenComplete((list2, th) -> {
                    igniteClient.close();
                });
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) list2 -> {
                return this.store.update(str, replicationEntry -> {
                    if (!replicationEntry.isRunning() || replicationEntry.tableStatuses().keySet().stream().distinct().noneMatch(str3 -> {
                        return list2.contains(QualifiedName.parse(str3));
                    })) {
                        return null;
                    }
                    list2.forEach(qualifiedName -> {
                        replicationEntry.tableStatuses().put(qualifiedName.toCanonicalForm(), ReplicationStatus.STOPPED);
                    });
                    return replicationEntry.toBuilder().schema(str2).status(replicationEntry.tableStatuses().containsValue(ReplicationStatus.REPLICATING) ? ReplicationStatus.REPLICATING : ReplicationStatus.STOPPED).tableStatuses(replicationEntry.tableStatuses()).build();
                }, false);
            });
        });
    }

    @Override // org.gridgain.internal.dcr.DcrManager
    public CompletableFuture<Boolean> removeReplication(String str) {
        return this.store.remove(str, replicationEntry -> {
            if (replicationEntry.isRunning()) {
                throw new ReplicationInProgressException("Removing running replication is not supported. It's required to stop it before.", this.nodeName, str);
            }
            return true;
        });
    }

    @Override // org.gridgain.internal.dcr.DcrManager
    public CompletableFuture<ReplicationInfo> showReplication(String str) {
        return this.store.get(str).thenCompose(replicationEntry -> {
            return replicationEntry == null ? CompletableFutures.nullCompletedFuture() : this.dcrMessaging.status(str, replicationEntry.workerNode()).handle((replicationNodeInfo, th) -> {
                if (th == null) {
                    return replicationNodeInfo;
                }
                LOG.error("No replication node info on node {}", replicationEntry.workerNode());
                return null;
            }).thenApply((Function<? super U, ? extends U>) replicationNodeInfo2 -> {
                return toReplicationInfo(str, replicationEntry, replicationNodeInfo2);
            });
        });
    }

    @Override // org.gridgain.internal.dcr.DcrManager
    public CompletableFuture<Collection<ReplicationInfo>> listReplications() {
        return this.store.getAll().thenApply(map -> {
            return (Collection) map.entrySet().stream().map(entry -> {
                return toReplicationInfo((String) entry.getKey(), (ReplicationEntry) entry.getValue(), null);
            }).collect(Collectors.toSet());
        });
    }

    @Override // org.gridgain.internal.dcr.DcrManager
    public CompletableFuture<Boolean> flushReplication(String str, Instant instant) {
        return this.store.update(str, replicationEntry -> {
            return replicationEntry.toBuilder().flushPoint(instant).build();
        }, true);
    }

    @Override // org.apache.ignite3.internal.manager.IgniteComponent
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        this.listenerDisposable = this.store.registerStoreListener(new DcrStorageListenerImpl(this.nodeName, this.dcrLocal, this.store));
        this.failover.start();
        this.dcrMessaging.start();
        this.metricManager.registerSource(this.metricSource);
        this.metricManager.enable(this.metricSource);
        return CompletableFutures.nullCompletedFuture();
    }

    @Override // org.apache.ignite3.internal.manager.IgniteComponent
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.dcrLocal.stop();
        this.failover.stop();
        this.listenerDisposable = Disposable.dispose(this.listenerDisposable);
        this.metricManager.unregisterSource(this.metricSource);
        return CompletableFutures.nullCompletedFuture();
    }

    private String chooseWorkerNode(Set<String> set) {
        return set.isEmpty() ? this.nodeName : randomNode(set);
    }

    private static String randomNode(Set<String> set) {
        int nextInt = ThreadLocalRandom.current().nextInt(set.size());
        Iterator<String> it = set.iterator();
        for (int i = 0; i < nextInt; i++) {
            it.next();
        }
        return it.next();
    }

    private ReplicationInfo toReplicationInfo(String str, ReplicationEntry replicationEntry, @Nullable ReplicationNodeInfo replicationNodeInfo) {
        ExceptionEntry exceptionEntry = replicationEntry.exceptionEntry();
        return ReplicationInfo.builder().name(str).sourceClusterAddresses(replicationEntry.sourceClusterAddresses()).authConfig(AuthConfig.fromEntry(replicationEntry.authConfig())).sslConfig(SslConfig.fromEntry(replicationEntry.sslConfig())).sourceClusterName(replicationEntry.sourceClusterName()).targetClusterName(this.clusterNameFuture.join()).workerNodeName(replicationEntry.workerNode()).schemaName(replicationEntry.schema()).tableStatuses(replicationEntry.tableStatuses()).status(replicationEntry.replicationStatus()).fstProgress(replicationNodeInfo != null ? replicationNodeInfo.progress() : 0).exception(exceptionEntry != null ? exceptionEntry.buildException() : null).build();
    }

    @Nullable
    private static AuthConfigEntry toAuthConfigEntry(@Nullable AuthConfig authConfig) {
        if (authConfig == null) {
            return null;
        }
        return AuthConfigEntry.builder().username(authConfig.username()).password(authConfig.password()).build();
    }

    @Nullable
    private static SslConfigEntry toSslConfigEntry(@Nullable SslConfig sslConfig) {
        if (sslConfig == null) {
            return null;
        }
        return SslConfigEntry.builder().ciphers(sslConfig.ciphers()).keyStorePath(sslConfig.keyStorePath()).keyStorePassword(sslConfig.keyStorePassword()).trustStorePath(sslConfig.trustStorePath()).trustStorePassword(sslConfig.trustStorePassword()).build();
    }

    private ReplicationException handleCreateClientException(String str, String str2, Throwable th) {
        return new ReplicationCreationException(str2, this.nodeName, str, th);
    }

    private ReplicationException handleStartClientException(String str, String str2, Throwable th) {
        return new ReplicationStartupException(str2, this.nodeName, str, th);
    }

    private ReplicationException handleStopClientException(String str, String str2, Throwable th) {
        return new ReplicationStopException(str2, this.nodeName, str, th);
    }

    private static List<QualifiedName> parseTableNames(List<String> list) {
        return (List) list.stream().map(QualifiedName::parse).collect(Collectors.toList());
    }

    private static CompletableFuture<IgniteClient> buildClient(String str, ReplicationEntry replicationEntry, ClientExceptionHandler clientExceptionHandler) {
        return buildClient(str, (String[]) replicationEntry.sourceClusterAddresses().toArray(i -> {
            return new String[i];
        }), AuthConfig.fromEntry(replicationEntry.authConfig()), SslConfig.fromEntry(replicationEntry.sslConfig()), clientExceptionHandler);
    }

    private static CompletableFuture<IgniteClient> buildClient(ReplicationOptions replicationOptions, ClientExceptionHandler clientExceptionHandler) {
        return buildClient(replicationOptions.name(), (String[]) replicationOptions.sourceClusterAddresses().toArray(i -> {
            return new String[i];
        }), replicationOptions.authConfig(), replicationOptions.sslConfig(), clientExceptionHandler);
    }

    private static CompletableFuture<IgniteClient> buildClient(String str, String[] strArr, @Nullable AuthConfig authConfig, @Nullable SslConfig sslConfig, ClientExceptionHandler clientExceptionHandler) {
        return IgniteClient.builder().addresses(strArr).authenticator(AuthConfig.createAuthenticator(authConfig)).ssl(SslConfig.createSslConfiguration(sslConfig)).buildAsync().exceptionally(th -> {
            Throwable unwrapCause = ExceptionUtils.unwrapCause(th);
            LOG.error("Failed to create client for replication {}: ", unwrapCause, str);
            throw handleClientException(str, unwrapCause, clientExceptionHandler);
        });
    }

    private static ReplicationException handleClientException(String str, Throwable th, ClientExceptionHandler clientExceptionHandler) {
        Throwable findErrorByCode = findErrorByCode(ErrorGroups.Authentication.INVALID_CREDENTIALS_ERR, th);
        if (findErrorByCode != null) {
            return clientExceptionHandler.createException(str, "Cannot connect to the source cluster" + ": " + findErrorByCode.getMessage(), th);
        }
        Throwable findErrorByCode2 = findErrorByCode(ErrorGroups.Client.CLIENT_SSL_CONFIGURATION_ERR, th);
        return findErrorByCode2 != null ? clientExceptionHandler.createException(str, "Cannot connect to the source cluster" + ": " + findErrorByCode2.getMessage(), th) : clientExceptionHandler.createException(str, "Cannot connect to the source cluster" + ": " + th.getMessage(), th);
    }

    @Nullable
    private static Throwable findErrorByCode(int i, @Nullable Throwable th) {
        if (th != null) {
            return ((th instanceof IgniteException) && ((IgniteException) th).code() == i) ? th : findErrorByCode(i, th.getCause());
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static CompletableFuture<List<QualifiedName>> validateSourceTables(IgniteClient igniteClient, @Nullable List<QualifiedName> list, String str) {
        return list == null ? CompletableFuture.failedFuture(new ReplicationNoSourceTablesException("No tables to replicate.", str)) : validateTables(igniteClient.tables(), list, str2 -> {
            return new ReplicationNoSourceTableException(String.format("Tables %s does not exist on the source side.", str2), str, str2);
        });
    }

    private CompletableFuture<List<QualifiedName>> validateTargetTables(List<QualifiedName> list) {
        return validateTables(this.tables, list, str -> {
            return new ReplicationSchemaSyncException(String.format("Tables %s does not exist on the target side.", str), this.nodeName, str);
        });
    }

    private static CompletableFuture<List<QualifiedName>> validateTables(IgniteTables igniteTables, List<QualifiedName> list, Function<String, RuntimeException> function) {
        ArrayList arrayList = new ArrayList();
        return CompletableFuture.allOf((CompletableFuture[]) list.stream().map(qualifiedName -> {
            return igniteTables.tableAsync(qualifiedName).thenAccept(table -> {
                if (table == null) {
                    arrayList.add(qualifiedName);
                }
            });
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).thenApply(r6 -> {
            if (arrayList.isEmpty()) {
                return list;
            }
            throw ((RuntimeException) function.apply((String) arrayList.stream().map((v0) -> {
                return v0.toCanonicalForm();
            }).collect(Collectors.joining(","))));
        });
    }
}
