package org.apache.ignite.internal.network.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.recovery.DescriptorAcquiry;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
import org.apache.ignite.internal.network.recovery.StaleIdDetector;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/* loaded from: input_file:org/apache/ignite/internal/network/netty/ConnectionManager.class */
public class ConnectionManager implements ChannelCreationListener {
    private static final NetworkMessagesFactory FACTORY;
    private static final IgniteLogger LOG;
    public static final byte DIRECT_PROTOCOL_VERSION = 1;
    private static final int MAX_RETRIES_TO_OPEN_CHANNEL = 10;
    private final Bootstrap clientBootstrap;
    private final NettyServer server;
    private final Map<ConnectorKey<String>, NettySender> channels;
    private final Map<ConnectorKey<InetSocketAddress>, NettyClient> clients;
    private final SerializationService serializationService;
    private final List<Consumer<InNetworkObject>> listeners;
    private final String consistentId;
    private final CompletableFuture<ClusterNode> localNodeFuture;
    private final NettyBootstrapFactory bootstrapFactory;
    private final StaleIdDetector staleIdDetector;
    private final ClusterIdSupplier clusterIdSupplier;

    @Nullable
    private final RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactory;
    private final AtomicBoolean started;
    private final AtomicBoolean stopping;
    private final AtomicBoolean stopped;
    private final RecoveryDescriptorProvider descriptorProvider;
    private final NetworkView networkConfiguration;
    private final ExecutorService connectionMaintenanceExecutor;
    private final FailureProcessor failureProcessor;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ConnectionManager(NetworkView networkView, SerializationService serializationService, String str, NettyBootstrapFactory nettyBootstrapFactory, StaleIdDetector staleIdDetector, ClusterIdSupplier clusterIdSupplier, FailureProcessor failureProcessor) {
        this(networkView, serializationService, str, nettyBootstrapFactory, staleIdDetector, clusterIdSupplier, null, failureProcessor);
    }

