/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.checker.processor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.cache.expiry.EternalExpiryPolicy;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.checker.objects.CachePartitionRequest;
import org.apache.ignite.internal.processors.cache.checker.objects.ExecutionResult;
import org.apache.ignite.internal.processors.cache.checker.objects.PartitionBatchRequest;
import org.apache.ignite.internal.processors.cache.checker.objects.RecheckRequest;
import org.apache.ignite.internal.processors.cache.checker.objects.ReconciliationAffectedEntries;
import org.apache.ignite.internal.processors.cache.checker.objects.RepairRequest;
import org.apache.ignite.internal.processors.cache.checker.objects.VersionedKey;
import org.apache.ignite.internal.processors.cache.checker.objects.VersionedValue;
import org.apache.ignite.internal.processors.cache.checker.processor.AbstractPipelineProcessor;
import org.apache.ignite.internal.processors.cache.checker.processor.PipelineWorkload;
import org.apache.ignite.internal.processors.cache.checker.processor.ReconciliationEventListener;
import org.apache.ignite.internal.processors.cache.checker.processor.ReconciliationResultCollector;
import org.apache.ignite.internal.processors.cache.checker.processor.workload.Batch;
import org.apache.ignite.internal.processors.cache.checker.processor.workload.Recheck;
import org.apache.ignite.internal.processors.cache.checker.processor.workload.Repair;
import org.apache.ignite.internal.processors.cache.checker.tasks.CollectPartitionKeysByBatchTask;
import org.apache.ignite.internal.processors.cache.checker.tasks.CollectPartitionKeysByRecheckRequestTask;
import org.apache.ignite.internal.processors.cache.checker.tasks.RepairRequestTask;
import org.apache.ignite.internal.processors.cache.checker.util.ConsistencyCheckUtils;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.verify.RepairAlgorithm;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.diagnostic.ReconciliationExecutionContext;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;

