package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryImpl;
import org.apache.ignite.thread.IgniteThread;

/* loaded from: input_file:org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.class */
public class FileHandleManagerImpl implements FileHandleManager {
    public static final long DFLT_WAL_SEGMENT_SYNC_TIMEOUT = 500;
    private final WALWriter walWriter;
    private final WalSegmentSyncer walSegmentSyncWorker;
    protected final GridCacheSharedContext cctx;
    private final IgniteLogger log;
    private final WALMode mode;
    private final DataStorageMetricsImpl metrics;
    private final boolean mmap;
    private final RecordSerializer serializer;
    private final Supplier<FileWriteHandle> currentHandleSupplier;
    private final int walBufferSize;
    private final long maxWalSegmentSize;
    private final long fsyncDelay;

    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl$WALWriter.class */
    public class WALWriter extends GridWorker {
        private static final long UNCONDITIONAL_FLUSH = -1;
        private static final long FILE_CLOSE = -2;
        private static final long FILE_FORCE = -3;
        private volatile Throwable err;
        final Map<Thread, Long> waiters;
        static final /* synthetic */ boolean $assertionsDisabled;

        WALWriter(IgniteLogger igniteLogger) {
            super(FileHandleManagerImpl.this.cctx.igniteInstanceName(), "wal-write-worker%" + FileHandleManagerImpl.this.cctx.igniteInstanceName(), igniteLogger, FileHandleManagerImpl.this.cctx.kernalContext().workersRegistry());
            this.waiters = new ConcurrentHashMap();
        }

