/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.metastorage.server.persistence;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import org.apache.ignite3.internal.components.NoOpLogSyncer;
import org.apache.ignite3.internal.failure.FailureProcessor;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.metastorage.CommandId;
import org.apache.ignite3.internal.metastorage.Entry;
import org.apache.ignite3.internal.metastorage.dsl.Operation;
import org.apache.ignite3.internal.metastorage.dsl.OperationType;
import org.apache.ignite3.internal.metastorage.dsl.Operations;
import org.apache.ignite3.internal.metastorage.dsl.StatementResult;
import org.apache.ignite3.internal.metastorage.dsl.Update;
import org.apache.ignite3.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite3.internal.metastorage.exceptions.MetaStorageException;
import org.apache.ignite3.internal.metastorage.impl.EntryImpl;
import org.apache.ignite3.internal.metastorage.server.AbstractKeyValueStorage;
import org.apache.ignite3.internal.metastorage.server.ChecksumAndRevisions;
import org.apache.ignite3.internal.metastorage.server.Condition;
import org.apache.ignite3.internal.metastorage.server.If;
import org.apache.ignite3.internal.metastorage.server.KeyValueStorageUtils;
import org.apache.ignite3.internal.metastorage.server.KeyValueUpdateContext;
import org.apache.ignite3.internal.metastorage.server.MetastorageChecksum;
import org.apache.ignite3.internal.metastorage.server.NotifyWatchProcessorEvent;
import org.apache.ignite3.internal.metastorage.server.ReadOperationForCompactionTracker;
import org.apache.ignite3.internal.metastorage.server.Statement;
import org.apache.ignite3.internal.metastorage.server.UpdateEntriesEvent;
import org.apache.ignite3.internal.metastorage.server.UpdateOnlyRevisionEvent;
import org.apache.ignite3.internal.metastorage.server.Value;
import org.apache.ignite3.internal.metastorage.server.WatchEventHandlingCallback;
import org.apache.ignite3.internal.metastorage.server.persistence.CompactionStatisticsHolder;
import org.apache.ignite3.internal.metastorage.server.persistence.RocksStorageUtils;
import org.apache.ignite3.internal.metastorage.server.persistence.StorageColumnFamilyType;
import org.apache.ignite3.internal.metastorage.server.persistence.WriteBatchProtector;
import org.apache.ignite3.internal.metastorage.server.raft.MetaStorageWriteHandler;
import org.apache.ignite3.internal.raft.IndexWithTerm;
import org.apache.ignite3.internal.rocksdb.ColumnFamily;
import org.apache.ignite3.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite3.internal.rocksdb.RocksUtils;
import org.apache.ignite3.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite3.internal.rocksdb.snapshot.ColumnFamilyRange;
import org.apache.ignite3.internal.rocksdb.snapshot.RocksSnapshotManager;
import org.apache.ignite3.internal.thread.IgniteThreadFactory;
import org.apache.ignite3.internal.thread.ThreadOperation;
import org.apache.ignite3.internal.util.ArrayUtils;
import org.apache.ignite3.internal.util.ByteUtils;
import org.apache.ignite3.internal.util.Cursor;
import org.apache.ignite3.internal.util.IgniteBusyLock;
import org.apache.ignite3.internal.util.IgniteSpinBusyLock;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.lang.ErrorGroups;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
import org.rocksdb.AbstractNativeReference;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.LRUCache;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

