/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.metastorage.server.raft;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import org.apache.ignite3.internal.hlc.HybridClock;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.lang.ByteArray;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.CommandId;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.command.CompactionCommand;
import org.apache.ignite3.internal.metastorage.command.EvictIdempotentCommandsCacheCommand;
import org.apache.ignite3.internal.metastorage.command.IdempotentCommand;
import org.apache.ignite3.internal.metastorage.command.InvokeCommand;
import org.apache.ignite3.internal.metastorage.command.MetaStorageWriteCommand;
import org.apache.ignite3.internal.metastorage.command.MultiInvokeCommand;
import org.apache.ignite3.internal.metastorage.command.PutAllCommand;
import org.apache.ignite3.internal.metastorage.command.PutCommand;
import org.apache.ignite3.internal.metastorage.command.RemoveAllCommand;
import org.apache.ignite3.internal.metastorage.command.RemoveByPrefixCommand;
import org.apache.ignite3.internal.metastorage.command.RemoveCommand;
import org.apache.ignite3.internal.metastorage.command.SyncTimeCommand;
import org.apache.ignite3.internal.metastorage.dsl.CompoundCondition;
import org.apache.ignite3.internal.metastorage.dsl.ConditionType;
import org.apache.ignite3.internal.metastorage.dsl.Iif;
import org.apache.ignite3.internal.metastorage.dsl.MetaStorageMessagesFactory;
import org.apache.ignite3.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite3.internal.metastorage.dsl.Statement;
import org.apache.ignite3.internal.metastorage.server.AndCondition;
import org.apache.ignite3.internal.metastorage.server.Condition;
import org.apache.ignite3.internal.metastorage.server.ExistenceCondition;
import org.apache.ignite3.internal.metastorage.server.If;
import org.apache.ignite3.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite3.internal.metastorage.server.KeyValueUpdateContext;
import org.apache.ignite3.internal.metastorage.server.OrCondition;
import org.apache.ignite3.internal.metastorage.server.RevisionCondition;
import org.apache.ignite3.internal.metastorage.server.Statement;
import org.apache.ignite3.internal.metastorage.server.TombstoneCondition;
import org.apache.ignite3.internal.metastorage.server.ValueCondition;
import org.apache.ignite3.internal.metastorage.server.raft.CommandResultAndTimestamp;
import org.apache.ignite3.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite3.internal.raft.Command;
import org.apache.ignite3.internal.raft.WriteCommand;
import org.apache.ignite3.internal.raft.service.CommandClosure;
import org.apache.ignite3.internal.util.ByteUtils;
import org.apache.ignite3.internal.util.Cursor;
import org.apache.ignite3.internal.util.StringUtils;
import org.jetbrains.annotations.Nullable;

public class MetaStorageWriteHandler {
    private static final IgniteLogger LOG = Loggers.forClass(MetaStorageWriteHandler.class);
    public static final String IDEMPOTENT_COMMAND_PREFIX = "icp.";
    public static final byte[] IDEMPOTENT_COMMAND_PREFIX_BYTES = "icp.".getBytes(StandardCharsets.UTF_8);
    private static final MetaStorageMessagesFactory MSG_FACTORY = new MetaStorageMessagesFactory();
    private final KeyValueStorage storage;
    private final HybridClock clock;
    private final ClusterTimeImpl clusterTime;
    private final IntConsumer idempotentCacheSizeListener;
    private final Map<CommandId, CommandResultAndTimestamp> idempotentCommandCache = new ConcurrentHashMap<CommandId, CommandResultAndTimestamp>();

    MetaStorageWriteHandler(KeyValueStorage storage, HybridClock clock, ClusterTimeImpl clusterTime, IntConsumer idempotentCacheSizeListener) {
        this.storage = storage;
        this.clock = clock;
        this.clusterTime = clusterTime;
        this.idempotentCacheSizeListener = idempotentCacheSizeListener;
    }

    void handleWriteCommand(CommandClosure<WriteCommand> clo) {
        ResultCachingClosure resultClosure;
        WriteCommand command = clo.command();
        if (command instanceof IdempotentCommand) {
            IdempotentCommand idempotentCommand = (IdempotentCommand)command;
            CommandId commandId = idempotentCommand.id();
            CommandResultAndTimestamp cachedResult = this.idempotentCommandCache.get(commandId);
            if (cachedResult != null) {
                Serializable commandResult = cachedResult.commandResult;
                if (commandResult instanceof Boolean && command instanceof MultiInvokeCommand) {
                    Boolean booleanResult = (Boolean)commandResult;
                    clo.result(MSG_FACTORY.statementResult().result(ByteBuffer.wrap(booleanResult != false ? KeyValueStorage.INVOKE_RESULT_TRUE_BYTES : KeyValueStorage.INVOKE_RESULT_FALSE_BYTES)).build());
                } else {
                    clo.result(commandResult);
                }
                return;
            }
            resultClosure = new ResultCachingClosure(clo);
        } else {
            resultClosure = clo;
        }
        this.handleNonCachedWriteCommand(resultClosure);
    }

