/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.dr.ist;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.gridgain.grid.internal.processors.cache.dr.Cancellable;
import org.gridgain.grid.internal.processors.cache.dr.ist.CacheIncrementalDrHandler;
import org.gridgain.grid.internal.processors.cache.dr.ist.CachePartitionStateManager;
import org.gridgain.grid.internal.processors.cache.dr.ist.DrPartitionAwareJob;
import org.gridgain.grid.internal.processors.cache.dr.ist.Watermark;

public class PartitionDrHandler
implements Cancellable {
    private final IgniteLogger log;
    private final GridCacheContext cctx;
    private final CachePartitionStateManager partStateMgr;
    private final CacheIncrementalDrHandler drHnd;
    private final int part;
    private final AtomicReference<State> state = new AtomicReference<State>(State.PAUSED);
    private volatile Watermark hwm;
    private CacheIncrementalDrHandler.IncrementalStateTransferJob activeJob;

    public PartitionDrHandler(GridCacheContext cctx, Integer part, CacheIncrementalDrHandler drHnd, CachePartitionStateManager partStateMgr) {
        this.cctx = cctx;
        this.log = cctx.logger(PartitionDrHandler.class);
        this.drHnd = drHnd;
        this.partStateMgr = partStateMgr;
        this.part = part;
        this.hwm = new Watermark(partStateMgr.lwm(part));
    }

    public int part() {
        return this.part;
    }

    public void onUpdateCounterChanged() {
        this.continueTransfer();
    }

    public synchronized void onBatchUpdated(DrPartitionAwareJob job, long startCntr, long endCntr) {
        if (this.activeJob == job) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("HWM updated: part=" + this.part + ", from=" + startCntr + ", to=" + endCntr);
            }
            this.hwm.update(startCntr, endCntr);
        }
    }

    public synchronized void onBatchRejected(DrPartitionAwareJob job) {
        if (this.activeJob == job) {
            this.pauseTransfer();
            this.continueTransfer();
        }
    }

    public synchronized void onBatchAcknowledged(long fromCntr, long toCntr) {
        if (this.isCancelled()) {
            return;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace("LWM updated: part=" + this.part + ". from=" + fromCntr + ", to=" + toCntr);
        }
        this.partStateMgr.lwm(this.part, fromCntr, toCntr);
    }

    public synchronized void resumeTransfer() {
        if (this.isCancelled()) {
            return;
        }
        this.continueTransfer();
    }

    public synchronized void onTransferJobFinished(DrPartitionAwareJob job) {
        if (this.activeJob == job) {
            if (this.state.compareAndSet(State.ACTIVE, State.PAUSED)) {
                this.continueTransfer();
            } else assert (false);
        }
    }

    public synchronized void pauseTransfer() {
        this.state.compareAndSet(State.ACTIVE, State.PAUSED);
        if (this.activeJob != null) {
            this.activeJob.cancel();
            this.activeJob = null;
        }
        this.hwm = new Watermark(this.partStateMgr.lwm(this.part));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void continueTransfer() {
        if (this.canContinue() && this.state.compareAndSet(State.PAUSED, State.ACTIVE)) {
            PartitionDrHandler partitionDrHandler = this;
            synchronized (partitionDrHandler) {
                if (!this.canContinue() || this.state.get() != State.ACTIVE) {
                    this.state.compareAndSet(State.ACTIVE, State.PAUSED);
                    return;
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Schedule incremental state transfer job: cache=" + this.cctx.cacheId() + ", part=" + this.part);
                }
                if (this.activeJob == null) {
                    this.activeJob = this.drHnd.createJob(this, this.hwm.get());
                }
                assert (!this.activeJob.isCancelled());
                this.drHnd.submit(this.activeJob);
            }
        }
    }

    private boolean canContinue() {
        return this.drHnd.isActive() && this.state.get() != State.CANCELLED && this.partStateMgr.updateCounter(this.part) > this.hwm.get();
    }

    @Override
    public synchronized void cancel() {
        this.state.set(State.CANCELLED);
        if (this.activeJob != null) {
            this.activeJob.cancel();
            this.activeJob = null;
        }
    }

    @Override
    public boolean isCancelled() {
        return this.state.get() == State.CANCELLED;
    }

    private static enum State {
        ACTIVE,
        PAUSED,
        CANCELLED;

    }
}

