/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.raft.jraft.rpc.impl;

import java.net.ConnectException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.TopologyEventHandler;
import org.apache.ignite3.internal.raft.PeerUnavailableException;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.raft.jraft.Status;
import org.apache.ignite3.raft.jraft.entity.PeerId;
import org.apache.ignite3.raft.jraft.error.InvokeTimeoutException;
import org.apache.ignite3.raft.jraft.error.RaftError;
import org.apache.ignite3.raft.jraft.error.RemotingException;
import org.apache.ignite3.raft.jraft.option.RpcOptions;
import org.apache.ignite3.raft.jraft.rpc.ClientService;
import org.apache.ignite3.raft.jraft.rpc.InvokeCallback;
import org.apache.ignite3.raft.jraft.rpc.InvokeContext;
import org.apache.ignite3.raft.jraft.rpc.Message;
import org.apache.ignite3.raft.jraft.rpc.NetworkInvoker;
import org.apache.ignite3.raft.jraft.rpc.RpcClient;
import org.apache.ignite3.raft.jraft.rpc.RpcRequests;
import org.apache.ignite3.raft.jraft.rpc.RpcResponseClosure;
import org.apache.ignite3.raft.jraft.rpc.impl.FutureImpl;
import org.apache.ignite3.raft.jraft.util.Utils;

