package org.apache.ignite.internal.network.scalecube;

import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessageTypes;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.message.ScaleCubeMessage;
import org.apache.ignite.internal.network.message.ScaleCubeMessageBuilder;
import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransport.class */
public class ScaleCubeDirectMarshallerTransport implements Transport {
    private static final IgniteLogger LOG = Loggers.forClass(Transport.class);
    private static final ChannelType SCALECUBE_CHANNEL_TYPE = ChannelType.register(1, "ScaleCube");
    private final DirectProcessor<Message> subject = DirectProcessor.create();
    private final FluxSink<Message> sink = this.subject.sink();
    private final MonoProcessor<Void> stop = MonoProcessor.create();
    private final MonoProcessor<Void> onStop = MonoProcessor.create();
    private final MessagingService messagingService;
    private final NetworkMessagesFactory messageFactory;
    private final ScaleCubeTopologyService topologyService;
    private final Address address;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ScaleCubeDirectMarshallerTransport(Address address, MessagingService messagingService, ScaleCubeTopologyService scaleCubeTopologyService, NetworkMessagesFactory networkMessagesFactory) {
        this.address = address;
        this.messagingService = messagingService;
        this.topologyService = scaleCubeTopologyService;
        this.messageFactory = networkMessagesFactory;
        this.messagingService.addMessageHandler(NetworkMessageTypes.class, (networkMessage, clusterNode, l) -> {
            onMessage(networkMessage);
        });
        this.stop.then(doStop()).doFinally(signalType -> {
            this.onStop.onComplete();
        }).subscribe((Consumer) null, th -> {
            LOG.warn("Failed to stop [address={}, reason={}]", new Object[]{this.address, th.toString()});
        });
    }

    private Mono<Void> doStop() {
        return Mono.defer(() -> {
            LOG.info("Stopping [address={}]", new Object[]{this.address});
            this.sink.complete();
            LOG.info("Stopped [address={}]", new Object[]{this.address});
            return Mono.empty();
        });
    }

    public Address address() {
        return this.address;
    }

    public Mono<Transport> start() {
        return Mono.just(this);
    }

    public Mono<Void> stop() {
        return Mono.defer(() -> {
            this.stop.onComplete();
            return this.onStop;
        });
    }

    public boolean isStopped() {
        return this.onStop.isDisposed();
    }

    public Mono<Void> send(Address address, Message message) {
        NetworkAddress networkAddress = new NetworkAddress(address.host(), address.port());
        return Mono.fromFuture(() -> {
            ClusterNodeImpl byAddress = this.topologyService.getByAddress(networkAddress);
            if (byAddress == null) {
                byAddress = new ClusterNodeImpl((String) null, (String) null, networkAddress);
            }
            return this.messagingService.send(byAddress, SCALECUBE_CHANNEL_TYPE, fromMessage(message));
        });
    }

    private void onMessage(NetworkMessage networkMessage) {
        Message fromNetworkMessage = fromNetworkMessage(networkMessage);
        if (fromNetworkMessage != null) {
            this.sink.next(fromNetworkMessage);
        }
    }

    private NetworkMessage fromMessage(Message message) throws IgniteInternalException {
        Object data = message.data();
        ScaleCubeMessageBuilder scaleCubeMessage = this.messageFactory.scaleCubeMessage();
        if (data instanceof NetworkMessage) {
            scaleCubeMessage.message((NetworkMessage) data);
        } else {
            scaleCubeMessage.data(data);
        }
        return scaleCubeMessage.headers(message.headers()).build();
    }

    @Nullable
    private static Message fromNetworkMessage(NetworkMessage networkMessage) throws IgniteInternalException {
        if (!(networkMessage instanceof ScaleCubeMessage)) {
            return null;
        }
        ScaleCubeMessage scaleCubeMessage = (ScaleCubeMessage) networkMessage;
        Map<String, String> headers = scaleCubeMessage.headers();
        Object data = scaleCubeMessage.data();
        return Message.withHeaders(headers).data(data != null ? data : scaleCubeMessage.message()).build();
    }

    public Mono<Message> requestResponse(Address address, Message message) {
        return Mono.create(monoSink -> {
            Objects.requireNonNull(message, "request must be not null");
            Objects.requireNonNull(message.correlationId(), "correlationId must be not null");
            Flux take = listen().filter(message2 -> {
                return message2.correlationId() != null;
            }).filter(message3 -> {
                return message3.correlationId().equals(message.correlationId());
            }).take(1L);
            Objects.requireNonNull(monoSink);
            Consumer consumer = (v1) -> {
                r1.success(v1);
            };
            Objects.requireNonNull(monoSink);
            Consumer consumer2 = monoSink::error;
            Objects.requireNonNull(monoSink);
            Disposable subscribe = take.subscribe(consumer, consumer2, monoSink::success);
            monoSink.onDispose(Disposables.composite(new Disposable[]{send(address, message).subscribe((Consumer) null, th -> {
                subscribe.dispose();
                monoSink.error(th);
            }), subscribe}));
        });
    }

    public final Flux<Message> listen() {
        return this.subject.onBackpressureBuffer();
    }
}
