/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database.recovery;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.persistence.RecoveryDebug;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.IgniteThreadFactory;
import org.gridgain.grid.internal.processors.cache.database.recovery.PITRFuture;
import org.gridgain.grid.internal.processors.cache.database.recovery.PITRRecoveryContext;
import org.gridgain.grid.internal.processors.cache.database.recovery.PitrEntryPredicate;
import org.gridgain.grid.internal.processors.cache.database.recovery.PitrWalScanFilter;
import org.gridgain.grid.internal.processors.cache.database.recovery.RecoveryCoordinatorLeftException;
import org.gridgain.grid.internal.processors.cache.database.recovery.RecoveryMessageWrapper;
import org.gridgain.grid.internal.processors.cache.database.recovery.Sender;
import org.gridgain.grid.internal.processors.cache.database.recovery.TxConvergenceMessage;
import org.gridgain.grid.internal.processors.cache.database.recovery.TxStateCommunicationProgressCalculator;
import org.gridgain.grid.internal.processors.cache.database.recovery.TxStateRequest;
import org.gridgain.grid.internal.processors.cache.database.recovery.TxStateResponse;
import org.gridgain.grid.internal.processors.cache.database.recovery.WALPointerIntervalProgressCalculator;
import org.gridgain.grid.internal.processors.cache.database.snapshot.ConsistentCutMeta;
import org.gridgain.grid.internal.processors.cache.database.snapshot.SnapshotMetadataV2;
import org.gridgain.grid.internal.processors.cache.database.snapshot.SnapshotOperationContext;
import org.gridgain.grid.internal.processors.cache.database.snapshot.SnapshotProgressCalculator;
import org.gridgain.grid.internal.processors.cache.database.snapshot.file.SnapshotPath;
import org.gridgain.grid.persistentstore.snapshot.file.remote.wal.GridGainWalIteratorFactory;
import org.gridgain.grid.persistentstore.snapshot.file.remote.wal.IteratorParametersBuilder;
import org.jetbrains.annotations.Nullable;

