package org.apache.ignite.internal.replicator;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite.internal.replicator.message.AwaitReplicaResponse;
import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/* loaded from: input_file:org/apache/ignite/internal/replicator/ReplicaService.class */
public class ReplicaService {
    private static final int RETRY_TIMEOUT_MILLIS = 10;
    private final MessagingService messagingService;
    private final HybridClock clock;
    private final Executor partitionOperationsExecutor;
    private final ReplicationConfiguration replicationConfiguration;

    @Nullable
    private final ScheduledExecutorService retryExecutor;
    private final Map<String, CompletableFuture<NetworkMessage>> pendingInvokes;
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY;
    static final /* synthetic */ boolean $assertionsDisabled;

    @TestOnly
    public ReplicaService(MessagingService messagingService, HybridClock hybridClock, ReplicationConfiguration replicationConfiguration) {
        this(messagingService, hybridClock, ForkJoinPool.commonPool(), replicationConfiguration, null);
    }

    public ReplicaService(MessagingService messagingService, HybridClock hybridClock, Executor executor, ReplicationConfiguration replicationConfiguration, @Nullable ScheduledExecutorService scheduledExecutorService) {
        this.pendingInvokes = new ConcurrentHashMap();
        this.messagingService = messagingService;
        this.clock = hybridClock;
        this.partitionOperationsExecutor = executor;
        this.replicationConfiguration = replicationConfiguration;
        this.retryExecutor = scheduledExecutorService;
    }

    private <R> CompletableFuture<R> sendToReplica(String str, ReplicaRequest replicaRequest) {
        CompletableFuture<R> completableFuture = new CompletableFuture<>();
        this.messagingService.invoke(str, replicaRequest, ((Long) this.replicationConfiguration.rpcTimeout().value()).longValue()).whenComplete((networkMessage, th) -> {
            if (th != null) {
                Throwable unwrapCause = ExceptionUtils.unwrapCause(th);
                if (unwrapCause instanceof TimeoutException) {
                    this.partitionOperationsExecutor.execute(() -> {
                        completableFuture.completeExceptionally(new ReplicationTimeoutException(replicaRequest.groupId()));
                    });
                    return;
                } else {
                    completableFuture.completeExceptionally(ExceptionUtils.withCause((v1, v2, v3, v4) -> {
                        return new ReplicationException(v1, v2, v3, v4);
                    }, ErrorGroups.Replicator.REPLICA_COMMON_ERR, "Failed to process replica request [replicaGroupId=" + replicaRequest.groupId() + "]", unwrapCause));
                    return;
                }
            }
            if (!$assertionsDisabled && !(networkMessage instanceof ReplicaResponse)) {
                throw new AssertionError("Unexpected message response [resp=" + networkMessage + "]");
            }
            if (networkMessage instanceof TimestampAware) {
                this.clock.update(((TimestampAware) networkMessage).timestamp());
            }
            if (!(networkMessage instanceof ErrorReplicaResponse)) {
                completableFuture.complete(((ReplicaResponse) networkMessage).result());
                return;
            }
            ErrorReplicaResponse errorReplicaResponse = (ErrorReplicaResponse) networkMessage;
            if (errorReplicaResponse.throwable() instanceof ReplicaUnavailableException) {
                CompletableFuture<NetworkMessage> completableFuture2 = new CompletableFuture<>();
                CompletableFuture<NetworkMessage> computeIfAbsent = this.pendingInvokes.computeIfAbsent(str, str2 -> {
                    return completableFuture2;
                });
                if (computeIfAbsent == completableFuture2) {
                    this.messagingService.invoke(str, REPLICA_MESSAGES_FACTORY.awaitReplicaRequest().groupId(replicaRequest.groupId()).build(), ((Long) this.replicationConfiguration.rpcTimeout().value()).longValue()).whenComplete((networkMessage, th) -> {
                        if (th != null) {
                            computeIfAbsent.completeExceptionally(th);
                        } else {
                            computeIfAbsent.complete(networkMessage);
                        }
                    });
                }
                computeIfAbsent.handleAsync((networkMessage2, th2) -> {
                    this.pendingInvokes.remove(str, computeIfAbsent);
                    if (th2 != null) {
                        Throwable unwrapCause2 = ExceptionUtils.unwrapCause(th2);
                        if (unwrapCause2 instanceof TimeoutException) {
                            completableFuture.completeExceptionally(ExceptionUtils.withCause((v1, v2, v3, v4) -> {
                                return new ReplicationTimeoutException(v1, v2, v3, v4);
                            }, ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR, IgniteStringFormatter.format("Could not wait for the replica readiness due to timeout [replicaGroupId={}, req={}]", new Object[]{replicaRequest.groupId(), replicaRequest.getClass().getSimpleName()}), unwrapCause2));
                            return null;
                        }
                        completableFuture.completeExceptionally(ExceptionUtils.withCause((v1, v2, v3, v4) -> {
                            return new ReplicationException(v1, v2, v3, v4);
                        }, ErrorGroups.Replicator.REPLICA_COMMON_ERR, IgniteStringFormatter.format("Failed to process replica request [replicaGroupId={}, req={}]", new Object[]{replicaRequest.groupId(), replicaRequest.getClass().getSimpleName()}), unwrapCause2));
                        return null;
                    }
                    if (networkMessage2 instanceof ErrorReplicaResponse) {
                        completableFuture.completeExceptionally(((ErrorReplicaResponse) networkMessage2).throwable());
                        return null;
                    }
                    if (!$assertionsDisabled && !(networkMessage2 instanceof AwaitReplicaResponse)) {
                        throw new AssertionError("Incorrect response type [type=" + networkMessage2.getClass().getSimpleName() + "]");
                    }
                    sendToReplica(str, replicaRequest).whenComplete((obj, th2) -> {
                        if (th2 != null) {
                            completableFuture.completeExceptionally(th2);
                        } else {
                            completableFuture.complete(obj);
                        }
                    });
                    return null;
                }, this.partitionOperationsExecutor);
                return;
            }
            if (this.retryExecutor == null || !ExceptionUtils.matchAny(ExceptionUtils.unwrapCause(errorReplicaResponse.throwable()), ErrorGroups.Transactions.ACQUIRE_LOCK_ERR, new int[]{ErrorGroups.Replicator.REPLICA_MISS_ERR})) {
                completableFuture.completeExceptionally(errorReplicaResponse.throwable());
            } else {
                this.retryExecutor.schedule(() -> {
                    this.partitionOperationsExecutor.execute(() -> {
                        completableFuture.completeExceptionally(errorReplicaResponse.throwable());
                    });
                }, 10L, TimeUnit.MILLISECONDS);
            }
        });
        return completableFuture;
    }

    public <R> CompletableFuture<R> invoke(ClusterNode clusterNode, ReplicaRequest replicaRequest) {
        return sendToReplica(clusterNode.name(), replicaRequest);
    }

    public <R> CompletableFuture<R> invoke(String str, ReplicaRequest replicaRequest) {
        return sendToReplica(str, replicaRequest);
    }

    public <R> CompletableFuture<R> invoke(ClusterNode clusterNode, ReplicaRequest replicaRequest, String str) {
        return sendToReplica(clusterNode.name(), replicaRequest);
    }

    public MessagingService messagingService() {
        return this.messagingService;
    }

    static {
        $assertionsDisabled = !ReplicaService.class.desiredAssertionStatus();
        REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    }
}
