/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.cluster;

import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.fdetector.FailureDetectorImpl;
import io.scalecube.cluster.gossip.GossipConfig;
import io.scalecube.cluster.gossip.GossipProtocolImpl;
import io.scalecube.cluster.membership.MembershipConfig;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.membership.MembershipProtocolImpl;
import io.scalecube.cluster.metadata.MetadataCodec;
import io.scalecube.cluster.metadata.MetadataStore;
import io.scalecube.cluster.metadata.MetadataStoreImpl;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.cluster.transport.api.TransportFactory;
import io.scalecube.net.Address;
import io.scalecube.reactor.RetryNonSerializedEmitFailureHandler;
import io.scalecube.utils.ServiceLoaderUtil;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public final class ClusterImpl
implements Cluster {
    private static final Logger LOGGER = LoggerFactory.getLogger(Cluster.class);
    private static final Pattern NAMESPACE_PATTERN = Pattern.compile("^(\\w+[\\w\\-./]*\\w)+");
    private static final Set<String> SYSTEM_MESSAGES = Collections.unmodifiableSet(Stream.of("sc/fdetector/ping", "sc/fdetector/pingReq", "sc/fdetector/pingAck", "sc/membership/sync", "sc/membership/syncAck", "sc/gossip/req", "sc/metadata/req", "sc/metadata/resp").collect(Collectors.toSet()));
    private static final Set<String> SYSTEM_GOSSIPS = Collections.singleton("sc/membership/gossip");
    private ClusterConfig config;
    private Function<Cluster, ? extends ClusterMessageHandler> handler = cluster -> new ClusterMessageHandler(){};
    private final Sinks.Many<MembershipEvent> membershipSink = Sinks.many().multicast().directBestEffort();
    private final Disposable.Composite actionsDisposables = Disposables.composite();
    private final Sinks.One<Void> start = Sinks.one();
    private final Sinks.One<Void> onStart = Sinks.one();
    private final Sinks.One<Void> shutdown = Sinks.one();
    private final Sinks.One<Void> onShutdown = Sinks.one();
    private Transport transport;
    private Member localMember;
    private FailureDetectorImpl failureDetector;
    private GossipProtocolImpl gossip;
    private MembershipProtocolImpl membership;
    private MetadataStore metadataStore;
    private Scheduler scheduler;

    public ClusterImpl() {
        this(ClusterConfig.defaultConfig());
    }

    public ClusterImpl(ClusterConfig config) {
        this.config = Objects.requireNonNull(config);
        this.initLifecycle();
    }

    private ClusterImpl(ClusterImpl that) {
        this.config = that.config.clone();
        this.handler = that.handler;
        this.initLifecycle();
    }

    private void initLifecycle() {
        this.start.asMono().then(this.doStart()).doOnSuccess(avoid -> this.onStart.emitEmpty(RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED)).doOnError(th -> this.onStart.emitError((Throwable)th, RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED)).subscribe(null, th -> LOGGER.error("[{}][doStart] Exception occurred:", (Object)this.localMember, th));
        this.shutdown.asMono().then(this.doShutdown()).doFinally(s -> this.onShutdown.emitEmpty(RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED)).subscribe(null, th -> LOGGER.warn("[{}][doShutdown] Exception occurred: {}", (Object)this.localMember, (Object)th.toString()));
    }

    public ClusterImpl config(UnaryOperator<ClusterConfig> options) {
        Objects.requireNonNull(options);
        ClusterImpl cluster = new ClusterImpl(this);
        cluster.config = (ClusterConfig)options.apply(this.config);
        return cluster;
    }

    public ClusterImpl transport(UnaryOperator<TransportConfig> options) {
        Objects.requireNonNull(options);
        ClusterImpl cluster = new ClusterImpl(this);
        cluster.config = this.config.transport(options);
        return cluster;
    }

    public ClusterImpl transportFactory(Supplier<TransportFactory> supplier) {
        Objects.requireNonNull(supplier);
        ClusterImpl cluster = new ClusterImpl(this);
        cluster.config = this.config.transport(opts -> opts.transportFactory((TransportFactory)supplier.get()));
        return cluster;
    }

    public ClusterImpl failureDetector(UnaryOperator<FailureDetectorConfig> options) {
        Objects.requireNonNull(options);
        ClusterImpl cluster = new ClusterImpl(this);
        cluster.config = this.config.failureDetector(options);
        return cluster;
    }

    public ClusterImpl gossip(UnaryOperator<GossipConfig> options) {
        Objects.requireNonNull(options);
        ClusterImpl cluster = new ClusterImpl(this);
        cluster.config = this.config.gossip(options);
        return cluster;
    }

    public ClusterImpl membership(UnaryOperator<MembershipConfig> options) {
        Objects.requireNonNull(options);
        ClusterImpl cluster = new ClusterImpl(this);
        cluster.config = this.config.membership(options);
        return cluster;
    }

    public ClusterImpl handler(Function<Cluster, ClusterMessageHandler> handler) {
        Objects.requireNonNull(handler);
        ClusterImpl cluster = new ClusterImpl(this);
        cluster.handler = handler;
        return cluster;
    }

    public Mono<Cluster> start() {
        return Mono.defer(() -> {
            this.start.emitEmpty(RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
            return this.onStart.asMono().thenReturn(this);
        });
    }

    public Cluster startAwait() {
        return this.start().block();
    }

    private Mono<Cluster> doStart() {
        return Mono.fromRunnable(this::validateConfiguration).then(Mono.defer(this::doStart0));
    }

    private Mono<Cluster> doStart0() {
        return Transport.bind(this.config.transportConfig()).flatMap(boundTransport -> {
            this.localMember = this.createLocalMember(boundTransport.address());
            this.transport = new SenderAwareTransport((Transport)boundTransport, this.localMember.address());
            this.scheduler = Schedulers.newSingle("sc-cluster-" + this.localMember.address().port(), true);
            this.failureDetector = new FailureDetectorImpl(this.localMember, this.transport, this.membershipSink.asFlux().onBackpressureBuffer(), this.config.failureDetectorConfig(), this.scheduler);
            this.gossip = new GossipProtocolImpl(this.localMember, this.transport, this.membershipSink.asFlux().onBackpressureBuffer(), this.config.gossipConfig(), this.scheduler);
            this.metadataStore = new MetadataStoreImpl(this.localMember, this.transport, this.config.metadata(), this.config, this.scheduler);
            this.membership = new MembershipProtocolImpl(this.localMember, this.transport, this.failureDetector, this.gossip, this.metadataStore, this.config, this.scheduler);
            this.actionsDisposables.add(this.membership.listen().subscribe(event -> this.membershipSink.emitNext((MembershipEvent)event, RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED), ex -> LOGGER.error("[{}][membership][error] cause:", (Object)this.localMember, ex), () -> this.membershipSink.emitComplete(RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED)));
            return Mono.fromRunnable(() -> this.failureDetector.start()).then(Mono.fromRunnable(() -> this.gossip.start())).then(Mono.fromRunnable(() -> this.metadataStore.start())).then(Mono.fromRunnable(this::startHandler)).then(this.membership.start()).then();
        }).doOnSubscribe(s -> LOGGER.info("[{}][doStart] Starting, config: {}", (Object)this.localMember, (Object)this.config)).doOnSuccess(avoid -> LOGGER.info("[{}][doStart] Started", (Object)this.localMember)).thenReturn(this);
    }

    private void validateConfiguration() {
        Object metadata;
        MetadataCodec metadataCodec = ServiceLoaderUtil.findFirst(MetadataCodec.class).orElse(null);
        if (metadataCodec == null && (metadata = this.config.metadata()) != null && !(metadata instanceof Serializable)) {
            throw new IllegalArgumentException("Invalid cluster config: metadata must be Serializable");
        }
        Objects.requireNonNull(this.config.transportConfig().transportFactory(), "Invalid cluster config: transportFactory must be specified");
        Objects.requireNonNull(this.config.transportConfig().messageCodec(), "Invalid cluster config: messageCodec must be specified");
        Objects.requireNonNull(this.config.membershipConfig().namespace(), "Invalid cluster config: membership namespace must be specified");
        if (!NAMESPACE_PATTERN.matcher(this.config.membershipConfig().namespace()).matches()) {
            throw new IllegalArgumentException("Invalid cluster config: membership namespace format is invalid");
        }
    }

    private void startHandler() {
        ClusterMessageHandler handler = this.handler.apply(this);
        this.actionsDisposables.add(this.listenMessage().subscribe(handler::onMessage, ex -> LOGGER.error("[{}][onMessage][error] cause:", (Object)this.localMember, ex)));
        this.actionsDisposables.add(this.listenMembership().subscribe(handler::onMembershipEvent, ex -> LOGGER.error("[{}][onMembershipEvent][error] cause:", (Object)this.localMember, ex)));
        this.actionsDisposables.add(this.listenGossip().subscribe(handler::onGossip, ex -> LOGGER.error("[{}][onGossip][error] cause:", (Object)this.localMember, ex)));
    }

    private Flux<Message> listenMessage() {
        return this.transport.listen().filter(msg -> !SYSTEM_MESSAGES.contains(msg.qualifier()));
    }

    private Flux<Message> listenGossip() {
        return this.gossip.listen().filter(msg -> !SYSTEM_GOSSIPS.contains(msg.qualifier()));
    }

    private Flux<MembershipEvent> listenMembership() {
        return this.membershipSink.asFlux().onBackpressureBuffer();
    }

    private Member createLocalMember(Address address) {
        int port = Optional.ofNullable(this.config.externalPort()).orElse(address.port());
        Address memberAddress = Optional.ofNullable(this.config.externalHost()).map(host -> Address.create(host, port)).orElseGet(() -> Address.create(address.host(), port));
        return new Member(this.config.memberId() != null ? this.config.memberId() : UUID.randomUUID().toString(), this.config.memberAlias(), memberAddress, this.config.membershipConfig().namespace());
    }

    @Override
    public Address address() {
        return this.member().address();
    }

    @Override
    public Mono<Void> send(Member member, Message message) {
        return this.send(member.address(), message);
    }

    @Override
    public Mono<Void> send(Address address, Message message) {
        return this.transport.send(address, message);
    }

    @Override
    public Mono<Message> requestResponse(Address address, Message request) {
        return this.transport.requestResponse(address, request);
    }

    @Override
    public Mono<Message> requestResponse(Member member, Message request) {
        return this.transport.requestResponse(member.address(), request);
    }

    @Override
    public Mono<String> spreadGossip(Message message) {
        return this.gossip.spread(message);
    }

    @Override
    public Collection<Member> members() {
        return this.membership.members();
    }

    @Override
    public Collection<Member> otherMembers() {
        return this.membership.otherMembers();
    }

    @Override
    public <T> Optional<T> metadata() {
        return this.metadataStore.metadata();
    }

    @Override
    public <T> Optional<T> metadata(Member member) {
        if (this.member().equals(member)) {
            return this.metadata();
        }
        return this.metadataStore.metadata(member).map(this::toMetadata);
    }

    private <T> T toMetadata(ByteBuffer buffer) {
        return (T)this.config.metadataCodec().deserialize(buffer);
    }

    @Override
    public Member member() {
        return this.localMember;
    }

    @Override
    public Optional<Member> member(String id) {
        return this.membership.member(id);
    }

    @Override
    public Optional<Member> member(Address address) {
        return this.membership.member(address);
    }

    @Override
    public <T> Mono<Void> updateMetadata(T metadata) {
        return Mono.fromRunnable(() -> this.metadataStore.updateMetadata(metadata)).then(this.membership.updateIncarnation()).subscribeOn(this.scheduler);
    }

    @Override
    public void shutdown() {
        this.shutdown.emitEmpty(RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
    }

    private Mono<Void> doShutdown() {
        return Mono.defer(() -> {
            LOGGER.info("[{}][doShutdown] Shutting down", (Object)this.localMember);
            return Flux.concatDelayError(this.leaveCluster(), this.dispose(), this.transport.stop()).then().doFinally(s -> this.scheduler.dispose()).doOnSuccess(avoid -> LOGGER.info("[{}][doShutdown] Shutdown", (Object)this.localMember));
        });
    }

    private Mono<Void> leaveCluster() {
        return this.membership.leaveCluster().subscribeOn(this.scheduler).doOnSubscribe(s -> LOGGER.info("[{}][leaveCluster] Leaving cluster", (Object)this.localMember)).doOnSuccess(s -> LOGGER.info("[{}][leaveCluster] Left cluster", (Object)this.localMember)).doOnError(ex -> LOGGER.warn("[{}][leaveCluster] Exception occurred: {}", (Object)this.localMember, (Object)ex.toString())).then();
    }

    private Mono<Void> dispose() {
        return Mono.fromRunnable(() -> {
            this.actionsDisposables.dispose();
            this.metadataStore.stop();
            this.membership.stop();
            this.gossip.stop();
            this.failureDetector.stop();
        });
    }

    @Override
    public Mono<Void> onShutdown() {
        return this.onShutdown.asMono();
    }

    private static class SenderAwareTransport
    implements Transport {
        private final Transport transport;
        private final Address address;

        private SenderAwareTransport(Transport transport, Address address) {
            this.transport = Objects.requireNonNull(transport);
            this.address = Objects.requireNonNull(address);
        }

        @Override
        public Address address() {
            return this.transport.address();
        }

        @Override
        public Mono<Transport> start() {
            return this.transport.start();
        }

        @Override
        public Mono<Void> stop() {
            return this.transport.stop();
        }

        @Override
        public boolean isStopped() {
            return this.transport.isStopped();
        }

        @Override
        public Mono<Void> send(Address address, Message message) {
            return Mono.defer(() -> this.transport.send(address, this.enhanceWithSender(message)));
        }

        @Override
        public Mono<Message> requestResponse(Address address, Message request) {
            return Mono.defer(() -> this.transport.requestResponse(address, this.enhanceWithSender(request)));
        }

        @Override
        public Flux<Message> listen() {
            return this.transport.listen();
        }

        private Message enhanceWithSender(Message message) {
            return Message.with(message).sender(this.address).build();
        }
    }
}

