package org.apache.ignite3.internal.table.distributed;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.metastorage.MetaStorageManager;
import org.apache.ignite3.internal.network.MessagingService;
import org.apache.ignite3.internal.network.TopologyEventHandler;
import org.apache.ignite3.internal.network.TopologyService;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import org.apache.ignite3.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite3.internal.partition.replicator.network.message.DataPresence;
import org.apache.ignite3.internal.partition.replicator.network.message.HasDataRequest;
import org.apache.ignite3.internal.partition.replicator.network.message.HasDataResponse;
import org.apache.ignite3.internal.partitiondistribution.Assignment;
import org.apache.ignite3.internal.raft.Peer;
import org.apache.ignite3.internal.raft.PeersAndLearners;
import org.apache.ignite3.internal.replicator.TablePartitionId;
import org.apache.ignite3.internal.storage.MvPartitionStorage;
import org.apache.ignite3.internal.storage.RowId;
import org.apache.ignite3.internal.storage.StorageClosedException;
import org.apache.ignite3.internal.storage.StorageRebalanceException;
import org.apache.ignite3.internal.table.InternalTable;
import org.apache.ignite3.internal.table.TableViewInternal;
import org.apache.ignite3.internal.util.CompletableFutures;
import org.apache.ignite3.internal.utils.RebalanceUtilEx;
import org.apache.ignite3.network.ClusterNode;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/ignite3/internal/table/distributed/PartitionReplicatorNodeRecovery.class */
public class PartitionReplicatorNodeRecovery {
    private static final long QUERY_DATA_NODES_COUNT_TIMEOUT_MILLIS;
    private static final long PEERS_IN_TOPOLOGY_TIMEOUT_MILLIS;
    private static final PartitionReplicationMessagesFactory TABLE_MESSAGES_FACTORY;
    private final MetaStorageManager metaStorageManager;
    private final MessagingService messagingService;
    private final TopologyService topologyService;
    private final Executor storageAccessExecutor;
    private final IntFunction<TableViewInternal> tableById;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite3/internal/table/distributed/PartitionReplicatorNodeRecovery$DataNodesCounts.class */
    public static class DataNodesCounts {
        private final long nonEmptyNodes;
        private final long emptyNodes;

        private DataNodesCounts(long j, long j2) {
            this.nonEmptyNodes = j;
            this.emptyNodes = j2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionReplicatorNodeRecovery(MetaStorageManager metaStorageManager, MessagingService messagingService, TopologyService topologyService, Executor executor, IntFunction<TableViewInternal> intFunction) {
        this.metaStorageManager = metaStorageManager;
        this.messagingService = messagingService;
        this.topologyService = topologyService;
        this.storageAccessExecutor = executor;
        this.tableById = intFunction;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        addMessageHandler();
    }

    private void addMessageHandler() {
        this.messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, (networkMessage, clusterNode, l) -> {
            if (networkMessage instanceof HasDataRequest) {
                if (!$assertionsDisabled && l == null) {
                    throw new AssertionError();
                }
                HasDataRequest hasDataRequest = (HasDataRequest) networkMessage;
                this.storageAccessExecutor.execute(() -> {
                    handleHasDataRequest(hasDataRequest, clusterNode, l);
                });
            }
        });
    }

