/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.tx.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.hlc.ClockService;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.network.ChannelType;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.MessagingService;
import org.apache.ignite3.internal.network.NetworkMessage;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.replicator.message.ReplicaMessageUtils;
import org.apache.ignite3.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite3.internal.replicator.message.ZonePartitionIdMessage;
import org.apache.ignite3.internal.tx.LockManager;
import org.apache.ignite3.internal.tx.impl.EnlistedPartitionGroup;
import org.apache.ignite3.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite3.internal.tx.impl.TxCleanupExceptionUtils;
import org.apache.ignite3.internal.tx.impl.WriteIntentSwitchProcessor;
import org.apache.ignite3.internal.tx.message.CleanupReplicatedInfo;
import org.apache.ignite3.internal.tx.message.CleanupReplicatedInfoMessage;
import org.apache.ignite3.internal.tx.message.EnlistedPartitionGroupMessage;
import org.apache.ignite3.internal.tx.message.TxCleanupMessage;
import org.apache.ignite3.internal.tx.message.TxMessageGroup;
import org.apache.ignite3.internal.tx.message.TxMessagesFactory;
import org.apache.ignite3.internal.tx.message.WriteIntentSwitchReplicatedInfo;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;

public class TxCleanupRequestHandler {
    private static final IgniteLogger LOG = Loggers.forClass(TxCleanupRequestHandler.class);
    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
    private final MessagingService messagingService;
    private final LockManager lockManager;
    private final ClockService clockService;
    private final Executor cleanupExecutor;
    private final WriteIntentSwitchProcessor writeIntentSwitchProcessor;
    private final RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry;
    private final ConcurrentMap<UUID, CleanupContext> writeIntentsReplicated = new ConcurrentHashMap<UUID, CleanupContext>();

    public TxCleanupRequestHandler(MessagingService messagingService, LockManager lockManager, ClockService clockService, WriteIntentSwitchProcessor writeIntentSwitchProcessor, RemotelyTriggeredResourceRegistry resourcesRegistry, Executor cleanupExecutor) {
        this.messagingService = messagingService;
        this.lockManager = lockManager;
        this.clockService = clockService;
        this.writeIntentSwitchProcessor = writeIntentSwitchProcessor;
        this.remotelyTriggeredResourceRegistry = resourcesRegistry;
        this.cleanupExecutor = cleanupExecutor;
    }

    public void start() {
        this.messagingService.addMessageHandler(TxMessageGroup.class, (msg, sender, correlationId) -> {
            if (msg instanceof TxCleanupMessage) {
                this.cleanupExecutor.execute(() -> this.processTxCleanup((TxCleanupMessage)msg, sender, correlationId));
            }
        });
    }

    public void stop() {
    }

    private void processTxCleanup(TxCleanupMessage txCleanupMessage, InternalClusterNode sender, @Nullable Long correlationId) {
        assert (correlationId != null);
        HashMap<EnlistedPartitionGroup, CompletionStage> writeIntentSwitches = new HashMap<EnlistedPartitionGroup, CompletionStage>();
        @Nullable List<EnlistedPartitionGroupMessage> partitionMessages = txCleanupMessage.groups();
        if (partitionMessages != null) {
            List<EnlistedPartitionGroup> partitions = TxCleanupRequestHandler.asPartitionsList(partitionMessages);
            this.trackPartitions(txCleanupMessage.txId(), partitions.stream().map(EnlistedPartitionGroup::groupId).collect(Collectors.toSet()), sender);
            for (EnlistedPartitionGroup partition : partitions) {
                CompletionStage future = this.writeIntentSwitchProcessor.switchLocalWriteIntents(partition, txCleanupMessage.txId(), txCleanupMessage.commit(), txCleanupMessage.commitTimestamp()).thenAccept(this::processWriteIntentSwitchResponse);
                writeIntentSwitches.put(partition, future);
            }
        }
        CompletableFuture.allOf(writeIntentSwitches.values().toArray(new CompletableFuture[0])).whenComplete((unused, ex) -> {
            NetworkMessage msg;
            this.releaseTxLocks(txCleanupMessage.txId());
            this.remotelyTriggeredResourceRegistry.close(txCleanupMessage.txId());
            if (ex == null) {
                msg = this.prepareResponse();
            } else {
                msg = this.prepareErrorResponse(txCleanupMessage.txId(), (Throwable)ex);
                writeIntentSwitches.forEach((groupId, future) -> {
                    if (future.isCompletedExceptionally()) {
                        ((CompletableFuture)this.writeIntentSwitchProcessor.switchWriteIntentsWithRetry(txCleanupMessage.commit(), txCleanupMessage.commitTimestamp(), txCleanupMessage.txId(), (EnlistedPartitionGroup)groupId).thenAccept(this::processWriteIntentSwitchResponse)).whenComplete((retryRes, retryEx) -> {
                            if (retryEx != null && TxCleanupExceptionUtils.writeIntentSwitchFailureShouldBeLogged(retryEx)) {
                                LOG.warn("Second cleanup attempt failed (the transaction outcome is not affected) [txId={}]", (Throwable)retryEx, (Object)txCleanupMessage.txId());
                            }
                        });
                    }
                });
            }
            this.messagingService.respond(sender, msg, (long)correlationId);
        });
    }