    public ConnectionManager(NetworkView networkView, SerializationService serializationService, String str, NettyBootstrapFactory nettyBootstrapFactory, StaleIdDetector staleIdDetector, ClusterIdSupplier clusterIdSupplier, @Nullable RecoveryClientHandshakeManagerFactory recoveryClientHandshakeManagerFactory, FailureProcessor failureProcessor) {
        this.channels = new ConcurrentHashMap();
        this.clients = new ConcurrentHashMap();
        this.listeners = new CopyOnWriteArrayList();
        this.localNodeFuture = new CompletableFuture<>();
        this.started = new AtomicBoolean(false);
        this.stopping = new AtomicBoolean(false);
        this.stopped = new AtomicBoolean(false);
        this.descriptorProvider = new DefaultRecoveryDescriptorProvider();
        this.serializationService = serializationService;
        this.consistentId = str;
        this.bootstrapFactory = nettyBootstrapFactory;
        this.staleIdDetector = staleIdDetector;
        this.clusterIdSupplier = clusterIdSupplier;
        this.clientHandshakeManagerFactory = recoveryClientHandshakeManagerFactory;
        this.networkConfiguration = networkView;
        this.failureProcessor = failureProcessor;
        this.server = new NettyServer(networkView, this::createServerHandshakeManager, this::onMessage, serializationService, nettyBootstrapFactory);
        this.clientBootstrap = nettyBootstrapFactory.createClientBootstrap();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) NamedThreadFactory.create(str, "connection-maintenance", LOG));
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        this.connectionMaintenanceExecutor = threadPoolExecutor;
    }

    public void start() throws IgniteInternalException {
        try {
            if (this.started.getAndSet(true)) {
                throw new IgniteInternalException("Attempted to start an already started connection manager");
            }
            if (this.stopped.get()) {
                throw new IgniteInternalException("Attempted to start an already stopped connection manager");
            }
            this.server.start().get();
            LOG.info("Server started [address={}]", new Object[]{this.server.address()});
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IgniteInternalException("Interrupted while starting the connection manager", e);
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            throw new IgniteInternalException("Failed to start the connection manager: " + cause.getMessage(), cause);
        }
    }

    public InetSocketAddress localAddress() {
        return (InetSocketAddress) this.server.address();
    }

    public OrderingFuture<NettySender> channel(@Nullable String str, ChannelType channelType, InetSocketAddress inetSocketAddress) {
        return getChannelWithRetry(str, channelType, inetSocketAddress, 0);
    }

    private OrderingFuture<NettySender> getChannelWithRetry(@Nullable String str, ChannelType channelType, InetSocketAddress inetSocketAddress, int i) {
        return i > 10 ? OrderingFuture.failedFuture(new IllegalStateException("Too many attempts to open channel to " + str)) : doGetChannel(str, channelType, inetSocketAddress).handle((nettySender, th) -> {
            if (th instanceof ChannelAlreadyExistsException) {
                return getChannelWithRetry(((ChannelAlreadyExistsException) th).consistentId(), channelType, inetSocketAddress, i + 1);
            }
            if (th != null && (th.getCause() instanceof ChannelAlreadyExistsException)) {
                return getChannelWithRetry(((ChannelAlreadyExistsException) th.getCause()).consistentId(), channelType, inetSocketAddress, i + 1);
            }
            if (th != null) {
                return OrderingFuture.failedFuture(th);
            }
            if ($assertionsDisabled || nettySender != null) {
                return nettySender.isOpen() ? OrderingFuture.completedFuture(nettySender) : getChannelWithRetry(nettySender.consistentId(), channelType, inetSocketAddress, i + 1);
            }
            throw new AssertionError();
        }).thenCompose(Function.identity());
    }

    private OrderingFuture<NettySender> doGetChannel(@Nullable String str, ChannelType channelType, InetSocketAddress inetSocketAddress) {
        NettySender compute;
        if (str != null && (compute = this.channels.compute(new ConnectorKey<>(str, channelType), (connectorKey, nettySender) -> {
            if (nettySender == null || !nettySender.isOpen()) {
                return null;
            }
            return nettySender;
        })) != null) {
            return OrderingFuture.completedFuture(compute);
        }
        NettyClient compute2 = this.clients.compute(new ConnectorKey<>(inetSocketAddress, channelType), (connectorKey2, nettyClient) -> {
            return isClientConnected(nettyClient) ? nettyClient : connect((InetSocketAddress) connectorKey2.id(), connectorKey2.type());
        });
        return compute2 == null ? OrderingFuture.failedFuture(new NodeStoppingException("No outgoing connections are allowed as the node is stopping")) : compute2.sender();
    }

    private static boolean isClientConnected(@Nullable NettyClient nettyClient) {
        return (nettyClient == null || nettyClient.failedToConnect() || nettyClient.isDisconnected()) ? false : true;
    }

    private void onMessage(InNetworkObject inNetworkObject) {
        this.listeners.forEach(consumer -> {
            consumer.accept(inNetworkObject);
        });
    }

    @Override // org.apache.ignite.internal.network.netty.ChannelCreationListener
    public void handshakeFinished(NettySender nettySender) {
        ConnectorKey<String> connectorKey = new ConnectorKey<>(nettySender.consistentId(), ChannelType.getChannel(nettySender.channelId()));
        NettySender put = this.channels.put(connectorKey, nettySender);
        if (!$assertionsDisabled && put != null && put.isOpen()) {
            throw new AssertionError("Incorrect channel creation flow");
        }
        if (this.staleIdDetector.isIdStale(nettySender.launchId())) {
            closeSenderAndDisposeDescriptor(nettySender, new RecipientLeftException());
            this.channels.remove(connectorKey, nettySender);
        } else if (this.stopping.get()) {
            closeSenderAndDisposeDescriptor(nettySender, new NodeStoppingException());
            this.channels.remove(connectorKey, nettySender);
        }
    }

    private void closeSenderAndDisposeDescriptor(NettySender nettySender, Exception exc) {
        this.connectionMaintenanceExecutor.submit(() -> {
            nettySender.closeAsync().whenCompleteAsync((r8, th) -> {
                blockAndDisposeDescriptor(this.descriptorProvider.getRecoveryDescriptor(nettySender.consistentId(), UUID.fromString(nettySender.launchId()), nettySender.channelId()), exc);
            }, (Executor) this.connectionMaintenanceExecutor);
        });
    }

    @Nullable
    private NettyClient connect(InetSocketAddress inetSocketAddress, ChannelType channelType) {
        if (this.stopping.get()) {
            return null;
        }
        NettyClient nettyClient = new NettyClient(inetSocketAddress, this.serializationService, createClientHandshakeManager(channelType.id()), this::onMessage, this.networkConfiguration.ssl());
        nettyClient.start(this.clientBootstrap).whenComplete((nettySender, th) -> {
            if (th != null) {
                this.clients.remove(new ConnectorKey(inetSocketAddress, channelType));
            }
        });
        return nettyClient;
    }

    public void addListener(Consumer<InNetworkObject> consumer) {
        this.listeners.add(consumer);
    }

    public void stop() {
        if (this.stopped.getAndSet(true)) {
            return;
        }
        if (!$assertionsDisabled && !this.stopping.get()) {
            throw new AssertionError();
        }
        ArrayList arrayList = new ArrayList((Collection) this.clients.values().stream().map((v0) -> {
            return v0.stop();
        }).collect(Collectors.toList()));
        arrayList.add(this.server.stop());
        arrayList.addAll((Collection) this.channels.values().stream().map((v0) -> {
            return v0.closeAsync();
        }).collect(Collectors.toList()));
        arrayList.add(disposeDescriptors());
        try {
            CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(i -> {
                return new CompletableFuture[i];
            })).get();
        } catch (Exception e) {
            LOG.warn("Failed to stop connection manager [reason={}]", new Object[]{e.getMessage()});
        }
        IgniteUtils.shutdownAndAwaitTermination(this.connectionMaintenanceExecutor, 10L, TimeUnit.SECONDS);
    }

    private CompletableFuture<Void> disposeDescriptors() {
        NodeStoppingException nodeStoppingException = new NodeStoppingException();
        Collection<RecoveryDescriptor> allRecoveryDescriptors = this.descriptorProvider.getAllRecoveryDescriptors();
        ArrayList arrayList = new ArrayList(allRecoveryDescriptors.size());
        Iterator<RecoveryDescriptor> it = allRecoveryDescriptors.iterator();
        while (it.hasNext()) {
            arrayList.add(blockAndDisposeDescriptor(it.next(), nodeStoppingException));
        }
        return CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    public boolean isStopped() {
        return this.stopped.get();
    }

    private HandshakeManager createClientHandshakeManager(short s) {
        ClusterNode clusterNode = (ClusterNode) Objects.requireNonNull(this.localNodeFuture.getNow(null), "localNode not set");
        if (this.clientHandshakeManagerFactory != null) {
            return this.clientHandshakeManagerFactory.create(clusterNode, s, this.descriptorProvider);
        }
        RecoveryDescriptorProvider recoveryDescriptorProvider = this.descriptorProvider;
        NettyBootstrapFactory nettyBootstrapFactory = this.bootstrapFactory;
        StaleIdDetector staleIdDetector = this.staleIdDetector;
        ClusterIdSupplier clusterIdSupplier = this.clusterIdSupplier;
        AtomicBoolean atomicBoolean = this.stopping;
        Objects.requireNonNull(atomicBoolean);
        return new RecoveryClientHandshakeManager(clusterNode, s, recoveryDescriptorProvider, nettyBootstrapFactory, staleIdDetector, clusterIdSupplier, this, atomicBoolean::get, this.failureProcessor);
    }

    private HandshakeManager createServerHandshakeManager() {
        waitForLocalNodeToBeSet();
        ClusterNode join = this.localNodeFuture.join();
        NetworkMessagesFactory networkMessagesFactory = FACTORY;
        RecoveryDescriptorProvider recoveryDescriptorProvider = this.descriptorProvider;
        NettyBootstrapFactory nettyBootstrapFactory = this.bootstrapFactory;
        StaleIdDetector staleIdDetector = this.staleIdDetector;
        ClusterIdSupplier clusterIdSupplier = this.clusterIdSupplier;
        AtomicBoolean atomicBoolean = this.stopping;
        Objects.requireNonNull(atomicBoolean);
        return new RecoveryServerHandshakeManager(join, networkMessagesFactory, recoveryDescriptorProvider, nettyBootstrapFactory, staleIdDetector, clusterIdSupplier, this, atomicBoolean::get, this.failureProcessor);
    }

    private void waitForLocalNodeToBeSet() {
        try {
            this.localNodeFuture.get(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for local node to be set", e);
        } catch (ExecutionException | TimeoutException e2) {
            throw new RuntimeException("Could not finish awaiting for local node", e2);
        }
    }

    @TestOnly
    public NettyServer server() {
        return this.server;
    }

    @TestOnly
    public SerializationService serializationService() {
        return this.serializationService;
    }

    public String consistentId() {
        return this.consistentId;
    }

    @TestOnly
    public Collection<NettyClient> clients() {
        return Collections.unmodifiableCollection(this.clients.values());
    }

    @TestOnly
    public Map<ConnectorKey<String>, NettySender> channels() {
        return Map.copyOf(this.channels);
    }

    public void initiateStopping() {
        this.stopping.set(true);
    }

    public void handleNodeLeft(String str) {
        if (!$assertionsDisabled && !this.staleIdDetector.isIdStale(str)) {
            throw new AssertionError(str + " is not stale yet");
        }
        this.connectionMaintenanceExecutor.execute(() -> {
            closeChannelsWith(str).whenCompleteAsync((r5, th) -> {
                disposeRecoveryDescriptorsOfLeftNode(str);
            }, (Executor) this.connectionMaintenanceExecutor);
        });
    }

    private CompletableFuture<Void> closeChannelsWith(String str) {
        List<Map.Entry> list = (List) this.channels.entrySet().stream().filter(entry -> {
            return ((NettySender) entry.getValue()).launchId().equals(str);
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry2 : list) {
            arrayList.add(((NettySender) entry2.getValue()).closeAsync());
            this.channels.remove(entry2.getKey());
        }
        return CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    private void disposeRecoveryDescriptorsOfLeftNode(String str) {
        RecipientLeftException recipientLeftException = new RecipientLeftException();
        Iterator<RecoveryDescriptor> it = this.descriptorProvider.getRecoveryDescriptorsByLaunchId(UUID.fromString(str)).iterator();
        while (it.hasNext()) {
            blockAndDisposeDescriptor(it.next(), recipientLeftException);
        }
    }

    private CompletableFuture<Void> blockAndDisposeDescriptor(RecoveryDescriptor recoveryDescriptor, Exception exc) {
        while (!recoveryDescriptor.tryBlockForever(exc)) {
            if (recoveryDescriptor.isBlockedForever()) {
                return CompletableFutures.nullCompletedFuture();
            }
            DescriptorAcquiry holder = recoveryDescriptor.holder();
            if (holder != null) {
                Channel channel = holder.channel();
                if ($assertionsDisabled || channel != null) {
                    return NettyUtils.toCompletableFuture(channel.close()).handleAsync((r7, th) -> {
                        return blockAndDisposeDescriptor(recoveryDescriptor, exc);
                    }, (Executor) this.connectionMaintenanceExecutor).thenCompose(Function.identity());
                }
                throw new AssertionError();
            }
        }
        recoveryDescriptor.dispose(exc);
        return CompletableFutures.nullCompletedFuture();
    }

    public void setLocalNode(ClusterNode clusterNode) {
        this.localNodeFuture.complete(clusterNode);
    }

    static {
        $assertionsDisabled = !ConnectionManager.class.desiredAssertionStatus();
        FACTORY = new NetworkMessagesFactory();
        LOG = Loggers.forClass(ConnectionManager.class);
    }
}