    private void handleHasDataRequest(HasDataRequest hasDataRequest, ClusterNode clusterNode, Long l) {
        MvPartitionStorage mvPartition;
        int tableId = hasDataRequest.tableId();
        int partitionId = hasDataRequest.partitionId();
        DataPresence dataPresence = DataPresence.UNKNOWN;
        TableViewInternal apply = this.tableById.apply(tableId);
        if (apply != null && (mvPartition = apply.internalTable().storage().getMvPartition(partitionId)) != null) {
            try {
                dataPresence = mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null ? DataPresence.HAS_DATA : DataPresence.EMPTY;
            } catch (StorageClosedException | StorageRebalanceException e) {
            }
        }
        this.messagingService.respond(clusterNode, TABLE_MESSAGES_FACTORY.hasDataResponse().presenceString(dataPresence.name()).build(), l.longValue());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Boolean> initiateGroupReentryIfNeeded(TablePartitionId tablePartitionId, InternalTable internalTable, PeersAndLearners peersAndLearners, Assignment assignment, long j) {
        return mightNeedGroupRecovery(internalTable) ? performGroupRecovery(tablePartitionId, peersAndLearners, assignment, j) : CompletableFutures.trueCompletedFuture();
    }

    private static boolean mightNeedGroupRecovery(InternalTable internalTable) {
        return internalTable.storage().isVolatile();
    }

    private CompletableFuture<Boolean> performGroupRecovery(TablePartitionId tablePartitionId, PeersAndLearners peersAndLearners, Assignment assignment, long j) {
        int tableId = tablePartitionId.tableId();
        int partitionId = tablePartitionId.partitionId();
        return waitForPeersAndQueryDataNodesCounts(tableId, partitionId, peersAndLearners.peers()).thenApply(dataNodesCounts -> {
            if (dataNodesCounts.emptyNodes == ((long) peersAndLearners.peers().size())) {
                return true;
            }
            if (!(dataNodesCounts.nonEmptyNodes >= ((long) ((peersAndLearners.peers().size() / 2) + 1)))) {
                throw new IgniteInternalException("Unable to start partition " + partitionId + ". Majority not available.");
            }
            RebalanceUtilEx.startPeerRemoval(tablePartitionId, assignment, this.metaStorageManager, j);
            return false;
        });
    }

    private CompletableFuture<DataNodesCounts> waitForPeersAndQueryDataNodesCounts(int i, int i2, Collection<Peer> collection) {
        HasDataRequest build = TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(i).partitionId(i2).build();
        return allPeersAreInTopology(collection).thenCompose(obj -> {
            return queryDataNodesCounts(collection, build);
        });
    }

    private CompletableFuture<?> allPeersAreInTopology(final Collection<Peer> collection) {
        ClusterNode byConsistentId;
        final Set set = (Set) collection.stream().map((v0) -> {
            return v0.consistentId();
        }).collect(Collectors.toSet());
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (Peer peer : collection) {
            ClusterNode byConsistentId2 = this.topologyService.getByConsistentId(peer.consistentId());
            if (byConsistentId2 != null) {
                concurrentHashMap.put(peer.consistentId(), byConsistentId2);
            }
        }
        if (concurrentHashMap.size() >= collection.size()) {
            return CompletableFutures.nullCompletedFuture();
        }
        final CompletableFuture completableFuture = new CompletableFuture();
        this.topologyService.addEventHandler(new TopologyEventHandler() { // from class: org.apache.ignite3.internal.table.distributed.PartitionReplicatorNodeRecovery.1
            @Override // org.apache.ignite3.internal.network.TopologyEventHandler
            public void onAppeared(ClusterNode clusterNode) {
                if (set.contains(clusterNode.name())) {
                    concurrentHashMap.put(clusterNode.name(), clusterNode);
                }
                if (concurrentHashMap.size() >= collection.size()) {
                    completableFuture.complete(null);
                }
            }
        });
        for (Peer peer2 : collection) {
            if (!concurrentHashMap.containsKey(peer2.consistentId()) && (byConsistentId = this.topologyService.getByConsistentId(peer2.consistentId())) != null) {
                concurrentHashMap.put(peer2.consistentId(), byConsistentId);
            }
        }
        return concurrentHashMap.size() >= collection.size() ? CompletableFutures.nullCompletedFuture() : withTimeout(completableFuture);
    }

    private static CompletableFuture<Void> withTimeout(CompletableFuture<Void> completableFuture) {
        return completableFuture.orTimeout(PEERS_IN_TOPOLOGY_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS).handle((r2, th) -> {
            if (!(th instanceof TimeoutException) && th != null) {
                return CompletableFuture.failedFuture(th);
            }
            return CompletableFuture.completedFuture(r2);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    private CompletableFuture<DataNodesCounts> queryDataNodesCounts(Collection<Peer> collection, HasDataRequest hasDataRequest) {
        Stream<R> map = collection.stream().map((v0) -> {
            return v0.consistentId();
        });
        TopologyService topologyService = this.topologyService;
        Objects.requireNonNull(topologyService);
        CompletableFuture[] completableFutureArr = (CompletableFuture[]) map.map(topologyService::getByConsistentId).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map(clusterNode -> {
            return this.messagingService.invoke(clusterNode, hasDataRequest, QUERY_DATA_NODES_COUNT_TIMEOUT_MILLIS).thenApply(networkMessage -> {
                if ($assertionsDisabled || (networkMessage instanceof HasDataResponse)) {
                    return ((HasDataResponse) networkMessage).presence();
                }
                throw new AssertionError(networkMessage);
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                return DataPresence.UNKNOWN;
            });
        }).toArray(i -> {
            return new CompletableFuture[i];
        });
        return CompletableFuture.allOf(completableFutureArr).thenApply(r8 -> {
            List list = (List) Arrays.stream(completableFutureArr).map((v0) -> {
                return v0.join();
            }).collect(Collectors.toList());
            return new DataNodesCounts(list.stream().filter(dataPresence -> {
                return dataPresence == DataPresence.HAS_DATA;
            }).count(), list.stream().filter(dataPresence2 -> {
                return dataPresence2 == DataPresence.EMPTY;
            }).count());
        });
    }

    static {
        $assertionsDisabled = !PartitionReplicatorNodeRecovery.class.desiredAssertionStatus();
        QUERY_DATA_NODES_COUNT_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(3L);
        PEERS_IN_TOPOLOGY_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(3L);
        TABLE_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
    }
}