public class PartitionReconciliationProcessor
extends AbstractPipelineProcessor {
    public static final String SESSION_CHANGE_MSG = "Reconciliation session has changed.";
    public static final String TOPOLOGY_CHANGE_MSG = "Topology has changed. Partition reconciliation task was stopped.";
    private static final String START_EXECUTION_MSG = "Partition reconciliation has started [sesId=%s, repair=%s, repairAlgorithm=%s, fastCheck=%s, batchSize=%s, recheckAttempts=%s, parallelismLevel=%s, caches=%s]";
    private static final String STOP_EXECUTION_MSG = "Partition reconciliation has finished locally [sesId=%s, conflicts=%s, repaired=%s, totalTime=%s(sec)]";
    private static final String SESSION_PROGRESS_MSG = "Partition reconciliation status [sesId=%s, caches=%s, parts= %s, conflicts=%s, repaired=%s , caches=[";
    private static final String CACHE_PROGRESS_MSG = "[name=%s, primaryParts=%s, processedPrimaryKeys=%s, currPrimaryKeysSize=%s, processedBackupKeys=%s, currBackupKeysSize=%s, conflicts=%s, repaired=%s, completed=%s]";
    public static final String ERROR_REASON = "Reason [msg=%s, exception=%s]";
    private final int recheckDelay;
    private final Collection<String> caches;
    private final boolean repair;
    private final Map<Integer, Set<Integer>> partsToValidate;
    private final int batchSize;
    private final int recheckAttempts;
    private final RepairAlgorithm repairAlg;
    private final WorkloadTracker workloadTracker = new WorkloadTracker();
    final ReconciliationResultCollector collector;
    private AtomicLong lastWorkProgressUpdateTime = new AtomicLong(0L);

    public PartitionReconciliationProcessor(long sesId, IgniteEx ignite, Collection<String> caches, Map<Integer, Set<Integer>> partsToValidate, boolean repair, RepairAlgorithm repairAlg, int parallelismLevel, int batchSize, int recheckAttempts, int recheckDelay, boolean compact, boolean includeSensitive) throws IgniteCheckedException {
        super(sesId, ignite, parallelismLevel);
        this.recheckDelay = recheckDelay;
        this.caches = caches;
        this.repair = repair;
        this.partsToValidate = partsToValidate;
        this.batchSize = batchSize;
        this.recheckAttempts = recheckAttempts;
        this.repairAlg = repairAlg;
        this.registerListener(this.workloadTracker.andThen(this.evtLsnr));
        this.collector = compact ? new ReconciliationResultCollector.Compact(ignite, this.log, sesId, includeSensitive) : new ReconciliationResultCollector.Simple(ignite, this.log, includeSensitive);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    public ExecutionResult<ReconciliationAffectedEntries> execute() {
        ExecutionResult<ReconciliationAffectedEntries> executionResult;
        long startTime = U.currentTimeMillis();
        if (this.log.isInfoEnabled()) {
            this.log.info(String.format(START_EXECUTION_MSG, new Object[]{this.sessionId(), this.repair, this.repairAlg, this.partsToValidate != null, this.batchSize, this.recheckAttempts, this.parallelismLevel, this.caches}));
        }
        try {
            for (String cache : this.caches) {
                int[] partitions;
                IgniteInternalCache cachex = this.ignite.cachex(cache);
                if (cachex == null) {
                    if (!this.log.isInfoEnabled()) continue;
                    this.log.info("The cache '" + cache + "' was skipped because it does not exist.");
                    continue;
                }
                ExpiryPolicy expPlc = cachex.context().expiry();
                if (expPlc != null && !(expPlc instanceof EternalExpiryPolicy)) {
                    if (!this.log.isInfoEnabled()) continue;
                    this.log.info("The cache '" + cache + "' was skipped because CacheConfiguration#setExpiryPolicyFactory is set.");
                    continue;
                }
                for (int partId : partitions = this.primaryPartitionsToValidate(cache)) {
                    Batch workload = new Batch(this.sesId, UUID.randomUUID(), cache, partId, null);
                    this.workloadTracker.addTrackingChain(workload);
                    this.schedule(workload);
                }
            }
            this.ctx.diagnostic().reconciliationExecutionContext().listenMetricsUpdates(this.sessionId(), this.workloadTracker);
            boolean live = false;
            this.lastWorkProgressUpdateTime.set(U.currentTimeMillis());
            while (!this.isEmpty() || (live = this.hasLiveHandlers())) {
                if (this.topologyChanged()) {
                    throw new IgniteException(TOPOLOGY_CHANGE_MSG);
                }
                if (this.isSessionExpired()) {
                    throw new IgniteException(SESSION_CHANGE_MSG);
                }
                if (this.isInterrupted()) {
                    throw new IgniteException((String)this.error.get());
                }
                this.printStatistics();
                if (this.isEmpty() && live) {
                    U.sleep(100L);
                    continue;
                }
                PipelineWorkload workload = this.pollTask(this.workProgressPrintIntervalSec / 5L, TimeUnit.SECONDS);
                if (workload == null) continue;
                if (workload instanceof Batch) {
                    this.handle((Batch)workload);
                    continue;
                }
                if (workload instanceof Recheck) {
                    this.handle((Recheck)workload);
                    continue;
                }
                if (workload instanceof Repair) {
                    this.handle((Repair)workload);
                    continue;
                }
                String err = "Unsupported workload type: " + workload;
                this.log.error(err);
                throw new IgniteException(err);
            }
            String errMsg = (String)this.error.get();
            executionResult = errMsg == null ? new ExecutionResult<ReconciliationAffectedEntries>(this.collector.result()) : new ExecutionResult<ReconciliationAffectedEntries>(this.collector.result(), "Partition reconciliation finished with error. " + String.format(ERROR_REASON, errMsg, "N/A"));
        }
        catch (InterruptedException | IgniteException e) {
            ExecutionResult<ReconciliationAffectedEntries> executionResult2;
            block21: {
                String errMsg = "Partition reconciliation was interrupted.";
                this.waitWorkFinish();
                this.log.warning(errMsg, e);
                executionResult2 = new ExecutionResult<ReconciliationAffectedEntries>(this.collector.result(), errMsg + ' ' + String.format(ERROR_REASON, e.getMessage(), e.getClass()));
                this.ctx.diagnostic().reconciliationExecutionContext().removeMetricsUpdateListener(this.sessionId());
                this.lastWorkProgressUpdateTime.set(0L);
                this.printStatistics();
                if (!this.log.isInfoEnabled()) break block21;
                this.log.info(String.format(STOP_EXECUTION_MSG, this.sessionId(), this.collector.conflictedEntriesSize(), this.collector.repairedEntriesSize(), (U.currentTimeMillis() - startTime) / 1000L));
            }
            return executionResult2;
        }
        catch (Exception e2) {
            ExecutionResult<ReconciliationAffectedEntries> executionResult3;
            block22: {
                String errMsg = "Unexpected error.";
                this.log.error(errMsg, e2);
                executionResult3 = new ExecutionResult<ReconciliationAffectedEntries>(this.collector.result(), errMsg + ' ' + String.format(ERROR_REASON, e2.getMessage(), e2.getClass()));
                this.ctx.diagnostic().reconciliationExecutionContext().removeMetricsUpdateListener(this.sessionId());
                this.lastWorkProgressUpdateTime.set(0L);
                this.printStatistics();
                if (!this.log.isInfoEnabled()) break block22;
                {
                    catch (Throwable throwable) {
                        this.ctx.diagnostic().reconciliationExecutionContext().removeMetricsUpdateListener(this.sessionId());
                        this.lastWorkProgressUpdateTime.set(0L);
                        this.printStatistics();
                        if (this.log.isInfoEnabled()) {
                            this.log.info(String.format(STOP_EXECUTION_MSG, this.sessionId(), this.collector.conflictedEntriesSize(), this.collector.repairedEntriesSize(), (U.currentTimeMillis() - startTime) / 1000L));
                        }
                        throw throwable;
                    }
                }
                this.log.info(String.format(STOP_EXECUTION_MSG, this.sessionId(), this.collector.conflictedEntriesSize(), this.collector.repairedEntriesSize(), (U.currentTimeMillis() - startTime) / 1000L));
            }
            return executionResult3;
        }
        this.ctx.diagnostic().reconciliationExecutionContext().removeMetricsUpdateListener(this.sessionId());
        this.lastWorkProgressUpdateTime.set(0L);
        this.printStatistics();
        if (this.log.isInfoEnabled()) {
            this.log.info(String.format(STOP_EXECUTION_MSG, this.sessionId(), this.collector.conflictedEntriesSize(), this.collector.repairedEntriesSize(), (U.currentTimeMillis() - startTime) / 1000L));
        }
        return executionResult;
    }

    @Override
    public void printStatistics() {
        long lastUpdate;
        long currTimeMillis = U.currentTimeMillis();
        if (currTimeMillis < (lastUpdate = this.lastWorkProgressUpdateTime.get()) + TimeUnit.SECONDS.toMillis(this.workProgressPrintIntervalSec) || !this.log.isInfoEnabled()) {
            return;
        }
        if (!this.lastWorkProgressUpdateTime.compareAndSet(lastUpdate, currTimeMillis)) {
            return;
        }
        try {
            StringBuilder sb = new StringBuilder();
            sb.append(String.format(SESSION_PROGRESS_MSG, this.sesId, this.workloadTracker.totalCaches(), this.workloadTracker.totalChains(), this.collector.conflictedEntriesSize(), this.collector.repairedEntriesSize()));
            boolean first = true;
            for (Map.Entry e : this.workloadTracker.cacheStatistics.entrySet()) {
                WorkloadTracker.CacheStatisticsDescriptor cacheDesc = (WorkloadTracker.CacheStatisticsDescriptor)e.getValue();
                long currPrimaryKeysCnt = 0L;
                long currBackupKeysCnt = 0L;
                int cacheId = CU.cacheId(cacheDesc.cacheName);
                GridCacheContext cctx = this.ignite.context().cache().context().cacheContext(cacheId);
                if (cctx != null) {
                    List<GridDhtLocalPartition> parts = cctx.group().topology().localPartitions();
                    for (GridDhtLocalPartition part : parts) {
                        if (this.partsToValidate != null && !this.partsToValidate.getOrDefault(cctx.group().groupId(), Collections.emptySet()).contains(part.id())) continue;
                        long partSize = part.dataStore().cacheSize(cacheId);
                        if (part.primary(this.startTopVer)) {
                            currPrimaryKeysCnt += partSize;
                            continue;
                        }
                        if (!part.backup(this.startTopVer)) continue;
                        currBackupKeysCnt += partSize;
                    }
                }
                if (!first) {
                    sb.append(", ");
                }
                sb.append(String.format(CACHE_PROGRESS_MSG, cacheDesc.cacheName, cacheDesc.chainsCnt.get(), cacheDesc.scannedPrimaryKeysCnt.get(), currPrimaryKeysCnt, cacheDesc.scannedBackupKeysCnt.get(), currBackupKeysCnt, this.collector.conflictedEntriesSize(cacheDesc.cacheName), this.collector.repairedEntriesSize(cacheDesc.cacheName), cacheDesc.chainsCnt.get() == cacheDesc.processedChainsCnt.get()));
                first = false;
            }
            sb.append(']');
            if (this.log.isInfoEnabled()) {
                this.log.info(sb.toString());
            }
        }
        catch (Exception e) {
            this.log.warning("Failed to calculate partition reconciliation statistics [err=" + e + ']', e);
        }
    }

    public ReconciliationResultCollector collector() {
        return this.collector;
    }

    private int[] primaryPartitionsToValidate(String name) {
        int[] cacheParts = this.ignite.affinity(name).primaryPartitions(this.ignite.localNode());
        if (this.partsToValidate == null) {
            return cacheParts;
        }
        DynamicCacheDescriptor desc = this.ctx.cache().cacheDescriptor(name);
        Set parts = desc != null ? this.partsToValidate.getOrDefault(desc.groupId(), Collections.emptySet()) : Collections.emptySet();
        return IntStream.of(cacheParts).filter(parts::contains).toArray();
    }

    private void handle(Batch workload) throws InterruptedException {
        this.compute(CollectPartitionKeysByBatchTask.class, new PartitionBatchRequest(workload.sessionId(), workload.workloadChainId(), workload.cacheName(), workload.partitionId(), this.batchSize, workload.lowerKey(), this.startTopVer), res -> {
            KeyCacheObject nextBatchKey = (KeyCacheObject)res.get1();
            Map recheckKeys = (Map)res.get2();
            assert (nextBatchKey != null || recheckKeys.isEmpty());
            if (nextBatchKey != null) {
                this.schedule(new Batch(workload.sessionId(), workload.workloadChainId(), workload.cacheName(), workload.partitionId(), nextBatchKey));
            }
            if (!recheckKeys.isEmpty()) {
                this.schedule(new Recheck(workload.sessionId(), workload.workloadChainId(), recheckKeys, workload.cacheName(), workload.partitionId(), 0, 0), this.recheckDelay, TimeUnit.SECONDS);
            }
        });
    }

    private void handle(Recheck workload) throws InterruptedException {
        this.compute(CollectPartitionKeysByRecheckRequestTask.class, new RecheckRequest(workload.sessionId(), workload.workloadChainId(), new ArrayList<KeyCacheObject>(workload.recheckKeys().keySet()), workload.cacheName(), workload.partitionId(), this.startTopVer), actualKeys -> {
            IgniteInternalCache cachex = this.ignite.cachex(workload.cacheName());
            if (cachex == null) {
                throw new IgniteException("Cache not found (was stopped) [name=" + workload.cacheName() + ']');
            }
            Map<KeyCacheObject, Map<UUID, GridCacheVersion>> conflicts = ConsistencyCheckUtils.checkConflicts(workload.recheckKeys(), actualKeys, cachex.context(), this.startTopVer);
            if (!conflicts.isEmpty()) {
                if (workload.recheckAttempt() < this.recheckAttempts) {
                    this.schedule(new Recheck(workload.sessionId(), workload.workloadChainId(), conflicts, workload.cacheName(), workload.partitionId(), workload.recheckAttempt() + 1, workload.repairAttempt()), this.recheckDelay, TimeUnit.SECONDS);
                } else if (this.repair) {
                    this.scheduleHighPriority(this.repair(workload.sessionId(), workload.workloadChainId(), workload.cacheName(), workload.partitionId(), conflicts, (Map<KeyCacheObject, Map<UUID, VersionedValue>>)actualKeys, workload.repairAttempt()));
                } else {
                    this.collector.appendConflictedEntries(workload.cacheName(), workload.partitionId(), conflicts, (Map<KeyCacheObject, Map<UUID, VersionedValue>>)actualKeys);
                }
            }
        });
    }

    private void handle(Repair workload) throws InterruptedException {
        this.compute(RepairRequestTask.class, new RepairRequest(workload.sessionId(), workload.workloadChainId(), workload.data(), workload.cacheName(), workload.partitionId(), this.startTopVer, this.repairAlg, workload.repairAttempt()), repairRes -> {
            if (!repairRes.repairedKeys().isEmpty()) {
                this.collector.appendRepairedEntries(workload.cacheName(), workload.partitionId(), repairRes.repairedKeys());
            }
            if (!repairRes.keysToRepair().isEmpty()) {
                IgniteInternalCache cachex = this.ignite.cachex(workload.cacheName());
                if (cachex == null) {
                    throw new IgniteException("Cache not found (was stopped) [name=" + workload.cacheName() + ']');
                }
                HashMap<KeyCacheObject, Map<UUID, GridCacheVersion>> recheckKeys = new HashMap<KeyCacheObject, Map<UUID, GridCacheVersion>>();
                for (Map.Entry<VersionedKey, Map<UUID, VersionedValue>> dataEntry : repairRes.keysToRepair().entrySet()) {
                    KeyCacheObject keyCacheObj;
                    try {
                        keyCacheObj = ConsistencyCheckUtils.unmarshalKey(dataEntry.getKey().key(), cachex.context());
                    }
                    catch (IgniteCheckedException e) {
                        U.error(this.log, "Unable to unmarshal key=[" + dataEntry.getKey().key() + "], key is skipped.");
                        continue;
                    }
                    recheckKeys.put(keyCacheObj, dataEntry.getValue().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e2 -> ((VersionedValue)e2.getValue()).version())));
                }
                if (workload.repairAttempt() < 3) {
                    this.schedule(new Recheck(workload.sessionId(), workload.workloadChainId(), recheckKeys, workload.cacheName(), workload.partitionId(), this.recheckAttempts, workload.repairAttempt() + 1));
                }
            }
        });
    }

    private Repair repair(long sesId, UUID workloadChainId, String cacheName, int partId, Map<KeyCacheObject, Map<UUID, GridCacheVersion>> notResolvingConflicts, Map<KeyCacheObject, Map<UUID, VersionedValue>> actualKeys, int repairAttempts) {
        HashMap<KeyCacheObject, Map<UUID, VersionedValue>> res = new HashMap<KeyCacheObject, Map<UUID, VersionedValue>>();
        for (KeyCacheObject key : notResolvingConflicts.keySet()) {
            Map<UUID, VersionedValue> versionedByNodes = actualKeys.get(key);
            if (versionedByNodes == null) continue;
            res.put(key, versionedByNodes);
        }
        return new Repair(sesId, workloadChainId, cacheName, partId, res, repairAttempts);
    }

    private class WorkloadTracker
    implements ReconciliationEventListener,
    ReconciliationExecutionContext.ReconciliationStatisticsUpdateListener {
        private final Map<UUID, ChainDescriptor> activeChains = new ConcurrentHashMap<UUID, ChainDescriptor>();
        private final Map<String, CacheStatisticsDescriptor> cacheStatistics = new ConcurrentHashMap<String, CacheStatisticsDescriptor>();
        private final AtomicInteger trackedChainsCnt = new AtomicInteger();
        private final AtomicInteger completedChainsCnt = new AtomicInteger();

        private WorkloadTracker() {
        }

        @Override
        public void onEvent(ReconciliationEventListener.WorkLoadStage stage, PipelineWorkload workload) {
            switch (stage) {
                case SCHEDULED: {
                    this.attachWorkload(workload);
                    break;
                }
                case SKIPPED: {
                    this.skipWorkload(workload);
                    break;
                }
                case FINISHED: {
                    this.detachWorkload(workload);
                    break;
                }
            }
        }

        @Override
        public void updateScannedPartition(long sesId, String cacheName, int partId, boolean primary, long keysCnt) {
            assert (sesId == PartitionReconciliationProcessor.this.sessionId()) : "Update partition scan metrics does not correspond to the current session [currSesId=" + PartitionReconciliationProcessor.this.sessionId() + ", updateSesId=" + sesId + ']';
            CacheStatisticsDescriptor desc = this.cacheStatistics.get(cacheName);
            if (desc != null) {
                if (primary) {
                    desc.scannedPrimaryKeysCnt.addAndGet(keysCnt);
                } else {
                    desc.scannedBackupKeysCnt.addAndGet(keysCnt);
                }
            }
        }

        public Integer totalChains() {
            return this.trackedChainsCnt.get();
        }

        public Integer remaningChains() {
            return this.trackedChainsCnt.get() - this.completedChainsCnt.get();
        }

        public Integer totalCaches() {
            return this.cacheStatistics.size();
        }

        public void addTrackingChain(Batch batch) {
            assert (batch.sessionId() == PartitionReconciliationProcessor.this.sesId) : "New tracking chain does not correspond to the current session [currSesId=" + PartitionReconciliationProcessor.this.sesId + ", chainSesId=" + batch.sessionId() + ", chainId=" + batch.workloadChainId() + ']';
            this.activeChains.putIfAbsent(batch.workloadChainId(), new ChainDescriptor(batch.workloadChainId(), batch.cacheName(), batch.partitionId()));
            CacheStatisticsDescriptor stat = this.cacheStatistics.computeIfAbsent(batch.cacheName(), x$0 -> new CacheStatisticsDescriptor((String)x$0));
            stat.chainsCnt.incrementAndGet();
            this.trackedChainsCnt.incrementAndGet();
        }

        private void attachWorkload(PipelineWorkload workload) {
            Optional.ofNullable(this.activeChains.get(workload.workloadChainId())).map(d -> ((ChainDescriptor)d).workloadCnt.incrementAndGet());
        }

        private void detachWorkload(PipelineWorkload workload) {
            ChainDescriptor desc = this.activeChains.get(workload.workloadChainId());
            if (desc != null && desc.workloadCnt.decrementAndGet() == 0) {
                this.completedChainsCnt.incrementAndGet();
                this.activeChains.remove(desc.chainId);
                this.onChainCompleted(desc.chainId, desc.cacheName, desc.partId);
                CacheStatisticsDescriptor stat = this.cacheStatistics.get(desc.cacheName);
                stat.processedChainsCnt.incrementAndGet();
            }
        }

        private void skipWorkload(PipelineWorkload workload) {
            if (workload instanceof CachePartitionRequest) {
                CachePartitionRequest partReqWorkload = (CachePartitionRequest)workload;
                PartitionReconciliationProcessor.this.collector.appendSkippedPartition(partReqWorkload.cacheName(), partReqWorkload.partitionId());
            }
        }

        private void onChainCompleted(UUID chainId, String cacheName, int partId) {
            PartitionReconciliationProcessor.this.collector.onPartitionProcessed(cacheName, partId);
        }

        private class CacheStatisticsDescriptor {
            final String cacheName;
            final AtomicInteger chainsCnt = new AtomicInteger();
            final AtomicInteger processedChainsCnt = new AtomicInteger();
            final AtomicLong scannedPrimaryKeysCnt = new AtomicLong();
            final AtomicLong scannedBackupKeysCnt = new AtomicLong();

            CacheStatisticsDescriptor(String cacheName) {
                this.cacheName = cacheName;
            }
        }

        private class ChainDescriptor {
            private final UUID chainId;
            private final String cacheName;
            private final int partId;
            private final AtomicInteger workloadCnt = new AtomicInteger();

            ChainDescriptor(UUID chainId, String cacheName, int partId) {
                this.chainId = chainId;
                this.cacheName = cacheName;
                this.partId = partId;
            }
        }
    }
}

