/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.raft.jraft.rpc.impl;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.raft.Command;
import org.apache.ignite3.internal.raft.Marshaller;
import org.apache.ignite3.internal.raft.ReadCommand;
import org.apache.ignite3.internal.raft.WriteCommand;
import org.apache.ignite3.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite3.internal.raft.service.BeforeApplyHandler;
import org.apache.ignite3.internal.raft.service.CommandClosure;
import org.apache.ignite3.internal.raft.service.RaftGroupListener;
import org.apache.ignite3.internal.raft.service.SafeTimeAwareCommandClosure;
import org.apache.ignite3.raft.jraft.Closure;
import org.apache.ignite3.raft.jraft.Node;
import org.apache.ignite3.raft.jraft.RaftMessagesFactory;
import org.apache.ignite3.raft.jraft.Status;
import org.apache.ignite3.raft.jraft.closure.ReadIndexClosure;
import org.apache.ignite3.raft.jraft.entity.PeerId;
import org.apache.ignite3.raft.jraft.entity.Task;
import org.apache.ignite3.raft.jraft.error.RaftError;
import org.apache.ignite3.raft.jraft.rpc.ActionRequest;
import org.apache.ignite3.raft.jraft.rpc.Message;
import org.apache.ignite3.raft.jraft.rpc.RaftRpcFactory;
import org.apache.ignite3.raft.jraft.rpc.ReadActionRequest;
import org.apache.ignite3.raft.jraft.rpc.RpcContext;
import org.apache.ignite3.raft.jraft.rpc.RpcProcessor;
import org.apache.ignite3.raft.jraft.rpc.RpcRequests;
import org.apache.ignite3.raft.jraft.rpc.WriteActionRequest;
import org.apache.ignite3.raft.jraft.rpc.impl.SMCompactedThrowable;
import org.apache.ignite3.raft.jraft.rpc.impl.SMFullThrowable;
import org.apache.ignite3.raft.jraft.util.BytesUtil;
import org.jetbrains.annotations.Nullable;

