package org.apache.ignite.internal.partition.replicator;

import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.rebalance.PartitionMover;
import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceRaftGroupEventsListener;
import org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.partition.replicator.snapshot.FailFastSnapshotStorageFactory;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.class */
public class PartitionReplicaLifecycleManager implements IgniteComponent {
    public static final String FEATURE_FLAG_NAME = "IGNITE_ZONE_BASED_REPLICATION";
    public static final boolean ENABLED;
    private final CatalogManager catalogMgr;
    private final ReplicaManager replicaMgr;
    private final DistributionZoneManager distributionZoneMgr;
    private final MetaStorageManager metaStorageMgr;
    private final TopologyService topologyService;
    private final LowWatermark lowWatermark;
    private static final IgniteLogger LOG;
    private final ExecutorService ioExecutor;
    private final ScheduledExecutorService rebalanceScheduler;
    private final Executor partitionOperationsExecutor;
    private final ClockService clockService;
    private final PlacementDriver placementDriver;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Set<ReplicationGroupId> replicationGroupIds = ConcurrentHashMap.newKeySet();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final Predicate<Assignment> isLocalNodeAssignment = assignment -> {
        return assignment.consistentId().equals(localNode().name());
    };
    private final WatchListener pendingAssignmentsRebalanceListener = createPendingAssignmentsRebalanceListener();
    private final WatchListener stableAssignmentsRebalanceListener = createStableAssignmentsRebalanceListener();
    private final WatchListener assignmentsSwitchRebalanceListener = createAssignmentsSwitchRebalanceListener();

