/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.snapshots;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.impl.MetaStorageServiceImpl;
import org.apache.ignite3.internal.network.InternalClusterNode;
import org.apache.ignite3.internal.network.NetworkMessage;
import org.apache.ignite3.internal.network.NetworkMessageHandler;
import org.gridgain.internal.snapshots.SnapshotException;
import org.gridgain.internal.snapshots.SnapshotManagerContext;
import org.gridgain.internal.snapshots.SnapshotMessageCallback;
import org.gridgain.internal.snapshots.communication.messages.CreateSnapshotMessage;
import org.gridgain.internal.snapshots.communication.messages.DeleteSnapshotMessage;
import org.gridgain.internal.snapshots.communication.messages.ErrorResponseMessage;
import org.gridgain.internal.snapshots.communication.messages.RestoreSnapshotMessage;
import org.gridgain.internal.snapshots.communication.messages.SnapshotMessagesFactory;
import org.jetbrains.annotations.Nullable;

class SnapshotMessageHandler
implements NetworkMessageHandler {
    private static final IgniteLogger LOG = Loggers.forClass(SnapshotMessageHandler.class);
    private final SnapshotManagerContext context;
    private final SnapshotMessageCallback callback;
    private final SnapshotMessagesFactory messagesFactory = new SnapshotMessagesFactory();

    SnapshotMessageHandler(SnapshotManagerContext context, SnapshotMessageCallback callback) {
        this.context = context;
        this.callback = callback;
    }

    @Override
    public void onReceived(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) {
        if (message instanceof CreateSnapshotMessage) {
            assert (correlationId != null);
            this.context.threadPool().execute(() -> this.handleSnapshotCreateMessage((CreateSnapshotMessage)message, sender, correlationId));
        } else if (message instanceof RestoreSnapshotMessage) {
            assert (correlationId != null);
            this.context.threadPool().execute(() -> this.handleSnapshotRestoreMessage((RestoreSnapshotMessage)message, sender, correlationId));
        } else if (message instanceof DeleteSnapshotMessage) {
            assert (correlationId != null);
            this.context.threadPool().execute(() -> this.handleSnapshotDeleteMessage((DeleteSnapshotMessage)message, sender, correlationId));
        } else assert (false) : "Unsupported message type: " + message.getClass();
    }

    private void handleSnapshotCreateMessage(CreateSnapshotMessage message, InternalClusterNode sender, long correlationId) {
        this.executeOnSnapshotCoordinator(message, sender, correlationId, this.callback::onCreateSnapshotMessageReceived);
    }

    private void handleSnapshotRestoreMessage(RestoreSnapshotMessage message, InternalClusterNode sender, long correlationId) {
        this.executeOnSnapshotCoordinator(message, sender, correlationId, this.callback::onRestoreSnapshotMessageReceived);
    }

    private void handleSnapshotDeleteMessage(DeleteSnapshotMessage message, InternalClusterNode sender, long correlationId) {
        this.executeOnSnapshotCoordinator(message, sender, correlationId, this.callback::onDeleteSnapshotMessageReceived);
    }

    private <T extends NetworkMessage> void executeOnSnapshotCoordinator(T message, InternalClusterNode sender, long correlationId, BiFunction<Long, T, CompletableFuture<UUID>> callback) {
        CompletableFuture<MetaStorageServiceImpl> serviceFuture = this.context.metaStorageManager().metaStorageService();
        if (!serviceFuture.isDone()) {
            ErrorResponseMessage response2 = this.messagesFactory.errorResponseMessage().error(new SnapshotException("Meta Storage Raft group is not ready")).build();
            this.context.messagingService().respond(sender, (NetworkMessage)response2, correlationId);
            return;
        }
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)serviceFuture.thenComposeAsync(service -> service.raftGroupService().refreshAndGetLeaderWithTerm(), (Executor)this.context.threadPool())).thenComposeAsync(leaderWithTerm -> {
            String leaderName = leaderWithTerm.leader().consistentId();
            if (!leaderName.equals(this.context.nodeName())) {
                return CompletableFuture.completedFuture(this.messagesFactory.notCoordinatorMessage().build());
            }
            return ((CompletableFuture)callback.apply(leaderWithTerm.term(), message)).handle((operationId, e) -> {
                if (e == null) {
                    return this.messagesFactory.successResponseMessage().operationId((UUID)operationId).build();
                }
                LOG.debug("Error when processing message: {}.", (Throwable)e, (Object)message);
                return this.buildErrorResponseMessage((Throwable)e);
            });
        }, (Executor)this.context.threadPool())).exceptionally(e -> {
            LOG.error("Error when processing message: {}.", (Throwable)e, (Object)message);
            return this.buildErrorResponseMessage((Throwable)e);
        })).thenAcceptAsync(response -> this.context.messagingService().respond(sender, (NetworkMessage)response, correlationId), (Executor)this.context.threadPool());
    }

    private ErrorResponseMessage buildErrorResponseMessage(Throwable e) {
        return this.messagesFactory.errorResponseMessage().error(e).build();
    }
}