    private void handleNonCachedWriteCommand(CommandClosure<WriteCommand> clo) {
        WriteCommand command = clo.command();
        long commandIndex = clo.index();
        long commandTerm = clo.term();
        try {
            if (command instanceof MetaStorageWriteCommand) {
                SyncTimeCommand syncTimeCommand;
                MetaStorageWriteCommand cmdWithTime = (MetaStorageWriteCommand)command;
                if (command instanceof SyncTimeCommand && commandTerm != (syncTimeCommand = (SyncTimeCommand)command).initiatorTerm()) {
                    LOG.info("Sync time command closure term {}, initiator term {}, ignoring the command", commandTerm, syncTimeCommand.initiatorTerm());
                    this.storage.setIndexAndTerm(commandIndex, commandTerm);
                    clo.result(null);
                    return;
                }
                this.handleWriteWithTime(clo, cmdWithTime, commandIndex, commandTerm);
            } else assert (false) : "Command was not found [cmd=" + command + "]";
        }
        catch (Throwable e) {
            LOG.error("Unknown error while processing command [commandIndex={}, commandTerm={}, command={}]", e, commandIndex, commandTerm, command);
            clo.result(e);
            throw e;
        }
    }

    private void handleWriteWithTime(CommandClosure<WriteCommand> clo, MetaStorageWriteCommand command, long index, long term) {
        HybridTimestamp opTime = command.safeTime();
        KeyValueUpdateContext context = new KeyValueUpdateContext(index, term, opTime);
        if (command instanceof PutCommand) {
            PutCommand putCmd = (PutCommand)command;
            this.storage.put(ByteUtils.toByteArray(putCmd.key()), ByteUtils.toByteArray(putCmd.value()), context);
            clo.result(null);
        } else if (command instanceof PutAllCommand) {
            PutAllCommand putAllCmd = (PutAllCommand)command;
            this.storage.putAll(ByteUtils.toByteArrayList(putAllCmd.keys()), ByteUtils.toByteArrayList(putAllCmd.values()), context);
            clo.result(null);
        } else if (command instanceof RemoveCommand) {
            RemoveCommand rmvCmd = (RemoveCommand)command;
            this.storage.remove(ByteUtils.toByteArray(rmvCmd.key()), context);
            clo.result(null);
        } else if (command instanceof RemoveAllCommand) {
            RemoveAllCommand rmvAllCmd = (RemoveAllCommand)command;
            this.storage.removeAll(ByteUtils.toByteArrayList(rmvAllCmd.keys()), context);
            clo.result(null);
        } else if (command instanceof RemoveByPrefixCommand) {
            RemoveByPrefixCommand rmvByPrefixCmd = (RemoveByPrefixCommand)command;
            this.storage.removeByPrefix(ByteUtils.toByteArray(rmvByPrefixCmd.prefix()), context);
            clo.result(null);
        } else if (command instanceof InvokeCommand) {
            InvokeCommand cmd = (InvokeCommand)command;
            clo.result(Boolean.valueOf(this.storage.invoke(MetaStorageWriteHandler.toCondition(cmd.condition()), cmd.success(), cmd.failure(), context, cmd.id())));
        } else if (command instanceof MultiInvokeCommand) {
            MultiInvokeCommand cmd = (MultiInvokeCommand)command;
            clo.result(this.storage.invoke(MetaStorageWriteHandler.toIf(cmd.iif()), context, cmd.id()));
        } else if (command instanceof SyncTimeCommand) {
            this.storage.advanceSafeTime(context);
            clo.result(null);
        } else if (command instanceof EvictIdempotentCommandsCacheCommand) {
            EvictIdempotentCommandsCacheCommand cmd = (EvictIdempotentCommandsCacheCommand)command;
            this.evictIdempotentCommandsCache(cmd.evictionTimestamp(), context);
            clo.result(null);
        } else if (command instanceof CompactionCommand) {
            CompactionCommand cmd = (CompactionCommand)command;
            this.storage.updateCompactionRevision(cmd.compactionRevision(), context);
            clo.result(null);
        } else {
            throw new AssertionError((Object)String.format("Unsupported command: [context=%s, command=%s]", context, command));
        }
    }