public class ActionRequestProcessor
implements RpcProcessor<ActionRequest> {
    private static final IgniteLogger LOG = Loggers.forClass(ActionRequestProcessor.class);
    private final Executor executor;
    private final RaftMessagesFactory factory;
    private final Map<String, Object> groupIdsToMonitors = new ConcurrentHashMap<String, Object>();

    public ActionRequestProcessor(Executor executor, RaftMessagesFactory factory) {
        this.executor = executor;
        this.factory = factory;
    }

    @Override
    public final void handleRequest(RpcContext rpcCtx, ActionRequest request) {
        Node node = rpcCtx.getNodeManager().get(request.groupId(), new PeerId(rpcCtx.getLocalConsistentId()));
        if (node == null) {
            rpcCtx.sendResponse(this.factory.errorResponse().errorCode(RaftError.UNKNOWN.getNumber()).build());
            return;
        }
        Marshaller commandsMarshaller = node.getOptions().getCommandsMarshaller();
        assert (commandsMarshaller != null) : "Marshaller for group " + request.groupId() + " is not found.";
        this.handleRequestInternal(rpcCtx, node, request, commandsMarshaller);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleRequestInternal(RpcContext rpcCtx, Node node, ActionRequest request, Marshaller commandsMarshaller) {
        JraftServerImpl.DelegatingStateMachine fsm = (JraftServerImpl.DelegatingStateMachine)node.getOptions().getFsm();
        RaftGroupListener listener = fsm.getListener();
        if (request instanceof WriteActionRequest) {
            WriteActionRequest writeRequest = (WriteActionRequest)request;
            WriteCommand command = writeRequest.deserializedCommand();
            if (command == null) {
                command = (WriteCommand)commandsMarshaller.unmarshall(writeRequest.command());
            }
            if (listener instanceof BeforeApplyHandler) {
                Object object = this.groupIdSyncMonitor(request.groupId());
                synchronized (object) {
                    Command patchedCommand = ((BeforeApplyHandler)((Object)listener)).onBeforeApply(command);
                    writeRequest = this.patchRequest(writeRequest, command, patchedCommand, commandsMarshaller);
                    this.applyWrite(node, writeRequest, (WriteCommand)patchedCommand, rpcCtx);
                }
            } else {
                this.applyWrite(node, writeRequest, command, rpcCtx);
            }
        } else {
            ReadActionRequest readRequest = (ReadActionRequest)request;
            if (listener instanceof BeforeApplyHandler) {
                ReadCommand command = readRequest.command();
                Command patchedCommand = ((BeforeApplyHandler)((Object)listener)).onBeforeApply(command);
                readRequest = this.patchRequest(readRequest, command, patchedCommand, commandsMarshaller);
            }
            this.applyRead(node, readRequest, rpcCtx);
        }
    }

    private <AR extends ActionRequest> AR patchRequest(AR request, Command originalCommand, Command patchedCommand, Marshaller commandsMarshaller) {
        if (originalCommand == patchedCommand) {
            return request;
        }
        if (request instanceof WriteActionRequest) {
            return (AR)this.factory.writeActionRequest().groupId(request.groupId()).command(commandsMarshaller.marshall(patchedCommand)).deserializedCommand((WriteCommand)patchedCommand).build();
        }
        return (AR)this.factory.readActionRequest().groupId(request.groupId()).command((ReadCommand)patchedCommand).readOnlySafe(((ReadActionRequest)request).readOnlySafe()).build();
    }

    private Object groupIdSyncMonitor(String groupId) {
        assert (groupId != null);
        return this.groupIdsToMonitors.computeIfAbsent(groupId, k -> groupId);
    }

    private void applyWrite(final Node node, WriteActionRequest request, final WriteCommand command, final RpcContext rpcCtx) {
        final ByteBuffer wrapper = ByteBuffer.wrap(request.command());
        node.apply(new Task(wrapper, new LocalAwareWriteCommandClosure(){
            private HybridTimestamp safeTs;

            @Override
            public void result(Serializable res) {
                ActionRequestProcessor.this.sendResponse(res, rpcCtx);
            }

            @Override
            public WriteCommand command() {
                return command;
            }

            @Override
            @Nullable
            public HybridTimestamp safeTimestamp() {
                return this.safeTs;
            }

            @Override
            public void run(Status status) {
                assert (!status.isOk()) : status;
                ActionRequestProcessor.this.sendRaftError(rpcCtx, status, node);
            }

            @Override
            public void safeTimestamp(HybridTimestamp safeTs) {
                assert (this.safeTs == null) : "Safe time can be set only once";
                node.getOptions().getCommandsMarshaller().patch(wrapper, safeTs);
                this.safeTs = safeTs;
            }
        }));
    }

    private void applyRead(final Node node, final ReadActionRequest request, final RpcContext rpcCtx) {
        if (request.readOnlySafe()) {
            node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure(){

                @Override
                public void run(Status status, long index, byte[] reqCtx) {
                    if (status.isOk()) {
                        JraftServerImpl.DelegatingStateMachine fsm = (JraftServerImpl.DelegatingStateMachine)node.getOptions().getFsm();
                        try {
                            fsm.getListener().onRead(List.of(new CommandClosure<ReadCommand>(){

                                @Override
                                public ReadCommand command() {
                                    return request.command();
                                }

                                @Override
                                public void result(Serializable res) {
                                    ActionRequestProcessor.this.sendResponse(res, rpcCtx);
                                }
                            }).iterator());
                        }
                        catch (Exception e) {
                            ActionRequestProcessor.this.sendRaftError(rpcCtx, RaftError.ESTATEMACHINE, e.getMessage());
                        }
                    } else {
                        ActionRequestProcessor.this.sendRaftError(rpcCtx, status, node);
                    }
                }
            });
        } else {
            JraftServerImpl.DelegatingStateMachine fsm = (JraftServerImpl.DelegatingStateMachine)node.getOptions().getFsm();
            try {
                fsm.getListener().onRead(List.of(new CommandClosure<ReadCommand>(){

                    @Override
                    public ReadCommand command() {
                        return request.command();
                    }

                    @Override
                    public void result(Serializable res) {
                        ActionRequestProcessor.this.sendResponse(res, rpcCtx);
                    }
                }).iterator());
            }
            catch (Exception e) {
                this.sendRaftError(rpcCtx, RaftError.ESTATEMACHINE, e.getMessage());
            }
        }
    }

    private void sendResponse(Serializable res, RpcContext rpcCtx) {
        if (res instanceof RaftGroupListener.ShutdownException) {
            rpcCtx.sendResponse(this.factory.errorResponse().errorCode(RaftError.ESHUTDOWN.getNumber()).build());
        } else if (res instanceof Throwable) {
            this.sendSMError(rpcCtx, (Throwable)res, false);
        } else {
            rpcCtx.sendResponse(this.factory.actionResponse().result(res).build());
        }
    }

    @Override
    public String interest() {
        return ActionRequest.class.getName();
    }

    @Override
    public Executor executor() {
        return this.executor;
    }

    private void sendRaftError(RpcContext ctx, RaftError error, String msg) {
        RpcRequests.ErrorResponse resp = this.factory.errorResponse().errorCode(error.getNumber()).errorMsg(msg).build();
        ctx.sendResponse(resp);
    }

    private void sendSMError(RpcContext ctx, Throwable th, boolean compacted) {
        RpcRequests.SMErrorResponse resp = this.factory.sMErrorResponse().error(compacted ? new SMCompactedThrowable(th) : new SMFullThrowable(th)).build();
        ctx.sendResponse(resp);
        LOG.warn("Error occurred on a user's state machine", th);
    }

    private void sendRaftError(RpcContext ctx, Status status, Node node) {
        PeerId leaderId;
        RaftError raftError = status.getRaftError();
        Message response = null;
        if (raftError == RaftError.EPERM && (leaderId = node.getLeaderId()) != null) {
            response = RaftRpcFactory.DEFAULT.newResponse(leaderId.toString(), this.factory, RaftError.EPERM, status.getErrorMsg(), new Object[0]);
        }
        if (response == null) {
            response = RaftRpcFactory.DEFAULT.newResponse(this.factory, raftError, status.getErrorMsg(), new Object[0]);
        }
        ctx.sendResponse(response);
    }

    private static interface LocalAwareWriteCommandClosure
    extends Closure,
    SafeTimeAwareCommandClosure {
    }
}