class PITRSharedFolderRecoveryContext
extends PITRRecoveryContext {
    private final MessageWorker messageWorker;
    private final Map<Object, Map<Integer, Set<Integer>>> partsToRestore;
    private final GridGainWalIteratorFactory iteratorFactory;
    private final IteratorParametersBuilder iteratorParams;
    private final ExecutorService jobExecutor;
    private final Map<Object, PITRSharedFolderFuture> scanFutures = new ConcurrentHashMap<Object, PITRSharedFolderFuture>();
    private GridMessageListener lsnr;
    private final Map<Object, Object> oldCId2CurCId;
    private final SnapshotPath snapshotDir;
    private final BlockingQueue<RecoveryMessageWrapper> queue;
    private final IgniteUuid recoveryId;
    private final PITRRecoveryContext.MultiWalApplyPredicate multiWalApplyPredicate;

    PITRSharedFolderRecoveryContext(IgniteLogger log, long snpId, long time, GridKernalContext ctx, SnapshotMetadataV2 metadata, Object locNodeConstId, AffinityTopologyVersion topVer, Map<Object, Map<Integer, Set<Integer>>> partsToRestore, Map<Object, Object> walOnNodes, SnapshotPath snapshotDir, BlockingQueue<RecoveryMessageWrapper> queue, IgniteUuid recoveryId) throws IgniteCheckedException {
        super(ctx, log, metadata.baselineTopology(), metadata, locNodeConstId, topVer, time, snpId);
        this.partsToRestore = partsToRestore;
        this.oldCId2CurCId = walOnNodes;
        this.snapshotDir = snapshotDir;
        this.queue = queue;
        this.recoveryId = recoveryId;
        this.jobExecutor = Executors.newFixedThreadPool(Math.max(1, partsToRestore.size()), (ThreadFactory)new IgniteThreadFactory(ctx.igniteInstanceName(), "pitr-job-executor"));
        this.messageWorker = new MessageWorker(this.queue, this.ig.config().getMarshaller(), recoveryId, ctx.igniteInstanceName(), "pitr-recovery-worker", log);
        DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration();
        String subFolder = ctx.pdsFolderResolver().resolveFolders().folderName();
        String workDir = ctx.config().getWorkDirectory();
        IteratorParametersBuilder params = new IteratorParametersBuilder();
        params.binaryMetadataFileStoreDir(new File(U.resolveWorkDirectory((String)workDir, (String)"db/binary_meta", (boolean)false), subFolder)).marshallerMappingFileStoreDir(U.resolveWorkDirectory((String)workDir, (String)"db/marshaller", (boolean)false)).pageSize(metadata.pageSize()).ioFactory(dsCfg.getFileIOFactory()).bufferSize(dsCfg.getWalRecordIteratorBufferSize());
        this.iteratorParams = params;
        this.iteratorFactory = new GridGainWalIteratorFactory(log);
        this.multiWalApplyPredicate = new PITRRecoveryContext.MultiWalApplyPredicate();
    }

    @Override
    public GridMessageListener init() throws IgniteCheckedException {
        this.messageWorker.init(this);
        new IgniteThread((GridWorker)this.messageWorker).start();
        HashSet<Object> originalNodes = new HashSet<Object>();
        Object crd = null;
        for (ClusterNode clusterNode : this.metadata.topology()) {
            if (crd == null) {
                crd = clusterNode.consistentId();
            }
            originalNodes.add(clusterNode.consistentId());
        }
        for (Object oldNodeConstId : this.partsToRestore.keySet()) {
            PITRSharedFolderFuture fut = new PITRSharedFolderFuture(this.time, this.snpId, this.snpBlt, oldNodeConstId.equals(crd), crd, originalNodes, oldNodeConstId, this.log, this.rd, this.snapshotDir);
            this.scanFutures.put(oldNodeConstId, fut);
        }
        return null;
    }

    @Override
    public IgniteInternalFuture<Set<Object>> scanForLeftNodes() {
        final GridFutureAdapter finishFut = new GridFutureAdapter();
        final ConcurrentHashMap resultOfScans = new ConcurrentHashMap();
        for (final Object nodeConstId : this.partsToRestore.keySet()) {
            this.jobExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        PITRSharedFolderFuture scanFut = (PITRSharedFolderFuture)((Object)PITRSharedFolderRecoveryContext.this.scanFutures.get(nodeConstId));
                        try (WALIterator it = PITRSharedFolderRecoveryContext.this.iteratorFactory.iterator(PITRSharedFolderRecoveryContext.this.iteratorParams.copy(), scanFut.getWalFolder());){
                            ConsistentCutMeta cutMeta = null;
                            if (!F.isEmpty((Map)PITRSharedFolderRecoveryContext.this.metadata.consistentCutMetas())) {
                                Short shortConsistentId = PITRSharedFolderRecoveryContext.this.metadata.baselineTopology().resolveShortConsistentId(nodeConstId);
                                cutMeta = shortConsistentId != null ? (ConsistentCutMeta)PITRSharedFolderRecoveryContext.this.metadata.consistentCutMetas().get(shortConsistentId) : null;
                            }
                            PitrWalScanFilter filter = cutMeta == null ? PitrWalScanFilter.ALWAYS_TRUE : new PitrWalScanFilter(cutMeta);
                            resultOfScans.put(nodeConstId, scanFut.scan(it, filter));
                            if (resultOfScans.size() == PITRSharedFolderRecoveryContext.this.partsToRestore.size()) {
                                HashSet res = new HashSet();
                                for (Set cIds : resultOfScans.values()) {
                                    res.addAll(cIds);
                                }
                                finishFut.onDone(res);
                            }
                        }
                    }
                    catch (Throwable e) {
                        PITRSharedFolderRecoveryContext.this.log.error("Fail scan wal log for recovery localNodeConstId=" + nodeConstId, e);
                        finishFut.onDone(e);
                    }
                }
            });
        }
        return finishFut;
    }

    @Override
    public IgniteInternalFuture<?> continueScan(Set<Object> allLeftNodes, final SnapshotOperationContext snapshotOperationContext) {
        GridCompoundFuture finishFut = new GridCompoundFuture();
        TxStateCommunicationProgressCalculator calc = new TxStateCommunicationProgressCalculator();
        for (final Object nodeConstId : this.partsToRestore.keySet()) {
            PITRSharedFolderFuture scanFut = this.scanFutures.get(nodeConstId);
            calc.addTotal(scanFut.generateRequestsAndGetSize(allLeftNodes));
        }
        snapshotOperationContext.setProgressCalculator((SnapshotProgressCalculator)calc);
        for (final Object nodeConstId : this.partsToRestore.keySet()) {
            final GridFutureAdapter adapter = new GridFutureAdapter();
            this.jobExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        PITRSharedFolderFuture scanFut = (PITRSharedFolderFuture)((Object)PITRSharedFolderRecoveryContext.this.scanFutures.get(nodeConstId));
                        scanFut.continueTxStateCommunication(snapshotOperationContext);
                        adapter.onDone();
                    }
                    catch (Throwable e) {
                        PITRSharedFolderRecoveryContext.this.log.error("Fail scan wal log for recovery localNodeConstId=" + nodeConstId, e);
                        adapter.onDone(e);
                    }
                }
            });
            finishFut.add((IgniteInternalFuture)adapter);
        }
        finishFut.markInitialized();
        return finishFut;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public IgniteInternalFuture<?> recovery(SnapshotOperationContext snapshotOperationContext) {
        GridCompoundFuture finishFut = new GridCompoundFuture();
        for (Map<Integer, Set<Integer>> integerSetMap : this.partsToRestore.values()) {
            for (Map.Entry<Integer, Set<Integer>> entry : integerSetMap.entrySet()) {
                GridDhtPartitionTopology top = this.ig.cache().cacheGroup(entry.getKey().intValue()).topology();
                for (Integer part : entry.getValue()) {
                    this.ig.cache().context().database().checkpointReadLock();
                    try {
                        GridDhtLocalPartition locPart = top.localPartition(part.intValue());
                        if (locPart == null) {
                            try {
                                locPart = top.forceCreatePartition(part.intValue());
                            }
                            catch (IgniteCheckedException e1) {
                                throw new IgniteException((Throwable)e1);
                            }
                        }
                        top.own(locPart);
                    }
                    finally {
                        this.ig.cache().context().database().checkpointReadUnlock();
                    }
                }
            }
        }
        final WALPointerIntervalProgressCalculator recoveryProgressCalculator = new WALPointerIntervalProgressCalculator(this.ig.config().getDataStorageConfiguration().getWalSegmentSize(), snapshotOperationContext, this.log);
        HashMap<Object, Integer> iteratorIndexes = new HashMap<Object, Integer>();
        for (Map.Entry<Object, Object> entry : this.partsToRestore.entrySet()) {
            try {
                PITRFuture.Result result = (PITRFuture.Result)this.scanFutures.get(entry.getKey()).get();
                iteratorIndexes.put(entry.getKey(), recoveryProgressCalculator.addWALPointers((FileWALPointer)result.getBeginWalPointer(), (FileWALPointer)result.getEndWalPointer()));
            }
            catch (IgniteCheckedException e) {
                finishFut.onDone((Throwable)e);
            }
        }
        recoveryProgressCalculator.calculateStep();
        snapshotOperationContext.setProgressCalculator((SnapshotProgressCalculator)recoveryProgressCalculator);
        for (final Map.Entry<Object, Object> entry : this.partsToRestore.entrySet()) {
            final Object nodeConstId = entry.getKey();
            final GridFutureAdapter adapter = new GridFutureAdapter();
            final Integer currentIteratorIndex = (Integer)iteratorIndexes.get(entry.getKey());
            this.jobExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        PITRSharedFolderFuture scanFut = (PITRSharedFolderFuture)((Object)PITRSharedFolderRecoveryContext.this.scanFutures.get(nodeConstId));
                        PITRFuture.Result res = (PITRFuture.Result)scanFut.get();
                        if (PITRSharedFolderRecoveryContext.this.log.isInfoEnabled()) {
                            Set<GridCacheVersion> skipTxs = res.getSkipTxs();
                            PITRSharedFolderRecoveryContext.this.log.info("Start shared folder recovery [thread=" + Thread.currentThread().getName() + " nodeConstId=" + nodeConstId + ", localNodeId=" + PITRSharedFolderRecoveryContext.this.locNodeConstId + " transactions with id will be skipped:" + res.getSkipTxs().size() + " [" + res.getBeginWalPointer() + " -> " + res.getEndWalPointer() + "]" + (!F.isEmpty(skipTxs) ? Arrays.toString(skipTxs.toArray()) : "") + ']');
                        }
                        try (WALIterator it = PITRSharedFolderRecoveryContext.this.iteratorFactory.iterator(PITRSharedFolderRecoveryContext.this.iteratorParams.copy(), scanFut.getWalFolder());){
                            PITRSharedFolderRecoveryContext.this.applyUpdate(it, res, (Map)entry.getValue(), PITRSharedFolderRecoveryContext.this.multiWalApplyPredicate, recoveryProgressCalculator, currentIteratorIndex);
                        }
                        adapter.onDone();
                    }
                    catch (Throwable e) {
                        PITRSharedFolderRecoveryContext.this.log.error("Fail scan wal log for recovery localNodeConstId=" + PITRSharedFolderRecoveryContext.this.locNodeConstId, e);
                        adapter.onDone(e);
                    }
                }
            });
            finishFut.add((IgniteInternalFuture)adapter);
        }
        finishFut.markInitialized();
        return finishFut;
    }

    @Override
    public void onNodeLeft(ClusterNode node, boolean initCrd) {
        for (Map.Entry<Object, Object> e : this.oldCId2CurCId.entrySet()) {
            Object origCId = e.getKey();
            for (PITRFuture pITRFuture : this.scanFutures.values()) {
                if (initCrd && !pITRFuture.isDone()) {
                    throw new RecoveryCoordinatorLeftException("Coordinator left during recovery before sending aggregated scan result, can't finish recovery.");
                }
                pITRFuture.onNodeLeft(origCId);
            }
        }
    }

    @Override
    public void onComplete(Throwable e) {
        super.onComplete(e);
        if (e != null) {
            for (PITRSharedFolderFuture fut : this.scanFutures.values()) {
                fut.onDone(e);
            }
        }
        this.scanFutures.clear();
        this.messageWorker.cancel();
        try {
            this.jobExecutor.shutdown();
            this.jobExecutor.awaitTermination(10L, TimeUnit.SECONDS);
            U.join((GridWorker)this.messageWorker);
        }
        catch (InterruptedException | IgniteInterruptedCheckedException throwable) {
            // empty catch block
        }
    }

    @Override
    protected IgniteBiPredicate<WALRecord, DataEntry> getEntryPredicate(PITRFuture.Result res, PITRRecoveryContext.MultiWalApplyPredicate multiWalApplyPred, AtomicLong appliedEntries) {
        PitrEntryPredicate superPred = super.getEntryPredicate(res, multiWalApplyPred, appliedEntries);
        ConsistentCutMeta cutMeta = null;
        if (!F.isEmpty((Map)this.metadata.consistentCutMetas())) {
            Short shortConsistentId = this.metadata.baselineTopology().resolveShortConsistentId(res.originalConsistentId());
            cutMeta = shortConsistentId != null ? (ConsistentCutMeta)this.metadata.consistentCutMetas().get(shortConsistentId) : null;
        }
        return cutMeta == null ? superPred : new PitrEntryPredicate(superPred, cutMeta);
    }

    static class MessageWorker
    extends GridWorker {
        private Map<Object, PITRSharedFolderFuture> futures;
        private PITRSharedFolderRecoveryContext rctx;
        private final BlockingQueue<RecoveryMessageWrapper> queue;
        private final Marshaller marsh;
        private final IgniteUuid recoveryId;

        MessageWorker(BlockingQueue<RecoveryMessageWrapper> queue, Marshaller marsh, IgniteUuid recoveryId, String igniteInstanceName, String threadName, IgniteLogger log) {
            super(igniteInstanceName, threadName, log);
            this.queue = queue;
            this.marsh = marsh;
            this.recoveryId = recoveryId;
        }

        private void init(PITRSharedFolderRecoveryContext rctx) {
            this.futures = rctx.scanFutures;
            this.rctx = rctx;
        }

        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            while (!Thread.currentThread().isInterrupted() && !this.isCancelled.get()) {
                RecoveryMessageWrapper wrapper = this.queue.take();
                try {
                    wrapper.unmarshal(this.marsh);
                }
                catch (IgniteCheckedException e) {
                    U.error((IgniteLogger)this.log, (Object)("Error while unmarshalling RecoveryMessageWrapper, " + wrapper), (Throwable)e);
                }
                if (!wrapper.recoveryId().equals((Object)this.recoveryId)) {
                    this.log.warning("Not expected operation id in msg - " + wrapper);
                    continue;
                }
                if (this.isCancelled.get()) {
                    return;
                }
                if (!this.futures.containsKey(wrapper.receiverNodeConstId())) {
                    try {
                        wrapper.marshal(this.marsh);
                        Object constId = this.rctx.oldCId2CurCId.get(wrapper.receiverNodeConstId());
                        this.rctx.sender().sendRequest(constId, wrapper);
                    }
                    catch (IgniteCheckedException e) {
                        U.error((IgniteLogger)this.log, (Object)("Error while sending message, " + wrapper), (Throwable)e);
                    }
                    continue;
                }
                PITRSharedFolderFuture fut = this.futures.get(wrapper.receiverNodeConstId());
                Object fromNodeConstId = wrapper.senderNodeConstId();
                Message msg = wrapper.msg();
                if (msg instanceof TxStateRequest) {
                    fut.processRequest(fromNodeConstId, (TxStateRequest)msg);
                    continue;
                }
                if (msg instanceof TxStateResponse) {
                    fut.processResponse(fromNodeConstId, (TxStateResponse)msg);
                    continue;
                }
                if (!(msg instanceof TxConvergenceMessage)) continue;
                fut.processConvergenceMessage(fromNodeConstId, (TxConvergenceMessage)msg);
            }
        }

        public void cancel() {
            this.futures.clear();
            super.cancel();
        }
    }

    private class SharedFolderSender
    implements Sender {
        private final Object oldNodeConstId;

        private SharedFolderSender(Object oldNodeConstId) {
            this.oldNodeConstId = oldNodeConstId;
        }

        @Override
        public void sendRequest(@Nullable Object originalCIdToSend, Message req) throws IgniteCheckedException {
            try {
                PITRSharedFolderRecoveryContext.this.queue.put(new RecoveryMessageWrapper(PITRSharedFolderRecoveryContext.this.recoveryId, this.oldNodeConstId, originalCIdToSend, req));
            }
            catch (InterruptedException e) {
                PITRSharedFolderRecoveryContext.this.log.error("Interrupted while adding message to queue", (Throwable)e);
            }
        }
    }

    private class PITRSharedFolderFuture
    extends PITRFuture {
        private final SnapshotPath walFolder;

        private PITRSharedFolderFuture(long time, long snapshotId, BaselineTopology snpBlt, boolean crdLocal, Object crdConstId, Set<Object> originalNodes, @Nullable Object oldNodeConstId, @Nullable IgniteLogger log, RecoveryDebug rd, SnapshotPath snapshotDir) {
            super(time, snapshotId, snpBlt, crdLocal, crdConstId, originalNodes, oldNodeConstId, new SharedFolderSender(oldNodeConstId), log);
            this.walFolder = snapshotDir.resolve(U.maskForFileName((CharSequence)oldNodeConstId.toString())).resolve("wal");
        }

        @Override
        protected boolean isAlive(Object originalCId) {
            return true;
        }

        public SnapshotPath getWalFolder() {
            return this.walFolder;
        }
    }
}