        @Override // org.apache.ignite.internal.util.worker.GridWorker
        protected void body() {
            Throwable th = null;
            while (!isCancelled()) {
                try {
                    onIdle();
                    while (this.waiters.isEmpty()) {
                        if (isCancelled()) {
                            unparkWaiters(Long.MAX_VALUE);
                            this.err = th;
                            unparkWaiters(Long.MAX_VALUE);
                            if (th == null && !this.isCancelled) {
                                th = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
                            }
                            if (th instanceof OutOfMemoryError) {
                                FileHandleManagerImpl.this.cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, th));
                                return;
                            } else {
                                if (th != null) {
                                    FileHandleManagerImpl.this.cctx.kernalContext().failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, th));
                                    return;
                                }
                                return;
                            }
                        }
                        blockingSectionBegin();
                        try {
                            LockSupport.park();
                            blockingSectionEnd();
                        } catch (Throwable th2) {
                            blockingSectionEnd();
                            throw th2;
                        }
                    }
                    Long l = null;
                    for (Long l2 : this.waiters.values()) {
                        if (l2.longValue() > Long.MIN_VALUE) {
                            l = l2;
                        }
                    }
                    updateHeartbeat();
                    if (l != null) {
                        if (l.longValue() < -1) {
                            try {
                                if (!$assertionsDisabled && l.longValue() != -2 && l.longValue() != -3) {
                                    throw new AssertionError(l);
                                }
                                if (l.longValue() == -2) {
                                    FileHandleManagerImpl.this.currentHandle().fileIO.close();
                                } else if (l.longValue() == -3) {
                                    FileHandleManagerImpl.this.currentHandle().fileIO.force();
                                }
                                unparkWaiters(l.longValue());
                            } catch (IOException e) {
                                this.log.error("Exception in WAL writer thread: ", e);
                                Throwable th3 = e;
                                unparkWaiters(Long.MAX_VALUE);
                                this.err = th3;
                                unparkWaiters(Long.MAX_VALUE);
                                if (th3 == null && !this.isCancelled) {
                                    th3 = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
                                }
                                if (th3 instanceof OutOfMemoryError) {
                                    FileHandleManagerImpl.this.cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, th3));
                                    return;
                                } else {
                                    if (th3 != null) {
                                        FileHandleManagerImpl.this.cctx.kernalContext().failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, th3));
                                        return;
                                    }
                                    return;
                                }
                            }
                        }
                        updateHeartbeat();
                        List<SegmentedRingByteBuffer.ReadSegment> poll = FileHandleManagerImpl.this.currentHandle().buf.poll(l.longValue());
                        if (poll == null) {
                            unparkWaiters(l.longValue());
                        } else {
                            for (int i = 0; i < poll.size(); i++) {
                                SegmentedRingByteBuffer.ReadSegment readSegment = poll.get(i);
                                updateHeartbeat();
                                try {
                                    try {
                                        writeBuffer(readSegment.position(), readSegment.buffer());
                                        readSegment.release();
                                        unparkWaiters((l.longValue() > (-1L) ? 1 : (l.longValue() == (-1L) ? 0 : -1)) == 0 || (l.longValue() > (-2L) ? 1 : (l.longValue() == (-2L) ? 0 : -1)) == 0 || th != null ? Long.MAX_VALUE : FileHandleManagerImpl.this.currentHandle().written);
                                    } catch (Throwable th4) {
                                        this.log.error("Exception in WAL writer thread:", th4);
                                        th = th4;
                                        readSegment.release();
                                        unparkWaiters((l.longValue() > (-1L) ? 1 : (l.longValue() == (-1L) ? 0 : -1)) == 0 || (l.longValue() > (-2L) ? 1 : (l.longValue() == (-2L) ? 0 : -1)) == 0 || th != null ? Long.MAX_VALUE : FileHandleManagerImpl.this.currentHandle().written);
                                    }
                                } catch (Throwable th5) {
                                    readSegment.release();
                                    unparkWaiters((l.longValue() > (-1L) ? 1 : (l.longValue() == (-1L) ? 0 : -1)) == 0 || (l.longValue() > (-2L) ? 1 : (l.longValue() == (-2L) ? 0 : -1)) == 0 || th != null ? Long.MAX_VALUE : FileHandleManagerImpl.this.currentHandle().written);
                                    throw th5;
                                }
                            }
                        }
                    }
                } catch (Throwable th6) {
                    Throwable th7 = th6;
                    this.err = th7;
                    unparkWaiters(Long.MAX_VALUE);
                    if (th7 == null && !this.isCancelled) {
                        th7 = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
                    }
                    if (th7 instanceof OutOfMemoryError) {
                        FileHandleManagerImpl.this.cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, th7));
                        return;
                    } else {
                        if (th7 != null) {
                            FileHandleManagerImpl.this.cctx.kernalContext().failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, th7));
                            return;
                        }
                        return;
                    }
                }
            }
            this.err = th;
            unparkWaiters(Long.MAX_VALUE);
            if (th == null && !this.isCancelled) {
                th = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
            }
            if (th instanceof OutOfMemoryError) {
                FileHandleManagerImpl.this.cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, th));
            } else if (th != null) {
                FileHandleManagerImpl.this.cctx.kernalContext().failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, th));
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void shutdown() throws IgniteInterruptedCheckedException {
            U.cancel(this);
            Thread runner = runner();
            if (runner != null) {
                LockSupport.unpark(runner);
                U.join(runner);
            }
            if (!$assertionsDisabled && FileHandleManagerImpl.this.walWriter.runner() != null) {
                throw new AssertionError("WALWriter should be stopped.");
            }
        }

        private void unparkWaiters(long j) {
            if (!$assertionsDisabled && j <= Long.MIN_VALUE) {
                throw new AssertionError(j);
            }
            for (Map.Entry<Thread, Long> entry : this.waiters.entrySet()) {
                Long value = entry.getValue();
                if (value.longValue() <= j) {
                    if (value.longValue() != Long.MIN_VALUE) {
                        this.waiters.put(entry.getKey(), Long.MIN_VALUE);
                    }
                    LockSupport.unpark(entry.getKey());
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void force() throws IgniteCheckedException {
            flushBuffer(-3L);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void close() throws IgniteCheckedException {
            flushBuffer(-2L);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void flushAll() throws IgniteCheckedException {
            flushBuffer(-1L);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void flushBuffer(long j) throws IgniteCheckedException {
            if (FileHandleManagerImpl.this.mmap) {
                return;
            }
            Throwable th = FileHandleManagerImpl.this.walWriter.err;
            if (th != null) {
                FileHandleManagerImpl.this.cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, th));
            }
            if (j == -1) {
                j = FileHandleManagerImpl.this.currentHandle().buf.tail();
            }
            Thread currentThread = Thread.currentThread();
            this.waiters.put(currentThread, Long.valueOf(j));
            LockSupport.unpark(FileHandleManagerImpl.this.walWriter.runner());
            while (true) {
                Long l = this.waiters.get(currentThread);
                if (!$assertionsDisabled && l == null) {
                    throw new AssertionError("Only this thread can remove thread from waiters");
                }
                if (l.longValue() == Long.MIN_VALUE) {
                    this.waiters.remove(currentThread);
                    Throwable th2 = FileHandleManagerImpl.this.walWriter.err;
                    if (th2 != null) {
                        throw new IgniteCheckedException("Flush buffer failed.", th2);
                    }
                    return;
                }
                LockSupport.park();
            }
        }

        private void writeBuffer(long j, ByteBuffer byteBuffer) throws StorageException, IgniteCheckedException {
            FileWriteHandleImpl currentHandle = FileHandleManagerImpl.this.currentHandle();
            if (!$assertionsDisabled && currentHandle.fileIO == null) {
                throw new AssertionError("Writing to a closed segment.");
            }
            FileHandleManagerImpl.this.checkNode();
            long currentTimeMillis = U.currentTimeMillis();
            long j2 = 2000;
            while (currentHandle.written != j) {
                if (!$assertionsDisabled && currentHandle.written >= j) {
                    throw new AssertionError("written = " + currentHandle.written + ", pos = " + j);
                }
                long currentTimeMillis2 = U.currentTimeMillis();
                if (currentTimeMillis2 - currentTimeMillis >= j2) {
                    if (j2 < TcpDiscoveryImpl.LOG_WARN_MSG_TIMEOUT) {
                        j2 *= 2;
                    }
                    U.warn(this.log, "Still waiting for a concurrent write to complete [written=" + currentHandle.written + ", pos=" + j + ", lastFsyncPos=" + currentHandle.lastFsyncPos + ", stop=" + currentHandle.stop.get() + ", actualPos=" + currentHandle.safePosition() + ']');
                    currentTimeMillis = currentTimeMillis2;
                }
                FileHandleManagerImpl.this.checkNode();
            }
            int remaining = byteBuffer.remaining();
            if (!$assertionsDisabled && remaining <= 0) {
                throw new AssertionError(remaining);
            }
            try {
                if (!$assertionsDisabled && currentHandle.written != currentHandle.fileIO.position()) {
                    throw new AssertionError();
                }
                currentHandle.written += currentHandle.fileIO.writeFully(byteBuffer);
                FileHandleManagerImpl.this.metrics.onWalBytesWritten(remaining);
                if (!$assertionsDisabled && currentHandle.written != currentHandle.fileIO.position()) {
                    throw new AssertionError();
                }
            } catch (IOException e) {
                this.err = e;
                StorageException storageException = new StorageException("Failed to write buffer.", e);
                FileHandleManagerImpl.this.cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, storageException));
                throw storageException;
            }
        }

        public void restart() {
            if (!$assertionsDisabled && runner() != null) {
                throw new AssertionError("WALWriter is still running.");
            }
            this.isCancelled = false;
            new IgniteThread(this).start();
        }

        static {
            $assertionsDisabled = !FileHandleManagerImpl.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl$WalSegmentSyncer.class */
    private class WalSegmentSyncer extends GridWorker {
        private final long syncTimeout;
        static final /* synthetic */ boolean $assertionsDisabled;

        private WalSegmentSyncer(String str, IgniteLogger igniteLogger) {
            super(str, "wal-segment-syncer", igniteLogger);
            this.syncTimeout = Math.max(IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_WAL_SEGMENT_SYNC_TIMEOUT, 500L), 100L);
        }

        @Override // org.apache.ignite.internal.util.worker.GridWorker
        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            while (!isCancelled()) {
                IgniteUtils.sleep(this.syncTimeout);
                try {
                    FileHandleManagerImpl.this.flush(null, true);
                } catch (IgniteCheckedException e) {
                    U.error(this.log, "Exception when flushing WAL.", e);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void shutdown() throws IgniteInterruptedCheckedException {
            synchronized (this) {
                U.cancel(this);
            }
            U.join(runner());
        }

        public void restart() {
            if (!$assertionsDisabled && runner() != null) {
                throw new AssertionError("WalSegmentSyncer is running.");
            }
            this.isCancelled = false;
            new IgniteThread(FileHandleManagerImpl.this.walSegmentSyncWorker).start();
        }

        static {
            $assertionsDisabled = !FileHandleManagerImpl.class.desiredAssertionStatus();
        }
    }

    public FileHandleManagerImpl(GridCacheSharedContext gridCacheSharedContext, DataStorageMetricsImpl dataStorageMetricsImpl, boolean z, RecordSerializer recordSerializer, Supplier<FileWriteHandle> supplier, WALMode wALMode, int i, long j, long j2) {
        this.cctx = gridCacheSharedContext;
        this.log = gridCacheSharedContext.logger(FileHandleManagerImpl.class);
        this.mode = wALMode;
        this.metrics = dataStorageMetricsImpl;
        this.mmap = z;
        this.serializer = recordSerializer;
        this.currentHandleSupplier = supplier;
        this.walBufferSize = i;
        this.maxWalSegmentSize = j;
        this.fsyncDelay = j2;
        this.walWriter = new WALWriter(this.log);
        if (wALMode == WALMode.NONE || wALMode == WALMode.FSYNC) {
            U.quietAndWarn(this.log, "Initialized write-ahead log manager in NONE mode, persisted data may be lost in a case of unexpected node failure. Make sure to deactivate the cluster before shutdown.");
            this.walSegmentSyncWorker = null;
        } else {
            this.walSegmentSyncWorker = new WalSegmentSyncer(gridCacheSharedContext.igniteInstanceName(), gridCacheSharedContext.kernalContext().log(WalSegmentSyncer.class));
            if (this.log.isInfoEnabled()) {
                this.log.info("Initialized write-ahead log manager [mode=" + wALMode + ']');
            }
        }
    }

    @Override // org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManager
    public FileWriteHandle initHandle(SegmentIO segmentIO, long j, RecordSerializer recordSerializer) throws IOException {
        SegmentedRingByteBuffer segmentedRingByteBuffer = this.mmap ? new SegmentedRingByteBuffer(segmentIO.map((int) this.maxWalSegmentSize), this.metrics) : new SegmentedRingByteBuffer(this.walBufferSize, this.maxWalSegmentSize, SegmentedRingByteBuffer.BufferMode.DIRECT, this.metrics);
        segmentedRingByteBuffer.init(j);
        return new FileWriteHandleImpl(this.cctx, segmentIO, segmentedRingByteBuffer, recordSerializer, this.metrics, this.walWriter, j, this.mode, this.mmap, true, this.fsyncDelay, this.maxWalSegmentSize);
    }

    @Override // org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManager
    public FileWriteHandle nextHandle(SegmentIO segmentIO, RecordSerializer recordSerializer) throws IOException {
        SegmentedRingByteBuffer segmentedRingByteBuffer = this.mmap ? new SegmentedRingByteBuffer(segmentIO.map((int) this.maxWalSegmentSize), this.metrics) : currentHandle().buf.reset();
        try {
            return new FileWriteHandleImpl(this.cctx, segmentIO, segmentedRingByteBuffer, recordSerializer, this.metrics, this.walWriter, 0L, this.mode, this.mmap, false, this.fsyncDelay, this.maxWalSegmentSize);
        } catch (ClosedByInterruptException e) {
            if (segmentedRingByteBuffer == null) {
                return null;
            }
            segmentedRingByteBuffer.free();
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public FileWriteHandleImpl currentHandle() {
        return (FileWriteHandleImpl) this.currentHandleSupplier.get();
    }

    @Override // org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManager
    public void onDeactivate() throws IgniteCheckedException {
        FileWriteHandleImpl currentHandle = currentHandle();
        try {
            if (this.mode == WALMode.BACKGROUND && currentHandle != null) {
                currentHandle.flush(null);
            }
            if (currentHandle != null) {
                currentHandle.close(false);
            }
        } finally {
            if (this.walSegmentSyncWorker != null) {
                this.walSegmentSyncWorker.shutdown();
            }
            this.walWriter.shutdown();
        }
    }

    @Override // org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManager
    public void resumeLogging() {
        if (!this.mmap) {
            this.walWriter.restart();
        }
        if (this.cctx.kernalContext().clientNode() || this.walSegmentSyncWorker == null) {
            return;
        }
        this.walSegmentSyncWorker.restart();
    }

    @Override // org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManager
    public WALPointer flush(WALPointer wALPointer, boolean z) throws IgniteCheckedException, StorageException {
        FileWriteHandleImpl currentHandle;
        if (this.serializer == null || this.mode == WALMode.NONE || (currentHandle = currentHandle()) == null) {
            return null;
        }
        FileWALPointer fileWALPointer = wALPointer == null ? new FileWALPointer(currentHandle.getSegmentId(), (int) currentHandle.buf.tail(), 0) : (FileWALPointer) wALPointer;
        if (this.mode == WALMode.LOG_ONLY) {
            currentHandle.flushOrWait(fileWALPointer);
        }
        if ((z || this.mode == WALMode.FSYNC) && currentHandle.needFsync(fileWALPointer)) {
            currentHandle.fsync(fileWALPointer);
            return fileWALPointer;
        }
        return fileWALPointer;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkNode() throws StorageException {
        if (this.cctx.kernalContext().invalid()) {
            throw new StorageException("Failed to perform WAL operation (environment was invalidated by a previous error)");
        }
    }
}
