/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.network.scalecube;

import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import java.util.Map;
import java.util.Objects;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessageTypes;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.message.ScaleCubeMessage;
import org.apache.ignite.internal.network.message.ScaleCubeMessageBuilder;
import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

class ScaleCubeDirectMarshallerTransport
implements Transport {
    private static final IgniteLogger LOG = Loggers.forClass(Transport.class);
    static final ChannelType SCALE_CUBE_CHANNEL_TYPE = new ChannelType(1, "ScaleCube");
    private final DirectProcessor<Message> subject = DirectProcessor.create();
    private final FluxSink<Message> sink = this.subject.sink();
    private final MonoProcessor<Void> stop = MonoProcessor.create();
    private final MonoProcessor<Void> onStop = MonoProcessor.create();
    private final MessagingService messagingService;
    private final NetworkMessagesFactory messageFactory;
    private final Address address;

    ScaleCubeDirectMarshallerTransport(Address localAddress, MessagingService messagingService, NetworkMessagesFactory messageFactory) {
        this.address = localAddress;
        this.messagingService = messagingService;
        this.messageFactory = messageFactory;
        this.messagingService.addMessageHandler(NetworkMessageTypes.class, (message, sender, correlationId) -> this.onMessage(message));
        this.stop.then(this.doStop()).doFinally(s -> this.onStop.onComplete()).subscribe(null, ex -> LOG.warn("Failed to stop [address={}, reason={}]", this.address, ex.toString()));
    }

    private Mono<Void> doStop() {
        return Mono.defer(() -> {
            LOG.info("Stopping [address={}]", this.address);
            this.sink.complete();
            LOG.info("Stopped [address={}]", this.address);
            return Mono.empty();
        });
    }

    @Override
    public Address address() {
        return this.address;
    }

    @Override
    public Mono<Transport> start() {
        return Mono.just(this);
    }

    @Override
    public Mono<Void> stop() {
        return Mono.defer(() -> {
            this.stop.onComplete();
            return this.onStop;
        });
    }

    @Override
    public boolean isStopped() {
        return this.onStop.isDisposed();
    }

    @Override
    public Mono<Void> send(Address address, Message message) {
        NetworkAddress addr = new NetworkAddress(address.host(), address.port());
        return Mono.fromFuture(() -> this.messagingService.send(addr, SCALE_CUBE_CHANNEL_TYPE, this.fromMessage(message)));
    }

    private void onMessage(NetworkMessage msg) {
        Message message = ScaleCubeDirectMarshallerTransport.fromNetworkMessage(msg);
        if (message != null) {
            this.sink.next(message);
        }
    }

    private NetworkMessage fromMessage(Message message) throws IgniteInternalException {
        Object dataObj = message.data();
        ScaleCubeMessageBuilder scaleCubeMessageBuilder = this.messageFactory.scaleCubeMessage();
        if (dataObj instanceof NetworkMessage) {
            scaleCubeMessageBuilder.message((NetworkMessage)dataObj);
        } else {
            scaleCubeMessageBuilder.data(dataObj);
        }
        return scaleCubeMessageBuilder.headers(message.headers()).build();
    }

    @Nullable
    private static Message fromNetworkMessage(NetworkMessage networkMessage) throws IgniteInternalException {
        if (networkMessage instanceof ScaleCubeMessage) {
            ScaleCubeMessage msg = (ScaleCubeMessage)networkMessage;
            Map<String, String> headers = msg.headers();
            Object obj = msg.data();
            NetworkMessage message = msg.message();
            Object data = obj != null ? obj : message;
            return Message.withHeaders(headers).data(data).build();
        }
        return null;
    }

    @Override
    public Mono<Message> requestResponse(Address address, Message request) {
        return Mono.create(sink -> {
            Objects.requireNonNull(request, "request must be not null");
            Objects.requireNonNull(request.correlationId(), "correlationId must be not null");
            Disposable receive = this.listen().filter(resp -> resp.correlationId() != null).filter(resp -> resp.correlationId().equals(request.correlationId())).take(1L).subscribe(sink::success, sink::error, sink::success);
            Disposable send = this.send(address, request).subscribe(null, ex -> {
                receive.dispose();
                sink.error((Throwable)ex);
            });
            sink.onDispose(Disposables.composite(send, receive));
        });
    }

    @Override
    public final Flux<Message> listen() {
        return this.subject.onBackpressureBuffer();
    }
}