public abstract class AbstractClientService
implements ClientService,
TopologyEventHandler {
    protected static final IgniteLogger LOG = Loggers.forClass(AbstractClientService.class);
    protected volatile RpcClient rpcClient;
    protected ExecutorService rpcExecutor;
    protected RpcOptions rpcOptions;
    private Set<PeerId> deadPeers = ConcurrentHashMap.newKeySet();
    private final Set<String> readyConsistentIds = ConcurrentHashMap.newKeySet();

    public RpcClient getRpcClient() {
        return this.rpcClient;
    }

    @Override
    public synchronized boolean init(RpcOptions rpcOptions) {
        if (this.rpcClient != null) {
            return true;
        }
        this.rpcOptions = rpcOptions;
        return this.initRpcClient(this.rpcOptions.getRpcProcessorThreadPoolSize());
    }

    @Override
    public void onAppeared(InternalClusterNode member) {
    }

    @Override
    public void onDisappeared(InternalClusterNode member) {
        this.readyConsistentIds.remove(member.name());
    }

    protected void configRpcClient(RpcClient rpcClient) {
        rpcClient.registerConnectEventListener(this);
    }

    protected boolean initRpcClient(int rpcProcessorThreadPoolSize) {
        this.rpcClient = this.rpcOptions.getRpcClient();
        this.configRpcClient(this.rpcClient);
        this.rpcClient.init(this.rpcOptions);
        this.rpcExecutor = this.rpcOptions.getClientExecutor();
        return true;
    }

    @Override
    public synchronized void shutdown() {
        if (this.rpcClient != null) {
            this.rpcClient.shutdown();
            this.rpcClient = null;
        }
    }

    @Override
    public boolean connect(PeerId peerId) {
        block3: {
            try {
                return this.connectAsync(peerId).get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.error("Interrupted while connecting to {}, exception: {}.", peerId, e.getMessage());
            }
            catch (ExecutionException e) {
                if (this.deadPeers.contains(peerId)) break block3;
                this.deadPeers.add(peerId);
                LOG.info("Fail to connect {}, exception: {}.", peerId, e.getMessage());
            }
        }
        return false;
    }

    @Override
    public CompletableFuture<Boolean> connectAsync(PeerId peerId) {
        RpcClient rc = this.rpcClient;
        if (rc == null) {
            return CompletableFutures.falseCompletedFuture();
        }
        if (this.readyConsistentIds.contains(peerId.getConsistentId())) {
            return CompletableFutures.trueCompletedFuture();
        }
        RpcRequests.PingRequest req = this.rpcOptions.getRaftMessagesFactory().pingRequest().sendTimestamp(System.currentTimeMillis()).build();
        CompletableFuture<Message> fut = this.invokeWithDone(peerId, req, null, null, this.rpcOptions.getRpcConnectTimeoutMs(), this.rpcExecutor, this.rpcClient);
        return fut.thenApply(msg -> {
            RpcRequests.ErrorResponse resp = (RpcRequests.ErrorResponse)msg;
            if (resp != null && resp.errorCode() == 0) {
                this.readyConsistentIds.add(peerId.getConsistentId());
                this.deadPeers.remove(peerId);
                return true;
            }
            return false;
        });
    }

    @Override
    public <T extends Message> CompletableFuture<Message> invokeWithDone(PeerId peerId, Message request, RpcResponseClosure<T> done, int timeoutMs) {
        return this.invokeWithDone(peerId, request, done, timeoutMs, this.rpcExecutor);
    }

    public <T extends Message> CompletableFuture<Message> invokeWithDone(PeerId peerId, Message request, RpcResponseClosure<T> done, int timeoutMs, Executor rpcExecutor) {
        return this.invokeWithDone(peerId, request, null, done, timeoutMs, rpcExecutor, this.rpcClient);
    }

    public <T extends Message> CompletableFuture<Message> invokeWithDone(PeerId peerId, Message request, InvokeContext ctx, RpcResponseClosure<T> done, int timeoutMs) {
        return this.invokeWithDone(peerId, request, ctx, done, timeoutMs, this.rpcExecutor, this.rpcClient);
    }

    public <T extends Message> CompletableFuture<Message> invokeWithDone(final PeerId peerId, final Message request, InvokeContext ctx, final RpcResponseClosure<T> done, int timeoutMs, Executor rpcExecutor, NetworkInvoker rc) {
        final FutureImpl<Message> future = new FutureImpl<Message>();
        final Executor currExecutor = rpcExecutor != null ? rpcExecutor : this.rpcExecutor;
        try {
            if (rc == null) {
                future.completeExceptionally(new IllegalStateException("Client service is uninitialized."));
                Utils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL, "Client service is uninitialized.", new Object[0]));
                return future;
            }
            return rc.invokeAsync(peerId, request, ctx, new InvokeCallback(){

                @Override
                public void complete(Object result, Throwable err) {
                    if (err == null) {
                        Message msg;
                        Status status = Status.OK();
                        if (result instanceof RpcRequests.ErrorResponse) {
                            status = AbstractClientService.handleErrorResponse((RpcRequests.ErrorResponse)result);
                            msg = (Message)result;
                        } else {
                            msg = (Message)result;
                        }
                        if (done != null) {
                            try {
                                if (status.isOk()) {
                                    done.setResponse(msg);
                                }
                                done.run(status);
                            }
                            catch (Throwable t) {
                                LOG.error("Fail to run RpcResponseClosure, the request is {}.", t, request);
                            }
                        }
                        if (!future.isDone()) {
                            future.complete(msg);
                        }
                    } else {
                        if (ExceptionUtils.hasCauseOrSuppressed(err, PeerUnavailableException.class, ConnectException.class)) {
                            AbstractClientService.this.readyConsistentIds.remove(peerId.getConsistentId());
                        }
                        if (done != null) {
                            try {
                                done.run(new Status(AbstractClientService.errorCodeByException(err), "RPC exception:" + err.getMessage(), new Object[0]));
                            }
                            catch (Throwable t) {
                                LOG.error("Fail to run RpcResponseClosure, the request is {}.", t, request);
                            }
                        }
                        if (!future.isDone()) {
                            future.completeExceptionally(err);
                        }
                    }
                }

                @Override
                public Executor executor() {
                    return currExecutor;
                }
            }, timeoutMs <= 0 ? (long)this.rpcOptions.getRpcDefaultTimeout() : (long)timeoutMs);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            future.completeExceptionally(e);
            Utils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTR, "Sending rpc was interrupted", new Object[0]));
        }
        catch (RemotingException e) {
            future.completeExceptionally(e);
            Utils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL, "Fail to send a RPC request:" + e.getMessage(), new Object[0]));
        }
        return future;
    }

    private static RaftError errorCodeByException(Throwable err) {
        if (ExceptionUtils.hasCauseOrSuppressed(err, NodeStoppingException.class)) {
            return RaftError.ESHUTDOWN;
        }
        return err instanceof InvokeTimeoutException ? RaftError.ETIMEDOUT : RaftError.EINTERNAL;
    }

    private static Status handleErrorResponse(RpcRequests.ErrorResponse eResp) {
        Status status = new Status();
        status.setCode(eResp.errorCode());
        status.setErrorMsg(eResp.errorMsg());
        return status;
    }
}

