/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database.txdr;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.events.Event;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgnitePredicate;
import org.gridgain.grid.internal.processors.cache.database.txdr.TransactionalDrProcessorImpl;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class WalSender
extends GridWorker {
    private static final boolean TX_DR_DEBUG_OUTPUT_ENABLED = Boolean.getBoolean("TX_DR_DEBUG_OUTPUT_ENABLED");
    static final String THREAD_NAME_PREFIX = "wal-sender";
    static final String COMPACTED_SEGMENT_SUFFIX = ".zip";
    private static final long WAIT_TIMEOUT_NS = 2000000000L;
    private final IgniteWriteAheadLogManager walMgr;
    private final IgniteEvents evts;
    private final File walArchiveDir;
    private final File dstDir;
    private final FailureProcessor failureProcessor;
    private final TransactionalDrProcessorImpl txdrProc;
    private final boolean walCompactionEnabled;
    private final AtomicReference<SendInProgressFuture> sndInProgress = new AtomicReference();
    private final IgnitePredicate<Event> evtLsnr = (IgnitePredicate & Serializable)event -> {
        LockSupport.unpark(this.runner());
        return true;
    };

    WalSender(@Nullable String igniteInstanceName, @NotNull IgniteLogger log, @NotNull IgniteWriteAheadLogManager walMgr, @NotNull IgniteEvents evts, @NotNull File walArchiveDir, @NotNull File dstDir, @Nullable FailureProcessor failureProcessor, @Nullable TransactionalDrProcessorImpl txdrProc, boolean walCompactionEnabled) {
        super(igniteInstanceName, THREAD_NAME_PREFIX, log);
        this.walMgr = Objects.requireNonNull(walMgr);
        this.evts = Objects.requireNonNull(evts);
        this.walArchiveDir = Objects.requireNonNull(walArchiveDir);
        this.dstDir = Objects.requireNonNull(dstDir);
        this.failureProcessor = failureProcessor;
        this.txdrProc = txdrProc;
        this.walCompactionEnabled = walCompactionEnabled;
    }

    protected void body() {
        block14: {
            try {
                long segToSnd = -1L;
                while (!this.isCancelled()) {
                    boolean segmentReady;
                    SendInProgressFuture sndFut = this.sndInProgress.get();
                    if (sndFut == null) {
                        LockSupport.parkNanos(2000000000L);
                        continue;
                    }
                    if (segToSnd == -1L) {
                        segToSnd = sndFut.initSeg;
                        int evtType = this.walCompactionEnabled ? 134 : 128;
                        this.evts.localListen(this.evtLsnr, new int[]{evtType});
                    }
                    if (segToSnd > sndFut.termSeg) {
                        segToSnd = -1L;
                        this.evts.stopLocalListen(this.evtLsnr, new int[0]);
                        if (!this.sndInProgress.compareAndSet(sndFut, null)) {
                            throw new IgniteException("Internal error: WAL sender was stopped more than once.");
                        }
                        try {
                            sndFut.onDone();
                        }
                        catch (Throwable t) {
                            this.log.error(t.getMessage(), t);
                        }
                        continue;
                    }
                    long readySegIdx = this.lastReadySegmentIndex();
                    boolean bl = segmentReady = segToSnd <= readySegIdx;
                    if (segmentReady && this.copySegment(segToSnd)) {
                        if (this.txdrProc != null) {
                            this.txdrProc.lastSentWalSegment(segToSnd);
                        }
                        ++segToSnd;
                        continue;
                    }
                    if (TX_DR_DEBUG_OUTPUT_ENABLED && !segmentReady) {
                        LT.info((IgniteLogger)this.log, (String)("The last archived/compacted segment is " + readySegIdx + ", will wait for " + segToSnd));
                    }
                    LockSupport.parkNanos(2000000000L);
                }
            }
            catch (Throwable t) {
                SendInProgressFuture sndFut = this.sndInProgress.get();
                while (sndFut != null) {
                    if (sndFut.termSeg != Long.MAX_VALUE) {
                        sndFut.onDone(new IgniteException("Aborted due to WAL sender failure", t));
                    }
                    if (this.sndInProgress.compareAndSet(sndFut, null)) continue;
                    sndFut = this.sndInProgress.get();
                }
                if (this.failureProcessor == null) break block14;
                this.failureProcessor.process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, t));
            }
        }
    }

    IgniteWriteAheadLogManager walManager() {
        return this.walMgr;
    }

    long lastReadySegmentIndex() {
        return this.walCompactionEnabled ? this.walMgr.lastCompactedSegment() : this.walMgr.lastArchivedSegment();
    }

    boolean startSending(long initSegIdx) throws IgniteCheckedException {
        boolean res;
        if (this.log.isInfoEnabled()) {
            this.log.info("Start sending wal segments [initSegIdx=" + initSegIdx + "]");
        }
        if (res = this.sndInProgress.compareAndSet(null, new SendInProgressFuture(initSegIdx, Long.MAX_VALUE))) {
            if (this.txdrProc != null) {
                this.txdrProc.lastSentWalSegment(initSegIdx - 1L);
            }
            LockSupport.unpark(this.runner());
        }
        return res;
    }

    GridFutureAdapter<Void> stopSending(long terminalSegIdx) {
        return this.stopSending0(terminalSegIdx, true);
    }

    GridFutureAdapter<Void> stopSending() {
        return this.stopSending0(-1L, false);
    }

    private GridFutureAdapter<Void> stopSending0(long terminalSegIdx, boolean checkIdxSanity) {
        SendInProgressFuture newFut;
        SendInProgressFuture oldFut;
        if (this.log.isInfoEnabled()) {
            this.log.info("Stop sending wal segments [terminalSegIdx=" + terminalSegIdx + ", checkIdxSanity=" + checkIdxSanity + "]");
        }
        if ((oldFut = this.sndInProgress.get()) == null) {
            GridFutureAdapter res = new GridFutureAdapter();
            res.onDone((Throwable)new IllegalStateException("WAL sending is not started."));
            return res;
        }
        if (checkIdxSanity && terminalSegIdx < oldFut.initSeg) {
            GridFutureAdapter res = new GridFutureAdapter();
            res.onDone((Throwable)new IllegalStateException("Bad terminal WAL index [initial=" + oldFut.initSeg + ", terminal=" + terminalSegIdx + ']'));
            return res;
        }
        if (oldFut.termSeg == Long.MAX_VALUE && this.sndInProgress.compareAndSet(oldFut, newFut = new SendInProgressFuture(oldFut.initSeg, terminalSegIdx))) {
            return newFut;
        }
        GridFutureAdapter res = new GridFutureAdapter();
        res.onDone((Throwable)new IllegalStateException("WAL sending is already stopping."));
        return res;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean copySegment(long idx) {
        FileWALPointer p = new FileWALPointer(idx, 0, 0);
        if (!this.walMgr.reserve((WALPointer)p)) {
            throw new IgniteException("Failed to reserve WAL segment [idx=" + idx + ']');
        }
        try {
            File dstFile;
            String fileName = this.walCompactionEnabled ? FileDescriptor.fileName((long)idx) + COMPACTED_SEGMENT_SUFFIX : FileDescriptor.fileName((long)idx);
            File srcFile = new File(this.walArchiveDir, fileName);
            if (!srcFile.exists()) {
                U.warn((IgniteLogger)this.log, (Object)("WAL archive segment not found [fileName=" + fileName + ']'));
                String anotherFileName = this.walCompactionEnabled ? FileDescriptor.fileName((long)idx) : FileDescriptor.fileName((long)idx) + COMPACTED_SEGMENT_SUFFIX;
                srcFile = new File(this.walArchiveDir, anotherFileName);
                if (!srcFile.exists()) {
                    throw new IgniteException("WAL archive segment not found [fileName=" + anotherFileName + ']');
                }
            }
            if ((dstFile = new File(this.dstDir, fileName)).exists()) {
                this.log.warning("WAL segment already exists in transfer directory [path=" + dstFile.getAbsolutePath() + ']');
                boolean bl = true;
                return bl;
            }
            File dstTmpFile = new File(this.dstDir, fileName + ".tmp");
            try {
                Files.deleteIfExists(dstTmpFile.toPath());
                Files.copy(srcFile.toPath(), dstTmpFile.toPath(), new CopyOption[0]);
                Files.move(dstTmpFile.toPath(), dstFile.toPath(), new CopyOption[0]);
                if (this.log.isInfoEnabled()) {
                    this.log.info("WAL segment copied to transfer directory [path=" + dstFile.getAbsolutePath() + ']');
                }
                boolean bl = true;
                return bl;
            }
            catch (IOException e) {
                this.log.error("Failed to copy to transfer directory [src=" + srcFile + ", dst=" + dstFile + ']', (Throwable)e);
                boolean bl = false;
                this.walMgr.release((WALPointer)p);
                return bl;
            }
        }
        finally {
            this.walMgr.release((WALPointer)p);
        }
    }

    private class SendInProgressFuture
    extends GridFutureAdapter<Void> {
        private final long initSeg;
        private final long termSeg;

        private SendInProgressFuture(long initSeg, long termSeg) {
            this.initSeg = initSeg;
            this.termSeg = termSeg;
        }

        public String toString() {
            return S.toString(SendInProgressFuture.class, (Object)((Object)this), (String)super.toString());
        }
    }
}

