package org.apache.ignite3.internal.metrics.messaging;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.manager.ComponentContext;
import org.apache.ignite3.internal.manager.IgniteComponent;
import org.apache.ignite3.internal.metrics.MetricManager;
import org.apache.ignite3.internal.metrics.message.MetricDisableRequest;
import org.apache.ignite3.internal.metrics.message.MetricDisableResponse;
import org.apache.ignite3.internal.metrics.message.MetricEnableRequest;
import org.apache.ignite3.internal.metrics.message.MetricEnableResponse;
import org.apache.ignite3.internal.metrics.message.MetricSourceDto;
import org.apache.ignite3.internal.metrics.message.MetricSourcesRequest;
import org.apache.ignite3.internal.metrics.message.MetricSourcesResponse;
import org.apache.ignite3.internal.network.MessagingService;
import org.apache.ignite3.internal.network.NetworkMessage;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.network.ClusterNode;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite3/internal/metrics/messaging/MetricMessaging.class */
public class MetricMessaging implements IgniteComponent {
    private static final long NETWORK_TIMEOUT_MILLIS = Long.MAX_VALUE;
    private final MetricManager metricManager;
    private final MessagingService messagingService;
    private final TopologyService topologyService;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final MetricMessagesFactory messagesFactory = new MetricMessagesFactory();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

    public MetricMessaging(MetricManager metricManager, MessagingService messagingService, TopologyService topologyService) {
        this.metricManager = metricManager;
        this.messagingService = messagingService;
        this.topologyService = topologyService;
    }

