/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.secondarystoragebridge.rocksdb;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite.internal.secondarystoragebridge.SecondaryStorageBridge;
import org.apache.ignite.internal.secondarystoragebridge.SecondaryStorageBridgeException;
import org.apache.ignite.internal.secondarystoragebridge.UpdatesStorage;
import org.apache.ignite.internal.secondarystoragebridge.rocksdb.RocksDbUpdatesStorage;
import org.apache.ignite.internal.secondarystoragebridge.rocksdb.UpdatesStorageColumnFamily;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.gridgain.lang.GridgainErrorGroups;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

public class RocksDbSecondaryStorageBridge
implements SecondaryStorageBridge {
    private static final IgniteLogger LOG = Loggers.forClass(RocksDbSecondaryStorageBridge.class);
    private static final int FLUSH_DELAY = 100;
    private final ConcurrentMap<TablePartitionId, RocksDbUpdatesStorage> updatesStorages = new ConcurrentHashMap<TablePartitionId, RocksDbUpdatesStorage>();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean stopGuard = new AtomicBoolean();
    private final Path storagePath;
    private final List<AutoCloseable> resources = new CopyOnWriteArrayList<AutoCloseable>();
    private final ExecutorService rocksDbFlusherThreadPool;
    private final ScheduledExecutorService rocksDbFlusherScheduledPool;
    private final RocksDbFlusher flusher;
    private volatile ColumnFamily dataCf;
    private volatile ColumnFamily committedCf;
    private volatile ColumnFamily ongoingCf;
    private volatile ColumnFamily metaCf;

    public RocksDbSecondaryStorageBridge(String nodeName, Path storagePath, LogSyncer logSyncer, FailureProcessor failureProcessor) {
        this.storagePath = storagePath;
        this.rocksDbFlusherThreadPool = Executors.newCachedThreadPool((ThreadFactory)IgniteThreadFactory.create((String)nodeName, (String)"secondary-bridge-flusher-executor", (IgniteLogger)LOG, (ThreadOperation[])new ThreadOperation[0]));
        this.rocksDbFlusherScheduledPool = Executors.newSingleThreadScheduledExecutor((ThreadFactory)IgniteThreadFactory.create((String)nodeName, (String)"secondary-bridge-flusher-schedule", (IgniteLogger)LOG, (ThreadOperation[])new ThreadOperation[0]));
        this.flusher = new RocksDbFlusher("secondary storage bridge", nodeName, this.busyLock, this.rocksDbFlusherScheduledPool, (Executor)this.rocksDbFlusherThreadPool, () -> 100, logSyncer, failureProcessor, this::onFlushCompleted);
    }

    ColumnFamily metaColumnFamily() {
        return this.metaCf;
    }

    ColumnFamily dataColumnFamily() {
        return this.dataCf;
    }

    ColumnFamily committedTransactionsColumnFamily() {
        return this.committedCf;
    }

    ColumnFamily ongoingTransactionsColumnFamily() {
        return this.ongoingCf;
    }

    @Override
    public void start() throws SecondaryStorageBridgeException {
        try {
            Files.createDirectories(this.storagePath, new FileAttribute[0]);
        }
        catch (IOException e) {
            throw new SecondaryStorageBridgeException(GridgainErrorGroups.SecondaryStorage.SECONDARY_STORAGE_BRIDGE_STARTUP_ERR, "Failed to create directory: " + this.storagePath, e);
        }
        List<ColumnFamilyDescriptor> cfDescriptors = RocksDbSecondaryStorageBridge.getExistingCfDescriptors();
        ArrayList cfHandles = new ArrayList(cfDescriptors.size());
        DBOptions dbOptions = new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true).setAtomicFlush(true).setListeners(List.of(this.flusher.listener()));
        this.resources.add((AutoCloseable)dbOptions);
        ColumnFamily dataCf = null;
        ColumnFamily committedCf = null;
        ColumnFamily ongoingCf = null;
        ColumnFamily metaCf = null;
        try {
            RocksDB db = RocksDB.open((DBOptions)dbOptions, (String)this.storagePath.toAbsolutePath().toString(), cfDescriptors, cfHandles);
            this.resources.add((AutoCloseable)db);
            block12: for (ColumnFamilyHandle cfHandle : cfHandles) {
                ColumnFamily cf = ColumnFamily.wrap((RocksDB)db, (ColumnFamilyHandle)cfHandle);
                switch (UpdatesStorageColumnFamily.ColumnFamilyType.fromCfName(cf.name())) {
                    case META: {
                        metaCf = cf;
                        continue block12;
                    }
                    case DATA: {
                        dataCf = cf;
                        continue block12;
                    }
                    case COMMITTED: {
                        committedCf = cf;
                        continue block12;
                    }
                    case ONGOING: {
                        ongoingCf = cf;
                        continue block12;
                    }
                }
                throw new IllegalArgumentException("Unidentified column family [name=" + cf.name() + "]");
            }
            this.flusher.init(db, cfHandles);
        }
        catch (RocksDBException ex) {
            try {
                this.close();
            }
            catch (Exception closeException) {
                ex.addSuppressed((Throwable)closeException);
            }
            throw new SecondaryStorageBridgeException(GridgainErrorGroups.SecondaryStorage.SECONDARY_STORAGE_BRIDGE_STARTUP_ERR, "Failed to start rocksdb", ex);
        }
        assert (metaCf != null);
        assert (dataCf != null);
        assert (committedCf != null);
        assert (ongoingCf != null);
        this.dataCf = dataCf;
        this.committedCf = committedCf;
        this.ongoingCf = ongoingCf;
        this.metaCf = metaCf;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public UpdatesStorage getOrCreateUpdatesStorage(int tableId, int partitionId) {
        if (!this.busyLock.enterBusy()) {
            throw new SecondaryStorageBridgeException(GridgainErrorGroups.SecondaryStorage.SECONDARY_STORAGE_BRIDGE_STARTUP_ERR, "Failed to access updates storage, component is stopped", null);
        }
        try {
            TablePartitionId key = new TablePartitionId(tableId, partitionId);
            UpdatesStorage updatesStorage = this.updatesStorages.computeIfAbsent(key, unused -> new RocksDbUpdatesStorage(this, tableId, partitionId));
            return updatesStorage;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    @Override
    public CompletableFuture<Void> flush() {
        return this.flusher.awaitFlush(true);
    }

    private static List<String> getExistingCfNames() {
        return List.of(UpdatesStorageColumnFamily.META_CF_NAME, "cf-data", "cf-committed", "cf-ongoing");
    }

    private static List<ColumnFamilyDescriptor> getExistingCfDescriptors() {
        return RocksDbSecondaryStorageBridge.getExistingCfNames().stream().map(RocksDbSecondaryStorageBridge::cfDescriptorFromName).collect(Collectors.toList());
    }

    private static ColumnFamilyDescriptor cfDescriptorFromName(String cfName) {
        int prefixSize;
        switch (UpdatesStorageColumnFamily.ColumnFamilyType.fromCfName(cfName)) {
            case META: {
                prefixSize = 6;
                break;
            }
            case DATA: {
                prefixSize = 22;
                break;
            }
            case COMMITTED: {
                prefixSize = 6;
                break;
            }
            case ONGOING: {
                prefixSize = 6;
                break;
            }
            default: {
                throw new IllegalArgumentException("Unidentified column family [name=" + cfName + "]");
            }
        }
        return new ColumnFamilyDescriptor(cfName.getBytes(StandardCharsets.UTF_8), new ColumnFamilyOptions().useFixedLengthPrefixExtractor(prefixSize));
    }

    private void onFlushCompleted() {
        if (!this.busyLock.enterBusy()) {
            return;
        }
        try {
            this.updatesStorages.values().forEach(RocksDbUpdatesStorage::refreshPersistedIndex);
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    public void close() throws Exception {
        if (!this.stopGuard.compareAndSet(false, true)) {
            return;
        }
        this.busyLock.block();
        AutoCloseable[] autoCloseableArray = new AutoCloseable[3];
        autoCloseableArray[0] = () -> ((RocksDbFlusher)this.flusher).stop();
        autoCloseableArray[1] = () -> IgniteUtils.shutdownAndAwaitTermination((ExecutorService)this.rocksDbFlusherScheduledPool, (long)10L, (TimeUnit)TimeUnit.SECONDS);
        autoCloseableArray[2] = () -> IgniteUtils.shutdownAndAwaitTermination((ExecutorService)this.rocksDbFlusherThreadPool, (long)10L, (TimeUnit)TimeUnit.SECONDS);
        Stream<AutoCloseable> closeables = Stream.concat(this.updatesStorages.values().stream(), Stream.of(autoCloseableArray));
        Collections.reverse(this.resources);
        IgniteUtils.closeAll(Stream.concat(closeables, this.resources.stream()));
    }
}

