/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.replicator;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.replicator.exception.AwaitReplicaTimeoutException;
import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite.internal.replicator.message.AwaitReplicaRequest;
import org.apache.ignite.internal.replicator.message.AwaitReplicaResponse;
import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class ReplicaService {
    private final MessagingService messagingService;
    private final ClockService clockService;
    private final Executor partitionOperationsExecutor;
    private final ReplicationConfiguration replicationConfiguration;
    @Nullable
    private final ScheduledExecutorService retryExecutor;
    private final Map<String, CompletableFuture<NetworkMessage>> pendingInvokes = new ConcurrentHashMap<String, CompletableFuture<NetworkMessage>>();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();

    @TestOnly
    public ReplicaService(MessagingService messagingService, ClockService clockService, ReplicationConfiguration replicationConfiguration) {
        this(messagingService, clockService, ForkJoinPool.commonPool(), replicationConfiguration, null);
    }

    public ReplicaService(MessagingService messagingService, ClockService clockService, Executor partitionOperationsExecutor, ReplicationConfiguration replicationConfiguration, @Nullable ScheduledExecutorService retryExecutor) {
        this.messagingService = messagingService;
        this.clockService = clockService;
        this.partitionOperationsExecutor = partitionOperationsExecutor;
        this.replicationConfiguration = replicationConfiguration;
        this.retryExecutor = retryExecutor;
    }

    private <R> CompletableFuture<R> sendToReplica(String targetNodeConsistentId, ReplicaRequest req) {
        return this.sendToReplicaRaw(targetNodeConsistentId, req).thenApply(res -> res.result());
    }

    private CompletableFuture<ReplicaResponse> sendToReplicaRaw(String targetNodeConsistentId, ReplicaRequest req) {
        CompletableFuture<ReplicaResponse> res = new CompletableFuture<ReplicaResponse>();
        this.messagingService.invoke(targetNodeConsistentId, (NetworkMessage)req, ((Long)this.replicationConfiguration.rpcTimeoutMillis().value()).longValue()).whenComplete((response, throwable) -> {
            if (throwable != null) {
                if ((throwable = ExceptionUtils.unwrapCause((Throwable)throwable)) instanceof TimeoutException) {
                    this.partitionOperationsExecutor.execute(() -> res.completeExceptionally((Throwable)((Object)new ReplicationTimeoutException(req.groupId().asReplicationGroupId()))));
                } else {
                    res.completeExceptionally(ExceptionUtils.withCause(ReplicationException::new, (int)ErrorGroups.Replicator.REPLICA_COMMON_ERR, (String)("Failed to process replica request [replicaGroupId=" + req.groupId() + "]"), (Throwable)throwable));
                }
            } else {
                assert (response instanceof ReplicaResponse) : "Unexpected message response [resp=" + response + "]";
                if (response instanceof TimestampAware) {
                    this.clockService.updateClock(((TimestampAware)response).timestamp());
                }
                if (response instanceof ErrorReplicaResponse) {
                    ErrorReplicaResponse errResp = (ErrorReplicaResponse)response;
                    if (errResp.throwable() instanceof ReplicaUnavailableException) {
                        CompletableFuture requestFuture = new CompletableFuture();
                        CompletableFuture awaitReplicaFut = this.pendingInvokes.computeIfAbsent(targetNodeConsistentId, consistentId -> requestFuture);
                        if (awaitReplicaFut == requestFuture) {
                            AwaitReplicaRequest awaitReplicaReq = REPLICA_MESSAGES_FACTORY.awaitReplicaRequest().groupId(req.groupId()).build();
                            this.messagingService.invoke(targetNodeConsistentId, (NetworkMessage)awaitReplicaReq, ((Long)this.replicationConfiguration.rpcTimeoutMillis().value()).longValue()).whenComplete((networkMessage, e) -> {
                                if (e != null) {
                                    awaitReplicaFut.completeExceptionally((Throwable)e);
                                } else {
                                    awaitReplicaFut.complete(networkMessage);
                                }
                            });
                        }
                        awaitReplicaFut.handleAsync((response0, throwable0) -> {
                            this.pendingInvokes.remove(targetNodeConsistentId, awaitReplicaFut);
                            if (throwable0 != null) {
                                if ((throwable0 = ExceptionUtils.unwrapCause((Throwable)throwable0)) instanceof TimeoutException) {
                                    res.completeExceptionally(ExceptionUtils.withCause(AwaitReplicaTimeoutException::new, (int)ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR, (String)IgniteStringFormatter.format((String)"Could not wait for the replica readiness due to timeout [replicaGroupId={}, req={}]", (Object[])new Object[]{req.groupId(), req.getClass().getSimpleName()}), (Throwable)throwable0));
                                } else {
                                    res.completeExceptionally(ExceptionUtils.withCause(ReplicationException::new, (int)ErrorGroups.Replicator.REPLICA_COMMON_ERR, (String)IgniteStringFormatter.format((String)"Failed to process replica request [replicaGroupId={}, req={}]", (Object[])new Object[]{req.groupId(), req.getClass().getSimpleName()}), (Throwable)throwable0));
                                }
                            } else if (response0 instanceof ErrorReplicaResponse) {
                                res.completeExceptionally(((ErrorReplicaResponse)response0).throwable());
                            } else {
                                assert (response0 instanceof AwaitReplicaResponse) : "Incorrect response type [type=" + response0.getClass().getSimpleName() + "]";
                                this.sendToReplicaRaw(targetNodeConsistentId, req).whenComplete((r, e) -> {
                                    if (e != null) {
                                        res.completeExceptionally((Throwable)e);
                                    } else {
                                        res.complete((ReplicaResponse)r);
                                    }
                                });
                            }
                            return null;
                        }, this.partitionOperationsExecutor);
                    } else {
                        int replicaOperationRetryInterval = (Integer)this.replicationConfiguration.replicaOperationRetryIntervalMillis().value();
                        if (this.retryExecutor != null && ExceptionUtils.matchAny((Throwable)ExceptionUtils.unwrapCause((Throwable)errResp.throwable()), (int)ErrorGroups.Transactions.ACQUIRE_LOCK_ERR, (int[])new int[]{ErrorGroups.Replicator.REPLICA_MISS_ERR, ErrorGroups.Replicator.GROUP_OVERLOADED_ERR}) && replicaOperationRetryInterval > 0) {
                            this.retryExecutor.schedule(() -> this.partitionOperationsExecutor.execute(() -> res.completeExceptionally(errResp.throwable())), (long)replicaOperationRetryInterval, TimeUnit.MILLISECONDS);
                        } else {
                            res.completeExceptionally(errResp.throwable());
                        }
                    }
                } else {
                    res.complete((ReplicaResponse)response);
                }
            }
        });
        return res;
    }

    public <R> CompletableFuture<R> invoke(InternalClusterNode node, ReplicaRequest request) {
        return this.invokeRaw(node, request).thenApply(r -> r.result());
    }

    public <R> CompletableFuture<R> invoke(String replicaConsistentId, ReplicaRequest request) {
        return this.sendToReplica(replicaConsistentId, request);
    }

    public <R> CompletableFuture<R> invoke(InternalClusterNode node, ReplicaRequest request, String storageId) {
        return this.sendToReplica(node.name(), request);
    }

    public CompletableFuture<ReplicaResponse> invokeRaw(InternalClusterNode node, ReplicaRequest request) {
        return this.invokeRaw(node.name(), request);
    }

    public CompletableFuture<ReplicaResponse> invokeRaw(String targetNodeConsistentId, ReplicaRequest request) {
        return this.sendToReplicaRaw(targetNodeConsistentId, request);
    }

    public MessagingService messagingService() {
        return this.messagingService;
    }
}