    public static If toIf(Iif iif) {
        return new If(MetaStorageWriteHandler.toCondition(iif.condition()), MetaStorageWriteHandler.toConditionBranch(iif.andThen()), MetaStorageWriteHandler.toConditionBranch(iif.orElse()));
    }

    private static Statement toConditionBranch(org.apache.ignite3.internal.metastorage.dsl.Statement statement) {
        if (statement instanceof Statement.IfStatement) {
            return new Statement(MetaStorageWriteHandler.toIf(((Statement.IfStatement)statement).iif()));
        }
        if (statement instanceof Statement.UpdateStatement) {
            return new Statement(((Statement.UpdateStatement)statement).update());
        }
        throw new IllegalArgumentException("Unexpected statement type: " + statement);
    }

    private static Condition toCondition(org.apache.ignite3.internal.metastorage.dsl.Condition condition) {
        if (condition instanceof SimpleCondition.ValueCondition) {
            SimpleCondition.ValueCondition valueCondition = (SimpleCondition.ValueCondition)condition;
            return new ValueCondition(MetaStorageWriteHandler.toValueConditionType(valueCondition.type()), ByteUtils.toByteArray(valueCondition.key()), ByteUtils.toByteArray(valueCondition.value()));
        }
        if (condition instanceof SimpleCondition.RevisionCondition) {
            SimpleCondition.RevisionCondition revisionCondition = (SimpleCondition.RevisionCondition)condition;
            return new RevisionCondition(MetaStorageWriteHandler.toRevisionConditionType(revisionCondition.type()), ByteUtils.toByteArray(revisionCondition.key()), revisionCondition.revision());
        }
        if (condition instanceof SimpleCondition) {
            SimpleCondition simpleCondition = (SimpleCondition)condition;
            switch (simpleCondition.type()) {
                case KEY_EXISTS: {
                    return new ExistenceCondition(ExistenceCondition.Type.EXISTS, ByteUtils.toByteArray(simpleCondition.key()));
                }
                case KEY_NOT_EXISTS: {
                    return new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, ByteUtils.toByteArray(simpleCondition.key()));
                }
                case TOMBSTONE: {
                    return new TombstoneCondition(TombstoneCondition.Type.TOMBSTONE, ByteUtils.toByteArray(simpleCondition.key()));
                }
                case NOT_TOMBSTONE: {
                    return new TombstoneCondition(TombstoneCondition.Type.NOT_TOMBSTONE, ByteUtils.toByteArray(simpleCondition.key()));
                }
            }
            throw new IllegalArgumentException("Unexpected simple condition type " + simpleCondition.type());
        }
        if (condition instanceof CompoundCondition) {
            CompoundCondition compoundCondition = (CompoundCondition)condition;
            Condition leftCondition = MetaStorageWriteHandler.toCondition(compoundCondition.leftCondition());
            Condition rightCondition = MetaStorageWriteHandler.toCondition(compoundCondition.rightCondition());
            switch (compoundCondition.type()) {
                case AND: {
                    return new AndCondition(leftCondition, rightCondition);
                }
                case OR: {
                    return new OrCondition(leftCondition, rightCondition);
                }
            }
            throw new IllegalArgumentException("Unexpected compound condition type " + compoundCondition.type());
        }
        throw new IllegalArgumentException("Unknown condition " + condition);
    }

    private static ValueCondition.Type toValueConditionType(ConditionType type) {
        switch (type) {
            case VAL_EQUAL: {
                return ValueCondition.Type.EQUAL;
            }
            case VAL_NOT_EQUAL: {
                return ValueCondition.Type.NOT_EQUAL;
            }
            case VAL_GREATER: {
                return ValueCondition.Type.GREATER;
            }
            case VAL_GREATER_OR_EQUAL: {
                return ValueCondition.Type.GREATER_OR_EQUAL;
            }
            case VAL_LESS: {
                return ValueCondition.Type.LESS;
            }
            case VAL_LESS_OR_EQUAL: {
                return ValueCondition.Type.LESS_OR_EQUAL;
            }
        }
        throw new IllegalArgumentException("Unexpected value condition type " + type);
    }

    private static RevisionCondition.Type toRevisionConditionType(ConditionType type) {
        switch (type) {
            case REV_EQUAL: {
                return RevisionCondition.Type.EQUAL;
            }
            case REV_NOT_EQUAL: {
                return RevisionCondition.Type.NOT_EQUAL;
            }
            case REV_GREATER: {
                return RevisionCondition.Type.GREATER;
            }
            case REV_GREATER_OR_EQUAL: {
                return RevisionCondition.Type.GREATER_OR_EQUAL;
            }
            case REV_LESS: {
                return RevisionCondition.Type.LESS;
            }
            case REV_LESS_OR_EQUAL: {
                return RevisionCondition.Type.LESS_OR_EQUAL;
            }
        }
        throw new IllegalArgumentException("Unexpected revision condition type " + type);
    }

    Command beforeApply(Command command) {
        if (command instanceof MetaStorageWriteCommand) {
            MetaStorageWriteCommand writeCommand = (MetaStorageWriteCommand)command.clone();
            this.clusterTime.adjustClock(writeCommand.initiatorTime());
            writeCommand.safeTime(this.clock.now());
            return writeCommand;
        }
        return command;
    }

    void onSnapshotLoad() {
        byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
        byte[] keyTo = this.storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
        try (Cursor<Entry> cursor = this.storage.range(keyFrom, keyTo);){
            for (Entry entry : cursor) {
                if (entry.tombstone()) continue;
                String commandIdString = StringUtils.toStringWithoutPrefix(entry.key(), IDEMPOTENT_COMMAND_PREFIX_BYTES.length);
                CommandId commandId = CommandId.fromString(commandIdString);
                byte[] entryValue = entry.value();
                assert (entryValue != null);
                Serializable result = entryValue.length == 1 && (entryValue[0] | 1) == 1 ? Boolean.valueOf(ByteUtils.byteToBoolean(entryValue[0])) : MSG_FACTORY.statementResult().result(ByteBuffer.wrap(entryValue)).build();
                this.idempotentCommandCache.put(commandId, new CommandResultAndTimestamp(result, entry.timestamp()));
            }
            this.idempotentCacheSizeListener.accept(this.idempotentCommandCache.size());
        }
    }

    private void evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp, KeyValueUpdateContext context) {
        List<CommandId> evictedCommandIds = this.evictCommandsFromCache(evictionTimestamp);
        if (evictedCommandIds.isEmpty()) {
            return;
        }
        this.storage.removeAll(MetaStorageWriteHandler.toIdempotentCommandKeyBytes(evictedCommandIds), context);
    }

    private List<CommandId> evictCommandsFromCache(HybridTimestamp evictionTimestamp) {
        Iterator<Map.Entry<CommandId, CommandResultAndTimestamp>> iterator = this.idempotentCommandCache.entrySet().iterator();
        ArrayList<CommandId> result = new ArrayList<CommandId>();
        while (iterator.hasNext()) {
            Map.Entry<CommandId, CommandResultAndTimestamp> entry = iterator.next();
            if (evictionTimestamp.compareTo(entry.getValue().commandTimestamp) < 0) continue;
            iterator.remove();
            result.add(entry.getKey());
        }
        this.idempotentCacheSizeListener.accept(this.idempotentCommandCache.size());
        return result;
    }

    private static List<byte[]> toIdempotentCommandKeyBytes(List<CommandId> commandIds) {
        return commandIds.stream().map(MetaStorageWriteHandler::toIdempotentCommandKey).map(ByteArray::bytes).collect(Collectors.toList());
    }

    public static ByteArray toIdempotentCommandKey(CommandId commandId) {
        return new ByteArray(IDEMPOTENT_COMMAND_PREFIX + commandId.toMgKeyAsString());
    }

    private class ResultCachingClosure
    implements CommandClosure<WriteCommand> {
        final CommandClosure<WriteCommand> closure;

        ResultCachingClosure(CommandClosure<WriteCommand> closure) {
            assert (closure.command() instanceof IdempotentCommand);
            this.closure = closure;
        }

        @Override
        public long index() {
            return this.closure.index();
        }

        @Override
        public long term() {
            return this.closure.term();
        }

        @Override
        public WriteCommand command() {
            return this.closure.command();
        }

        @Override
        public void result(@Nullable Serializable res) {
            IdempotentCommand command = (IdempotentCommand)this.closure.command();
            if (!(res instanceof Throwable)) {
                MetaStorageWriteHandler.this.idempotentCommandCache.put(command.id(), new CommandResultAndTimestamp(res, command.safeTime()));
                MetaStorageWriteHandler.this.idempotentCacheSizeListener.accept(MetaStorageWriteHandler.this.idempotentCommandCache.size());
            }
            this.closure.result(res);
        }
    }
}

