/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.replicator;

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.internal.event.Event;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
import org.apache.ignite.internal.raft.LeaderElectionListener;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite.internal.replicator.Member;
import org.apache.ignite.internal.replicator.PlacementDriverMessageProcessor;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;

public class ReplicaImpl
implements Replica {
    private static final IgniteLogger LOG = Loggers.forClass(ReplicaManager.class);
    private final ReplicationGroupId replicaGrpId;
    private final ReplicaListener listener;
    private final TopologyAwareRaftGroupService raftClient;
    private final InternalClusterNode localNode;
    private final PlacementDriver placementDriver;
    private final Function<ReplicationGroupId, CompletableFuture<byte[]>> getPendingAssignmentsSupplier;
    private LeaderElectionListener onLeaderElectedFailoverCallback;
    private final FailureProcessor failureProcessor;
    private final PlacementDriverMessageProcessor placementDriverMessageProcessor;
    private final EventListener<PrimaryReplicaEventParameters> onPrimaryReplicaElected = this::registerFailoverCallback;
    private final EventListener<PrimaryReplicaEventParameters> onPrimaryReplicaExpired = this::unregisterFailoverCallback;

    public ReplicaImpl(ReplicationGroupId replicaGrpId, ReplicaListener listener, InternalClusterNode localNode, PlacementDriver placementDriver, Function<ReplicationGroupId, CompletableFuture<byte[]>> getPendingAssignmentsSupplier, FailureProcessor failureProcessor, PlacementDriverMessageProcessor placementDriverMessageProcessor) {
        this.replicaGrpId = replicaGrpId;
        this.listener = listener;
        this.raftClient = this.raftClient();
        this.localNode = localNode;
        this.placementDriver = placementDriver;
        this.getPendingAssignmentsSupplier = getPendingAssignmentsSupplier;
        this.failureProcessor = failureProcessor;
        this.placementDriverMessageProcessor = placementDriverMessageProcessor;
        placementDriver.listen((Event)PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this.onPrimaryReplicaElected);
        placementDriver.listen((Event)PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.onPrimaryReplicaExpired);
    }

    @Override
    public ReplicaListener listener() {
        return this.listener;
    }

    @Override
    public final TopologyAwareRaftGroupService raftClient() {
        return (TopologyAwareRaftGroupService)this.listener.raftClient();
    }

    @Override
    public CompletableFuture<ReplicaResult> processRequest(ReplicaRequest request, UUID senderId) {
        assert (this.replicaGrpId.equals(request.groupId().asReplicationGroupId())) : IgniteStringFormatter.format((String)"Partition mismatch: request does not match the replica [reqReplicaGrpId={}, replicaGrpId={}]", (Object[])new Object[]{request.groupId(), this.replicaGrpId});
        return this.listener.invoke(request, senderId);
    }

    @Override
    public ReplicationGroupId groupId() {
        return this.replicaGrpId;
    }

    @Override
    public CompletableFuture<? extends NetworkMessage> processPlacementDriverMessage(PlacementDriverReplicaMessage msg) {
        return this.placementDriverMessageProcessor.processPlacementDriverMessage(msg);
    }

    @Override
    public CompletableFuture<Void> shutdown() {
        this.placementDriver.removeListener((Event)PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, this.onPrimaryReplicaElected);
        this.placementDriver.removeListener((Event)PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, this.onPrimaryReplicaExpired);
        this.listener.onShutdown();
        return this.raftClient.unsubscribeLeader().thenAccept(v -> this.raftClient.shutdown());
    }

    @Override
    public void updatePeersAndLearners(PeersAndLearners peersAndLearners) {
        this.raftClient.updateConfiguration(peersAndLearners);
    }

    @Override
    public CompletableFuture<Void> createSnapshotOn(Member targetMember, boolean forced) {
        Peer peer = targetMember.isVotingMember() ? new Peer(targetMember.consistentId(), 0) : new Peer(targetMember.consistentId(), 1);
        return this.raftClient.snapshot(peer, forced);
    }

    @Override
    public CompletableFuture<Void> transferLeadershipTo(String targetConsistentId) {
        return this.raftClient.transferLeadership(new Peer(targetConsistentId));
    }

    private CompletableFuture<Boolean> registerFailoverCallback(PrimaryReplicaEventParameters parameters) {
        if (!parameters.leaseholderId().equals(this.localNode.id()) || !this.replicaGrpId.equals(parameters.groupId())) {
            return CompletableFutures.falseCompletedFuture();
        }
        assert (this.onLeaderElectedFailoverCallback == null) : IgniteStringFormatter.format((String)"We already have failover subscription [thisGrpId={}, thisNode={}, givenExpiredPrimaryId={}, givenExpiredPrimaryNode={}", (Object[])new Object[]{this.replicaGrpId, this.localNode.name(), parameters.groupId(), parameters.leaseholder()});
        this.onLeaderElectedFailoverCallback = (leaderNode, term) -> this.changePeersAndLearnersAsyncIfPendingExists(term);
        return ((CompletableFuture)this.raftClient.subscribeLeader(this.onLeaderElectedFailoverCallback).exceptionally(e -> {
            if (!ExceptionUtils.hasCause((Throwable)e, (Class[])new Class[]{NodeStoppingException.class})) {
                String errorMessage = "Rebalance failover subscription on elected primary replica failed [groupId=" + this.replicaGrpId + "].";
                this.failureProcessor.process(new FailureContext(e, errorMessage));
            }
            return null;
        })).thenApply(v -> false);
    }

    private void changePeersAndLearnersAsyncIfPendingExists(long term) {
        ((CompletableFuture)((CompletableFuture)this.getPendingAssignmentsSupplier.apply(this.replicaGrpId).exceptionally(e -> {
            LOG.error("Couldn't fetch pending assignments for rebalance failover [groupId={}, term={}].", e, new Object[]{this.replicaGrpId, term});
            return null;
        })).thenCompose(pendingsBytes -> {
            if (pendingsBytes == null) {
                return CompletableFutures.nullCompletedFuture();
            }
            PeersAndLearners newConfiguration = PeersAndLearners.fromAssignments((Collection)AssignmentsQueue.fromBytes((byte[])pendingsBytes).poll().nodes());
            LOG.info("New leader elected. Going to apply new configuration [tablePartitionId={}, peers={}, learners={}]", new Object[]{this.replicaGrpId, newConfiguration.peers(), newConfiguration.learners()});
            return this.raftClient.changePeersAndLearnersAsync(newConfiguration, term);
        })).exceptionally(e -> {
            LOG.error("Failover ChangePeersAndLearners failed [groupId={}, term={}].", e, new Object[]{this.replicaGrpId, term});
            return null;
        });
    }

    private CompletableFuture<Boolean> unregisterFailoverCallback(PrimaryReplicaEventParameters parameters) {
        if (!parameters.leaseholderId().equals(this.localNode.id()) || !this.replicaGrpId.equals(parameters.groupId())) {
            return CompletableFutures.falseCompletedFuture();
        }
        if (this.onLeaderElectedFailoverCallback == null) {
            return CompletableFutures.falseCompletedFuture();
        }
        assert (this.onLeaderElectedFailoverCallback != null) : IgniteStringFormatter.format((String)"We have no failover subscription [thisGrpId={}, thisNode={}, givenExpiredPrimaryId={}, givenExpiredPrimaryNode={}", (Object[])new Object[]{this.replicaGrpId, this.localNode.name(), parameters.groupId(), parameters.leaseholder()});
        this.raftClient.unsubscribeLeader(this.onLeaderElectedFailoverCallback);
        this.onLeaderElectedFailoverCallback = null;
        return CompletableFutures.falseCompletedFuture();
    }
}

