package org.apache.ignite3.internal.distributionzones.rebalance;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite3.internal.configuration.utils.SystemDistributedConfigurationPropertyHolder;
import org.apache.ignite3.internal.failure.FailureContext;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.lang.NodeStoppingException;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.MetaStorageManager;
import org.apache.ignite3.internal.metastorage.dsl.CompoundCondition;
import org.apache.ignite3.internal.metastorage.dsl.Conditions;
import org.apache.ignite3.internal.metastorage.dsl.Operation;
import org.apache.ignite3.internal.metastorage.dsl.Operations;
import org.apache.ignite3.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite3.internal.metastorage.dsl.Statements;
import org.apache.ignite3.internal.metastorage.dsl.Update;
import org.apache.ignite3.internal.partitiondistribution.Assignment;
import org.apache.ignite3.internal.partitiondistribution.Assignments;
import org.apache.ignite3.internal.partitiondistribution.AssignmentsChain;
import org.apache.ignite3.internal.partitiondistribution.AssignmentsQueue;
import org.apache.ignite3.internal.raft.PeersAndLearners;
import org.apache.ignite3.internal.raft.RaftError;
import org.apache.ignite3.internal.raft.RaftGroupEventsListener;
import org.apache.ignite3.internal.raft.Status;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.util.CollectionUtils;
import org.apache.ignite3.internal.util.ExceptionUtils;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.TestOnly;

