/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.raft.jraft.storage.impl;

import com.codahale.metrics.Metric;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.storage.TermCache;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
import org.apache.ignite.raft.jraft.core.NodeMetrics;
import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.RaftOutter;
import org.apache.ignite.raft.jraft.error.LogEntryCorruptedException;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.error.RaftException;
import org.apache.ignite.raft.jraft.option.LogManagerOptions;
import org.apache.ignite.raft.jraft.option.LogStorageOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.LogManager;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.apache.ignite.raft.jraft.util.ArrayDeque;
import org.apache.ignite.raft.jraft.util.Describer;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.SegmentList;
import org.apache.ignite.raft.jraft.util.Utils;

public class LogManagerImpl
implements LogManager {
    private static final IgniteLogger LOG = Loggers.forClass(LogManagerImpl.class);
    private NodeId nodeId;
    private LogStorage logStorage;
    private ConfigurationManager configManager;
    private FSMCaller fsmCaller;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock writeLock = this.lock.writeLock();
    private final Lock readLock = this.lock.readLock();
    private volatile boolean stopped;
    protected volatile boolean hasError;
    private long nextWaitId;
    private LogId diskId = new LogId(0L, 0L);
    private LogId appliedId = new LogId(0L, 0L);
    private final SegmentList<LogEntry> logsInMemory = new SegmentList(true);
    private final TermCache termCache = new TermCache(8);
    private volatile long firstLogIndex;
    private volatile long lastLogIndex;
    private volatile LogId lastSnapshotId = new LogId(0L, 0L);
    private final Map<Long, WaitMeta> waitMap = new HashMap<Long, WaitMeta>();
    private StripedDisruptor<StableClosureEvent> disruptor;
    private RingBuffer<StableClosureEvent> diskQueue;
    private RaftOptions raftOptions;
    private volatile CountDownLatch shutDownLatch;
    private NodeMetrics nodeMetrics;
    private final CopyOnWriteArrayList<LogManager.LastLogIndexListener> lastLogIndexListeners = new CopyOnWriteArrayList();
    private NodeOptions nodeOptions;

    @Override
    public void addLastLogIndexListener(LogManager.LastLogIndexListener listener) {
        this.lastLogIndexListeners.add(listener);
    }

    @Override
    public void removeLastLogIndexListener(LogManager.LastLogIndexListener listener) {
        this.lastLogIndexListeners.remove(listener);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean init(LogManagerOptions opts) {
        this.writeLock.lock();
        try {
            if (opts.getLogStorage() == null) {
                LOG.error("Fail to init log manager, log storage is null", new Object[0]);
                boolean bl = false;
                return bl;
            }
            this.raftOptions = opts.getRaftOptions();
            this.nodeMetrics = opts.getNodeMetrics();
            this.logStorage = opts.getLogStorage();
            this.configManager = opts.getConfigurationManager();
            this.nodeOptions = opts.getNode().getOptions();
            this.nodeId = opts.getNode().getNodeId();
            LogStorageOptions lsOpts = new LogStorageOptions();
            lsOpts.setConfigurationManager(this.configManager);
            lsOpts.setLogEntryCodecFactory(opts.getLogEntryCodecFactory());
            if (!this.logStorage.init(lsOpts)) {
                LOG.error("Fail to init logStorage", new Object[0]);
                boolean bl = false;
                return bl;
            }
            this.firstLogIndex = this.logStorage.getFirstLogIndex();
            this.lastLogIndex = this.logStorage.getLastLogIndex();
            this.diskId = new LogId(this.lastLogIndex, this.getTermFromLogStorage(this.lastLogIndex));
            this.fsmCaller = opts.getFsmCaller();
            this.disruptor = opts.getLogManagerDisruptor();
            this.diskQueue = this.disruptor.subscribe(this.nodeId, new StableClosureEventHandler(), (event, ex) -> this.reportError(-1, "LogManager handle event error", new Object[0]));
            if (this.nodeMetrics.getMetricRegistry() != null) {
                this.nodeMetrics.getMetricRegistry().register("jraft-log-manager-disruptor", (Metric)new DisruptorMetricSet(this.diskQueue));
            }
        }
        finally {
            this.writeLock.unlock();
        }
        return true;
    }

    @Override
    public boolean hasAvailableCapacityToAppendEntries(int requiredCapacity) {
        if (this.stopped) {
            return false;
        }
        return this.diskQueue.hasAvailableCapacity(requiredCapacity);
    }

    private void stopDiskThread() {
        if (this.diskQueue == null) {
            return;
        }
        this.shutDownLatch = new CountDownLatch(1);
        Utils.runInThread(this.nodeOptions.getCommonExecutor(), () -> this.diskQueue.publishEvent((event, sequence) -> {
            event.reset();
            event.nodeId = this.nodeId;
            event.type = EventType.SHUTDOWN;
        }));
    }

    @Override
    public void join() throws InterruptedException {
        if (this.shutDownLatch == null) {
            return;
        }
        this.shutDownLatch.await();
        this.disruptor.unsubscribe(this.nodeId);
    }

    @Override
    public void shutdown() {
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            if (this.stopped) {
                return;
            }
            this.stopped = true;
            doUnlock = false;
            this.wakeupAllWaiter(this.writeLock);
        }
        finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
        this.stopDiskThread();
    }

    private void clearMemoryLogs(LogId id) {
        this.writeLock.lock();
        try {
            this.logsInMemory.removeFromFirstWhen(entry -> entry.getId().compareTo(id) <= 0);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void appendEntries(List<LogEntry> entries, LogManager.StableClosure done) {
        assert (done != null);
        Requires.requireNonNull(done, "done");
        if (this.hasError) {
            entries.clear();
            Utils.runClosureInThread(this.nodeOptions.getCommonExecutor(), done, new Status(RaftError.EIO, "Corrupted LogStorage", new Object[0]));
            return;
        }
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            if (!entries.isEmpty() && !this.checkAndResolveConflict(entries, done, this.writeLock)) {
                entries.clear();
                return;
            }
            for (int i = 0; i < entries.size(); ++i) {
                LogEntry entry = entries.get(i);
                if (this.raftOptions.isEnableLogEntryChecksum()) {
                    entry.setChecksum(entry.checksum());
                }
                if (entry.getType() != EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) continue;
                Configuration oldConf = new Configuration();
                if (entry.getOldPeers() != null) {
                    oldConf = new Configuration(entry.getOldPeers(), entry.getOldLearners());
                }
                ConfigurationEntry conf = new ConfigurationEntry(entry.getId(), new Configuration(entry.getPeers(), entry.getLearners()), oldConf);
                this.configManager.add(conf);
            }
            if (!entries.isEmpty()) {
                done.setFirstLogIndex(entries.get(0).getId().getIndex());
                this.logsInMemory.addAll(entries);
                for (LogEntry entry : entries) {
                    this.termCache.append(entry.getId());
                }
            }
            done.setEntries(entries);
            doUnlock = false;
            if (!this.wakeupAllWaiter(this.writeLock)) {
                this.notifyLastLogIndexListeners();
            }
            this.diskQueue.publishEvent((event, sequence) -> {
                event.reset();
                event.nodeId = this.nodeId;
                event.type = EventType.OTHER;
                event.done = done;
            });
        }
        finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
    }

    private void offerEvent(LogManager.StableClosure done, EventType type) {
        assert (done != null);
        if (this.stopped) {
            Utils.runClosureInThread(this.nodeOptions.getCommonExecutor(), done, new Status(RaftError.ESTOP, "Log manager is stopped.", new Object[0]));
            return;
        }
        this.diskQueue.publishEvent((event, sequence) -> {
            event.reset();
            event.nodeId = this.nodeId;
            event.type = type;
            event.done = done;
        });
    }

    private void notifyLastLogIndexListeners() {
        for (LogManager.LastLogIndexListener listener : this.lastLogIndexListeners) {
            if (listener == null) continue;
            try {
                listener.onLastLogIndexChanged(this.lastLogIndex);
            }
            catch (Exception e) {
                LOG.error("Fail to notify LastLogIndexListener, listener={}, index={}", new Object[]{listener, this.lastLogIndex});
            }
        }
    }

    private boolean wakeupAllWaiter(Lock lock) {
        if (this.waitMap.isEmpty()) {
            lock.unlock();
            return false;
        }
        ArrayList<WaitMeta> wms = new ArrayList<WaitMeta>(this.waitMap.values());
        int errCode = this.stopped ? RaftError.ESTOP.getNumber() : RaftError.SUCCESS.getNumber();
        this.waitMap.clear();
        lock.unlock();
        int waiterCount = wms.size();
        for (int i = 0; i < waiterCount; ++i) {
            WaitMeta wm = (WaitMeta)wms.get(i);
            wm.errorCode = errCode;
            Utils.runInThread(this.nodeOptions.getCommonExecutor(), () -> this.runOnNewLog(wm));
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected LogId appendToStorage(List<LogEntry> toAppend) {
        LogId lastId = null;
        if (!this.hasError) {
            long startMs = Utils.monotonicMs();
            int entriesCount = toAppend.size();
            this.nodeMetrics.recordSize("append-logs-count", entriesCount);
            try {
                int writtenSize = 0;
                for (int i = 0; i < entriesCount; ++i) {
                    LogEntry entry = toAppend.get(i);
                    writtenSize += entry.getData() != null ? entry.getData().remaining() : 0;
                }
                this.nodeMetrics.recordSize("append-logs-bytes", writtenSize);
                int nAppent = this.appendToLogStorage(toAppend);
                if (nAppent != entriesCount) {
                    LOG.error("**Critical error**, fail to appendEntries, nAppent={}, toAppend={}", new Object[]{nAppent, toAppend.size()});
                    this.reportError(RaftError.EIO.getNumber(), "Fail to append log entries", new Object[0]);
                }
                if (nAppent > 0) {
                    lastId = toAppend.get(nAppent - 1).getId();
                }
            }
            finally {
                this.nodeMetrics.recordLatency("append-logs", Utils.monotonicMs() - startMs);
            }
        }
        return lastId;
    }

    protected int appendToLogStorage(List<LogEntry> toAppend) {
        return this.logStorage.appendEntries(toAppend);
    }

    protected AppendBatcher newAppendBatcher(List<LogManager.StableClosure> storages, int cap, LogId diskId) {
        return new AppendBatcher(storages, cap, new ArrayList<LogEntry>(), diskId);
    }

    protected void reportError(int code, String fmt, Object ... args) {
        this.hasError = true;
        RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_LOG);
        error.setStatus(new Status(code, fmt, args));
        this.fsmCaller.onError(error);
    }

    protected void setDiskId(LogId id) {
        LogId clearId;
        if (id == null) {
            return;
        }
        this.writeLock.lock();
        try {
            if (id.compareTo(this.diskId) < 0) {
                return;
            }
            this.diskId = id;
            clearId = this.diskId.compareTo(this.appliedId) <= 0 ? this.diskId : this.appliedId;
        }
        finally {
            this.writeLock.unlock();
        }
        if (clearId != null) {
            this.clearMemoryLogs(clearId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setSnapshot(RaftOutter.SnapshotMeta meta, boolean useLastSnapshotIndex) {
        LOG.debug("set snapshot: {}.", new Object[]{meta});
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            if (meta.lastIncludedIndex() <= this.lastSnapshotId.getIndex()) {
                return;
            }
            Configuration conf = this.confFromMeta(meta);
            Configuration oldConf = this.oldConfFromMeta(meta);
            ConfigurationEntry entry = new ConfigurationEntry(new LogId(meta.lastIncludedIndex(), meta.lastIncludedTerm()), conf, oldConf);
            this.configManager.setSnapshot(entry);
            long term = this.unsafeGetTerm(meta.lastIncludedIndex());
            long savedLastSnapshotIndex = this.lastSnapshotId.getIndex();
            this.lastSnapshotId.setIndex(meta.lastIncludedIndex());
            this.lastSnapshotId.setTerm(meta.lastIncludedTerm());
            if (this.lastSnapshotId.compareTo(this.appliedId) > 0) {
                this.appliedId = this.lastSnapshotId.copy();
            }
            if (useLastSnapshotIndex || term == 0L) {
                doUnlock = false;
                this.truncatePrefix(meta.lastIncludedIndex() + 1L, this.writeLock);
            } else if (term == meta.lastIncludedTerm()) {
                if (savedLastSnapshotIndex > 0L) {
                    doUnlock = false;
                    this.truncatePrefix(savedLastSnapshotIndex + 1L, this.writeLock);
                }
            } else if (!this.reset(meta.lastIncludedIndex() + 1L)) {
                LOG.warn("Reset log manager failed, nextLogIndex={}.", new Object[]{meta.lastIncludedIndex() + 1L});
            }
        }
        finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
    }

    private Configuration oldConfFromMeta(RaftOutter.SnapshotMeta meta) {
        PeerId peer;
        Configuration oldConf = new Configuration();
        if (meta.oldPeersList() != null) {
            for (String oldPeer : meta.oldPeersList()) {
                peer = new PeerId();
                peer.parse(oldPeer);
                oldConf.addPeer(peer);
            }
        }
        if (meta.oldLearnersList() != null) {
            for (String oldLearner : meta.oldLearnersList()) {
                peer = new PeerId();
                peer.parse(oldLearner);
                oldConf.addLearner(peer);
            }
        }
        return oldConf;
    }

    private Configuration confFromMeta(RaftOutter.SnapshotMeta meta) {
        PeerId peer;
        Configuration conf = new Configuration();
        if (meta.peersList() != null) {
            for (String metaPeer : meta.peersList()) {
                peer = new PeerId();
                peer.parse(metaPeer);
                conf.addPeer(peer);
            }
        }
        if (meta.learnersList() != null) {
            for (String learner : meta.learnersList()) {
                peer = new PeerId();
                peer.parse(learner);
                conf.addLearner(peer);
            }
        }
        return conf;
    }

    @Override
    public void clearBufferedLogs() {
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            if (this.lastSnapshotId.getIndex() != 0L) {
                doUnlock = false;
                this.truncatePrefix(this.lastSnapshotId.getIndex() + 1L, this.writeLock);
            }
        }
        finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
    }

    private String descLogsInMemory() {
        StringBuilder sb = new StringBuilder();
        boolean wasFirst = true;
        for (int i = 0; i < this.logsInMemory.size(); ++i) {
            LogEntry logEntry = this.logsInMemory.get(i);
            if (!wasFirst) {
                sb.append(",");
            } else {
                wasFirst = false;
            }
            sb.append("<id:(").append(logEntry.getId().getTerm()).append(",").append(logEntry.getId().getIndex()).append("),type:").append((Object)logEntry.getType()).append(">");
        }
        return sb.toString();
    }

    protected LogEntry getEntryFromMemory(long index) {
        LogEntry entry = null;
        if (!this.logsInMemory.isEmpty()) {
            long firstIndex = this.logsInMemory.peekFirst().getId().getIndex();
            long lastIndex = this.logsInMemory.peekLast().getId().getIndex();
            if (lastIndex - firstIndex + 1L != (long)this.logsInMemory.size()) {
                throw new IllegalStateException(String.format("lastIndex=%d,firstIndex=%d,logsInMemory=[%s]", lastIndex, firstIndex, this.descLogsInMemory()));
            }
            if (index >= firstIndex && index <= lastIndex) {
                entry = this.logsInMemory.get((int)(index - firstIndex));
            }
        }
        return entry;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public LogEntry getEntry(long index) {
        LogEntry entry;
        this.readLock.lock();
        try {
            if (index > this.lastLogIndex || index < this.firstLogIndex) {
                LogEntry logEntry = null;
                return logEntry;
            }
            entry = this.getEntryFromMemory(index);
            if (entry != null) {
                LogEntry logEntry = entry;
                return logEntry;
            }
        }
        finally {
            this.readLock.unlock();
        }
        entry = this.logStorage.getEntry(index);
        if (entry == null) {
            this.reportError(RaftError.EIO.getNumber(), "Corrupted entry at index=%d, not found", index);
        }
        if (entry != null && this.raftOptions.isEnableLogEntryChecksum() && entry.isCorrupted()) {
            String msg = String.format("Corrupted entry at index=%d, term=%d, expectedChecksum=%d, realChecksum=%d", index, entry.getId().getTerm(), entry.getChecksum(), entry.checksum());
            this.reportError(RaftError.EIO.getNumber(), msg, new Object[0]);
            throw new LogEntryCorruptedException(msg);
        }
        return entry;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long getTerm(long index) {
        if (index == 0L) {
            return 0L;
        }
        this.readLock.lock();
        try {
            if (index == this.lastSnapshotId.getIndex()) {
                long l = this.lastSnapshotId.getTerm();
                return l;
            }
            if (index > this.lastLogIndex || index < this.firstLogIndex) {
                long l = 0L;
                return l;
            }
            long term = this.termCache.lookup(index);
            if (term != -1L) {
                long l = term;
                return l;
            }
        }
        finally {
            this.readLock.unlock();
        }
        return this.getTermFromLogStorage(index);
    }

    private long getTermFromLogStorage(long index) {
        LogEntry entry = this.logStorage.getEntry(index);
        if (entry != null) {
            if (this.raftOptions.isEnableLogEntryChecksum() && entry.isCorrupted()) {
                String msg = String.format("The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d", entry.getId().getIndex(), entry.getId().getTerm(), entry.getChecksum(), entry.checksum());
                this.reportError(RaftError.EIO.getNumber(), msg, new Object[0]);
                throw new LogEntryCorruptedException(msg);
            }
            return entry.getId().getTerm();
        }
        return 0L;
    }

    @Override
    public long getFirstLogIndex() {
        this.readLock.lock();
        try {
            long l = this.firstLogIndex;
            return l;
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    public long getLastLogIndex() {
        return this.getLastLogIndex(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long getLastLogIndex(boolean isFlush) {
        LastLogIdClosure c;
        this.readLock.lock();
        try {
            if (!isFlush) {
                long l = this.lastLogIndex;
                return l;
            }
            if (this.lastLogIndex == this.lastSnapshotId.getIndex()) {
                long l = this.lastLogIndex;
                return l;
            }
            c = new LastLogIdClosure();
        }
        finally {
            this.readLock.unlock();
        }
        this.offerEvent(c, EventType.LAST_LOG_ID);
        try {
            c.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
        if (c.lastLogId == null) {
            assert (this.stopped) : "Last log id can be null only when node is stopping.";
            throw new IllegalStateException("Node is shutting down");
        }
        return c.lastLogId.getIndex();
    }

    private long unsafeGetTerm(long index) {
        if (index == 0L) {
            return 0L;
        }
        LogId lss = this.lastSnapshotId;
        if (index == lss.getIndex()) {
            return lss.getTerm();
        }
        if (index > this.lastLogIndex || index < this.firstLogIndex) {
            return 0L;
        }
        long term = this.termCache.lookup(index);
        if (term != -1L) {
            return term;
        }
        return this.getTermFromLogStorage(index);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public LogId getLastLogId(boolean isFlush) {
        LastLogIdClosure c;
        this.readLock.lock();
        try {
            if (!isFlush) {
                if (this.lastLogIndex >= this.firstLogIndex) {
                    LogId logId = new LogId(this.lastLogIndex, this.unsafeGetTerm(this.lastLogIndex));
                    return logId;
                }
                LogId logId = this.lastSnapshotId;
                return logId;
            }
            if (this.lastLogIndex == this.lastSnapshotId.getIndex()) {
                LogId logId = this.lastSnapshotId;
                return logId;
            }
            c = new LastLogIdClosure();
        }
        finally {
            this.readLock.unlock();
        }
        this.offerEvent(c, EventType.LAST_LOG_ID);
        try {
            c.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
        if (c.lastLogId == null) {
            assert (this.stopped) : "Last log id can be null only when node is stopping.";
            throw new IllegalStateException("Node is shutting down");
        }
        return c.lastLogId;
    }

    private boolean truncatePrefix(long firstIndexKept, Lock lock) {
        this.logsInMemory.removeFromFirstWhen(entry -> entry.getId().getIndex() < firstIndexKept);
        Requires.requireTrue(firstIndexKept >= this.firstLogIndex, "Try to truncate logs before %d, but the firstLogIndex is %d", firstIndexKept, this.firstLogIndex);
        this.firstLogIndex = firstIndexKept;
        if (firstIndexKept > this.lastLogIndex) {
            this.lastLogIndex = firstIndexKept - 1L;
        }
        LOG.debug("Truncate prefix, firstIndexKept is :{}", new Object[]{firstIndexKept});
        this.configManager.truncatePrefix(firstIndexKept);
        lock.unlock();
        TruncatePrefixClosure c = new TruncatePrefixClosure(firstIndexKept);
        this.offerEvent(c, EventType.TRUNCATE_PREFIX);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean reset(long nextLogIndex) {
        this.writeLock.lock();
        try {
            this.logsInMemory.clear();
            this.termCache.reset();
            this.firstLogIndex = nextLogIndex;
            this.lastLogIndex = nextLogIndex - 1L;
            this.configManager.truncatePrefix(this.firstLogIndex);
            this.configManager.truncateSuffix(this.lastLogIndex);
            boolean bl = true;
            return bl;
        }
        finally {
            this.writeLock.unlock();
            ResetClosure c = new ResetClosure(nextLogIndex);
            this.offerEvent(c, EventType.RESET);
        }
    }

    private void unsafeTruncateSuffix(long lastIndexKept, Lock lock) {
        if (lastIndexKept < this.appliedId.getIndex()) {
            LOG.error("FATAL ERROR: Can't truncate logs before appliedId={}, lastIndexKept={}", new Object[]{this.appliedId, lastIndexKept});
            return;
        }
        this.logsInMemory.removeFromLastWhen(entry -> entry.getId().getIndex() > lastIndexKept);
        this.termCache.truncateTail(lastIndexKept + 1L);
        this.lastLogIndex = lastIndexKept;
        long lastTermKept = this.unsafeGetTerm(lastIndexKept);
        Requires.requireTrue(this.lastLogIndex == 0L || lastTermKept != 0L);
        LOG.debug("Truncate suffix :{}", new Object[]{lastIndexKept});
        this.configManager.truncateSuffix(lastIndexKept);
        lock.unlock();
        TruncateSuffixClosure c = new TruncateSuffixClosure(lastIndexKept, lastTermKept);
        this.offerEvent(c, EventType.TRUNCATE_SUFFIX);
        lock.lock();
    }

    private boolean checkAndResolveConflict(List<LogEntry> entries, LogManager.StableClosure done, Lock lock) {
        LogEntry firstLogEntry = ArrayDeque.peekFirst(entries);
        if (firstLogEntry.getId().getIndex() == 0L) {
            for (int i = 0; i < entries.size(); ++i) {
                entries.get(i).getId().setIndex(++this.lastLogIndex);
            }
            return true;
        }
        if (firstLogEntry.getId().getIndex() > this.lastLogIndex + 1L) {
            Utils.runClosureInThread(this.nodeOptions.getCommonExecutor(), done, new Status(RaftError.EINVAL, "There's gap between first_index=%d and last_log_index=%d", firstLogEntry.getId().getIndex(), this.lastLogIndex));
            return false;
        }
        long appliedIndex = this.appliedId.getIndex();
        LogEntry lastLogEntry = ArrayDeque.peekLast(entries);
        if (lastLogEntry.getId().getIndex() <= appliedIndex) {
            LOG.warn("Received entries of which the lastLog={} is not greater than appliedIndex={}, return immediately with nothing changed.", new Object[]{lastLogEntry.getId().getIndex(), appliedIndex});
            Utils.runClosureInThread(this.nodeOptions.getCommonExecutor(), done);
            return false;
        }
        if (firstLogEntry.getId().getIndex() == this.lastLogIndex + 1L) {
            this.lastLogIndex = lastLogEntry.getId().getIndex();
        } else {
            int conflictingIndex;
            for (conflictingIndex = 0; conflictingIndex < entries.size() && this.unsafeGetTerm(entries.get(conflictingIndex).getId().getIndex()) == entries.get(conflictingIndex).getId().getTerm(); ++conflictingIndex) {
            }
            if (conflictingIndex != entries.size()) {
                if (entries.get(conflictingIndex).getId().getIndex() <= this.lastLogIndex) {
                    this.unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1L, lock);
                }
                this.lastLogIndex = lastLogEntry.getId().getIndex();
            }
            if (conflictingIndex > 0) {
                entries.subList(0, conflictingIndex).clear();
            }
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ConfigurationEntry getConfiguration(long index) {
        this.readLock.lock();
        try {
            ConfigurationEntry configurationEntry = this.configManager.get(index);
            return configurationEntry;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ConfigurationEntry checkAndSetConfiguration(ConfigurationEntry current) {
        if (current == null) {
            return null;
        }
        this.readLock.lock();
        try {
            ConfigurationEntry lastConf = this.configManager.getLastConfiguration();
            if (lastConf != null && !lastConf.isEmpty() && !current.getId().equals(lastConf.getId())) {
                ConfigurationEntry configurationEntry = lastConf;
                return configurationEntry;
            }
        }
        finally {
            this.readLock.unlock();
        }
        return current;
    }

    @Override
    public long wait(long expectedLastLogIndex, LogManager.NewLogCallback cb, Object arg) {
        WaitMeta wm = new WaitMeta(cb, arg, 0);
        return this.notifyOnNewLog(expectedLastLogIndex, wm);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long notifyOnNewLog(long expectedLastLogIndex, WaitMeta wm) {
        this.writeLock.lock();
        try {
            if (expectedLastLogIndex != this.lastLogIndex || this.stopped) {
                wm.errorCode = this.stopped ? RaftError.ESTOP.getNumber() : 0;
                Utils.runInThread(this.nodeOptions.getCommonExecutor(), () -> this.runOnNewLog(wm));
                long l = 0L;
                return l;
            }
            if (this.nextWaitId == 0L) {
                ++this.nextWaitId;
            }
            long waitId = this.nextWaitId++;
            this.waitMap.put(waitId, wm);
            long l = waitId;
            return l;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean removeWaiter(long id) {
        this.writeLock.lock();
        try {
            boolean bl = this.waitMap.remove(id) != null;
            return bl;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    @Override
    public void setAppliedId(LogId appliedId) {
        LogId clearId;
        this.writeLock.lock();
        try {
            if (appliedId.compareTo(this.appliedId) < 0) {
                return;
            }
            this.appliedId = appliedId.copy();
            clearId = this.diskId.compareTo(this.appliedId) <= 0 ? this.diskId : this.appliedId;
        }
        finally {
            this.writeLock.unlock();
        }
        if (clearId != null) {
            this.clearMemoryLogs(clearId);
        }
    }

    void runOnNewLog(WaitMeta wm) {
        wm.onNewLog.onNewLog(wm.arg, wm.errorCode);
    }

    @Override
    public Status checkConsistency() {
        this.readLock.lock();
        try {
            Requires.requireTrue(this.firstLogIndex > 0L);
            Requires.requireTrue(this.lastLogIndex >= 0L);
            if (this.lastSnapshotId.equals(new LogId(0L, 0L))) {
                if (this.firstLogIndex == 1L) {
                    Status status = Status.OK();
                    return status;
                }
                Status status = new Status(RaftError.EIO, "Missing logs in (0, %d)", this.firstLogIndex);
                return status;
            }
            if (this.lastSnapshotId.getIndex() >= this.firstLogIndex - 1L && this.lastSnapshotId.getIndex() <= this.lastLogIndex) {
                Status status = Status.OK();
                return status;
            }
            Status status = new Status(RaftError.EIO, "There's a gap between snapshot={%d, %d} and log=[%d, %d] ", this.lastSnapshotId.toString(), this.lastSnapshotId.getTerm(), this.firstLogIndex, this.lastLogIndex);
            return status;
        }
        finally {
            this.readLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void describe(Describer.Printer out) {
        String _lastSnapshotId;
        String _appliedId;
        String _diskId;
        long _lastLogIndex;
        long _firstLogIndex;
        this.readLock.lock();
        try {
            _firstLogIndex = this.firstLogIndex;
            _lastLogIndex = this.lastLogIndex;
            _diskId = String.valueOf(this.diskId);
            _appliedId = String.valueOf(this.appliedId);
            _lastSnapshotId = String.valueOf(this.lastSnapshotId);
        }
        finally {
            this.readLock.unlock();
        }
        out.print("  storage: [").print(_firstLogIndex).print(", ").print(_lastLogIndex).println(Character.valueOf(']'));
        out.print("  diskId: ").println(_diskId);
        out.print("  appliedId: ").println(_appliedId);
        out.print("  lastSnapshotId: ").println(_lastSnapshotId);
    }

    private class StableClosureEventHandler
    implements EventHandler<StableClosureEvent> {
        LogId lastId;
        List<LogManager.StableClosure> storage;
        AppendBatcher ab;

        private StableClosureEventHandler() {
            this.lastId = LogManagerImpl.this.diskId;
            this.storage = new ArrayList<LogManager.StableClosure>(256);
            this.ab = LogManagerImpl.this.newAppendBatcher(this.storage, 256, LogManagerImpl.this.diskId);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onEvent(StableClosureEvent event, long sequence, boolean endOfBatch) throws Exception {
            if (event.type == EventType.SHUTDOWN) {
                this.lastId = this.ab.flush();
                LogManagerImpl.this.setDiskId(this.lastId);
                LogManagerImpl.this.shutDownLatch.countDown();
                event.reset();
                return;
            }
            LogManager.StableClosure done = event.done;
            EventType eventType = event.type;
            event.reset();
            if (done.getEntries() != null && !done.getEntries().isEmpty()) {
                this.ab.append(done);
            } else {
                this.lastId = this.ab.flush();
                LogManagerImpl.this.setDiskId(this.lastId);
                boolean ret = true;
                switch (eventType) {
                    case LAST_LOG_ID: {
                        ((LastLogIdClosure)done).setLastLogId(this.lastId.copy());
                        break;
                    }
                    case TRUNCATE_PREFIX: {
                        long startMs = Utils.monotonicMs();
                        try {
                            TruncatePrefixClosure tpc = (TruncatePrefixClosure)done;
                            LOG.info("Truncating log storage prefix [groupId={}, firstIndexKept={}]", new Object[]{LogManagerImpl.this.nodeId.getGroupId(), tpc.firstIndexKept});
                            ret = LogManagerImpl.this.logStorage.truncatePrefix(tpc.firstIndexKept);
                            break;
                        }
                        finally {
                            LogManagerImpl.this.nodeMetrics.recordLatency("truncate-log-prefix", Utils.monotonicMs() - startMs);
                        }
                    }
                    case TRUNCATE_SUFFIX: {
                        long startMs = Utils.monotonicMs();
                        try {
                            TruncateSuffixClosure tsc = (TruncateSuffixClosure)done;
                            LOG.info("Truncating log storage suffix [groupId={}, lastIndexKept={}]", new Object[]{LogManagerImpl.this.nodeId.getGroupId(), tsc.lastIndexKept});
                            ret = LogManagerImpl.this.logStorage.truncateSuffix(tsc.lastIndexKept);
                            if (!ret) break;
                            this.lastId.setIndex(tsc.lastIndexKept);
                            this.lastId.setTerm(tsc.lastTermKept);
                            Requires.requireTrue(this.lastId.getIndex() == 0L || this.lastId.getTerm() != 0L);
                            break;
                        }
                        finally {
                            LogManagerImpl.this.nodeMetrics.recordLatency("truncate-log-suffix", Utils.monotonicMs() - startMs);
                        }
                    }
                    case RESET: {
                        ResetClosure rc = (ResetClosure)done;
                        LOG.info("Resetting log storage [groupId={}, nextLogIndex={}]", new Object[]{LogManagerImpl.this.nodeId.getGroupId(), rc.nextLogIndex});
                        ret = LogManagerImpl.this.logStorage.reset(rc.nextLogIndex);
                        break;
                    }
                }
                if (!ret) {
                    LogManagerImpl.this.reportError(RaftError.EIO.getNumber(), "Failed operation in LogStorage", new Object[0]);
                } else {
                    done.run(Status.OK());
                }
            }
            if (endOfBatch) {
                this.lastId = this.ab.flush();
                LogManagerImpl.this.setDiskId(this.lastId);
            }
        }
    }

    private static enum EventType {
        OTHER,
        RESET,
        TRUNCATE_PREFIX,
        TRUNCATE_SUFFIX,
        SHUTDOWN,
        LAST_LOG_ID;

    }

    private static class WaitMeta {
        LogManager.NewLogCallback onNewLog;
        int errorCode;
        Object arg;

        WaitMeta(LogManager.NewLogCallback onNewLog, Object arg, int errorCode) {
            this.onNewLog = onNewLog;
            this.arg = arg;
            this.errorCode = errorCode;
        }
    }

    protected class AppendBatcher {
        protected final List<LogManager.StableClosure> storage;
        protected final int cap;
        protected int size;
        protected int bufferSize;
        protected final List<LogEntry> toAppend;
        protected LogId lastId;

        protected AppendBatcher(List<LogManager.StableClosure> storage, int cap, List<LogEntry> toAppend, LogId lastId) {
            this.storage = storage;
            this.cap = cap;
            this.toAppend = toAppend;
            this.lastId = lastId;
        }

        protected LogId flush() {
            if (this.size > 0) {
                this.lastId = LogManagerImpl.this.appendToStorage(this.toAppend);
                for (int i = 0; i < this.size; ++i) {
                    this.storage.get(i).getEntries().clear();
                    Status st = null;
                    try {
                        st = LogManagerImpl.this.hasError ? new Status(RaftError.EIO, "Corrupted LogStorage", new Object[0]) : Status.OK();
                        this.storage.get(i).run(st);
                        continue;
                    }
                    catch (Throwable t) {
                        LOG.error("Fail to run closure with status: {}.", t, new Object[]{st});
                    }
                }
                this.toAppend.clear();
                this.storage.clear();
            }
            this.size = 0;
            this.bufferSize = 0;
            return this.lastId;
        }

        protected void append(LogManager.StableClosure done) {
            if (this.size == this.cap || this.bufferSize >= LogManagerImpl.this.raftOptions.getMaxAppendBufferSize()) {
                this.flush();
            }
            this.storage.add(done);
            ++this.size;
            this.toAppend.addAll(done.getEntries());
            for (LogEntry entry : done.getEntries()) {
                this.bufferSize += entry.getData() != null ? entry.getData().remaining() : 0;
            }
        }
    }

    private static class LastLogIdClosure
    extends LogManager.StableClosure {
        private LogId lastLogId;
        private final CountDownLatch latch = new CountDownLatch(1);

        LastLogIdClosure() {
            super(null);
        }

        void setLastLogId(LogId logId) {
            Requires.requireTrue(logId.getIndex() == 0L || logId.getTerm() != 0L);
            this.lastLogId = logId;
        }

        @Override
        public void run(Status status) {
            this.latch.countDown();
        }

        void await() throws InterruptedException {
            this.latch.await();
        }
    }

    private static class TruncatePrefixClosure
    extends LogManager.StableClosure {
        long firstIndexKept;

        TruncatePrefixClosure(long firstIndexKept) {
            super(null);
            this.firstIndexKept = firstIndexKept;
        }

        @Override
        public void run(Status status) {
        }
    }

    private static class ResetClosure
    extends LogManager.StableClosure {
        long nextLogIndex;

        ResetClosure(long nextLogIndex) {
            super(null);
            this.nextLogIndex = nextLogIndex;
        }

        @Override
        public void run(Status status) {
        }
    }

    private static class TruncateSuffixClosure
    extends LogManager.StableClosure {
        long lastIndexKept;
        long lastTermKept;

        TruncateSuffixClosure(long lastIndexKept, long lastTermKept) {
            super(null);
            this.lastIndexKept = lastIndexKept;
            this.lastTermKept = lastTermKept;
        }

        @Override
        public void run(Status status) {
        }
    }

    public static class StableClosureEvent
    extends NodeIdAware {
        LogManager.StableClosure done;
        EventType type;

        @Override
        public void reset() {
            super.reset();
            this.done = null;
            this.type = null;
        }
    }
}