    public PartitionReplicaLifecycleManager(CatalogManager catalogManager, ReplicaManager replicaManager, DistributionZoneManager distributionZoneManager, MetaStorageManager metaStorageManager, TopologyService topologyService, LowWatermark lowWatermark, ExecutorService executorService, ScheduledExecutorService scheduledExecutorService, Executor executor, ClockService clockService, PlacementDriver placementDriver) {
        this.catalogMgr = catalogManager;
        this.replicaMgr = replicaManager;
        this.distributionZoneMgr = distributionZoneManager;
        this.metaStorageMgr = metaStorageManager;
        this.topologyService = topologyService;
        this.lowWatermark = lowWatermark;
        this.ioExecutor = executorService;
        this.rebalanceScheduler = scheduledExecutorService;
        this.partitionOperationsExecutor = executor;
        this.clockService = clockService;
        this.placementDriver = placementDriver;
    }

    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        if (!ENABLED) {
            return CompletableFutures.nullCompletedFuture();
        }
        CompletableFuture recoveryFinishedFuture = this.metaStorageMgr.recoveryFinishedFuture();
        if (!$assertionsDisabled && !recoveryFinishedFuture.isDone()) {
            throw new AssertionError();
        }
        long longValue = ((Long) recoveryFinishedFuture.join()).longValue();
        cleanUpResourcesForDroppedZonesOnRecovery();
        CompletableFuture thenCompose = processZonesOnStart(longValue, this.lowWatermark.getLowWatermark()).thenCompose(r7 -> {
            return processAssignmentsOnRecovery(longValue);
        });
        this.metaStorageMgr.registerPrefixWatch(ByteArray.fromString("zone.assignments.pending."), this.pendingAssignmentsRebalanceListener);
        this.metaStorageMgr.registerPrefixWatch(ByteArray.fromString("zone.assignments.stable."), this.stableAssignmentsRebalanceListener);
        this.metaStorageMgr.registerPrefixWatch(ByteArray.fromString("zone.assignments.switch.reduce."), this.assignmentsSwitchRebalanceListener);
        this.catalogMgr.listen(CatalogEvent.ZONE_CREATE, createZoneEventParameters -> {
            return (CompletableFuture) IgniteUtils.inBusyLock(this.busyLock, () -> {
                return onCreateZone(createZoneEventParameters).thenApply(r2 -> {
                    return false;
                });
            });
        });
        return thenCompose;
    }

    private CompletableFuture<Void> processZonesOnStart(long j, @Nullable HybridTimestamp hybridTimestamp) {
        int activeCatalogVersion = this.catalogMgr.activeCatalogVersion(HybridTimestamp.hybridTimestampToLong(hybridTimestamp));
        int latestCatalogVersion = this.catalogMgr.latestCatalogVersion();
        IntOpenHashSet intOpenHashSet = new IntOpenHashSet();
        ArrayList arrayList = new ArrayList();
        for (int i = latestCatalogVersion; i >= activeCatalogVersion; i--) {
            int i2 = i;
            this.catalogMgr.zones(i).stream().filter(catalogZoneDescriptor -> {
                return intOpenHashSet.add(catalogZoneDescriptor.id());
            }).forEach(catalogZoneDescriptor2 -> {
                arrayList.add(calculateZoneAssignmentsAndCreateReplicationNodes(j, i2, catalogZoneDescriptor2));
            });
        }
        return CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(i3 -> {
            return new CompletableFuture[i3];
        })).whenComplete((r10, th) -> {
            if (th != null) {
                LOG.error("Error starting zones", th);
            } else {
                LOG.debug("Zones started successfully [earliestCatalogVersion={}, latestCatalogVersion={}, startedZoneIds={}]", new Object[]{Integer.valueOf(activeCatalogVersion), Integer.valueOf(latestCatalogVersion), intOpenHashSet});
            }
        });
    }

    private CompletableFuture<Void> processAssignmentsOnRecovery(long j) {
        return CompletableFuture.allOf(handleAssignmentsOnRecovery(new ByteArray("zone.assignments.stable."), j, (entry, l) -> {
            return handleChangeStableAssignmentEvent(entry, l.longValue(), true);
        }, "stable"), handleAssignmentsOnRecovery(new ByteArray("zone.assignments.pending."), j, (entry2, l2) -> {
            return handleChangePendingAssignmentEvent(entry2, l2.longValue(), true);
        }, "pending"));
    }

    private CompletableFuture<Void> handleAssignmentsOnRecovery(ByteArray byteArray, long j, BiFunction<Entry, Long, CompletableFuture<Void>> biFunction, String str) {
        Cursor prefixLocally = this.metaStorageMgr.prefixLocally(byteArray, j);
        try {
            CompletableFuture<Void> whenComplete = CompletableFuture.allOf((CompletableFuture[]) prefixLocally.stream().map(entry -> {
                if (LOG.isInfoEnabled()) {
                    LOG.info("Non handled {} assignments for key '{}' discovered, performing recovery", new Object[]{str, new String(entry.key(), StandardCharsets.UTF_8)});
                }
                return (CompletableFuture) biFunction.apply(entry, Long.valueOf(j));
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).whenComplete((r4, th) -> {
                if (th != null) {
                    LOG.error("Error when performing assignments recovery", th);
                }
            });
            if (prefixLocally != null) {
                prefixLocally.close();
            }
            return whenComplete;
        } catch (Throwable th2) {
            if (prefixLocally != null) {
                try {
                    prefixLocally.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    private void cleanUpResourcesForDroppedZonesOnRecovery() {
    }

    private CompletableFuture<Void> onCreateZone(CreateZoneEventParameters createZoneEventParameters) {
        return calculateZoneAssignmentsAndCreateReplicationNodes(createZoneEventParameters.causalityToken(), createZoneEventParameters.catalogVersion(), createZoneEventParameters.zoneDescriptor());
    }

    private CompletableFuture<Void> calculateZoneAssignmentsAndCreateReplicationNodes(long j, int i, CatalogZoneDescriptor catalogZoneDescriptor) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            return createZoneReplicationNodes(writeZoneAssignmentsToMetastore(catalogZoneDescriptor.id(), getOrCreateAssignments(catalogZoneDescriptor, j, i)), catalogZoneDescriptor.id());
        });
    }

    private CompletableFuture<Void> createZoneReplicationNodes(CompletableFuture<List<Assignments>> completableFuture, int i) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            return completableFuture.thenCompose(list -> {
                if (!$assertionsDisabled && list == null) {
                    throw new AssertionError(IgniteStringFormatter.format("Zone has empty assignments [id={}].", new Object[]{Integer.valueOf(i)}));
                }
                ArrayList arrayList = new ArrayList();
                for (int i2 = 0; i2 < list.size(); i2++) {
                    Assignments assignments = (Assignments) list.get(i2);
                    arrayList.add(createZonePartitionReplicationNode(i, i2, localMemberAssignment(assignments), assignments));
                }
                return CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0]));
            });
        });
    }

    private CompletableFuture<Void> createZonePartitionReplicationNode(int i, int i2, @Nullable Assignment assignment, Assignments assignments) {
        if (assignment == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        PeersAndLearners fromAssignments = PeersAndLearners.fromAssignments(assignments.nodes());
        ReplicationGroupId zonePartitionId = new ZonePartitionId(i, i2);
        ZonePartitionRaftListener zonePartitionRaftListener = new ZonePartitionRaftListener();
        ZoneRebalanceRaftGroupEventsListener zoneRebalanceRaftGroupEventsListener = new ZoneRebalanceRaftGroupEventsListener(this.metaStorageMgr, zonePartitionId, this.busyLock, createPartitionMover(zonePartitionId), this.rebalanceScheduler, this.catalogMgr, this.distributionZoneMgr);
        this.replicationGroupIds.add(zonePartitionId);
        try {
            return this.replicaMgr.startReplica(zonePartitionId, raftGroupService -> {
                return new ZonePartitionReplicaListener(new ExecutorInclinedRaftCommandRunner(raftGroupService, this.partitionOperationsExecutor));
            }, new FailFastSnapshotStorageFactory(), fromAssignments, zonePartitionRaftListener, zoneRebalanceRaftGroupEventsListener, this.busyLock).thenApply(replica -> {
                return null;
            });
        } catch (NodeStoppingException e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    private PartitionMover createPartitionMover(ZonePartitionId zonePartitionId) {
        return new PartitionMover(this.busyLock, () -> {
            CompletableFuture replica = this.replicaMgr.replica(zonePartitionId);
            return replica == null ? CompletableFuture.failedFuture(new IgniteInternalException("No such replica for partition " + zonePartitionId.partitionId() + " in zone " + zonePartitionId.zoneId())) : replica.thenApply((v0) -> {
                return v0.raftClient();
            });
        });
    }

    private ClusterNode localNode() {
        return this.topologyService.localMember();
    }

    public void beforeNodeStop() {
        this.busyLock.block();
        this.metaStorageMgr.unregisterWatch(this.pendingAssignmentsRebalanceListener);
        this.metaStorageMgr.unregisterWatch(this.stableAssignmentsRebalanceListener);
        this.metaStorageMgr.unregisterWatch(this.assignmentsSwitchRebalanceListener);
        cleanUpPartitionsResources(this.replicationGroupIds);
    }

    private CompletableFuture<List<Assignments>> writeZoneAssignmentsToMetastore(int i, CompletableFuture<List<Assignments>> completableFuture) {
        return completableFuture.thenCompose(list -> {
            if (!$assertionsDisabled && list.isEmpty()) {
                throw new AssertionError();
            }
            ArrayList arrayList = new ArrayList(list.size());
            for (int i2 = 0; i2 < list.size(); i2++) {
                arrayList.add(Operations.put(ZoneRebalanceUtil.stablePartAssignmentsKey(new ZonePartitionId(i, i2)), ((Assignments) list.get(i2)).toBytes()));
            }
            return this.metaStorageMgr.invoke(Conditions.notExists(new ByteArray(ByteUtils.toByteArray(((Operation) arrayList.get(0)).key()))), arrayList, Collections.emptyList()).handle((bool, th) -> {
                if (th == null) {
                    return bool;
                }
                LOG.error("Couldn't write assignments [assignmentsList={}] to metastore during invoke.", th, new Object[]{Assignments.assignmentListToString(list)});
                throw ((RuntimeException) ExceptionUtils.sneakyThrow(th));
            }).thenCompose(bool2 -> {
                if (!bool2.booleanValue()) {
                    return this.metaStorageMgr.getAll((Set) IntStream.range(0, list.size()).mapToObj(i3 -> {
                        return ZoneRebalanceUtil.stablePartAssignmentsKey(new ZonePartitionId(i, i3));
                    }).collect(Collectors.toSet())).thenApply(map -> {
                        ArrayList arrayList2 = new ArrayList();
                        for (int i4 = 0; i4 < list.size(); i4++) {
                            ZonePartitionId zonePartitionId = new ZonePartitionId(i, i4);
                            Entry entry = (Entry) map.get(ZoneRebalanceUtil.stablePartAssignmentsKey(zonePartitionId));
                            if (!$assertionsDisabled && (entry == null || entry.empty() || entry.tombstone())) {
                                throw new AssertionError("Unexpected assignments for partition [" + zonePartitionId + ", entry=" + entry + "].");
                            }
                            arrayList2.add(Assignments.fromBytes(entry.value()));
                        }
                        LOG.info("Assignments picked up from meta storage [zoneId={}, assignments={}].", new Object[]{Integer.valueOf(i), Assignments.assignmentListToString(arrayList2)});
                        return arrayList2;
                    });
                }
                LOG.info("Assignments calculated from data nodes are successfully written to meta storage [zoneId={}, assignments={}].", new Object[]{Integer.valueOf(i), Assignments.assignmentListToString(list)});
                return CompletableFuture.completedFuture(list);
            }).handle((list, th2) -> {
                if (th2 == null) {
                    return list;
                }
                LOG.error("Couldn't get assignments from metastore for zone [zoneId={}].", th2, new Object[]{Integer.valueOf(i)});
                throw ((RuntimeException) ExceptionUtils.sneakyThrow(th2));
            });
        });
    }

    private CompletableFuture<List<Assignments>> getOrCreateAssignments(CatalogZoneDescriptor catalogZoneDescriptor, long j, int i) {
        CompletableFuture<List<Assignments>> thenApply;
        if (ZoneRebalanceUtil.zonePartitionAssignmentsGetLocally(this.metaStorageMgr, catalogZoneDescriptor.id(), 0, j) != null) {
            thenApply = CompletableFuture.completedFuture(ZoneRebalanceUtil.zoneAssignmentsGetLocally(this.metaStorageMgr, catalogZoneDescriptor.id(), catalogZoneDescriptor.partitions(), j));
        } else {
            thenApply = this.distributionZoneMgr.dataNodes(j, i, catalogZoneDescriptor.id()).thenApply(set -> {
                return (List) AffinityUtils.calculateAssignments(set, catalogZoneDescriptor.partitions(), catalogZoneDescriptor.replicas()).stream().map(Assignments::of).collect(Collectors.toList());
            });
            thenApply.thenAccept(list -> {
                LOG.info("Assignments calculated from data nodes [zone={}, zoneId={}, assignments={}, revision={}]", new Object[]{catalogZoneDescriptor.name(), Integer.valueOf(catalogZoneDescriptor.id()), Assignments.assignmentListToString(list), Long.valueOf(j)});
            });
        }
        return thenApply;
    }

    public boolean hasLocalPartition(ZonePartitionId zonePartitionId) {
        return this.replicationGroupIds.contains(zonePartitionId);
    }

    private WatchListener createPendingAssignmentsRebalanceListener() {
        return new WatchListener() { // from class: org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager.1
            public CompletableFuture<Void> onUpdate(WatchEvent watchEvent) {
                if (!PartitionReplicaLifecycleManager.this.busyLock.enterBusy()) {
                    return CompletableFuture.failedFuture(new NodeStoppingException());
                }
                try {
                    CompletableFuture<Void> handleChangePendingAssignmentEvent = PartitionReplicaLifecycleManager.this.handleChangePendingAssignmentEvent(watchEvent.entryEvent().newEntry(), watchEvent.revision(), false);
                    PartitionReplicaLifecycleManager.this.busyLock.leaveBusy();
                    return handleChangePendingAssignmentEvent;
                } catch (Throwable th) {
                    PartitionReplicaLifecycleManager.this.busyLock.leaveBusy();
                    throw th;
                }
            }

            public void onError(Throwable th) {
                PartitionReplicaLifecycleManager.LOG.warn("Unable to process pending assignments event", th);
            }
        };
    }

    private WatchListener createStableAssignmentsRebalanceListener() {
        return new WatchListener() { // from class: org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager.2
            public CompletableFuture<Void> onUpdate(WatchEvent watchEvent) {
                if (!PartitionReplicaLifecycleManager.this.busyLock.enterBusy()) {
                    return CompletableFuture.failedFuture(new NodeStoppingException());
                }
                try {
                    return PartitionReplicaLifecycleManager.this.handleChangeStableAssignmentEvent(watchEvent);
                } finally {
                    PartitionReplicaLifecycleManager.this.busyLock.leaveBusy();
                }
            }

            public void onError(Throwable th) {
                PartitionReplicaLifecycleManager.LOG.warn("Unable to process stable assignments event", th);
            }
        };
    }

    private WatchListener createAssignmentsSwitchRebalanceListener() {
        return new WatchListener() { // from class: org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager.3
            public CompletableFuture<Void> onUpdate(WatchEvent watchEvent) {
                return IgniteUtils.inBusyLockAsync(PartitionReplicaLifecycleManager.this.busyLock, () -> {
                    byte[] key = watchEvent.entryEvent().newEntry().key();
                    int extractPartitionNumber = ZoneRebalanceUtil.extractPartitionNumber(key);
                    int extractZoneId = ZoneRebalanceUtil.extractZoneId(key, "zone.assignments.switch.reduce.");
                    ZonePartitionId zonePartitionId = new ZonePartitionId(extractZoneId, extractPartitionNumber);
                    int latestCatalogVersion = PartitionReplicaLifecycleManager.this.catalogMgr.latestCatalogVersion();
                    CatalogZoneDescriptor zone = PartitionReplicaLifecycleManager.this.catalogMgr.zone(extractZoneId, latestCatalogVersion);
                    return PartitionReplicaLifecycleManager.this.distributionZoneMgr.dataNodes(zone.updateToken(), latestCatalogVersion, extractZoneId).thenCompose(set -> {
                        return ZoneRebalanceRaftGroupEventsListener.handleReduceChanged(PartitionReplicaLifecycleManager.this.metaStorageMgr, set, zone.replicas(), zonePartitionId, watchEvent);
                    });
                });
            }

            public void onError(Throwable th) {
                PartitionReplicaLifecycleManager.LOG.warn("Unable to process switch reduce event", th);
            }
        };
    }

    protected CompletableFuture<Void> handleChangeStableAssignmentEvent(WatchEvent watchEvent) {
        if (watchEvent.entryEvents().stream().allMatch(entryEvent -> {
            return entryEvent.oldEntry().value() == null;
        })) {
            return CompletableFutures.nullCompletedFuture();
        }
        if (!watchEvent.single()) {
            if ($assertionsDisabled || watchEvent.entryEvents().stream().allMatch(entryEvent2 -> {
                return entryEvent2.newEntry().tombstone();
            })) {
                return CompletableFutures.nullCompletedFuture();
            }
            throw new AssertionError(watchEvent);
        }
        if (!$assertionsDisabled && !watchEvent.single()) {
            throw new AssertionError(watchEvent);
        }
        if (watchEvent.entryEvent().oldEntry() == null) {
            return CompletableFutures.nullCompletedFuture();
        }
        Entry newEntry = watchEvent.entryEvent().newEntry();
        long revision = watchEvent.revision();
        if ($assertionsDisabled || newEntry.revision() == revision) {
            return newEntry.value() == null ? CompletableFutures.nullCompletedFuture() : handleChangeStableAssignmentEvent(newEntry, watchEvent.revision(), false);
        }
        throw new AssertionError(newEntry);
    }

    protected CompletableFuture<Void> handleChangeStableAssignmentEvent(Entry entry, long j, boolean z) {
        ZonePartitionId zonePartitionId = new ZonePartitionId(ZoneRebalanceUtil.extractZoneId(entry.key(), "zone.assignments.stable."), ZoneRebalanceUtil.extractPartitionNumber(entry.key()));
        Set emptySet = entry.value() == null ? Collections.emptySet() : Assignments.fromBytes(entry.value()).nodes();
        return CompletableFuture.supplyAsync(() -> {
            byte[] value = this.metaStorageMgr.getLocally(ZoneRebalanceUtil.pendingPartAssignmentsKey(zonePartitionId), j).value();
            return stopAndDestroyPartitionAndUpdateClients(zonePartitionId, emptySet, value == null ? Assignments.EMPTY : Assignments.fromBytes(value), z);
        }, this.ioExecutor).thenCompose(Function.identity());
    }

    private CompletableFuture<Void> updatePartitionClients(ZonePartitionId zonePartitionId, Set<Assignment> set) {
        return isLocalNodeIsPrimary(zonePartitionId).thenCompose(bool -> {
            return (CompletionStage) IgniteUtils.inBusyLock(this.busyLock, () -> {
                if (!isLocalNodeInAssignments(set) && !bool.booleanValue()) {
                    return CompletableFutures.nullCompletedFuture();
                }
                if ($assertionsDisabled || this.replicaMgr.isReplicaStarted(zonePartitionId)) {
                    return this.replicaMgr.replica(zonePartitionId).thenAccept(replica -> {
                        replica.updatePeersAndLearners(PeersAndLearners.fromAssignments(set));
                    });
                }
                throw new AssertionError("The local node is outside of the replication group [stable=" + set + ", isLeaseholder=" + bool + "].");
            });
        });
    }

    private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients(ZonePartitionId zonePartitionId, Set<Assignment> set, Assignments assignments, boolean z) {
        CompletableFuture<Void> nullCompletedFuture = z ? CompletableFutures.nullCompletedFuture() : updatePartitionClients(zonePartitionId, set);
        return (assignments.force() ? assignments.nodes().stream() : Stream.concat(set.stream(), assignments.nodes().stream())).noneMatch(assignment -> {
            return assignment.consistentId().equals(localNode().name());
        }) ? nullCompletedFuture.thenCompose(r5 -> {
            return stopAndDestroyPartition(zonePartitionId);
        }).thenAccept((Consumer<? super U>) obj -> {
        }) : nullCompletedFuture;
    }

    private CompletableFuture<?> stopAndDestroyPartition(ReplicationGroupId replicationGroupId) {
        return stopPartition(replicationGroupId);
    }

    private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry entry, long j, boolean z) {
        if (entry.value() == null || entry.empty()) {
            return CompletableFutures.nullCompletedFuture();
        }
        int extractPartitionNumber = ZoneRebalanceUtil.extractPartitionNumber(entry.key());
        int extractZoneId = ZoneRebalanceUtil.extractZoneId(entry.key(), "zone.assignments.pending.");
        ZonePartitionId zonePartitionId = new ZonePartitionId(extractZoneId, extractPartitionNumber);
        Assignments stableAssignments = stableAssignments(zonePartitionId, j);
        Assignments fromBytes = Assignments.fromBytes(entry.value());
        if (!this.busyLock.enterBusy()) {
            return CompletableFuture.failedFuture(new NodeStoppingException());
        }
        try {
            if (LOG.isInfoEnabled()) {
                LOG.info("Received update on pending assignments. Check if new replication node should be started [key={}, partition={}, zoneId={}, localMemberAddress={}, pendingAssignments={}, revision={}]", new Object[]{new String(entry.key(), StandardCharsets.UTF_8), Integer.valueOf(extractPartitionNumber), Integer.valueOf(extractZoneId), localNode().address(), fromBytes, Long.valueOf(j)});
            }
            CompletableFuture thenCompose = handleChangePendingAssignmentEvent(zonePartitionId, stableAssignments, fromBytes, j).thenCompose(r13 -> {
                return !isNodeInReducedStableOrPendingAssignments(zonePartitionId, stableAssignments, fromBytes, j) ? CompletableFutures.nullCompletedFuture() : changePeersOnRebalance(this.replicaMgr, zonePartitionId, fromBytes.nodes(), j);
            });
            this.busyLock.leaveBusy();
            return thenCompose;
        } catch (Throwable th) {
            this.busyLock.leaveBusy();
            throw th;
        }
    }

    private CompletableFuture<Void> handleChangePendingAssignmentEvent(ZonePartitionId zonePartitionId, @Nullable Assignments assignments, Assignments assignments2, long j) {
        CompletableFuture<Void> nullCompletedFuture;
        boolean force = assignments2.force();
        Set nodes = assignments2.nodes();
        Assignment localMemberAssignment = localMemberAssignment(assignments2);
        boolean z = localMemberAssignment != null && (assignments == null || !assignments.nodes().contains(localMemberAssignment));
        Assignments forced = (assignments == null || assignments.nodes().isEmpty()) ? Assignments.forced(nodes) : force ? assignments2 : assignments;
        int partitionId = zonePartitionId.partitionId();
        int zoneId = zonePartitionId.zoneId();
        if (z) {
            nullCompletedFuture = createZonePartitionReplicationNode(zoneId, partitionId, localMemberAssignment, forced);
        } else if (!force || localMemberAssignment == null) {
            nullCompletedFuture = CompletableFutures.nullCompletedFuture();
        } else {
            Assignments assignments3 = forced;
            nullCompletedFuture = CompletableFuture.runAsync(() -> {
                IgniteUtils.inBusyLock(this.busyLock, () -> {
                    this.replicaMgr.resetPeers(zonePartitionId, PeersAndLearners.fromAssignments(assignments3.nodes()));
                });
            }, this.ioExecutor);
        }
        return nullCompletedFuture.thenComposeAsync(r6 -> {
            return (CompletionStage) IgniteUtils.inBusyLock(this.busyLock, () -> {
                return isLocalNodeIsPrimary(zonePartitionId);
            });
        }, (Executor) this.ioExecutor).thenAcceptAsync((Consumer<? super U>) bool -> {
            IgniteUtils.inBusyLock(this.busyLock, () -> {
                boolean isNodeInReducedStableOrPendingAssignments = isNodeInReducedStableOrPendingAssignments(zonePartitionId, assignments, assignments2, j);
                if (isNodeInReducedStableOrPendingAssignments || bool.booleanValue()) {
                    if (!$assertionsDisabled && !isNodeInReducedStableOrPendingAssignments && !bool.booleanValue()) {
                        throw new AssertionError("The local node is outside of the replication group [inStableOrPending=" + isNodeInReducedStableOrPendingAssignments + ", isLeaseholder=" + bool + "].");
                    }
                    Set union = (force || assignments == null) ? nodes : RebalanceUtil.union(nodes, assignments.nodes());
                    this.replicaMgr.replica(zonePartitionId).thenAccept(replica -> {
                        replica.updatePeersAndLearners(PeersAndLearners.fromAssignments(union));
                    });
                }
            });
        }, (Executor) this.ioExecutor);
    }

    private CompletableFuture<Void> changePeersOnRebalance(ReplicaManager replicaManager, ZonePartitionId zonePartitionId, Set<Assignment> set, long j) {
        return replicaManager.replica(zonePartitionId).thenApply((v0) -> {
            return v0.raftClient();
        }).thenCompose(topologyAwareRaftGroupService -> {
            return topologyAwareRaftGroupService.refreshAndGetLeaderWithTerm().exceptionally(th -> {
                Throwable unwrapCause = ExceptionUtils.unwrapCause(th);
                if (!(unwrapCause instanceof TimeoutException)) {
                    throw new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, "Failed to get a leader for the RAFT replication group [get=" + zonePartitionId + "].", unwrapCause);
                }
                LOG.info("Node couldn't get the leader within timeout so the changing peers is skipped [grp={}].", new Object[]{zonePartitionId});
                return LeaderWithTerm.NO_LEADER;
            }).thenCompose(leaderWithTerm -> {
                if (leaderWithTerm.isEmpty() || !isLocalPeer(leaderWithTerm.leader())) {
                    return CompletableFutures.nullCompletedFuture();
                }
                LOG.info("Current node={} is the leader of partition raft group={}. Initiate rebalance process for partition={}, zoneId={}", new Object[]{leaderWithTerm.leader(), zonePartitionId, Integer.valueOf(zonePartitionId.partitionId()), Integer.valueOf(zonePartitionId.zoneId())});
                return this.metaStorageMgr.get(ZoneRebalanceUtil.pendingPartAssignmentsKey(zonePartitionId)).thenCompose(entry -> {
                    return j < entry.revision() ? CompletableFutures.nullCompletedFuture() : topologyAwareRaftGroupService.changePeersAndLearnersAsync(PeersAndLearners.fromAssignments(set), leaderWithTerm.term()).exceptionally(th2 -> {
                        return null;
                    });
                });
            });
        });
    }

    private boolean isLocalPeer(Peer peer) {
        return peer.consistentId().equals(localNode().name());
    }

    private boolean isLocalNodeInAssignments(Collection<Assignment> collection) {
        return collection.stream().anyMatch(this.isLocalNodeAssignment);
    }

    private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId replicationGroupId) {
        HybridTimestamp currentSafeTime = this.metaStorageMgr.clusterTime().currentSafeTime();
        if (HybridTimestamp.MIN_VALUE.equals(currentSafeTime)) {
            return CompletableFutures.falseCompletedFuture();
        }
        long maxClockSkewMillis = this.clockService.maxClockSkewMillis();
        try {
            return this.placementDriver.getPrimaryReplica(replicationGroupId, currentSafeTime.subtractPhysicalTime(maxClockSkewMillis)).thenApply(replicaMeta -> {
                return Boolean.valueOf((replicaMeta == null || replicaMeta.getLeaseholderId() == null || !replicaMeta.getLeaseholderId().equals(localNode().id())) ? false : true);
            });
        } catch (IllegalArgumentException e) {
            long longValue = currentSafeTime.longValue();
            long j = longValue + ((-maxClockSkewMillis) << 16);
            AssertionError assertionError = new AssertionError("Got a negative time [currentSafeTime=" + currentSafeTime + ", currentSafeTimeMs=" + longValue + ", skewMs=" + assertionError + ", internal=" + maxClockSkewMillis + "]", e);
            throw assertionError;
        }
    }

    private boolean isNodeInReducedStableOrPendingAssignments(ZonePartitionId zonePartitionId, @Nullable Assignments assignments, Assignments assignments2, long j) {
        Entry locally = this.metaStorageMgr.getLocally(ZoneRebalanceUtil.switchReduceKey(zonePartitionId), j);
        Assignments fromBytes = locally != null ? Assignments.fromBytes(locally.value()) : null;
        if (!isLocalNodeInAssignments(RebalanceUtil.union(fromBytes != null ? RebalanceUtil.subtract(assignments.nodes(), fromBytes.nodes()) : assignments.nodes(), assignments2.nodes()))) {
            return false;
        }
        if ($assertionsDisabled || this.replicaMgr.isReplicaStarted(zonePartitionId)) {
            return true;
        }
        throw new AssertionError("The local node is outside of the replication group [, stable=" + assignments + ", pending=" + assignments2 + ", reduce=" + fromBytes + ", localName=" + localNode().name() + "].");
    }

    @Nullable
    private Assignment localMemberAssignment(Assignments assignments) {
        Assignment forPeer = Assignment.forPeer(localNode().name());
        if (assignments.nodes().contains(forPeer)) {
            return forPeer;
        }
        return null;
    }

    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        return !ENABLED ? CompletableFutures.nullCompletedFuture() : CompletableFutures.nullCompletedFuture();
    }

    private static String zoneInfo(CatalogZoneDescriptor catalogZoneDescriptor) {
        return catalogZoneDescriptor.id() + "/" + catalogZoneDescriptor.name();
    }

    @Nullable
    private Assignments stableAssignments(ZonePartitionId zonePartitionId, long j) {
        return Assignments.fromBytes(this.metaStorageMgr.getLocally(ZoneRebalanceUtil.stablePartAssignmentsKey(zonePartitionId), j).value());
    }

    private CompletableFuture<?> stopPartition(ReplicationGroupId replicationGroupId) {
        CompletableFuture<?> falseCompletedFuture;
        try {
            falseCompletedFuture = this.replicaMgr.stopReplica(replicationGroupId);
        } catch (NodeStoppingException e) {
            falseCompletedFuture = CompletableFutures.falseCompletedFuture();
        }
        return falseCompletedFuture;
    }

    private void cleanUpPartitionsResources(Set<ReplicationGroupId> set) {
        try {
            CompletableFuture.runAsync(() -> {
                Stream.Builder builder = Stream.builder();
                builder.add(() -> {
                    CompletableFuture[] completableFutureArr = new CompletableFuture[set.size()];
                    int i = 0;
                    Iterator it = set.iterator();
                    while (it.hasNext()) {
                        int i2 = i;
                        i++;
                        completableFutureArr[i2] = stopPartition((ReplicationGroupId) it.next());
                    }
                    CompletableFuture.allOf(completableFutureArr).get(10L, TimeUnit.SECONDS);
                });
                try {
                    IgniteUtils.closeAllManually(builder.build());
                } catch (Throwable th) {
                    LOG.error("Unable to stop partition", new Object[0]);
                }
            }, this.ioExecutor).get(30L, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.error("Unable to clean zones resources", e);
        }
    }

    static {
        $assertionsDisabled = !PartitionReplicaLifecycleManager.class.desiredAssertionStatus();
        ENABLED = IgniteSystemProperties.getBoolean(FEATURE_FLAG_NAME, false);
        LOG = Loggers.forClass(PartitionReplicaLifecycleManager.class);
    }
}