/* loaded from: input_file:org/apache/ignite3/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.class */
public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener {
    private static final IgniteLogger LOG;
    private static final int REBALANCE_RETRY_THRESHOLD = 10;
    private static final int SWITCH_APPEND_SUCCESS = 1;
    private static final int SWITCH_REDUCE_SUCCESS = 2;
    private static final int SCHEDULE_PENDING_REBALANCE_SUCCESS = 3;
    private static final int FINISH_REBALANCE_SUCCESS = 4;
    private static final int SWITCH_APPEND_FAIL = -1;
    private static final int SWITCH_REDUCE_FAIL = -2;
    private static final int SCHEDULE_PENDING_REBALANCE_FAIL = -3;
    private static final int FINISH_REBALANCE_FAIL = -4;
    private final MetaStorageManager metaStorageMgr;
    private final FailureProcessor failureProcessor;
    private final TablePartitionId tablePartitionId;
    private final IgniteSpinBusyLock busyLock;
    private final ScheduledExecutorService rebalanceScheduler;
    private final PartitionMover partitionMover;
    private final AtomicInteger rebalanceAttempts = new AtomicInteger(0);
    private final SystemDistributedConfigurationPropertyHolder<Integer> retryDelayConfiguration;
    private final BiFunction<TablePartitionId, Long, CompletableFuture<Set<Assignment>>> calculateAssignmentsFn;
    static final /* synthetic */ boolean $assertionsDisabled;

    public RebalanceRaftGroupEventsListener(MetaStorageManager metaStorageManager, FailureProcessor failureProcessor, TablePartitionId tablePartitionId, IgniteSpinBusyLock igniteSpinBusyLock, PartitionMover partitionMover, BiFunction<TablePartitionId, Long, CompletableFuture<Set<Assignment>>> biFunction, ScheduledExecutorService scheduledExecutorService, SystemDistributedConfigurationPropertyHolder<Integer> systemDistributedConfigurationPropertyHolder) {
        this.metaStorageMgr = metaStorageManager;
        this.failureProcessor = failureProcessor;
        this.tablePartitionId = tablePartitionId;
        this.busyLock = igniteSpinBusyLock;
        this.partitionMover = partitionMover;
        this.calculateAssignmentsFn = biFunction;
        this.rebalanceScheduler = scheduledExecutorService;
        this.retryDelayConfiguration = systemDistributedConfigurationPropertyHolder;
    }

    @Override // org.apache.ignite3.internal.raft.RaftGroupEventsListener
    public void onLeaderElected(long j) {
    }

    @Override // org.apache.ignite3.internal.raft.RaftGroupEventsListener
    public void onNewPeersConfigurationApplied(PeersAndLearners peersAndLearners, long j, long j2) {
        if (this.busyLock.enterBusy()) {
            try {
                this.rebalanceScheduler.schedule(() -> {
                    if (this.busyLock.enterBusy()) {
                        try {
                            doStableKeySwitch(createAssignments(peersAndLearners), this.tablePartitionId, this.metaStorageMgr, j, j2, this.calculateAssignmentsFn);
                            this.busyLock.leaveBusy();
                        } catch (Throwable th) {
                            this.busyLock.leaveBusy();
                            throw th;
                        }
                    }
                }, 0L, TimeUnit.MILLISECONDS);
                this.busyLock.leaveBusy();
            } catch (Throwable th) {
                this.busyLock.leaveBusy();
                throw th;
            }
        }
    }

    @Override // org.apache.ignite3.internal.raft.RaftGroupEventsListener
    public void onReconfigurationError(Status status, PeersAndLearners peersAndLearners, long j) {
        if (this.busyLock.enterBusy()) {
            try {
                if (!$assertionsDisabled && status == null) {
                    throw new AssertionError();
                }
                if (status.equals(Status.LEADER_STEPPED_DOWN)) {
                    LOG.info("Leader stepped down during rebalance [partId={}]", this.tablePartitionId);
                    this.busyLock.leaveBusy();
                    return;
                }
                RaftError error = status.error();
                if (!$assertionsDisabled && error != RaftError.ECATCHUP) {
                    throw new AssertionError("According to the JRaft protocol, " + RaftError.ECATCHUP + " is expected, got " + error);
                }
                LOG.debug("Error occurred during rebalance [partId={}]", this.tablePartitionId);
                if (this.rebalanceAttempts.incrementAndGet() < 10) {
                    scheduleChangePeersAndLearners(peersAndLearners, j);
                } else {
                    LOG.info("Number of retries for rebalance exceeded the threshold [partId={}, threshold={}]", this.tablePartitionId, 10);
                    scheduleChangePeersAndLearners(peersAndLearners, j);
                }
            } finally {
                this.busyLock.leaveBusy();
            }
        }
    }

    @TestOnly
    public int currentRetryDelay() {
        return this.retryDelayConfiguration.currentValue().intValue();
    }

    private void scheduleChangePeersAndLearners(PeersAndLearners peersAndLearners, long j) {
        this.rebalanceScheduler.schedule(() -> {
            if (this.busyLock.enterBusy()) {
                LOG.info("Going to retry rebalance [attemptNo={}, partId={}]", Integer.valueOf(this.rebalanceAttempts.get()), this.tablePartitionId);
                try {
                    this.partitionMover.movePartition(peersAndLearners, j).join();
                    this.busyLock.leaveBusy();
                } catch (Throwable th) {
                    this.busyLock.leaveBusy();
                    throw th;
                }
            }
        }, this.retryDelayConfiguration.currentValue().intValue(), TimeUnit.MILLISECONDS);
    }

    private void doStableKeySwitch(Set<Assignment> set, TablePartitionId tablePartitionId, MetaStorageManager metaStorageManager, long j, long j2, BiFunction<TablePartitionId, Long, CompletableFuture<Set<Assignment>>> biFunction) {
        SimpleCondition notExists;
        Update yield;
        Update yield2;
        try {
            ByteArray pendingPartAssignmentsQueueKey = RebalanceUtil.pendingPartAssignmentsQueueKey(tablePartitionId);
            ByteArray stablePartAssignmentsKey = RebalanceUtil.stablePartAssignmentsKey(tablePartitionId);
            ByteArray plannedPartAssignmentsKey = RebalanceUtil.plannedPartAssignmentsKey(tablePartitionId);
            ByteArray switchReduceKey = RebalanceUtil.switchReduceKey(tablePartitionId);
            ByteArray switchAppendKey = RebalanceUtil.switchAppendKey(tablePartitionId);
            ByteArray assignmentsChainKey = RebalanceUtil.assignmentsChainKey(tablePartitionId);
            Map<ByteArray, Entry> map = metaStorageManager.getAll(Set.of(plannedPartAssignmentsKey, pendingPartAssignmentsQueueKey, stablePartAssignmentsKey, switchReduceKey, switchAppendKey, assignmentsChainKey)).get();
            Entry entry = map.get(stablePartAssignmentsKey);
            Entry entry2 = map.get(pendingPartAssignmentsQueueKey);
            Entry entry3 = map.get(plannedPartAssignmentsKey);
            Entry entry4 = map.get(switchReduceKey);
            Entry entry5 = map.get(switchAppendKey);
            Entry entry6 = map.get(assignmentsChainKey);
            Set<Assignment> nodes = readAssignments(entry).nodes();
            Set<Assignment> nodes2 = readAssignments(entry4).nodes();
            Set<Assignment> nodes3 = readAssignments(entry5).nodes();
            Assignments poll = entry2.value() == null ? Assignments.EMPTY : AssignmentsQueue.fromBytes(entry2.value()).poll();
            if (poll.nodes().equals(set)) {
                Set<Assignment> set2 = biFunction.apply(tablePartitionId, Long.valueOf(poll.timestamp())).get();
                Set difference = CollectionUtils.difference(nodes2, set);
                Set difference2 = CollectionUtils.difference(set, nodes);
                Set difference3 = CollectionUtils.difference(nodes2, difference);
                Set intersect = CollectionUtils.intersect(set2, CollectionUtils.difference(RebalanceUtil.union(nodes3, difference), difference2));
                Set difference4 = CollectionUtils.difference(set, nodes2);
                Set intersect2 = CollectionUtils.intersect(set2, RebalanceUtil.union(set, difference));
                CompoundCondition and = Conditions.and(entry.empty() ? Conditions.notExists(stablePartAssignmentsKey) : Conditions.revision(stablePartAssignmentsKey).eq(entry.revision()), Conditions.and(Conditions.revision(pendingPartAssignmentsQueueKey).eq(entry2.revision()), Conditions.and(entry4.empty() ? Conditions.notExists(switchReduceKey) : Conditions.revision(switchReduceKey).eq(entry4.revision()), entry5.empty() ? Conditions.notExists(switchAppendKey) : Conditions.revision(switchAppendKey).eq(entry5.revision()))));
                long timestamp = poll.timestamp();
                Assignments of = Assignments.of(set, timestamp);
                Operation handleAssignmentsChainChange = handleAssignmentsChainChange(assignmentsChainKey, entry6, poll, of, j, j2);
                byte[] bytes = of.toBytes();
                byte[] bytes2 = AssignmentsQueue.toBytes(Assignments.of((Set<Assignment>) intersect2, timestamp));
                byte[] bytes3 = AssignmentsQueue.toBytes(Assignments.of((Set<Assignment>) difference4, timestamp));
                byte[] bytes4 = Assignments.toBytes(difference3, timestamp);
                byte[] bytes5 = Assignments.toBytes(intersect, timestamp);
                if (!intersect.isEmpty()) {
                    yield = Operations.ops(Operations.put(stablePartAssignmentsKey, bytes), Operations.put(pendingPartAssignmentsQueueKey, bytes2), Operations.put(switchReduceKey, bytes4), Operations.put(switchAppendKey, bytes5), handleAssignmentsChainChange).yield(1);
                    yield2 = Operations.ops(new Operation[0]).yield(-1);
                } else if (difference3.isEmpty()) {
                    if (entry3.value() != null) {
                        notExists = Conditions.revision(plannedPartAssignmentsKey).eq(entry3.revision());
                        yield = Operations.ops(Operations.put(stablePartAssignmentsKey, bytes), Operations.put(pendingPartAssignmentsQueueKey, AssignmentsQueue.toBytes(Assignments.fromBytes(entry3.value()))), Operations.remove(plannedPartAssignmentsKey), handleAssignmentsChainChange).yield(3);
                        yield2 = Operations.ops(new Operation[0]).yield(SCHEDULE_PENDING_REBALANCE_FAIL);
                    } else {
                        notExists = Conditions.notExists(plannedPartAssignmentsKey);
                        yield = Operations.ops(Operations.put(stablePartAssignmentsKey, bytes), Operations.remove(pendingPartAssignmentsQueueKey), handleAssignmentsChainChange).yield(4);
                        yield2 = Operations.ops(new Operation[0]).yield(FINISH_REBALANCE_FAIL);
                    }
                    and = Conditions.and(and, notExists);
                } else {
                    yield = Operations.ops(Operations.put(stablePartAssignmentsKey, bytes), Operations.put(pendingPartAssignmentsQueueKey, bytes3), Operations.put(switchReduceKey, bytes4), Operations.put(switchAppendKey, bytes5), handleAssignmentsChainChange).yield(2);
                    yield2 = Operations.ops(new Operation[0]).yield(SWITCH_REDUCE_FAIL);
                }
                int asInt = metaStorageManager.invoke(Statements.iif(and, yield, yield2)).get().getAsInt();
                if (asInt < 0) {
                    switch (asInt) {
                        case FINISH_REBALANCE_FAIL /* -4 */:
                        case SCHEDULE_PENDING_REBALANCE_FAIL /* -3 */:
                            LOG.info("Rebalance keys changed while trying to update rebalance information. Going to retry [tablePartitionId={}, appliedPeers={}]", tablePartitionId, set);
                            break;
                        case SWITCH_REDUCE_FAIL /* -2 */:
                            LOG.info("Rebalance keys changed while trying to update rebalance pending reduce information. Going to retry [tablePartitionID={}, appliedPeers={}]", tablePartitionId, set);
                            break;
                        case -1:
                            LOG.info("Rebalance keys changed while trying to update rebalance pending addition information. Going to retry [tablePartitionID={}, appliedPeers={}]", tablePartitionId, set);
                            break;
                        default:
                            if (!$assertionsDisabled) {
                                throw new AssertionError(asInt);
                            }
                            break;
                    }
                    doStableKeySwitch(set, tablePartitionId, metaStorageManager, j, j2, biFunction);
                    return;
                }
                switch (asInt) {
                    case 1:
                        LOG.info("Rebalance finished. Going to schedule next rebalance with addition [tablePartitionId={}, appliedPeers={}, plannedPeers={}]", tablePartitionId, set, intersect2);
                        break;
                    case 2:
                        LOG.info("Rebalance finished. Going to schedule next rebalance with reduction [tablePartitionId={}, appliedPeers={}, plannedPeers={}]", tablePartitionId, set, difference4);
                        break;
                    case 3:
                        LOG.info("Rebalance finished. Going to schedule next rebalance [tablePartitionId={}, appliedPeers={}, plannedPeers={}]", tablePartitionId, set, Assignments.fromBytes(entry3.value()).nodes());
                        break;
                    case 4:
                        LOG.info("Rebalance finished [tablePartitionId={}, appliedPeers={}]", tablePartitionId, set);
                        break;
                    default:
                        if (!$assertionsDisabled) {
                            throw new AssertionError(asInt);
                        }
                        break;
                }
            }
        } catch (InterruptedException | ExecutionException e) {
            if (ExceptionUtils.hasCause(e, NodeStoppingException.class)) {
                return;
            }
            if (ExceptionUtils.hasCause(e, TimeoutException.class)) {
                LOG.error("Unable to commit partition configuration to metastore: {}", e, tablePartitionId);
            } else {
                this.failureProcessor.process(new FailureContext(e, String.format("Unable to commit partition configuration to metastore: %s", tablePartitionId)));
            }
        }
    }

    private static Operation handleAssignmentsChainChange(ByteArray byteArray, Entry entry, Assignments assignments, Assignments assignments2, long j, long j2) {
        return entry.value() != null ? Operations.put(byteArray, updateAssignmentsChain(AssignmentsChain.fromBytes(entry.value()), assignments2, assignments, j, j2).toBytes()) : Operations.noop();
    }

    private static AssignmentsChain updateAssignmentsChain(AssignmentsChain assignmentsChain, Assignments assignments, Assignments assignments2, long j, long j2) {
        if (!$assertionsDisabled && assignmentsChain == null) {
            throw new AssertionError("Assignments chain cannot be null in HA mode.");
        }
        if (!$assertionsDisabled && assignmentsChain.size() <= 0) {
            throw new AssertionError("Assignments chain cannot be empty on stable switch.");
        }
        if (!assignments2.force() && !assignments2.fromReset()) {
            return AssignmentsChain.of(j, j2, assignments);
        }
        if (assignments2.force() || !assignments2.fromReset()) {
            assignmentsChain.addLast(assignments, j, j2);
        } else {
            assignmentsChain.replaceLast(assignments, j, j2);
        }
        return assignmentsChain;
    }

    private static Set<Assignment> createAssignments(PeersAndLearners peersAndLearners) {
        return (Set) Stream.concat(peersAndLearners.peers().stream().map(peer -> {
            return Assignment.forPeer(peer.consistentId());
        }), peersAndLearners.learners().stream().map(peer2 -> {
            return Assignment.forLearner(peer2.consistentId());
        })).collect(Collectors.toSet());
    }

    private static Assignments readAssignments(Entry entry) {
        byte[] value = entry.value();
        return value == null ? Assignments.EMPTY : Assignments.fromBytes(value);
    }

    static {
        $assertionsDisabled = !RebalanceRaftGroupEventsListener.class.desiredAssertionStatus();
        LOG = Loggers.forClass(RebalanceRaftGroupEventsListener.class);
    }
}
