/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.dr.ist;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.internal.processors.cache.dr.ist.PartitionDrState;
import org.gridgain.grid.internal.processors.cache.dr.ist.messages.DrPartitionStatesMessage;
import org.gridgain.grid.internal.processors.cache.dr.ist.messages.DrStateRecord;
import org.gridgain.grid.internal.processors.dr.DrProcessor;
import org.gridgain.grid.internal.processors.dr.DrUtils;

public class CachePartitionStateManager
implements CheckpointListener,
GridMessageListener {
    private final GridCacheContext cctx;
    private final IgniteLogger log;
    private final Object topic;
    private final long backupSyncFreq;
    private final DrProcessor drProc;
    private final GridBusyLock busyLock;
    private ConcurrentMap<Integer, PartitionDrState> partStates = new ConcurrentHashMap<Integer, PartitionDrState>();
    private volatile boolean stopping = false;
    private GridTimeoutProcessor.CancelableTask scheduledTask;

    public CachePartitionStateManager(GridCacheContext cctx, CacheDrSenderConfiguration sndCfg, DrProcessor drProc, GridBusyLock busyLock) {
        this.cctx = cctx;
        this.drProc = drProc;
        this.log = cctx.logger(CachePartitionStateManager.class);
        this.busyLock = busyLock;
        this.topic = GridTopic.TOPIC_CACHE.topic("DR-STATE-" + cctx.name());
        long backupSyncFreq = sndCfg.getBackupSyncFrequency();
        if (backupSyncFreq == 0L) {
            backupSyncFreq = 5000L;
        }
        this.backupSyncFreq = backupSyncFreq;
    }

    void start() {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Start DR partition state manager: cache=" + this.cctx.name());
        }
        GridCacheSharedContext sCtx = this.cctx.shared();
        if (CU.isPersistenceEnabled(this.cctx.kernalContext().config())) {
            try {
                this.restoreState();
            }
            catch (IgniteCheckedException e) {
                this.log.error("Failed to restore DR state.", e);
                throw new IllegalStateException();
            }
            ((GridCacheDatabaseSharedManager)sCtx.database()).addCheckpointListener(this);
        }
        this.cctx.gridIO().addMessageListener(this.topic, (GridMessageListener)this);
        GridDhtPartitionsExchangeFuture topFut = sCtx.exchange().lastTopologyFuture();
        assert (topFut != null);
        IgniteInternalFuture<AffinityTopologyVersion> affFut = sCtx.exchange().affinityReadyFuture(topFut.initialVersion());
        affFut.listen(f -> {
            block5: {
                try {
                    f.get();
                    assert (((AffinityTopologyVersion)f.result()).initialized());
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("DR partition state manager initialized: cache=" + this.cctx.name());
                    }
                    this.scheduledTask = this.cctx.time().schedule(this::sendStateToBackups, this.backupSyncFreq, this.backupSyncFreq);
                }
                catch (IgniteCheckedException ex) {
                    if (!this.stopping && ex instanceof NodeStoppingException) {
                        this.log.warning("Failed to start DR partition state manager due to node is stopped concurrently.", ex);
                    }
                    if (!this.log.isDebugEnabled()) break block5;
                    this.log.debug("Failed to start DR partition state manager due to node is stopped concurrently.");
                }
            }
        });
    }

    public void stop() {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Stop DR partition state manager: cache=" + this.cctx.name());
        }
        this.stopping = true;
        if (this.scheduledTask != null) {
            this.scheduledTask.close();
        }
        this.cctx.gridIO().removeMessageListener(this.topic);
        if (CU.isPersistenceEnabled(this.cctx.kernalContext().config())) {
            ((GridCacheDatabaseSharedManager)this.cctx.shared().database()).removeCheckpointListener(this);
            try {
                this.saveState();
            }
            catch (Exception ignore) {
                this.log.warning("Failed to save DR progress on cache stop. ", ignore);
            }
        }
    }

    public PartitionDrState getOrCreateState(int part) {
        return this.partStates.computeIfAbsent(part, p -> new PartitionDrState((int)p, 0L));
    }

    void onCacheDestroy() {
        MetaStorage metaStorage = this.cctx.shared().database().metaStorage();
        if (metaStorage == null) {
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Drop IncrementalDR state: cache=" + this.cctx.cache().name());
        }
        this.cctx.shared().database().checkpointReadLock();
        try {
            metaStorage.removeData(DrUtils.drStateMetastorageKey(this.cctx.name()));
        }
        catch (IgniteCheckedException e) {
            throw new IgniteException(e);
        }
        finally {
            this.cctx.shared().database().checkpointReadUnlock();
        }
    }

    void initUpdateCounter(int part, long newUpdCntr) {
        PartitionDrState state = (PartitionDrState)this.partStates.get(part);
        assert (state != null);
        state.lastUpdCntr(0L, newUpdCntr);
    }

    public boolean updateCounter(int part, long to) {
        PartitionDrState state = (PartitionDrState)this.partStates.get(part);
        assert (state != null);
        return state.lastUpdCntr(to - 1L, to);
    }

    public boolean updateCounter(int part, long from, long delta) {
        PartitionDrState state = (PartitionDrState)this.partStates.get(part);
        assert (state != null);
        return state.lastUpdCntr(from, from + delta);
    }

    public long updateCounter(int part) {
        PartitionDrState state = (PartitionDrState)this.partStates.get(part);
        assert (state != null);
        return state.lastUpdCntr();
    }

    public long lwm(int part) {
        PartitionDrState state = (PartitionDrState)this.partStates.get(part);
        assert (state != null);
        return state.lwm();
    }

    public long lwmOrDefault(int part, long defVal) {
        PartitionDrState state = (PartitionDrState)this.partStates.get(part);
        if (state == null) {
            return defVal;
        }
        return state.lwm();
    }

    public void lwm(int part, long from, long to) {
        PartitionDrState state = (PartitionDrState)this.partStates.get(part);
        if (state == null) {
            return;
        }
        if (state.lwm(from, to)) {
            this.cleanupAsync(part);
        }
    }

    public long fstWM(int part) {
        PartitionDrState state = (PartitionDrState)this.partStates.get(part);
        assert (state != null);
        return state.fstWM();
    }

    public void fstWM(int part, long from, long to) {
        PartitionDrState state = (PartitionDrState)this.partStates.get(part);
        assert (state != null);
        state.fstWM(from, to);
    }

    public void cleanFstWM() {
        this.partStates.forEach((p, s2) -> s2.resetFstWM());
    }

    public void onPartitionAssignment(Set<Integer> primaryParts, Set<Integer> backupParts) {
        Stream.concat(primaryParts.stream(), backupParts.stream()).forEach(p -> {
            PartitionDrState state = this.getOrCreateState((int)p);
            if (state.lwm() > state.cwm()) {
                this.cleanupAsync((int)p);
            }
        });
    }

    public void onPartitionEvicted(int part) {
        this.partStates.remove(part);
    }

    @Override
    public void onMessage(UUID nodeId, Object msg, byte plc) {
        if (msg instanceof DrPartitionStatesMessage) {
            this.onPrimaryDrStateMessage(nodeId, (DrPartitionStatesMessage)msg);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onPrimaryDrStateMessage(UUID nodeId, DrPartitionStatesMessage msg) {
        assert (!nodeId.equals(this.cctx.localNode().id()));
        if (!this.busyLock.enterBusy()) {
            return;
        }
        try {
            if (this.cctx.startTopologyVersion().after(msg.topVer())) {
                return;
            }
            for (DrStateRecord s2 : msg.partStates()) {
                if (s2.lwm() > 0L) {
                    this.lwm(s2.part(), 0L, s2.lwm());
                }
                if (s2.fstWM() <= Long.MIN_VALUE) continue;
                this.fstWM(s2.part(), Long.MIN_VALUE, s2.fstWM());
            }
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    @Override
    public void onMarkCheckpointBegin(CheckpointListener.Context ctx) throws IgniteCheckedException {
    }

    @Override
    public void onCheckpointBegin(CheckpointListener.Context ctx) throws IgniteCheckedException {
    }

    @Override
    public void beforeCheckpointBegin(CheckpointListener.Context ctx) throws IgniteCheckedException {
        this.saveState();
    }

    public void cleanupAsync(int part) {
        UpdateLogCleanupJob task = new UpdateLogCleanupJob(part);
        this.drProc.submit(task);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendStateToBackups() {
        HashMap<ClusterNode, DrPartitionStatesMessage> msgs;
        if (this.stopping) {
            return;
        }
        this.cctx.topology().readLock();
        try {
            if (this.cctx.topology().stopping()) {
                return;
            }
            AffinityTopologyVersion topVer = this.cctx.shared().exchange().readyAffinityVersion();
            if (!topVer.initialized()) {
                return;
            }
            AffinityAssignment assignment = this.cctx.affinity().assignment(topVer);
            Set<Integer> primaryParts = assignment.primaryPartitions(this.cctx.localNodeId());
            if (primaryParts.isEmpty()) {
                return;
            }
            GridKernalContext kctx = this.cctx.kernalContext();
            msgs = new HashMap<ClusterNode, DrPartitionStatesMessage>();
            for (Integer part : primaryParts) {
                List<ClusterNode> nodes = assignment.get(part);
                PartitionDrState state = (PartitionDrState)this.partStates.get(part);
                assert (state != null);
                for (ClusterNode node : nodes) {
                    if (node.isLocal() || !IgniteFeatures.nodeSupports(kctx, node, IgniteFeatures.INCREMENTAL_DR)) continue;
                    DrPartitionStatesMessage msg = msgs.computeIfAbsent(node, n -> new DrPartitionStatesMessage(topVer));
                    msg.addPartSate(new DrStateRecord(part, state.lwm(), state.fstWM()));
                }
            }
        }
        finally {
            this.cctx.topology().readUnlock();
        }
        if (!F.isEmpty(msgs)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Sending cache DR state to backups: locNode=" + this.cctx.localNode().id());
            }
            for (Map.Entry e : msgs.entrySet()) {
                try {
                    DrPartitionStatesMessage msg = (DrPartitionStatesMessage)e.getValue();
                    this.cctx.gridIO().sendToCustomTopic((ClusterNode)e.getKey(), this.topic, (Message)msg, (byte)2);
                }
                catch (ClusterTopologyCheckedException ex) {
                    if (!this.log.isDebugEnabled()) continue;
                    this.log.debug("Failed to sync DR state with backup node (is it gone?): " + e.getKey());
                }
                catch (IgniteCheckedException ex) {
                    this.log.warning("Failed to sync DR state with backup node (is it gone?): " + e.getKey(), ex);
                }
            }
        }
    }

    void saveState() throws IgniteCheckedException {
        IgniteCacheDatabaseSharedManager database = this.cctx.shared().database();
        if (database.metaStorage() == null) {
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Persist IncrementalDR state: cache=" + this.cctx.cache().name());
        }
        Map<Integer, Long> partCounters = this.partStates.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((PartitionDrState)e.getValue()).lwm()));
        DrUtils.writeDrState(database, DrUtils.drStateMetastorageKey(this.cctx.name()), partCounters);
    }

    void restoreState() throws IgniteCheckedException {
        IgniteCacheDatabaseSharedManager database = this.cctx.shared().database();
        if (database.metaStorage() == null) {
            return;
        }
        if (this.log.isInfoEnabled()) {
            this.log.info("Restore IncrementalDR state: cache=" + this.cctx.cache().name());
        }
        Collection<DrStateRecord> drStateRecords = DrUtils.readDrState(database, DrUtils.drStateMetastorageKey(this.cctx.name()));
        this.partStates = drStateRecords.stream().collect(Collectors.toConcurrentMap(DrStateRecord::part, s2 -> new PartitionDrState(s2.part(), s2.lwm())));
    }

    static /* synthetic */ GridBusyLock access$100(CachePartitionStateManager x0) {
        return x0.busyLock;
    }

    static /* synthetic */ GridCacheContext access$200(CachePartitionStateManager x0) {
        return x0.cctx;
    }

    static /* synthetic */ IgniteLogger access$300(CachePartitionStateManager x0) {
        return x0.log;
    }

    private class UpdateLogCleanupJob
    implements Runnable {
        private final int part;

        public UpdateLogCleanupJob(int part) {
            this.part = part;
        }

        @Override
        public void run() {
            PartitionDrState drState = (PartitionDrState)CachePartitionStateManager.this.partStates.get(this.part);
            if (drState == null) {
                return;
            }
            long toCntr = drState.lwm();
            if (this.cleanupPartition(toCntr)) {
                drState.cwm(toCntr);
            }
        }

        /*
         * Exception decompiling
         */
        private boolean cleanupPartition(long toCntr) {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [2[TRYBLOCK]], but top level block is 5[TRYBLOCK]
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }
    }
}