    @Override // org.apache.ignite3.internal.manager.IgniteComponent
    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        this.messagingService.addMessageHandler(MetricMessageTypes.class, (networkMessage, clusterNode, l) -> {
            if (!$assertionsDisabled && l == null) {
                throw new AssertionError();
            }
            if (!this.busyLock.enterBusy()) {
                sendException(networkMessage, clusterNode, ((Long) Objects.requireNonNull(l, "correlationId is null")).longValue(), new IgniteInternalException(ErrorGroups.Common.NODE_STOPPING_ERR, new NodeStoppingException()));
                return;
            }
            try {
                processRequest(networkMessage, clusterNode, ((Long) Objects.requireNonNull(l)).longValue());
                this.busyLock.leaveBusy();
            } catch (Throwable th) {
                this.busyLock.leaveBusy();
                throw th;
            }
        });
        return CompletableFutures.nullCompletedFuture();
    }

    @Override // org.apache.ignite3.internal.manager.IgniteComponent
    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        this.busyLock.block();
        return CompletableFutures.nullCompletedFuture();
    }

    private void sendException(NetworkMessage networkMessage, ClusterNode clusterNode, long j, IgniteInternalException igniteInternalException) {
        if (networkMessage instanceof MetricEnableRequest) {
            sendEnableResponse(igniteInternalException, clusterNode, j);
        } else if (networkMessage instanceof MetricDisableRequest) {
            sendDisableResponse(igniteInternalException, clusterNode, j);
        } else if (networkMessage instanceof MetricSourcesRequest) {
            sendSourcesResponse(null, igniteInternalException, clusterNode, j);
        }
    }

    private void processRequest(NetworkMessage networkMessage, ClusterNode clusterNode, long j) {
        if (networkMessage instanceof MetricEnableRequest) {
            processEnableRequest((MetricEnableRequest) networkMessage, clusterNode, j);
        } else if (networkMessage instanceof MetricDisableRequest) {
            processDisableRequest((MetricDisableRequest) networkMessage, clusterNode, j);
        } else if (networkMessage instanceof MetricSourcesRequest) {
            processSourcesRequest(clusterNode, j);
        }
    }

    private void processEnableRequest(MetricEnableRequest metricEnableRequest, ClusterNode clusterNode, long j) {
        try {
            this.metricManager.enable(metricEnableRequest.sourceName());
            sendEnableResponse(null, clusterNode, j);
        } catch (IllegalStateException e) {
            sendEnableResponse(e, clusterNode, j);
        }
    }

    private void sendEnableResponse(@Nullable Throwable th, ClusterNode clusterNode, long j) {
        respond(clusterNode, this.messagesFactory.metricEnableResponse().throwable(th).build(), j);
    }

    private void processDisableRequest(MetricDisableRequest metricDisableRequest, ClusterNode clusterNode, long j) {
        try {
            this.metricManager.disable(metricDisableRequest.sourceName());
            sendDisableResponse(null, clusterNode, j);
        } catch (IllegalStateException e) {
            sendDisableResponse(e, clusterNode, j);
        }
    }

    private void sendDisableResponse(@Nullable Throwable th, ClusterNode clusterNode, long j) {
        respond(clusterNode, this.messagesFactory.metricDisableResponse().throwable(th).build(), j);
    }

    private void processSourcesRequest(ClusterNode clusterNode, long j) {
        sendSourcesResponse((List) this.metricManager.metricSources().stream().map(metricSource -> {
            return new MetricSourceDto(metricSource.name(), metricSource.enabled());
        }).collect(Collectors.toList()), null, clusterNode, j);
    }

    private void sendSourcesResponse(@Nullable Collection<MetricSourceDto> collection, @Nullable Throwable th, ClusterNode clusterNode, long j) {
        respond(clusterNode, this.messagesFactory.metricSourcesResponse().sources(collection).throwable(th).build(), j);
    }

    public CompletableFuture<Void> broadcastMetricEnableAsync(String str) {
        return broadcastAsync(clusterNode -> {
            return remoteMetricEnableAsync(clusterNode, str);
        });
    }

    private CompletableFuture<Void> remoteMetricEnableAsync(ClusterNode clusterNode, String str) {
        return invoke(clusterNode, this.messagesFactory.metricEnableRequest().sourceName(str).build()).thenCompose(MetricMessaging::fromEnableResponse);
    }

    private static CompletableFuture<Void> fromEnableResponse(NetworkMessage networkMessage) {
        Throwable throwable = ((MetricEnableResponse) networkMessage).throwable();
        return throwable != null ? CompletableFuture.failedFuture(throwable) : CompletableFutures.nullCompletedFuture();
    }

    public CompletableFuture<Void> broadcastMetricDisableAsync(String str) {
        return broadcastAsync(clusterNode -> {
            return remoteMetricDisableAsync(clusterNode, str);
        });
    }

    private CompletableFuture<Void> remoteMetricDisableAsync(ClusterNode clusterNode, String str) {
        return invoke(clusterNode, this.messagesFactory.metricDisableRequest().sourceName(str).build()).thenCompose(MetricMessaging::fromDisableResponse);
    }

    private static CompletableFuture<Void> fromDisableResponse(NetworkMessage networkMessage) {
        Throwable throwable = ((MetricDisableResponse) networkMessage).throwable();
        return throwable != null ? CompletableFuture.failedFuture(throwable) : CompletableFutures.nullCompletedFuture();
    }

    public CompletableFuture<Map<String, Collection<MetricSourceDto>>> broadcastMetricSourcesAsync() {
        ArrayList arrayList = new ArrayList(this.topologyService.allMembers());
        return CompletableFutures.allOfToList((CompletableFuture[]) arrayList.stream().map(this::remoteMetricSourcesAsync).toArray(i -> {
            return new CompletableFuture[i];
        })).thenApply(list -> {
            HashMap hashMap = new HashMap();
            for (int i2 = 0; i2 < arrayList.size(); i2++) {
                hashMap.put(((ClusterNode) arrayList.get(i2)).name(), (Collection) list.get(i2));
            }
            return hashMap;
        });
    }

    private CompletableFuture<Collection<MetricSourceDto>> remoteMetricSourcesAsync(ClusterNode clusterNode) {
        return invoke(clusterNode, this.messagesFactory.metricSourcesRequest().build()).thenCompose(networkMessage -> {
            return sourcesFromSourcesResponse((MetricSourcesResponse) networkMessage);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static CompletableFuture<Collection<MetricSourceDto>> sourcesFromSourcesResponse(MetricSourcesResponse metricSourcesResponse) {
        Throwable throwable = metricSourcesResponse.throwable();
        return throwable != null ? CompletableFuture.failedFuture(throwable) : CompletableFuture.completedFuture(metricSourcesResponse.sources());
    }

    private CompletableFuture<Void> broadcastAsync(Function<ClusterNode, CompletableFuture<Void>> function) {
        return CompletableFuture.allOf((CompletableFuture[]) this.topologyService.allMembers().stream().map(function).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    private CompletableFuture<NetworkMessage> invoke(ClusterNode clusterNode, NetworkMessage networkMessage) {
        return this.messagingService.invoke(clusterNode.name(), networkMessage, Long.MAX_VALUE);
    }

    private void respond(ClusterNode clusterNode, NetworkMessage networkMessage, long j) {
        this.messagingService.respond(clusterNode.name(), networkMessage, j);
    }

    static {
        $assertionsDisabled = !MetricMessaging.class.desiredAssertionStatus();
    }
}
