/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.table.distributed;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.partitiondistribution.AssignmentsChain;
import org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.util.ByteUtils;

public class TableAssignmentsService {
    private static final IgniteLogger LOG = Loggers.forClass(TableAssignmentsService.class);
    private final MetaStorageManager metaStorageMgr;
    private final CatalogService catalogService;
    private final DistributionZoneManager distributionZoneManager;
    private final FailureProcessor failureProcessor;

    public TableAssignmentsService(MetaStorageManager metaStorageMgr, CatalogService catalogService, DistributionZoneManager distributionZoneManager, FailureProcessor failureProcessor) {
        this.metaStorageMgr = metaStorageMgr;
        this.catalogService = catalogService;
        this.distributionZoneManager = distributionZoneManager;
        this.failureProcessor = failureProcessor;
    }

    CompletableFuture<List<Assignments>> createAndWriteTableAssignmentsToMetastorage(int tableId, CatalogZoneDescriptor zoneDescriptor, CatalogTableDescriptor tableDescriptor, long causalityToken, int catalogVersion) {
        Optional<Integer> tableToCopyAssignmentsFrom = this.catalogService.catalog(catalogVersion).tables(zoneDescriptor.id()).stream().map(CatalogObjectDescriptor::id).filter(id -> id != tableId && RebalanceUtil.partitionAssignmentsGetLocally((MetaStorageManager)this.metaStorageMgr, (int)id, (int)0, (long)Long.MAX_VALUE) != null).findFirst();
        if (tableToCopyAssignmentsFrom.isPresent()) {
            return this.copyTableAssignments(tableId, tableToCopyAssignmentsFrom.get(), zoneDescriptor.partitions(), zoneDescriptor.consistencyMode());
        }
        CompletableFuture<List<Assignments>> assignments = this.getOrCreateAssignments(tableId, zoneDescriptor, tableDescriptor, causalityToken, catalogVersion);
        return this.writeTableAssignmentsToMetastore(tableId, zoneDescriptor.consistencyMode(), assignments);
    }

    public CompletableFuture<List<Assignments>> writeTableAssignmentsToMetastore(int tableId, ConsistencyMode consistencyMode, CompletableFuture<List<Assignments>> assignmentsFuture) {
        return assignmentsFuture.thenCompose(newAssignments -> {
            assert (!newAssignments.isEmpty());
            List<Operation> partitionAssignments = TableAssignmentsService.getTableAssignmentsOperations(tableId, newAssignments, consistencyMode);
            SimpleCondition condition = Conditions.notExists((ByteArray)new ByteArray(ByteUtils.toByteArray((ByteBuffer)partitionAssignments.get(0).key())));
            return ((CompletableFuture)this.metaStorageMgr.invoke((Condition)condition, partitionAssignments, Collections.emptyList()).whenComplete((invokeResult, e) -> {
                if (e != null) {
                    String errorMessage = String.format("Couldn't write assignments [assignmentsList=%s] to metastore during invoke.", Assignments.assignmentListToString((List)newAssignments));
                    this.failureProcessor.process(new FailureContext(e, errorMessage));
                }
            })).thenCompose(invokeResult -> {
                if (invokeResult.booleanValue()) {
                    LOG.info("Assignments calculated from data nodes are successfully written to meta storage [tableId={}, assignments={}].", new Object[]{tableId, Assignments.assignmentListToString((List)newAssignments)});
                    return CompletableFuture.completedFuture(newAssignments);
                }
                return this.getAssignmentsFromMetastorage(tableId, newAssignments.size());
            });
        });
    }

    private CompletableFuture<List<Assignments>> getOrCreateAssignments(int tableId, CatalogZoneDescriptor zoneDescriptor, CatalogTableDescriptor tableDescriptor, long causalityToken, int catalogVersion) {
        CompletionStage<List<Object>> assignmentsFuture;
        if (RebalanceUtil.partitionAssignmentsGetLocally((MetaStorageManager)this.metaStorageMgr, (int)tableId, (int)0, (long)causalityToken) != null) {
            assignmentsFuture = CompletableFuture.completedFuture(RebalanceUtil.tableAssignmentsGetLocally((MetaStorageManager)this.metaStorageMgr, (int)tableId, (int)zoneDescriptor.partitions(), (long)causalityToken));
        } else {
            Catalog catalog = this.catalogService.catalog(catalogVersion);
            long assignmentsTimestamp = catalog.time();
            assignmentsFuture = this.distributionZoneManager.dataNodes(tableDescriptor.updateTimestamp(), catalogVersion, zoneDescriptor.id()).thenApply(dataNodes -> PartitionDistributionUtils.calculateAssignments((Collection)dataNodes, Collections.emptyList(), (int)zoneDescriptor.partitions(), (int)zoneDescriptor.replicas(), (int)zoneDescriptor.consensusGroupSize()).stream().map(assignments -> Assignments.of((Set)assignments, (long)assignmentsTimestamp)).collect(Collectors.toList()));
            ((CompletableFuture)assignmentsFuture).thenAccept(assignmentsList -> LOG.info("Assignments calculated from data nodes [tableId={}, assignments={}, revision={}]", new Object[]{tableId, Assignments.assignmentListToString((List)assignmentsList), causalityToken}));
        }
        return assignmentsFuture;
    }

