/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.network.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.handler.ssl.SslContext;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.future.OrderingFuture;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.network.ChannelType;
import org.apache.ignite3.internal.network.ChannelTypeRegistry;
import org.apache.ignite3.internal.network.ClusterIdSupplier;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.NettyBootstrapFactory;
import org.apache.ignite3.internal.network.NetworkMessagesFactory;
import org.apache.ignite3.internal.network.RecipientLeftException;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.network.configuration.NetworkView;
import org.apache.ignite3.internal.network.configuration.SslView;
import org.apache.ignite3.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite3.internal.network.handshake.HandshakeManager;
import org.apache.ignite3.internal.network.netty.ChannelCreationListener;
import org.apache.ignite3.internal.network.netty.ConnectorKey;
import org.apache.ignite3.internal.network.netty.DefaultRecoveryDescriptorProvider;
import org.apache.ignite3.internal.network.netty.InNetworkObject;
import org.apache.ignite3.internal.network.netty.NettyClient;
import org.apache.ignite3.internal.network.netty.NettySender;
import org.apache.ignite3.internal.network.netty.NettyServer;
import org.apache.ignite3.internal.network.netty.NettyUtils;
import org.apache.ignite3.internal.network.recovery.DescriptorAcquiry;
import org.apache.ignite3.internal.network.recovery.RecoveryAcceptorHandshakeManager;
import org.apache.ignite3.internal.network.recovery.RecoveryAcceptorHandshakeManagerV2;
import org.apache.ignite3.internal.network.recovery.RecoveryDescriptor;
import org.apache.ignite3.internal.network.recovery.RecoveryDescriptorProvider;
import org.apache.ignite3.internal.network.recovery.RecoveryInitiatorHandshakeManager;
import org.apache.ignite3.internal.network.recovery.RecoveryInitiatorHandshakeManagerV2;
import org.apache.ignite3.internal.network.recovery.StaleIdDetector;
import org.apache.ignite3.internal.network.serialization.SerializationService;
import org.apache.ignite3.internal.network.ssl.SslContextProvider;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.internal.version.IgniteProductVersionSource;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class ConnectionManager
implements ChannelCreationListener {
    protected static final NetworkMessagesFactory FACTORY = new NetworkMessagesFactory();
    private static final IgniteLogger LOG = Loggers.forClass(ConnectionManager.class);
    public static final byte DIRECT_PROTOCOL_VERSION = 1;
    private static final int MAX_RETRIES_TO_OPEN_CHANNEL = 10;
    private final Bootstrap clientBootstrap;
    private final NettyServer server;
    private final Map<ConnectorKey<UUID>, NettySender> channels = new ConcurrentHashMap<ConnectorKey<UUID>, NettySender>();
    private final Map<ConnectorKey<InetSocketAddress>, NettyClient> clients = new ConcurrentHashMap<ConnectorKey<InetSocketAddress>, NettyClient>();
    private final SerializationService serializationService;
    private final List<Consumer<InNetworkObject>> listeners = new CopyOnWriteArrayList<Consumer<InNetworkObject>>();
    private final UUID nodeId;
    private final CompletableFuture<InternalClusterNode> localNodeFuture = new CompletableFuture();
    protected final NettyBootstrapFactory bootstrapFactory;
    protected final StaleIdDetector staleIdDetector;
    protected final ClusterIdSupplier clusterIdSupplier;
    private final AtomicBoolean started = new AtomicBoolean(false);
    protected final AtomicBoolean stopping = new AtomicBoolean(false);
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    protected final RecoveryDescriptorProvider descriptorProvider = new DefaultRecoveryDescriptorProvider();
    private final ExecutorService connectionMaintenanceExecutor;
    private final ChannelTypeRegistry channelTypeRegistry;
    protected final IgniteProductVersionSource productVersionSource;
    @Nullable
    private final SslContext clientSslContext;
    protected final TopologyService topologyService;
    protected final FailureProcessor failureProcessor;

    public ConnectionManager(NetworkView networkConfiguration, SerializationService serializationService, String nodeName, UUID nodeId, NettyBootstrapFactory bootstrapFactory, StaleIdDetector staleIdDetector, ClusterIdSupplier clusterIdSupplier, ChannelTypeRegistry channelTypeRegistry, IgniteProductVersionSource productVersionSource, TopologyService topologyService, FailureProcessor failureProcessor) {
        this.serializationService = serializationService;
        this.nodeId = nodeId;
        this.bootstrapFactory = bootstrapFactory;
        this.staleIdDetector = staleIdDetector;
        this.clusterIdSupplier = clusterIdSupplier;
        this.channelTypeRegistry = channelTypeRegistry;
        this.productVersionSource = productVersionSource;
        this.topologyService = topologyService;
        this.failureProcessor = failureProcessor;
        SslView ssl = networkConfiguration.ssl();
        this.clientSslContext = ssl.enabled() ? SslContextProvider.createClientSslContext(ssl) : null;
        this.server = new NettyServer(networkConfiguration, this::createAcceptorHandshakeManager, this::onMessage, serializationService, bootstrapFactory, ssl.enabled() ? SslContextProvider.createServerSslContext(ssl) : null);
        this.clientBootstrap = bootstrapFactory.createOutboundBootstrap();
        ThreadPoolExecutor maintenanceExecutor = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), IgniteThreadFactory.create(nodeName, "connection-maintenance", LOG, new ThreadOperation[0]));
        maintenanceExecutor.allowCoreThreadTimeOut(true);
        this.connectionMaintenanceExecutor = maintenanceExecutor;
    }

    public void start() throws IgniteInternalException {
        try {
            boolean wasStarted = this.started.getAndSet(true);
            if (wasStarted) {
                throw new IgniteInternalException("Attempted to start an already started connection manager");
            }
            if (this.stopped.get()) {
                throw new IgniteInternalException("Attempted to start an already stopped connection manager");
            }
            this.server.start().get();
            LOG.info("Server started [address={}]", this.server.address());
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            throw new IgniteInternalException(ExceptionUtils.extractCodeFrom(cause), "Failed to start the connection manager: " + cause.getMessage(), cause);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IgniteInternalException("Interrupted while starting the connection manager", (Throwable)e);
        }
    }

    public InetSocketAddress localBindAddress() {
        return (InetSocketAddress)this.server.address();
    }

    public OrderingFuture<NettySender> channel(UUID nodeId, ChannelType type, InetSocketAddress address) {
        return this.getChannelWithRetry(nodeId, type, address, 0);
    }

    private OrderingFuture<NettySender> getChannelWithRetry(UUID nodeId, ChannelType type, InetSocketAddress address, int attempt) {
        if (attempt > 10) {
            return OrderingFuture.failedFuture(new IllegalStateException("Too many attempts to open channel to node \"" + nodeId + "\", address=" + address));
        }
        return this.doGetChannel(nodeId, type, address).handle((res, ex) -> {
            if (ex instanceof ChannelAlreadyExistsException) {
                return this.getChannelWithRetry(((ChannelAlreadyExistsException)ex).nodeId(), type, address, attempt + 1);
            }
            if (ex != null && ex.getCause() instanceof ChannelAlreadyExistsException) {
                return this.getChannelWithRetry(((ChannelAlreadyExistsException)ex.getCause()).nodeId(), type, address, attempt + 1);
            }
            if (ex != null) {
                return OrderingFuture.failedFuture(ex);
            }
            assert (res != null);
            if (res.isOpen()) {
                return OrderingFuture.completedFuture(res);
            }
            return this.getChannelWithRetry(nodeId, type, address, attempt + 1);
        }).thenCompose(Function.identity());
    }

    private OrderingFuture<NettySender> doGetChannel(UUID nodeId, ChannelType type, InetSocketAddress address) {
        NettySender channel = this.channels.compute(new ConnectorKey<UUID>(nodeId, type), (key, sender) -> sender == null || !sender.isOpen() ? null : sender);
        if (channel != null) {
            return OrderingFuture.completedFuture(channel);
        }
        @Nullable NettyClient client = this.clients.compute(new ConnectorKey<InetSocketAddress>(address, type), (key, existingClient) -> ConnectionManager.isClientConnected(existingClient) ? existingClient : this.connect((InetSocketAddress)key.id(), key.type()));
        if (client == null) {
            return OrderingFuture.failedFuture(new NodeStoppingException("No outgoing connections are allowed as the node is stopping"));
        }
        return client.sender();
    }

    private static boolean isClientConnected(@Nullable NettyClient client) {
        return client != null && !client.failedToConnect() && !client.isDisconnected();
    }

    private void onMessage(InNetworkObject message) {
        this.listeners.forEach(consumer -> consumer.accept(message));
    }

    @Override
    public void handshakeFinished(NettySender channel) {
        ConnectorKey<UUID> key = new ConnectorKey<UUID>(channel.launchId(), this.channelTypeRegistry.get(channel.channelId()));
        NettySender oldChannel = this.channels.put(key, channel);
        assert (oldChannel == null || !oldChannel.isOpen()) : "Incorrect channel creation flow";
        if (this.staleIdDetector.isIdStale(channel.launchId())) {
            this.closeSenderAndDisposeDescriptor(channel, new RecipientLeftException("Recipient is stale [id=" + channel.launchId() + "]"));
            this.channels.remove(key, channel);
        } else if (this.stopping.get()) {
            this.closeSenderAndDisposeDescriptor(channel, new NodeStoppingException());
            this.channels.remove(key, channel);
        }
    }

    private void closeSenderAndDisposeDescriptor(NettySender sender, Exception exceptionToFailSendFutures) {
        this.connectionMaintenanceExecutor.submit(() -> sender.closeAsync().whenCompleteAsync((res, ex) -> {
            RecoveryDescriptor recoveryDescriptor = this.descriptorProvider.getRecoveryDescriptor(sender.consistentId(), sender.launchId(), sender.channelId());
            this.blockAndDisposeDescriptor(recoveryDescriptor, exceptionToFailSendFutures);
        }, (Executor)this.connectionMaintenanceExecutor));
    }

    @Nullable
    private NettyClient connect(InetSocketAddress address, ChannelType channelType) {
        if (this.stopping.get()) {
            return null;
        }
        NettyClient client = new NettyClient(address, this.serializationService, this.createInitiatorHandshakeManager(channelType.id()), this::onMessage, this.clientSslContext);
        client.start(this.clientBootstrap).whenComplete((sender, throwable) -> {
            if (throwable != null) {
                this.clients.remove(new ConnectorKey<InetSocketAddress>(address, channelType));
            }
        });
        return client;
    }

    public void addListener(Consumer<InNetworkObject> listener) {
        this.listeners.add(listener);
    }

    public void stop() {
        boolean wasStopped = this.stopped.getAndSet(true);
        if (wasStopped) {
            return;
        }
        assert (this.stopping.get());
        ArrayList<CompletableFuture<Void>> stopFutures = new ArrayList<CompletableFuture<Void>>(this.clients.values().stream().map(NettyClient::stop).collect(Collectors.toList()));
        stopFutures.add(this.server.stop());
        stopFutures.addAll(this.channels.values().stream().map(NettySender::closeAsync).collect(Collectors.toList()));
        stopFutures.add(this.disposeDescriptors());
        CompletableFuture<Void> finalStopFuture = CompletableFuture.allOf((CompletableFuture[])stopFutures.toArray(CompletableFuture[]::new));
        try {
            finalStopFuture.get();
        }
        catch (Exception e) {
            LOG.warn("Failed to stop connection manager [reason={}]", e.getMessage());
        }
        IgniteUtils.shutdownAndAwaitTermination(this.connectionMaintenanceExecutor, 10L, TimeUnit.SECONDS);
    }

    private CompletableFuture<Void> disposeDescriptors() {
        NodeStoppingException exceptionToFailSendFutures = new NodeStoppingException();
        Collection<RecoveryDescriptor> descriptors = this.descriptorProvider.getAllRecoveryDescriptors();
        ArrayList<CompletableFuture<Void>> disposeFutures = new ArrayList<CompletableFuture<Void>>(descriptors.size());
        for (RecoveryDescriptor descriptor : descriptors) {
            disposeFutures.add(this.blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures));
        }
        return CompletableFuture.allOf((CompletableFuture[])disposeFutures.toArray(CompletableFuture[]::new));
    }

    public boolean isStopped() {
        return this.stopped.get();
    }

    private HandshakeManager createInitiatorHandshakeManager(short connectionId) {
        InternalClusterNode localNode = Objects.requireNonNull((InternalClusterNode)this.localNodeFuture.getNow(null), "localNode not set");
        return this.newRecoveryInitiatorHandshakeManager(connectionId, localNode);
    }

    protected RecoveryInitiatorHandshakeManager newRecoveryInitiatorHandshakeManager(short connectionId, InternalClusterNode localNode) {
        return new RecoveryInitiatorHandshakeManagerV2(localNode, connectionId, this.descriptorProvider, this.bootstrapFactory.handshakeEventLoopSwitcher(), this.staleIdDetector, this.clusterIdSupplier, this, this.stopping::get, this.productVersionSource, this.topologyService, this.failureProcessor);
    }

    private HandshakeManager createAcceptorHandshakeManager() {
        InternalClusterNode localNode = this.waitForLocalNodeToBeSet();
        return this.newRecoveryAcceptorHandshakeManager(localNode);
    }

    protected RecoveryAcceptorHandshakeManager newRecoveryAcceptorHandshakeManager(InternalClusterNode localNode) {
        return new RecoveryAcceptorHandshakeManagerV2(localNode, FACTORY, this.descriptorProvider, this.bootstrapFactory.handshakeEventLoopSwitcher(), this.staleIdDetector, this.clusterIdSupplier, this, this.stopping::get, this.productVersionSource, this.topologyService);
    }

    private InternalClusterNode waitForLocalNodeToBeSet() {
        try {
            return this.localNodeFuture.get(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for local node to be set", e);
        }
        catch (ExecutionException | TimeoutException e) {
            throw new RuntimeException("Could not finish awaiting for local node", e);
        }
    }

    @TestOnly
    public NettyServer server() {
        return this.server;
    }

    @TestOnly
    public SerializationService serializationService() {
        return this.serializationService;
    }

    public UUID nodeId() {
        return this.nodeId;
    }

    @TestOnly
    public Collection<NettyClient> clients() {
        return Collections.unmodifiableCollection(this.clients.values());
    }

    @TestOnly
    public Map<ConnectorKey<UUID>, NettySender> channels() {
        return Map.copyOf(this.channels);
    }

    public void initiateStopping() {
        this.stopping.set(true);
    }

    public CompletableFuture<Void> handleNodeLeft(UUID id) {
        assert (this.staleIdDetector.isIdStale(id)) : id + " is not stale yet";
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.connectionMaintenanceExecutor.execute(() -> this.closeChannelsWith(id).whenCompleteAsync((res, ex) -> {
            this.disposeRecoveryDescriptorsOfLeftNode(id);
            future.complete(null);
        }, (Executor)this.connectionMaintenanceExecutor));
        return future;
    }

    private CompletableFuture<Void> closeChannelsWith(UUID id) {
        List entriesToRemove = this.channels.entrySet().stream().filter(entry -> ((NettySender)entry.getValue()).launchId().equals(id)).collect(Collectors.toList());
        ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<CompletableFuture<Void>>();
        for (Map.Entry entry2 : entriesToRemove) {
            closeFutures.add(((NettySender)entry2.getValue()).closeAsync());
            this.channels.remove(entry2.getKey());
        }
        return CompletableFuture.allOf((CompletableFuture[])closeFutures.toArray(CompletableFuture[]::new));
    }

    private void disposeRecoveryDescriptorsOfLeftNode(UUID id) {
        RecipientLeftException exceptionToFailSendFutures = new RecipientLeftException("Recipient left [id=" + id + "]");
        for (RecoveryDescriptor descriptor : this.descriptorProvider.getRecoveryDescriptorsByLaunchId(id)) {
            this.blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures);
        }
    }

    private CompletableFuture<Void> blockAndDisposeDescriptor(RecoveryDescriptor descriptor, Exception exceptionToFailSendFutures) {
        while (!descriptor.tryBlockForever(exceptionToFailSendFutures)) {
            if (descriptor.isBlockedForever()) {
                return CompletableFutures.nullCompletedFuture();
            }
            DescriptorAcquiry acquiry = descriptor.holder();
            if (acquiry == null) continue;
            Channel channel = acquiry.channel();
            assert (channel != null);
            return ((CompletableFuture)NettyUtils.toCompletableFuture(channel.close()).handleAsync((res, e) -> this.blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures), (Executor)this.connectionMaintenanceExecutor)).thenCompose(Function.identity());
        }
        descriptor.dispose(exceptionToFailSendFutures);
        return CompletableFutures.nullCompletedFuture();
    }

    public void setLocalNode(InternalClusterNode localNode) {
        this.localNodeFuture.complete(localNode);
    }
}