public class RocksDbKeyValueStorage
extends AbstractKeyValueStorage {
    private static final IgniteLogger LOG = Loggers.forClass(RocksDbKeyValueStorage.class);
    private static final long SYSTEM_REVISION_MARKER_VALUE = 0L;
    private static final byte[] REVISION_KEY = RocksStorageUtils.keyToRocksKey(0L, "SYSTEM_REVISION_KEY".getBytes(StandardCharsets.UTF_8));
    private static final byte[] COMPACTION_REVISION_KEY = RocksStorageUtils.keyToRocksKey(0L, "SYSTEM_COMPACTION_REVISION_KEY".getBytes(StandardCharsets.UTF_8));
    private static final byte[] INDEX_AND_TERM_KEY = RocksStorageUtils.keyToRocksKey(0L, "SYSTEM_INDEX_AND_TERM_KEY".getBytes(StandardCharsets.UTF_8));
    private static final byte[] CONFIGURATION_KEY = RocksStorageUtils.keyToRocksKey(0L, "SYSTEM_CONFIGURATION_KEY".getBytes(StandardCharsets.UTF_8));
    private static final int COMPACT_BATCH_SIZE = 10;
    private static final int KV_STORAGE_FLUSH_DELAY = 100;
    private final LongSupplier revSupplier = () -> this.rev;
    private final LongSupplier compactedRevSupplier = () -> this.compactionRevision;
    private final WriteBatchProtector writeBatchProtector = new WriteBatchProtector();
    private final ExecutorService executor;
    private final ScheduledExecutorService scheduledExecutor;
    private final Path dbPath;
    private final String nodeName;
    private volatile DBOptions options;
    private volatile RocksDB db;
    private volatile ColumnFamily data;
    private volatile ColumnFamily index;
    private volatile ColumnFamily tsToRevision;
    private volatile ColumnFamily revisionToTs;
    private volatile ColumnFamily revisionToChecksum;
    private volatile RocksSnapshotManager snapshotManager;
    private volatile MetastorageChecksum checksum;
    private final AtomicReference<RecoveryStatus> recoveryStatus = new AtomicReference<RecoveryStatus>(RecoveryStatus.INITIAL);
    private final UpdatedEntries updatedEntries = new UpdatedEntries();
    protected List<AbstractNativeReference> rocksResources = new ArrayList<AbstractNativeReference>();
    private volatile WriteOptions writeOptions;
    private volatile RocksDbFlusher flusher;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean closeGuard = new AtomicBoolean();

    public RocksDbKeyValueStorage(String nodeName, Path dbPath, FailureProcessor failureProcessor, ReadOperationForCompactionTracker readOperationForCompactionTracker, ScheduledExecutorService scheduledExecutor) {
        super(nodeName, failureProcessor, readOperationForCompactionTracker);
        this.dbPath = dbPath;
        this.scheduledExecutor = scheduledExecutor;
        this.nodeName = nodeName;
        this.executor = Executors.newFixedThreadPool(2, IgniteThreadFactory.create(nodeName, "metastorage-rocksdb-kv-storage-executor", this.log, new ThreadOperation[0]));
    }

    @Override
    public void start() {
        IgniteUtils.inBusyLock((IgniteBusyLock)this.busyLock, this::startBusy);
    }

    private void startBusy() {
        try {
            Files.createDirectories(this.dbPath, new FileAttribute[0]);
            this.createDb();
        }
        catch (IOException | RocksDBException e) {
            this.closeRocksResources();
            throw new MetaStorageException(ErrorGroups.MetaStorage.STARTING_STORAGE_ERR, "Failed to start the storage", (Throwable)e);
        }
    }

    private List<ColumnFamilyDescriptor> cfDescriptors() {
        Options baseOptions = new Options().setCreateIfMissing(true).setNumLevels(4).setMaxWriteBufferNumber(4).setTableFormatConfig(new BlockBasedTableConfig().setPinL0FilterAndIndexBlocksInCache(true).setFilterPolicy(new BloomFilter(12.0)).setBlockCache(new LRUCache(0x4000000L)));
        ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(baseOptions).useFixedLengthPrefixExtractor(8);
        this.rocksResources.add(dataFamilyOptions);
        ColumnFamilyOptions indexFamilyOptions = new ColumnFamilyOptions(baseOptions);
        this.rocksResources.add(indexFamilyOptions);
        ColumnFamilyOptions tsToRevFamilyOptions = new ColumnFamilyOptions(baseOptions);
        this.rocksResources.add(tsToRevFamilyOptions);
        ColumnFamilyOptions revToTsFamilyOptions = new ColumnFamilyOptions(baseOptions);
        this.rocksResources.add(revToTsFamilyOptions);
        ColumnFamilyOptions revToChecksumFamilyOptions = new ColumnFamilyOptions(baseOptions);
        this.rocksResources.add(revToChecksumFamilyOptions);
        return List.of(new ColumnFamilyDescriptor(StorageColumnFamilyType.DATA.nameAsBytes(), dataFamilyOptions), new ColumnFamilyDescriptor(StorageColumnFamilyType.INDEX.nameAsBytes(), indexFamilyOptions), new ColumnFamilyDescriptor(StorageColumnFamilyType.TS_TO_REVISION.nameAsBytes(), tsToRevFamilyOptions), new ColumnFamilyDescriptor(StorageColumnFamilyType.REVISION_TO_TS.nameAsBytes(), revToTsFamilyOptions), new ColumnFamilyDescriptor(StorageColumnFamilyType.REVISION_TO_CHECKSUM.nameAsBytes(), revToChecksumFamilyOptions));
    }

    protected DBOptions createDbOptions() {
        DBOptions options = ((DBOptions)new DBOptions().setAtomicFlush(true).setCreateMissingColumnFamilies(true).setListeners((List)List.of(this.flusher.listener()))).setCreateIfMissing(true);
        this.rocksResources.add(options);
        return options;
    }

    private void createDb() throws RocksDBException {
        this.writeOptions = new WriteOptions().setDisableWAL(true);
        this.rocksResources.add(this.writeOptions);
        List<ColumnFamilyDescriptor> descriptors = this.cfDescriptors();
        descriptors.stream().map(ColumnFamilyDescriptor::getOptions).distinct().forEach(this.rocksResources::add);
        assert (descriptors.size() == 5) : descriptors.size();
        ArrayList<ColumnFamilyHandle> handles = new ArrayList<ColumnFamilyHandle>(descriptors.size());
        this.flusher = new RocksDbFlusher("rocksdb metastorage kv storage", this.nodeName, this.busyLock, this.scheduledExecutor, this.executor, () -> 100, new NoOpLogSyncer(), this.failureProcessor, () -> {});
        this.options = this.createDbOptions();
        this.db = RocksDB.open(this.options, this.dbPath.toAbsolutePath().toString(), descriptors, handles);
        this.rocksResources.add(this.db);
        this.rocksResources.addAll(handles);
        this.data = ColumnFamily.wrap(this.db, handles.get(0));
        this.index = ColumnFamily.wrap(this.db, handles.get(1));
        this.tsToRevision = ColumnFamily.wrap(this.db, handles.get(2));
        this.revisionToTs = ColumnFamily.wrap(this.db, handles.get(3));
        this.revisionToChecksum = ColumnFamily.wrap(this.db, handles.get(4));
        this.snapshotManager = new RocksSnapshotManager(this.db, List.of(ColumnFamilyRange.fullRange(this.data), ColumnFamilyRange.fullRange(this.index), ColumnFamilyRange.fullRange(this.tsToRevision), ColumnFamilyRange.fullRange(this.revisionToTs), ColumnFamilyRange.fullRange(this.revisionToChecksum)), this.executor);
        this.flusher.init(this.db, handles);
        byte[] revision = this.data.get(REVISION_KEY);
        if (revision != null) {
            this.rev = ByteUtils.bytesToLong(revision);
        }
        this.checksum = new MetastorageChecksum(revision == null ? 0L : this.checksumByRevision(this.rev));
        byte[] compactionRevisionBytes = this.data.get(COMPACTION_REVISION_KEY);
        if (compactionRevisionBytes != null) {
            this.compactionRevision = ByteUtils.bytesToLong(compactionRevisionBytes);
        }
    }

    private long checksumByRevision(long revision) throws RocksDBException {
        byte[] bytes = this.revisionToChecksum.get(RocksStorageUtils.longToBytes(revision));
        if (bytes == null) {
            throw new CompactedException(revision, this.compactionRevision);
        }
        return RocksStorageUtils.bytesToLong(bytes);
    }

    private long checksumByRevisionOrZero(long revision) throws RocksDBException {
        byte[] bytes = this.revisionToChecksum.get(RocksStorageUtils.longToBytes(revision));
        if (bytes == null) {
            return 0L;
        }
        return RocksStorageUtils.bytesToLong(bytes);
    }

    protected void destroyRocksDb() throws IOException {
        IgniteUtils.deleteIfExists(this.dbPath);
        Files.createDirectories(this.dbPath, new FileAttribute[0]);
    }

    @Override
    public void close() throws Exception {
        if (!this.closeGuard.compareAndSet(false, true)) {
            return;
        }
        this.stopCompaction();
        this.busyLock.block();
        this.watchProcessor.close();
        this.flusher.stop();
        IgniteUtils.shutdownAndAwaitTermination(this.executor, 10L, TimeUnit.SECONDS);
        this.closeRocksResources();
    }

    private void closeRocksResources() {
        Collections.reverse(this.rocksResources);
        RocksUtils.closeAll(this.rocksResources);
        this.rocksResources = new ArrayList<AbstractNativeReference>();
    }

    @Override
    public CompletableFuture<Void> snapshot(Path snapshotPath) {
        return this.snapshotManager.createSnapshot(snapshotPath).thenCompose(unused -> this.flush());
    }

    @Override
    public void restoreSnapshot(Path path) {
        try {
            this.clear();
            this.snapshotManager.restoreSnapshot(path);
            this.rev = RocksStorageUtils.bytesToLong(this.data.get(REVISION_KEY));
            byte[] compactionRevisionBytes = this.data.get(COMPACTION_REVISION_KEY);
            if (compactionRevisionBytes != null) {
                this.compactionRevision = RocksStorageUtils.bytesToLong(compactionRevisionBytes);
            }
            this.notifyRevisionsUpdate();
        }
        catch (MetaStorageException e) {
            throw e;
        }
        catch (Exception e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.RESTORING_STORAGE_ERR, "Failed to restore snapshot", (Throwable)e);
        }
    }

    @Override
    public Entry get(byte[] key) {
        try (ReadOperationForCompactionTracker.TrackingToken unused = this.readOperationForCompactionTracker.track(-1L, this.revSupplier, this.compactedRevSupplier);){
            Entry entry = super.get(key);
            return entry;
        }
    }

    @Override
    public List<Entry> getAll(List<byte[]> keys) {
        try (ReadOperationForCompactionTracker.TrackingToken unused = this.readOperationForCompactionTracker.track(-1L, this.revSupplier, this.compactedRevSupplier);){
            List<Entry> list = super.getAll(keys);
            return list;
        }
    }

    @Override
    public void put(byte[] key, byte[] value, KeyValueUpdateContext context) {
        try (WriteBatch batch = new WriteBatch();){
            long newChecksum = this.checksum.wholePut(key, value);
            long curRev = this.rev + 1L;
            this.addDataToBatch(batch, key, value, curRev, context.timestamp);
            this.updateKeysIndex(batch, key, curRev);
            this.completeAndWriteBatch(batch, curRev, context, newChecksum);
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
        }
    }

    @Override
    public void setIndexAndTerm(long index, long term) {
        try {
            this.db.put(this.data.handle(), this.writeOptions, INDEX_AND_TERM_KEY, RocksStorageUtils.longsToBytes(0, index, term));
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
        }
    }

    @Override
    @Nullable
    public IndexWithTerm getIndexWithTerm() {
        try {
            byte[] bytes = this.data.get(INDEX_AND_TERM_KEY);
            if (bytes == null) {
                return null;
            }
            return new IndexWithTerm(RocksStorageUtils.bytesToLong(bytes, 0), RocksStorageUtils.bytesToLong(bytes, 8));
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
        }
    }

    @Override
    public void saveConfiguration(byte[] configuration, long lastAppliedIndex, long lastAppliedTerm) {
        try (WriteBatch batch = new WriteBatch();){
            this.data.put(batch, INDEX_AND_TERM_KEY, RocksStorageUtils.longsToBytes(0, lastAppliedIndex, lastAppliedTerm));
            this.data.put(batch, CONFIGURATION_KEY, configuration);
            this.writeBatch(batch);
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
        }
    }

    @Override
    public byte @Nullable [] getConfiguration() {
        try {
            return this.data.get(CONFIGURATION_KEY);
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
        }
    }

    private void updateKeysIndex(WriteBatch batch, byte[] key, long curRev) {
        try {
            byte @Nullable [] array = this.index.get(key);
            this.index.put(batch, key, RocksStorageUtils.appendLong(array, curRev));
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
        }
    }

    private void completeAndWriteBatch(WriteBatch batch, long newRev, KeyValueUpdateContext context, long newChecksum) throws RocksDBException {
        byte[] revisionBytes = RocksStorageUtils.longToBytes(newRev);
        boolean sameChecksumAlreadyExists = this.validateNoChecksumConflict(newRev, newChecksum);
        if (!sameChecksumAlreadyExists) {
            this.revisionToChecksum.put(batch, revisionBytes, RocksStorageUtils.longToBytes(newChecksum));
        }
        HybridTimestamp ts = context.timestamp;
        this.data.put(batch, REVISION_KEY, revisionBytes);
        byte[] tsBytes = RocksDbKeyValueStorage.hybridTsToArray(ts);
        this.tsToRevision.put(batch, tsBytes, revisionBytes);
        this.revisionToTs.put(batch, revisionBytes, tsBytes);
        this.addIndexAndTermToWriteBatch(batch, context);
        this.writeBatch(batch);
        this.rev = newRev;
        this.checksum.commitRound(newChecksum);
        this.updatedEntries.ts = ts;
        this.queueWatchEvent();
        this.notifyRevisionsUpdate();
    }

    private boolean validateNoChecksumConflict(long newRev, long newChecksum) throws RocksDBException {
        long existingChecksum;
        byte[] existingChecksumBytes = this.revisionToChecksum.get(RocksStorageUtils.longToBytes(newRev));
        if (existingChecksumBytes != null && (existingChecksum = RocksStorageUtils.bytesToLong(existingChecksumBytes)) != newChecksum) {
            throw new MetaStorageException(ErrorGroups.Common.INTERNAL_ERR, String.format("Metastorage revision checksum differs from a checksum for the same revision saved earlier. This probably means that the Metastorage has diverged. [revision=%d, existingChecksum=%d, newChecksum=%d]", newRev, existingChecksum, newChecksum));
        }
        return existingChecksumBytes != null;
    }

    private static byte[] hybridTsToArray(HybridTimestamp ts) {
        return RocksStorageUtils.longToBytes(ts.longValue());
    }

    @Override
    public void putAll(List<byte[]> keys, List<byte[]> values, KeyValueUpdateContext context) {
        try (WriteBatch batch = new WriteBatch();){
            long newChecksum = this.checksum.wholePutAll(keys, values);
            long curRev = this.rev + 1L;
            this.addAllToBatch(batch, keys, values, curRev, context.timestamp);
            for (byte[] key : keys) {
                this.updateKeysIndex(batch, key, curRev);
            }
            this.completeAndWriteBatch(batch, curRev, context, newChecksum);
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
        }
    }

    @Override
    public void remove(byte[] key, KeyValueUpdateContext context) {
        try (WriteBatch batch = new WriteBatch();){
            long newChecksum = this.checksum.wholeRemove(key);
            long curRev = this.rev + 1L;
            if (this.addToBatchForRemoval(batch, key, curRev, context.timestamp)) {
                this.updateKeysIndex(batch, key, curRev);
            }
            this.completeAndWriteBatch(batch, curRev, context, newChecksum);
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
        }
    }

    @Override
    public void removeAll(List<byte[]> keys, KeyValueUpdateContext context) {
        try (WriteBatch batch = new WriteBatch();){
            long newChecksum = this.checksum.wholeRemoveAll(keys);
            long curRev = this.rev + 1L;
            ArrayList<byte[]> existingKeys = new ArrayList<byte[]>(keys.size());
            for (byte[] key : keys) {
                if (!this.addToBatchForRemoval(batch, key, curRev, context.timestamp)) continue;
                existingKeys.add(key);
            }
            for (byte[] key : existingKeys) {
                this.updateKeysIndex(batch, key, curRev);
            }
            this.completeAndWriteBatch(batch, curRev, context, newChecksum);
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
        }
    }

    @Override
    public void removeByPrefix(byte[] prefix, KeyValueUpdateContext context) {
        try (WriteBatch batch = new WriteBatch();
             Cursor<Entry> entryCursor = this.range(prefix, this.nextKey(prefix));){
            long curRev = this.rev + 1L;
            for (Entry entry : entryCursor) {
                byte[] key = entry.key();
                if (!this.addToBatchForRemoval(batch, key, curRev, context.timestamp)) continue;
                this.updateKeysIndex(batch, key, curRev);
            }
            this.completeAndWriteBatch(batch, curRev, context, this.checksum.wholeRemoveByPrefix(prefix));
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
        }
    }

    @Override
    public boolean invoke(Condition condition, List<Operation> success, List<Operation> failure, KeyValueUpdateContext context, CommandId commandId) {
        try {
            Entry[] entries = this.getAll(Arrays.asList(condition.keys())).toArray(new Entry[0]);
            boolean branch = condition.test(entries);
            ByteBuffer updateResult = ByteBuffer.wrap(branch ? INVOKE_RESULT_TRUE_BYTES : INVOKE_RESULT_FALSE_BYTES);
            ArrayList<Operation> ops = new ArrayList<Operation>(branch ? success : failure);
            ops.add(Operations.put(MetaStorageWriteHandler.toIdempotentCommandKey(commandId), updateResult));
            this.applyOperations(ops, context, false, updateResult);
            return branch;
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
        }
    }

    @Override
    public StatementResult invoke(If iif, KeyValueUpdateContext context, CommandId commandId) {
        try {
            If currIf = iif;
            byte maximumNumOfNestedBranch = 100;
            while (true) {
                Statement branch;
                byte by = maximumNumOfNestedBranch;
                maximumNumOfNestedBranch = (byte)(maximumNumOfNestedBranch - 1);
                if (by <= 0) {
                    throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, "Too many nested (" + maximumNumOfNestedBranch + ") statements in multi-invoke command.");
                }
                Entry[] entries = this.getAll(Arrays.asList(currIf.cond().keys())).toArray(new Entry[0]);
                Statement statement = branch = currIf.cond().test(entries) ? currIf.andThen() : currIf.orElse();
                if (branch.isTerminal()) {
                    Update update = branch.update();
                    ByteBuffer updateResult = update.result().result();
                    ArrayList<Operation> ops = new ArrayList<Operation>(update.operations());
                    ops.add(Operations.put(MetaStorageWriteHandler.toIdempotentCommandKey(commandId), updateResult));
                    this.applyOperations(ops, context, true, updateResult);
                    return update.result();
                }
                currIf = branch.iif();
            }
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
        }
    }

    private void applyOperations(List<Operation> ops, KeyValueUpdateContext context, boolean multiInvoke, ByteBuffer updateResult) throws RocksDBException {
        HybridTimestamp opTs = context.timestamp;
        long curRev = this.rev + 1L;
        ArrayList<byte[]> updatedKeys = new ArrayList<byte[]>();
        int nonDummyOps = (int)ops.stream().filter(op -> op.type() != OperationType.NO_OP).count();
        this.checksum.prepareForInvoke(multiInvoke, nonDummyOps, ByteUtils.toByteArray(updateResult));
        try (WriteBatch batch = new WriteBatch();){
            block10: for (Operation op2 : ops) {
                byte @Nullable [] key = op2.key() == null ? null : ByteUtils.toByteArray(op2.key());
                switch (op2.type()) {
                    case PUT: {
                        byte[] value = ByteUtils.toByteArray(op2.value());
                        this.addDataToBatch(batch, key, value, curRev, opTs);
                        updatedKeys.add(key);
                        this.checksum.appendPutAsPart(key, value);
                        continue block10;
                    }
                    case REMOVE: {
                        if (this.addToBatchForRemoval(batch, key, curRev, opTs)) {
                            updatedKeys.add(key);
                        }
                        this.checksum.appendRemoveAsPart(key);
                        continue block10;
                    }
                    case NO_OP: {
                        continue block10;
                    }
                }
                throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, "Unknown operation type: " + op2.type());
            }
            for (byte[] key : updatedKeys) {
                this.updateKeysIndex(batch, key, curRev);
            }
            this.completeAndWriteBatch(batch, curRev, context, this.checksum.roundValue());
        }
    }

    @Override
    public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
        return this.doRange(keyFrom, keyTo, this.rev);
    }

    @Override
    public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long revUpperBound) {
        return this.doRange(keyFrom, keyTo, revUpperBound);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void startWatches(long startRevision, WatchEventHandlingCallback callback) {
        long currentRevision;
        assert (startRevision > 0L) : startRevision;
        Object object = this.watchProcessorMutex;
        synchronized (object) {
            this.watchProcessor.setWatchEventHandlingCallback(callback);
            currentRevision = this.rev;
            if (currentRevision == 0L) {
                this.recoveryStatus.set(RecoveryStatus.DONE);
            } else {
                this.recoveryStatus.set(RecoveryStatus.IN_PROGRESS);
            }
        }
        if (currentRevision != 0L) {
            Set<UpdateEntriesEvent> updateEntriesEvents = this.collectUpdateEntriesEventsFromStorage(startRevision, currentRevision);
            Set<UpdateOnlyRevisionEvent> updateOnlyRevisionEvents = this.collectUpdateRevisionEventsFromStorage(startRevision, currentRevision);
            Object object2 = this.watchProcessorMutex;
            synchronized (object2) {
                this.notifyWatchProcessorEventsBeforeStartingWatches.addAll(updateEntriesEvents);
                this.notifyWatchProcessorEventsBeforeStartingWatches.addAll(updateOnlyRevisionEvents);
                this.drainNotifyWatchProcessorEventsBeforeStartingWatches();
                this.recoveryStatus.set(RecoveryStatus.DONE);
            }
        }
    }

    @Override
    public void compact(long revision) {
        assert (revision >= 0L) : revision;
        LOG.info("Metastore compaction has started. [revision={}]", revision);
        CompactionStatisticsHolder statHolder = new CompactionStatisticsHolder(revision);
        try {
            this.compactKeys(revision, statHolder);
            this.compactAuxiliaryMappings(revision, statHolder);
            statHolder.onFinished();
            LOG.info("Metastore compaction completed successfully. [" + statHolder.info() + "]", new Object[0]);
        }
        catch (Throwable t) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.COMPACTION_ERR, "Error during compaction: " + revision, t);
        }
    }

    private boolean addToBatchForRemoval(WriteBatch batch, byte[] key, long curRev, HybridTimestamp opTs) throws RocksDBException {
        Entry e = this.doGet(key, curRev);
        if (e.empty() || e.tombstone()) {
            return false;
        }
        this.addDataToBatch(batch, key, Value.TOMBSTONE, curRev, opTs);
        return true;
    }

    private void compactForKey(WriteBatch batch, List<byte[]> batchKeys, byte[] key, long[] revs, long compactionRevision, CompactionStatisticsHolder statHolder) {
        try {
            statHolder.onKeyEncountered();
            int indexToCompact = KeyValueStorageUtils.indexToCompact(revs, compactionRevision, revision -> this.isTombstoneForCompaction(key, revision));
            if (-1 == indexToCompact) {
                return;
            }
            batchKeys.add(key);
            for (int revisionIndex = 0; revisionIndex <= indexToCompact; ++revisionIndex) {
                statHolder.onKeyRevisionCompacted();
                this.data.delete(batch, RocksStorageUtils.keyToRocksKey(revs[revisionIndex], key));
            }
            if (indexToCompact == revs.length - 1) {
                statHolder.onTombstoneCompacted();
                this.index.delete(batch, key);
            } else {
                statHolder.onKeyCompacted();
                this.index.put(batch, key, RocksStorageUtils.longsToBytes(indexToCompact + 1, revs));
            }
        }
        catch (Throwable t) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.COMPACTION_ERR, String.format("Error during compaction of key: [KeyBytes=%s, keyBytesToUtf8String=%s]", Arrays.toString(key), KeyValueStorageUtils.toUtf8String(key)), t);
        }
    }

    private long[] getRevisions(byte[] key) throws RocksDBException {
        byte[] revisions = this.index.get(key);
        if (revisions == null) {
            return ArrayUtils.LONG_EMPTY_ARRAY;
        }
        return RocksStorageUtils.getAsLongs(revisions);
    }

    private void addDataToBatch(WriteBatch batch, byte[] key, byte[] value, long curRev, HybridTimestamp opTs) throws RocksDBException {
        byte[] rocksKey = RocksStorageUtils.keyToRocksKey(curRev, key);
        byte[] rocksValue = RocksStorageUtils.valueToBytes(value, opTs);
        this.data.put(batch, rocksKey, rocksValue);
        this.updatedEntries.add(EntryImpl.toEntry(key, curRev, new Value(value, opTs)));
    }

    private void addAllToBatch(WriteBatch batch, List<byte[]> keys, List<byte[]> values, long curRev, HybridTimestamp opTs) throws RocksDBException {
        for (int i = 0; i < keys.size(); ++i) {
            byte[] key = keys.get(i);
            byte[] bytes = values.get(i);
            this.addDataToBatch(batch, key, bytes, curRev, opTs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void queueWatchEvent() {
        Object object = this.watchProcessorMutex;
        synchronized (object) {
            if (this.recoveryStatus.get() == RecoveryStatus.INITIAL) {
                this.updatedEntries.clear();
            } else {
                this.notifyWatchProcessor(this.updatedEntries.toNotifyWatchProcessorEvent(this.rev));
            }
        }
    }

    private Set<UpdateEntriesEvent> collectUpdateEntriesEventsFromStorage(long lowerRevision, long upperRevision) {
        long minWatchRevision = Math.max(lowerRevision, this.watchProcessor.minWatchRevision().orElse(-1L));
        if (minWatchRevision > upperRevision) {
            return Set.of();
        }
        ArrayList<Entry> updatedEntries = new ArrayList<Entry>();
        HybridTimestamp ts = null;
        TreeSet<UpdateEntriesEvent> events = new TreeSet<UpdateEntriesEvent>();
        try (Slice upperBound = new Slice(RocksStorageUtils.longToBytes(upperRevision + 1L));
             ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound);
             RocksIterator it = this.data.newIterator(options);){
            it.seek(RocksStorageUtils.longToBytes(minWatchRevision));
            long lastSeenRevision = minWatchRevision;
            while (it.isValid()) {
                byte[] rocksKey = it.key();
                byte[] rocksValue = it.value();
                long revision = RocksStorageUtils.revisionFromRocksKey(rocksKey);
                if (revision != lastSeenRevision) {
                    if (!updatedEntries.isEmpty()) {
                        List<Entry> updatedEntriesCopy = List.copyOf(updatedEntries);
                        assert (ts != null) : revision;
                        UpdateEntriesEvent event = new UpdateEntriesEvent(updatedEntriesCopy, ts);
                        boolean added = events.add(event);
                        assert (added) : event;
                        updatedEntries.clear();
                        ts = HybridTimestamp.hybridTimestamp(RocksStorageUtils.timestampFromRocksValue(rocksValue));
                    }
                    lastSeenRevision = revision;
                }
                if (ts == null) {
                    ts = HybridTimestamp.hybridTimestamp(RocksStorageUtils.timestampFromRocksValue(rocksValue));
                }
                updatedEntries.add(EntryImpl.toEntry(RocksStorageUtils.rocksKeyToBytes(rocksKey), revision, RocksStorageUtils.bytesToValue(rocksValue)));
                it.next();
            }
            try {
                it.status();
            }
            catch (RocksDBException e) {
                throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
            }
            if (!updatedEntries.isEmpty()) {
                assert (ts != null);
                UpdateEntriesEvent event = new UpdateEntriesEvent(updatedEntries, ts);
                boolean added = events.add(event);
                assert (added) : event;
            }
        }
        return events;
    }

    private Set<UpdateOnlyRevisionEvent> collectUpdateRevisionEventsFromStorage(long lowerRevision, long upperRevision) {
        TreeSet<UpdateOnlyRevisionEvent> events = new TreeSet<UpdateOnlyRevisionEvent>();
        try (Slice upperBound = new Slice(RocksStorageUtils.longToBytes(upperRevision + 1L));
             ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound);
             RocksIterator it = this.revisionToTs.newIterator(options);){
            it.seek(RocksStorageUtils.longToBytes(lowerRevision));
            while (it.isValid()) {
                byte[] rocksKey = it.key();
                byte[] rocksValue = it.value();
                long revision = RocksStorageUtils.bytesToLong(rocksKey);
                HybridTimestamp time = HybridTimestamp.hybridTimestamp(RocksStorageUtils.bytesToLong(rocksValue));
                UpdateOnlyRevisionEvent event = new UpdateOnlyRevisionEvent(revision, time);
                boolean added = events.add(event);
                assert (added) : event;
                try {
                    it.status();
                }
                catch (RocksDBException e) {
                    throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
                }
                it.next();
            }
        }
        return events;
    }

    @Override
    public HybridTimestamp timestampByRevision(long revision) {
        try {
            KeyValueStorageUtils.assertRequestedRevisionLessThanOrEqualToCurrent(revision, this.rev);
            byte[] tsBytes = this.revisionToTs.get(RocksStorageUtils.longToBytes(revision));
            if (tsBytes == null) {
                throw new CompactedException("Requested revision has already been compacted: " + revision);
            }
            return HybridTimestamp.hybridTimestamp(RocksStorageUtils.bytesToLong(tsBytes));
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, "Error reading revision timestamp: " + revision, (Throwable)e);
        }
    }

    @Override
    public long revisionByTimestamp(HybridTimestamp timestamp) {
        long l;
        block9: {
            RocksIterator rocksIterator = this.tsToRevision.newIterator();
            try {
                rocksIterator.seekForPrev(RocksDbKeyValueStorage.hybridTsToArray(timestamp));
                rocksIterator.status();
                if (!rocksIterator.isValid()) {
                    throw new CompactedException("Revisions less than or equal to the requested one are already compacted: " + timestamp);
                }
                l = RocksStorageUtils.bytesToLong(rocksIterator.value());
                if (rocksIterator == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (rocksIterator != null) {
                        try {
                            rocksIterator.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (RocksDBException e) {
                    throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, (Throwable)e);
                }
            }
            rocksIterator.close();
        }
        return l;
    }

    @TestOnly
    public Path getDbPath() {
        return this.dbPath;
    }

    @Override
    protected void saveCompactionRevision(long revision, KeyValueUpdateContext context, boolean advanceSafeTime) {
        try (WriteBatch batch = new WriteBatch();){
            this.data.put(batch, COMPACTION_REVISION_KEY, RocksStorageUtils.longToBytes(revision));
            this.addIndexAndTermToWriteBatch(batch, context);
            this.writeBatch(batch);
            if (advanceSafeTime && this.areWatchesStarted()) {
                this.watchProcessor.advanceSafeTime(() -> {}, context.timestamp);
            }
        }
        catch (Throwable t) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.COMPACTION_ERR, "Error saving compaction revision: " + revision, t);
        }
    }

    @Override
    public long checksum(long revision) {
        try {
            KeyValueStorageUtils.assertRequestedRevisionLessThanOrEqualToCurrent(revision, this.rev);
            return this.checksumByRevision(revision);
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.Common.INTERNAL_ERR, "Cannot get checksum by revision: " + revision, (Throwable)e);
        }
    }

    @Override
    public ChecksumAndRevisions checksumAndRevisions(long revision) {
        try {
            long currentRevision = this.rev;
            return new ChecksumAndRevisions(this.checksumByRevisionOrZero(revision), this.minChecksummedRevisionOrZero(), currentRevision);
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.Common.INTERNAL_ERR, "Cannot get checksum by revision: " + revision, (Throwable)e);
        }
    }

    private long minChecksummedRevisionOrZero() throws RocksDBException {
        try (ReadOptions options = new ReadOptions().setTailing(true);){
            long l;
            block16: {
                RocksIterator it;
                block14: {
                    long l2;
                    block15: {
                        it = this.revisionToChecksum.newIterator(options);
                        try {
                            it.seekToFirst();
                            if (!it.isValid()) break block14;
                            l2 = RocksStorageUtils.bytesToLong(it.key());
                            if (it == null) break block15;
                        }
                        catch (Throwable throwable) {
                            if (it != null) {
                                try {
                                    it.close();
                                }
                                catch (Throwable throwable2) {
                                    throwable.addSuppressed(throwable2);
                                }
                            }
                            throw throwable;
                        }
                        it.close();
                    }
                    return l2;
                }
                it.status();
                l = 0L;
                if (it == null) break block16;
                it.close();
            }
            return l;
        }
    }

    @Override
    public void clear() {
        try {
            this.closeRocksResources();
            this.destroyRocksDb();
            this.rev = 0L;
            this.compactionRevision = -1L;
            this.updatedEntries.clear();
            this.createDb();
        }
        catch (Exception e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.RESTORING_STORAGE_ERR, "Failed to restore snapshot", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    private void compactKeys(long compactionRevision, CompactionStatisticsHolder statHolder) throws RocksDBException {
        KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent(this.compactionRevision, this.rev);
        WriteBatchProtector writeBatchProtector = this.writeBatchProtector;
        // MONITORENTER : writeBatchProtector
        this.writeBatchProtector.clear();
        // MONITOREXIT : writeBatchProtector
        if (!this.busyLock.enterBusy()) {
            statHolder.onCancelled();
            return;
        }
        try (RocksIterator iterator = this.index.newIterator();){
            byte[] key = null;
            ArrayList<byte[]> batchKeys = new ArrayList<byte[]>(10);
            iterator.seekToFirst();
            while (iterator.isValid()) {
                try (WriteBatch batch = new WriteBatch();){
                    byte[] retryPositionKey = key;
                    batchKeys.clear();
                    for (int i = 0; i < 10 && iterator.isValid(); ++i) {
                        if (this.stopCompaction.get()) {
                            statHolder.onCancelled();
                            return;
                        }
                        iterator.status();
                        key = iterator.key();
                        this.compactForKey(batch, batchKeys, key, RocksStorageUtils.getAsLongs(iterator.value()), compactionRevision, statHolder);
                        iterator.next();
                    }
                    if (this.writeCompactedBatch(batchKeys, batch, statHolder)) continue;
                    key = retryPositionKey;
                    RocksDbKeyValueStorage.refreshIterator(iterator, key);
                }
            }
            return;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void compactAuxiliaryMappings(long compactionRevision, CompactionStatisticsHolder statHolder) throws RocksDBException {
        KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent(compactionRevision, this.rev);
        if (!this.busyLock.enterBusy()) {
            statHolder.onCancelled();
            return;
        }
        try (RocksIterator iterator = this.revisionToTs.newIterator();){
            boolean continueIterating = true;
            iterator.seekToFirst();
            while (continueIterating) {
                if (!iterator.isValid()) return;
                try (WriteBatch batch = new WriteBatch();){
                    for (int i = 0; i < 10 && iterator.isValid(); ++i) {
                        if (this.stopCompaction.get()) {
                            statHolder.onCancelled();
                            return;
                        }
                        iterator.status();
                        statHolder.onAuxiliaryMappingCompacted();
                        if (!this.deleteAuxiliaryMapping(compactionRevision, iterator, batch)) {
                            continueIterating = false;
                            break;
                        }
                        iterator.next();
                    }
                    this.db.write(this.writeOptions, batch);
                }
            }
            return;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private boolean deleteAuxiliaryMapping(long compactionRevision, RocksIterator iterator, WriteBatch batch) throws RocksDBException {
        byte[] key = iterator.key();
        long revision = RocksStorageUtils.bytesToLong(key);
        if (revision > compactionRevision) {
            return false;
        }
        this.revisionToTs.delete(batch, key);
        this.tsToRevision.delete(batch, iterator.value());
        this.revisionToChecksum.delete(batch, key);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeBatch(WriteBatch batch) throws RocksDBException {
        WriteBatchProtector writeBatchProtector = this.writeBatchProtector;
        synchronized (writeBatchProtector) {
            for (Entry updatedEntry : this.updatedEntries.updatedEntries) {
                this.writeBatchProtector.onUpdate(updatedEntry.key());
            }
            this.db.write(this.writeOptions, batch);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean writeCompactedBatch(List<byte[]> batchKeys, WriteBatch batch, CompactionStatisticsHolder statHolder) throws RocksDBException {
        statHolder.onBeforeWriteBatchLock();
        WriteBatchProtector writeBatchProtector = this.writeBatchProtector;
        synchronized (writeBatchProtector) {
            statHolder.onAfterWriteBatchLock();
            for (byte[] key : batchKeys) {
                if (!this.writeBatchProtector.maybeUpdated(key)) continue;
                this.writeBatchProtector.clear();
                statHolder.onBatchAborted();
                return false;
            }
            this.db.write(this.writeOptions, batch);
            statHolder.onBatchCommitted();
        }
        return true;
    }

    private static void refreshIterator(RocksIterator iterator, byte @Nullable [] key) throws RocksDBException {
        iterator.refresh();
        if (key == null) {
            iterator.seekToFirst();
        } else {
            iterator.seekForPrev(key);
            if (iterator.isValid()) {
                iterator.next();
            } else {
                iterator.seekToFirst();
            }
        }
        iterator.status();
    }

    private boolean isTombstone(byte[] key, long revision) throws RocksDBException {
        byte[] rocksKey = RocksStorageUtils.keyToRocksKey(revision, key);
        byte[] valueBytes = this.data.get(rocksKey);
        return valueBytes == null || RocksStorageUtils.bytesToValue(valueBytes).tombstone();
    }

    private boolean isTombstoneForCompaction(byte[] key, long revision) {
        try {
            return this.isTombstone(key, revision);
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.COMPACTION_ERR, String.format("Error getting key value by revision: [KeyBytes=%s, keyBytesToUtf8String=%s, revision=%s]", Arrays.toString(key), KeyValueStorageUtils.toUtf8String(key), revision), (Throwable)e);
        }
    }

    @Override
    protected long[] keyRevisionsForOperation(byte[] key) {
        try {
            return this.getRevisions(key);
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, "Failed to get revisions for the key: " + KeyValueStorageUtils.toUtf8String(key), (Throwable)e);
        }
    }

    @Override
    @Nullable
    protected Value valueForOperation(byte[] key, long revision) {
        return this.getValueForOperation(key, revision);
    }

    @Override
    protected boolean areWatchesStarted() {
        return this.recoveryStatus.get() == RecoveryStatus.DONE;
    }

    @Nullable
    private Value getValueForOperation(byte[] key, long revision) {
        try {
            byte[] valueBytes = this.data.get(RocksStorageUtils.keyToRocksKey(revision, key));
            if (valueBytes == null) {
                return null;
            }
            return RocksStorageUtils.bytesToValue(valueBytes);
        }
        catch (RocksDBException e) {
            throw new MetaStorageException(ErrorGroups.MetaStorage.OP_EXECUTION_ERR, String.format("Failed to get value: [key=%s, revision=%s]", KeyValueStorageUtils.toUtf8String(key), revision), (Throwable)e);
        }
    }

    private Cursor<Entry> doRange(byte[] keyFrom, byte @Nullable [] keyTo, final long revUpperBound) {
        ReadOperationForCompactionTracker.TrackingToken token;
        assert (revUpperBound >= 0L) : revUpperBound;
        final ReadOptions readOpts = new ReadOptions();
        final Slice upperBound = keyTo == null ? null : new Slice(keyTo);
        readOpts.setIterateUpperBound(upperBound);
        RocksIterator iterator = this.index.newIterator(readOpts);
        iterator.seek(keyFrom);
        try {
            token = this.readOperationForCompactionTracker.track(revUpperBound, this::revision, this::getCompactionRevision);
        }
        catch (Throwable e) {
            RocksUtils.closeAll(iterator, upperBound, readOpts);
            throw e;
        }
        return new RocksIteratorAdapter<Entry>(iterator){
            @Nullable
            private Entry next;

            @Override
            public boolean hasNext() {
                if (this.next != null) {
                    return true;
                }
                while (this.next == null && super.hasNext()) {
                    Entry nextCandidate = this.decodeEntry(this.it.key(), this.it.value());
                    this.it.next();
                    if (nextCandidate.empty()) continue;
                    this.next = nextCandidate;
                    return true;
                }
                return false;
            }

            @Override
            public Entry next() {
                if (!this.hasNext()) {
                    throw new NoSuchElementException();
                }
                Entry result = this.next;
                assert (result != null);
                this.next = null;
                return result;
            }

            @Override
            protected Entry decodeEntry(byte[] key, byte[] keyRevisionsBytes) {
                long[] keyRevisions = RocksStorageUtils.getAsLongs(keyRevisionsBytes);
                int maxRevisionIndex = KeyValueStorageUtils.maxRevisionIndex(keyRevisions, revUpperBound);
                if (maxRevisionIndex == -1) {
                    return EntryImpl.empty(key);
                }
                long revision = keyRevisions[maxRevisionIndex];
                Value value = RocksDbKeyValueStorage.this.getValueForOperation(key, revision);
                if (value == null || value.tombstone()) {
                    return EntryImpl.empty(key);
                }
                return EntryImpl.toEntry(key, revision, value);
            }

            @Override
            public void close() {
                token.close();
                super.close();
                RocksUtils.closeAll(readOpts, upperBound);
            }
        };
    }

    private void addIndexAndTermToWriteBatch(WriteBatch batch, KeyValueUpdateContext context) throws RocksDBException {
        this.data.put(batch, INDEX_AND_TERM_KEY, RocksStorageUtils.longsToBytes(0, context.index, context.term));
    }

    @Override
    public CompletableFuture<Void> flush() {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> this.flusher.awaitFlush(true));
    }

    static {
        RocksDB.loadLibrary();
    }

    private static enum RecoveryStatus {
        INITIAL,
        IN_PROGRESS,
        DONE;

    }

    private static class UpdatedEntries {
        private final List<Entry> updatedEntries;
        @Nullable
        private HybridTimestamp ts;

        private UpdatedEntries() {
            this.updatedEntries = new ArrayList<Entry>();
        }

        private UpdatedEntries(List<Entry> updatedEntries, HybridTimestamp ts) {
            this.updatedEntries = updatedEntries;
            this.ts = Objects.requireNonNull(ts);
        }

        boolean isEmpty() {
            return this.updatedEntries.isEmpty();
        }

        void add(Entry entry) {
            this.updatedEntries.add(entry);
        }

        void clear() {
            this.updatedEntries.clear();
            this.ts = null;
        }

        UpdatedEntries transfer() {
            assert (this.ts != null);
            UpdatedEntries transferredValue = new UpdatedEntries(new ArrayList<Entry>(this.updatedEntries), this.ts);
            this.clear();
            return transferredValue;
        }

        NotifyWatchProcessorEvent toNotifyWatchProcessorEvent(long newRevision) {
            UpdatedEntries copy = this.transfer();
            return copy.updatedEntries.isEmpty() ? new UpdateOnlyRevisionEvent(newRevision, copy.ts) : new UpdateEntriesEvent(copy.updatedEntries, copy.ts);
        }
    }
}