    private CompletableFuture<List<Assignments>> getAssignmentsFromMetastorage(int tableId, int partitions) {
        Set partKeys = IntStream.range(0, partitions).mapToObj(p -> RebalanceUtil.stablePartAssignmentsKey((TablePartitionId)new TablePartitionId(tableId, p))).collect(Collectors.toSet());
        CompletableFuture assignmentsFuture = this.metaStorageMgr.getAll(partKeys);
        return ((CompletableFuture)assignmentsFuture.thenApply(metaStorageAssignments -> {
            ArrayList<Assignments> realAssignments = new ArrayList<Assignments>();
            for (int p = 0; p < partitions; ++p) {
                TablePartitionId partId = new TablePartitionId(tableId, p);
                Entry assignmentsEntry = (Entry)metaStorageAssignments.get(RebalanceUtil.stablePartAssignmentsKey((TablePartitionId)partId));
                assert (assignmentsEntry != null && !assignmentsEntry.empty() && !assignmentsEntry.tombstone()) : "Unexpected assignments for partition [" + partId + ", entry=" + assignmentsEntry + "].";
                Assignments real = Assignments.fromBytes((byte[])assignmentsEntry.value());
                realAssignments.add(real);
            }
            LOG.info("Assignments picked up from meta storage [tableId={}, assignments={}].", new Object[]{tableId, Assignments.assignmentListToString(realAssignments)});
            return realAssignments;
        })).whenComplete((realAssignments, e) -> {
            if (e != null) {
                String errorMessage = String.format("Couldn't get assignments from metastore for table [tableId=%s].", tableId);
                this.failureProcessor.process(new FailureContext(e, errorMessage));
            }
        });
    }

    private CompletableFuture<List<Assignments>> copyTableAssignments(int tableId, int sourceTableId, int partitions, ConsistencyMode consistencyMode) {
        ArrayList<Assignments> assignments = new ArrayList<Assignments>();
        ArrayList<Long> revisions = new ArrayList<Long>();
        for (int p = 0; p < partitions; ++p) {
            Entry sourceAssignmentEntry = this.metaStorageMgr.getLocally(RebalanceUtil.stablePartAssignmentsKey((TablePartitionId)new TablePartitionId(sourceTableId, p)));
            assert (!sourceAssignmentEntry.empty() && !sourceAssignmentEntry.tombstone()) : "Unexpected assignments for partition [" + p + ", entry=" + sourceAssignmentEntry + "].";
            assignments.add(Assignments.fromBytes((byte[])sourceAssignmentEntry.value()));
            revisions.add(sourceAssignmentEntry.revision());
        }
        List<Operation> operations = TableAssignmentsService.getTableAssignmentsOperations(tableId, assignments, consistencyMode);
        SimpleCondition condition = Conditions.notExists((ByteArray)RebalanceUtil.stablePartAssignmentsKey((TablePartitionId)new TablePartitionId(tableId, 0)));
        for (int p = 0; p < partitions; ++p) {
            ByteArray sourceTablePartitionKey = RebalanceUtil.stablePartAssignmentsKey((TablePartitionId)new TablePartitionId(sourceTableId, p));
            condition = condition.and((Condition)Conditions.revision((ByteArray)sourceTablePartitionKey).eq(((Long)revisions.get(p)).longValue()));
        }
        return ((CompletableFuture)this.metaStorageMgr.invoke((Condition)condition, operations, Collections.emptyList()).whenComplete((invokeResult, e) -> {
            if (e != null) {
                LOG.error("Error while copying assignments to metastore during invoke [tableId={}, sourceTableId={}]", e, new Object[]{tableId, sourceTableId});
            }
        })).thenCompose(invokeResult -> {
            if (invokeResult.booleanValue()) {
                return CompletableFuture.completedFuture(assignments);
            }
            LOG.info("Couldn't copy assignments to metastorage during invoke [tableId={}, sourceTableId={}]", new Object[]{tableId, sourceTableId});
            return this.metaStorageMgr.get(RebalanceUtil.stablePartAssignmentsKey((TablePartitionId)new TablePartitionId(tableId, 0))).thenCompose(entry -> entry.empty() ? this.copyTableAssignments(tableId, sourceTableId, partitions, consistencyMode) : this.getAssignmentsFromMetastorage(tableId, assignments.size()));
        });
    }

    private static List<Operation> getTableAssignmentsOperations(int tableId, List<Assignments> assignments, ConsistencyMode consistencyMode) {
        boolean haMode = consistencyMode == ConsistencyMode.HIGH_AVAILABILITY;
        ArrayList<Operation> partitionAssignments = new ArrayList<Operation>(assignments.size());
        for (int i = 0; i < assignments.size(); ++i) {
            TablePartitionId tablePartitionId = new TablePartitionId(tableId, i);
            ByteArray stableAssignmentsKey = RebalanceUtil.stablePartAssignmentsKey((TablePartitionId)tablePartitionId);
            byte[] anAssignment = assignments.get(i).toBytes();
            Operation op = Operations.put((ByteArray)stableAssignmentsKey, (byte[])anAssignment);
            partitionAssignments.add(op);
            if (!haMode) continue;
            ByteArray assignmentsChainKey = RebalanceUtil.assignmentsChainKey((TablePartitionId)tablePartitionId);
            byte[] assignmentChain = AssignmentsChain.of((long)-1L, (long)-1L, (Assignments[])new Assignments[]{assignments.get(i)}).toBytes();
            Operation chainOp = Operations.put((ByteArray)assignmentsChainKey, (byte[])assignmentChain);
            partitionAssignments.add(chainOp);
        }
        return partitionAssignments;
    }
}

