package org.gridgain.control.agent.processor;

import java.lang.reflect.Type;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.cluster.IgniteClusterImpl;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.gridgain.control.agent.ControlCenterAgent;
import org.gridgain.control.agent.StompDestinationsUtils;
import org.gridgain.control.agent.configuration.DistributedWebSocketConfiguration;
import org.gridgain.control.agent.processor.lifecycle.ClusterLifecycleProcessor;
import org.gridgain.control.agent.transport.ControlCenterClient;
import org.gridgain.control.agent.transport.ws.ConnectionStatus;
import org.gridgain.control.agent.transport.ws.StompRouter;
import org.gridgain.control.agent.transport.ws.WebSocketConnectionFactory;
import org.gridgain.control.agent.transport.ws.WebSocketControlCenterClient;
import org.gridgain.control.agent.utils.AgentUtils;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/gridgain/control/agent/processor/AgentCoordinatorProcessor.class */
public class AgentCoordinatorProcessor extends GridProcessorAdapter implements ControlCenterClient {
    private static final String AGENT_COORDINATOR_LISTENER_ID = AgentCoordinatorProcessor.class.getName();
    private final DiscoveryEventListener nodeJoinedListener;
    private WebSocketControlCenterClient controlCenterClient;
    private final ClusterLifecycleProcessor clusterLifecycleProc;
    private final IgniteClusterImpl cluster;
    private final IgniteEx ignite;
    private final DistributedWebSocketConfiguration wsCfg;
    private final AgentConfigurationProcessor agentCfgProc;
    private final StompRouter stompRouter;

    @Nullable
    private volatile URI monitoringUri;

    public AgentCoordinatorProcessor(GridKernalContext gridKernalContext, ClusterLifecycleProcessor clusterLifecycleProcessor, AgentConfigurationProcessor agentConfigurationProcessor, DistributedWebSocketConfiguration distributedWebSocketConfiguration, StompRouter stompRouter, @Nullable URI uri) {
        super(gridKernalContext);
        this.nodeJoinedListener = this::nodeJoinedListener;
        this.clusterLifecycleProc = clusterLifecycleProcessor;
        this.ignite = gridKernalContext.grid();
        this.cluster = gridKernalContext.cluster().get();
        this.agentCfgProc = agentConfigurationProcessor;
        this.wsCfg = distributedWebSocketConfiguration;
        this.stompRouter = stompRouter;
        this.monitoringUri = uri;
    }

    public void start() throws IgniteCheckedException {
        U.log(this.log, "Starting Control Center agent on coordinator");
        GridResourceProcessor resource = this.ctx.resource();
        this.controlCenterClient = (WebSocketControlCenterClient) resource.resolve(new WebSocketControlCenterClient(this.log, this.stompRouter, (WebSocketConnectionFactory) resource.resolve(new WebSocketConnectionFactory(this.ctx, this.log, this.wsCfg)), this.agentCfgProc, this.wsCfg));
        this.controlCenterClient.onConnectionStatusChanged((connectionStatus, str) -> {
            switch (connectionStatus) {
                case CONNECTED:
                    this.agentCfgProc.updateAgentUrl(str);
                    this.ctx.event().addDiscoveryEventListener(this.nodeJoinedListener, 10, new int[0]);
                    if (this.monitoringUri == null || !str.equals(AgentUtils.extractConnectedUri(this.monitoringUri))) {
                        requestClusterToken().thenAccept(str -> {
                            URI monitoringUri = AgentUtils.monitoringUri(str, str);
                            sendMonitoringUri(AgentUtils.forAgentNodes(this.cluster), monitoringUri);
                            this.monitoringUri = monitoringUri;
                        });
                    } else {
                        sendMonitoringUri(AgentUtils.forAgentNodes(this.cluster), this.monitoringUri);
                    }
                    this.clusterLifecycleProc.onAgentConnectedStatusChanged(true);
                    return;
                case CONNECTING:
                    this.clusterLifecycleProc.onAgentConnectedStatusChanged(false);
                    return;
                case DISCONNECTED:
                    this.clusterLifecycleProc.onAgentConnectedStatusChanged(false);
                    this.ctx.event().removeDiscoveryEventListener(this.nodeJoinedListener, new int[]{10});
                    return;
                default:
                    return;
            }
        });
        this.wsCfg.onConfigurationUpdated(AGENT_COORDINATOR_LISTENER_ID, () -> {
            U.log(this.log, "Cluster secret or websocket config has been changed, will reconnect to Control Center");
            this.controlCenterClient.connectOrReconnect();
        });
        this.agentCfgProc.onConfigurationUpdated(AGENT_COORDINATOR_LISTENER_ID, (controlCenterAgentConfiguration, controlCenterAgentConfiguration2) -> {
            if (controlCenterAgentConfiguration.isConnectionChanged(controlCenterAgentConfiguration2)) {
                if (controlCenterAgentConfiguration2.isEnabled()) {
                    U.log(this.log, "The agent connection config has been changed, will reconnect to Control Center");
                } else {
                    U.log(this.log, "The agent was disabled in the configuration, shutdown");
                }
            }
        });
    }