    private void releaseTxLocks(UUID txId) {
        this.lockManager.releaseAll(txId);
    }

    private NetworkMessage prepareResponse() {
        return TX_MESSAGES_FACTORY.txCleanupMessageResponse().timestamp(this.clockService.now()).build();
    }

    private NetworkMessage prepareResponse(CleanupReplicatedInfo result) {
        return TX_MESSAGES_FACTORY.txCleanupMessageResponse().result(TxCleanupRequestHandler.toCleanupReplicatedInfoMessage(result)).timestamp(this.clockService.now()).build();
    }

    private NetworkMessage prepareErrorResponse(UUID txId, Throwable th) {
        return TX_MESSAGES_FACTORY.txCleanupMessageErrorResponse().txId(txId).throwable(th).timestamp(this.clockService.now()).build();
    }

    private void trackPartitions(UUID txId, Set<ZonePartitionId> groups, InternalClusterNode sender) {
        this.writeIntentsReplicated.put(txId, new CleanupContext(sender, groups, groups));
    }

    private void processWriteIntentSwitchResponse(WriteIntentSwitchReplicatedInfo result) {
        if (result == null) {
            return;
        }
        this.writeIntentSwitchReplicated(result);
    }

    void writeIntentSwitchReplicated(WriteIntentSwitchReplicatedInfo info) {
        CleanupContext cleanupContext = this.writeIntentsReplicated.computeIfPresent(info.txId(), (uuid, context) -> {
            HashSet<ZonePartitionId> partitions = new HashSet<ZonePartitionId>(context.partitions);
            partitions.remove(info.partitionId());
            return new CleanupContext(context.sender, partitions, context.initialPartitions);
        });
        if (cleanupContext != null && cleanupContext.partitions.isEmpty()) {
            this.sendCleanupReplicatedResponse(info.txId(), cleanupContext.sender, cleanupContext.initialPartitions);
            this.writeIntentsReplicated.remove(info.txId());
        }
    }

    private void sendCleanupReplicatedResponse(UUID txId, InternalClusterNode sender, Collection<ZonePartitionId> partitions) {
        this.messagingService.send(sender, ChannelType.DEFAULT, this.prepareResponse(new CleanupReplicatedInfo(txId, partitions)));
    }

    private static CleanupReplicatedInfoMessage toCleanupReplicatedInfoMessage(CleanupReplicatedInfo info) {
        Collection<ZonePartitionId> partitions = info.partitions();
        ArrayList<ZonePartitionIdMessage> partitionMessages = new ArrayList<ZonePartitionIdMessage>(partitions.size());
        for (ZonePartitionId partition : partitions) {
            partitionMessages.add(ReplicaMessageUtils.toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, partition));
        }
        return TX_MESSAGES_FACTORY.cleanupReplicatedInfoMessage().txId(info.txId()).partitions(partitionMessages).build();
    }

    private static List<EnlistedPartitionGroup> asPartitionsList(List<EnlistedPartitionGroupMessage> messages) {
        ArrayList<EnlistedPartitionGroup> list = new ArrayList<EnlistedPartitionGroup>(IgniteUtils.capacity(messages.size()));
        for (EnlistedPartitionGroupMessage message : messages) {
            list.add(message.asPartitionInfo());
        }
        return list;
    }

    private static class CleanupContext {
        private final InternalClusterNode sender;
        private final Set<ZonePartitionId> partitions;
        private final Set<ZonePartitionId> initialPartitions;

        public CleanupContext(InternalClusterNode sender, Set<ZonePartitionId> partitions, Set<ZonePartitionId> initialPartitions) {
            this.sender = sender;
            this.partitions = partitions;
            this.initialPartitions = initialPartitions;
        }
    }
}

