/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.replicator;

import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.apache.ignite.internal.event.Event;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaReservationFailedException;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TransientReplicaStartException;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;

class ReplicaStateManager {
    private static final IgniteLogger LOG = Loggers.forClass(ReplicaStateManager.class);
    private final Map<ReplicationGroupId, ReplicaStateContext> replicaContexts = new ConcurrentHashMap<ReplicationGroupId, ReplicaStateContext>();
    private final Executor replicaStartStopExecutor;
    private final PlacementDriver placementDriver;
    private final ReplicaManager replicaManager;
    private final FailureProcessor failureProcessor;
    private volatile UUID localNodeId;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

    ReplicaStateManager(Executor replicaStartStopExecutor, PlacementDriver placementDriver, ReplicaManager replicaManager, FailureProcessor failureProcessor) {
        this.replicaStartStopExecutor = replicaStartStopExecutor;
        this.placementDriver = placementDriver;
        this.replicaManager = replicaManager;
        this.failureProcessor = failureProcessor;
    }

    void start(UUID localNodeId) {
        this.localNodeId = localNodeId;
        this.placementDriver.listen((Event)PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this::onPrimaryElected);
        this.placementDriver.listen((Event)PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this::onPrimaryExpired);
    }

    void stop() {
        this.busyLock.block();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Boolean> onPrimaryElected(PrimaryReplicaEventParameters parameters) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            ReplicationGroupId replicationGroupId = parameters.groupId();
            ReplicaStateContext context = this.getContext(replicationGroupId);
            Object object = context;
            synchronized (object) {
                if (this.localNodeId.equals(parameters.leaseholderId())) {
                    assert (context.replicaState != ReplicaState.STOPPED) : "Unexpected primary replica state STOPPED [groupId=" + replicationGroupId + ", leaseStartTime=" + parameters.startTime() + ", reservedForPrimary=" + context.reservedForPrimary + ", contextLeaseStartTime=" + context.leaseStartTime + "].";
                } else if (context.reservedForPrimary) {
                    context.assertReservation(replicationGroupId);
                    if (parameters.startTime().compareTo(context.leaseStartTime) > 0) {
                        context.releaseReservation();
                    }
                }
            }
            object = CompletableFutures.falseCompletedFuture();
            return object;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Boolean> onPrimaryExpired(PrimaryReplicaEventParameters parameters) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            ReplicaStateContext context;
            if (this.localNodeId.equals(parameters.leaseholderId()) && (context = this.replicaContexts.get(parameters.groupId())) != null) {
                ReplicaStateContext replicaStateContext = context;
                synchronized (replicaStateContext) {
                    if (context.reservedForPrimary) {
                        context.assertReservation(parameters.groupId());
                        if (parameters.startTime().equals((Object)context.leaseStartTime)) {
                            context.releaseReservation();
                        }
                    }
                }
            }
            CompletableFuture completableFuture = CompletableFutures.falseCompletedFuture();
            return completableFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private ReplicaStateContext getContext(ReplicationGroupId groupId) {
        return this.replicaContexts.computeIfAbsent(groupId, k -> new ReplicaStateContext(ReplicaState.STOPPED, CompletableFutures.nullCompletedFuture()));
    }

    CompletableFuture<Boolean> weakStartReplica(ReplicationGroupId groupId, Supplier<CompletableFuture<Boolean>> startOperation, @Nullable Assignments forcedAssignments) {
        ReplicaStateContext context;
        ReplicaStateContext replicaStateContext = context = this.getContext(groupId);
        synchronized (replicaStateContext) {
            ReplicaState state = context.replicaState;
            LOG.debug("Weak replica start [grp={}, state={}, future={}].", new Object[]{groupId, state, context.previousOperationFuture});
            if (state == ReplicaState.STOPPED || state == ReplicaState.STOPPING) {
                return this.startReplica(groupId, context, startOperation);
            }
            if (state == ReplicaState.ASSIGNED) {
                if (forcedAssignments != null) {
                    assert (forcedAssignments.force()) : IgniteStringFormatter.format((String)"Unexpected assignments to force [assignments={}, groupId={}].", (Object[])new Object[]{forcedAssignments, groupId});
                    this.replicaManager.resetPeers(groupId, PeersAndLearners.fromAssignments((Collection)forcedAssignments.nodes()));
                }
                return CompletableFutures.trueCompletedFuture();
            }
            if (state == ReplicaState.PRIMARY_ONLY) {
                context.replicaState = ReplicaState.ASSIGNED;
                LOG.debug("Weak replica start complete [state={}].", new Object[]{context.replicaState});
                return CompletableFutures.trueCompletedFuture();
            }
            if (state == ReplicaState.RESTART_PLANNED) {
                throw new AssertionError((Object)("Replica start cannot begin before stop on replica restart is completed [groupId=" + groupId + "]."));
            }
            throw new AssertionError((Object)("Replica start cannot begin while the replica is being started [groupId=" + groupId + "]."));
        }
    }

    private CompletableFuture<Boolean> startReplica(ReplicationGroupId groupId, ReplicaStateContext context, Supplier<CompletableFuture<Boolean>> startOperation) {
        context.replicaState = ReplicaState.STARTING;
        context.previousOperationFuture = ((CompletableFuture)((CompletableFuture)context.previousOperationFuture.handleAsync((v, e) -> (CompletableFuture)startOperation.get(), this.replicaStartStopExecutor)).thenCompose(startOperationFuture -> startOperationFuture.thenApply(partitionStarted -> {
            ReplicaStateContext replicaStateContext = context;
            synchronized (replicaStateContext) {
                if (partitionStarted.booleanValue()) {
                    context.replicaState = ReplicaState.ASSIGNED;
                } else {
                    context.replicaState = ReplicaState.STOPPED;
                    this.replicaContexts.remove(groupId);
                }
            }
            LOG.debug("Weak replica start complete [state={}, partitionStarted={}].", new Object[]{context.replicaState, partitionStarted});
            return partitionStarted;
        }))).whenComplete((res, ex) -> {
            if (ex != null && !ExceptionUtils.hasCause((Throwable)ex, (Class[])new Class[]{NodeStoppingException.class, TransientReplicaStartException.class})) {
                this.failureProcessor.process(new FailureContext(ex, String.format("Replica start failed [groupId=%s]", groupId)));
            }
        });
        return context.previousOperationFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    CompletableFuture<Void> weakStopReplica(ReplicationGroupId groupId, ReplicaManager.WeakReplicaStopReason reason, Supplier<CompletableFuture<Void>> stopOperation) {
        ReplicaStateContext context;
        ReplicaStateContext replicaStateContext = context = this.getContext(groupId);
        synchronized (replicaStateContext) {
            ReplicaState state = context.replicaState;
            LOG.debug("Weak replica stop [grpId={}, state={}, reason={}, reservedForPrimary={}, future={}].", new Object[]{groupId, state, reason, context.reservedForPrimary, context.previousOperationFuture});
            if (reason == ReplicaManager.WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS) {
                if (state == ReplicaState.ASSIGNED) {
                    if (!context.reservedForPrimary) {
                        return this.stopReplica(groupId, context, stopOperation);
                    }
                    context.replicaState = ReplicaState.PRIMARY_ONLY;
                    this.planDeferredReplicaStop(groupId, context, stopOperation);
                } else {
                    if (state == ReplicaState.STARTING) {
                        return this.stopReplica(groupId, context, stopOperation);
                    }
                    if (state == ReplicaState.STOPPED) {
                        return this.stopReplica(groupId, context, stopOperation);
                    }
                }
            } else {
                if (reason == ReplicaManager.WeakReplicaStopReason.RESTART) {
                    if (context.reservedForPrimary) {
                        context.replicaState = ReplicaState.RESTART_PLANNED;
                        return this.replicaManager.stopLeaseProlongation(groupId, null).thenCompose(unused -> this.planDeferredReplicaStop(groupId, context, stopOperation));
                    }
                    return this.stopReplica(groupId, context, stopOperation);
                }
                assert (reason == ReplicaManager.WeakReplicaStopReason.PRIMARY_EXPIRED) : "Unknown replica stop reason: " + reason;
                if (state == ReplicaState.PRIMARY_ONLY) {
                    return this.stopReplica(groupId, context, stopOperation);
                }
            }
            LOG.debug("Weak replica stop (sync part) complete [grpId={}, state={}].", new Object[]{groupId, context.replicaState});
            return CompletableFutures.nullCompletedFuture();
        }
    }

    private CompletableFuture<Void> stopReplica(ReplicationGroupId groupId, ReplicaStateContext context, Supplier<CompletableFuture<Void>> stopOperation) {
        if (context.reservedForPrimary) {
            return this.replicaManager.stopLeaseProlongation(groupId, null).thenCompose(unused -> this.planDeferredReplicaStop(groupId, context, stopOperation));
        }
        context.replicaState = ReplicaState.STOPPING;
        context.previousOperationFuture = ((CompletableFuture)((CompletableFuture)context.previousOperationFuture.handleAsync((v, e) -> (CompletableFuture)stopOperation.get(), this.replicaStartStopExecutor)).thenCompose(stopOperationFuture -> stopOperationFuture.thenApply(v -> {
            ReplicaStateContext replicaStateContext = context;
            synchronized (replicaStateContext) {
                context.replicaState = ReplicaState.STOPPED;
            }
            LOG.debug("Weak replica stop complete [grpId={}, state={}].", new Object[]{groupId, context.replicaState});
            return true;
        }))).whenComplete((res, ex) -> {
            if (ex != null && !ExceptionUtils.hasCause((Throwable)ex, (Class[])new Class[]{NodeStoppingException.class})) {
                this.failureProcessor.process(new FailureContext(ex, String.format("Replica stop failed [groupId=%s]", groupId)));
            }
        });
        return context.previousOperationFuture.thenApply(v -> null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> planDeferredReplicaStop(ReplicationGroupId groupId, ReplicaStateContext context, Supplier<CompletableFuture<Void>> deferredStopOperation) {
        ReplicaStateContext replicaStateContext = context;
        synchronized (replicaStateContext) {
            LOG.debug("Planning deferred replica stop [groupId={}, reservedForPrimary={}].", new Object[]{groupId, context.reservedForPrimary});
            context.deferredStopReadyFuture = context.reservedForPrimary ? new CompletableFuture() : CompletableFutures.nullCompletedFuture();
            return context.deferredStopReadyFuture.thenCompose(unused -> {
                ReplicaStateContext replicaStateContext = context;
                synchronized (replicaStateContext) {
                    return this.stopReplica(groupId, context, deferredStopOperation);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void reserveReplica(ReplicationGroupId groupId, HybridTimestamp leaseStartTime) {
        ReplicaStateContext context;
        ReplicaStateContext replicaStateContext = context = this.getContext(groupId);
        synchronized (replicaStateContext) {
            LOG.debug("Trying to reserve replica [groupId={}, leaseStartTime={}, replicaState={}, reservedForPrimary={}].", new Object[]{groupId, leaseStartTime, context.replicaState, context.reservedForPrimary});
            ReplicaState state = context.replicaState;
            if (state == ReplicaState.STOPPING || state == ReplicaState.STOPPED) {
                if (state == ReplicaState.STOPPED) {
                    this.replicaContexts.remove(groupId);
                }
                if (context.reservedForPrimary) {
                    throw new AssertionError((Object)("Unexpected replica reservation with " + state + " state [groupId=" + groupId + "]."));
                }
            } else if (state == ReplicaState.RESTART_PLANNED) {
                context.releaseReservation();
            } else {
                context.reserve(groupId, leaseStartTime);
            }
            if (!context.reservedForPrimary) {
                throw new ReplicaReservationFailedException(IgniteStringFormatter.format((String)"Replica reservation failed [groupId={}, leaseStartTime={}, currentReplicaState={}].", (Object[])new Object[]{groupId, leaseStartTime, state}));
            }
        }
    }

    private static class ReplicaStateContext {
        ReplicaState replicaState;
        CompletableFuture<Boolean> previousOperationFuture;
        boolean reservedForPrimary;
        @Nullable
        HybridTimestamp leaseStartTime;
        @Nullable
        CompletableFuture<Void> deferredStopReadyFuture;

        ReplicaStateContext(ReplicaState replicaState, CompletableFuture<Boolean> previousOperationFuture) {
            this.replicaState = replicaState;
            this.previousOperationFuture = previousOperationFuture;
        }

        void reserve(ReplicationGroupId groupId, HybridTimestamp leaseStartTime) {
            if (this.reservedForPrimary && this.leaseStartTime != null && leaseStartTime.compareTo(this.leaseStartTime) < 0) {
                throw new ReplicaReservationFailedException(IgniteStringFormatter.format((String)"Replica reservation failed: newer lease has already reserved this replica [groupId={}, requestedLeaseStartTime={}, newerLeaseStartTime={}].", (Object[])new Object[]{groupId, leaseStartTime, this.leaseStartTime}));
            }
            LOG.debug("Reserving replica [groupId={}, leaseStartTime={}].", new Object[]{groupId, leaseStartTime});
            this.leaseStartTime = leaseStartTime;
            this.reservedForPrimary = true;
        }

        void releaseReservation() {
            this.reservedForPrimary = false;
            this.leaseStartTime = null;
            if (this.deferredStopReadyFuture != null && (this.replicaState == ReplicaState.PRIMARY_ONLY || this.replicaState == ReplicaState.RESTART_PLANNED)) {
                this.deferredStopReadyFuture.complete(null);
                this.deferredStopReadyFuture = null;
            }
        }

        void assertReservation(ReplicationGroupId groupId) {
            assert (this.reservedForPrimary) : "Replica is elected as primary but not reserved [groupId=" + groupId + ", leaseStartTime=" + this.leaseStartTime + "].";
            assert (this.leaseStartTime != null) : "Replica is reserved but lease start time is null [groupId=" + groupId + ", leaseStartTime=" + this.leaseStartTime + "].";
        }
    }

    private static enum ReplicaState {
        STARTING,
        ASSIGNED,
        PRIMARY_ONLY,
        RESTART_PLANNED,
        STOPPING,
        STOPPED;

    }
}