    public void stop(boolean z) throws IgniteCheckedException {
        U.closeQuiet(this.controlCenterClient);
        this.wsCfg.unsubscribe(AGENT_COORDINATOR_LISTENER_ID);
        this.agentCfgProc.unsubscribe(AGENT_COORDINATOR_LISTENER_ID);
    }

    private void nodeJoinedListener(DiscoveryEvent discoveryEvent, DiscoCache discoCache) {
        if (AgentUtils.hasAgent(discoveryEvent.eventNode())) {
            ClusterGroup forNode = this.cluster.forNode(discoveryEvent.eventNode(), new ClusterNode[0]);
            this.ctx.pools().getSystemExecutorService().submit(() -> {
                this.clusterLifecycleProc.sendState(forNode);
                sendCurrentMonitoringUri(forNode);
            });
        }
    }

    private void sendMonitoringUri(ClusterGroup clusterGroup, URI uri) {
        try {
            this.ignite.message(clusterGroup).send(ControlCenterAgent.TOPIC_CONTROL_CENTER_CONNECTED, uri);
            this.ignite.message(clusterGroup).sendOrdered(ControlCenterAgent.TOPIC_CONTROL_CENTER_MONITORING_URI, uri, 0L);
        } catch (IgniteException e) {
        }
    }

    public void sendCurrentMonitoringUri(ClusterGroup clusterGroup) {
        if (this.monitoringUri != null) {
            sendMonitoringUri(clusterGroup, this.monitoringUri);
        }
    }

    public CompletableFuture<String> requestClusterToken() {
        return this.controlCenterClient.request(StompDestinationsUtils.buildClusterTokenTopic(this.cluster.id()), String.class);
    }

    @Override // org.gridgain.control.agent.transport.ControlCenterClient
    public ConnectionStatus getState() {
        return this.controlCenterClient.getState();
    }

    @Override // org.gridgain.control.agent.transport.ControlCenterClient
    public boolean send(String str, Object obj) {
        return this.controlCenterClient.send(str, obj);
    }

    @Override // org.gridgain.control.agent.transport.ControlCenterClient
    public <R> CompletableFuture<R> request(String str, Type type) {
        return this.controlCenterClient.request(str, type);
    }

    @Override // org.gridgain.control.agent.transport.ControlCenterClient
    public void onConnectionStatusChanged(BiConsumer<ConnectionStatus, String> biConsumer) {
        throw new UnsupportedOperationException();
    }

    @Override // org.gridgain.control.agent.transport.ControlCenterClient
    public void connectOrReconnect() {
        this.controlCenterClient.connectOrReconnect();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        stop(true);
    }
}
